Bulk Execution Engine or BEE is a simple job runner. While writing distributed control plane APIs, there was a need for:
- waiting for distributed asynchronous transaction convergence
- retrying for intermittent errors
- providing status without holding up network connection
After a brief survey of available solution landscape (and definitely not an intensive one), I found that popular products like Apache Airflow, AWS Step Functions could solve the problem, but at a high operational and cognitive cost.
A hand written solution could be built with a backend store, but would lack any reuse across multiple API servers.
BEE is my humble attempt to write a simple server. I decided to use sqlite as a backend, as there is no need to keep job status’s around forever. Since BEE interacts only with its clients, the job submission and execution protocol are the main considerations. BEE is an internal platform service, and as such can be run with mTLS or other protocols.
At its heart, BEE runs multiple tasks on behalf of its client. Further simplifications:
- The tasks are executed as callbacks to the client
- The tasks have dependency order as specified by the client
- A task is retried as many times as specified by the client
- Job status is available to the client at any time
- Job metadata is held for as much time as the client wants (but not forever, and is not durable)
import "time"
// Job specification by a client
type Job struct {
// Concurrency controls parallelism for tasks
Concurrency int
// Tasks to execute
Tasks []Task
Done bool
}
// Task specification by client
type Task struct {
// Type classifies a task
Type string
// Captures information on task
Spec map[string]interface{}
// StopRetry fails a task
StopRetry bool
// Stats are managed by the BEE
Stats []TaskTelemetry
}
// TaskTelemetry is an audit of task execution
type TaskTelemetry struct {
// Response back from client for this task
Response map[string]interface{}
// Result back from the client
Result int
// How much time was spent on the task
Time time.Duration
}
// BeeRequest for communication to bee
type BeeRequest struct {
Job Job
Request string
JobID int
}
// BeeResponse for communication from bee
type BeeResponse struct {
Job Job
JobID int
}
const MAX_JOBS = 10
const MAX_TASKS = 100
func (t *Task) Execute() {
}
func main() {
// ask Bee server to process jobs
toBee := make(chan BeeRequest, MAX_JOBS)
// reply from Bee server
fromBee := make(chan BeeResponse, MAX_JOBS)
// Bee server, for now processes a single
// job at a time
go func(in <-chan BeeRequest, out <-chan BeeResponse) {
jobs := map[int]Job{}
jobCnt := 0
for _, j := range in {
if j.Request == "CREATE" {
jobs[jobCnt] = j.Job
jobCnt++
fromBee <- BeeResponse{j.Job, jobCnt-1}
go func() {
worked := false
for {
for _, t := range j.Job.Tasks {
if !t.StopRetry {
t.Execute()
worked = true
}
}
if !worked {
j.Job.Done = true
break
}
}
}()
} else if in.Request == "QUERY" {
fromBee <- BeeResponse{jobs[in.JobID], in.JobID}
}
}
}(toBee, fromBee)
toBee <- BeeRequest{ Job{0, []Task{}}, "CREATE", 0}
for {
response := <- fromBee
if response.Job.Done {
break
}
time.Sleep(100*time.Millisec)
toBee <- BeeRequest{ Job{}, "QUERY", response.JobID}
}
}