Bulk Execution Engine or BEE is a simple job runner. While writing distributed control plane APIs, there was a need for:

  1. waiting for distributed asynchronous transaction convergence
  2. retrying for intermittent errors
  3. providing status without holding up network connection

After a brief survey of available solution landscape (and definitely not an intensive one), I found that popular products like Apache Airflow, AWS Step Functions could solve the problem, but at a high operational and cognitive cost.

A hand written solution could be built with a backend store, but would lack any reuse across multiple API servers.

BEE is my humble attempt to write a simple server. I decided to use sqlite as a backend, as there is no need to keep job status’s around forever. Since BEE interacts only with its clients, the job submission and execution protocol are the main considerations. BEE is an internal platform service, and as such can be run with mTLS or other protocols.

At its heart, BEE runs multiple tasks on behalf of its client. Further simplifications:

  1. The tasks are executed as callbacks to the client
  2. The tasks have dependency order as specified by the client
  3. A task is retried as many times as specified by the client
  4. Job status is available to the client at any time
  5. Job metadata is held for as much time as the client wants (but not forever, and is not durable)
import "time"

// Job specification by a client
type Job struct {
  // Concurrency controls parallelism for tasks
  Concurrency int
  // Tasks to execute
  Tasks []Task
  Done bool
}

// Task specification by client
type Task struct {
  // Type classifies a task
  Type string
  // Captures information on task
  Spec map[string]interface{}
  // StopRetry fails a task
  StopRetry bool
  // Stats are managed by the BEE
  Stats []TaskTelemetry
}

// TaskTelemetry is an audit of task execution
type TaskTelemetry struct {
  // Response back from client for this task
  Response map[string]interface{}
  // Result back from the client
  Result int
  // How much time was spent on the task
  Time time.Duration
}

// BeeRequest for communication to bee
type BeeRequest struct {
  Job Job
  Request string
  JobID int
}

// BeeResponse for communication from bee
type BeeResponse struct {
  Job Job
  JobID int
}

const MAX_JOBS = 10
const MAX_TASKS = 100

func (t *Task) Execute() {

}

func main() {
  // ask Bee server to process jobs
  toBee := make(chan BeeRequest, MAX_JOBS)

  // reply from Bee server
  fromBee := make(chan BeeResponse, MAX_JOBS)

  // Bee server, for now processes a single
  // job at a time
  go func(in <-chan BeeRequest, out <-chan BeeResponse) {
	  jobs := map[int]Job{}
	  jobCnt := 0

	  for _, j := range in {
		  if j.Request == "CREATE" {
			  jobs[jobCnt] = j.Job
			  jobCnt++
			  fromBee <- BeeResponse{j.Job, jobCnt-1}
			  go func() {
				  worked := false
				  for {
					  for _, t := range j.Job.Tasks {
						  if !t.StopRetry {
							  t.Execute()
							  worked = true
						  }
					  }
					  if !worked {
						  j.Job.Done = true
						  break
					  }
				  }
			  }()
		  } else if in.Request == "QUERY" {
			  fromBee <- BeeResponse{jobs[in.JobID], in.JobID}
		  }
	  }
  }(toBee, fromBee)

  toBee <- BeeRequest{ Job{0, []Task{}}, "CREATE", 0}

  for {
	  response := <- fromBee
	  if response.Job.Done {
		  break
	  }
	  time.Sleep(100*time.Millisec)
	  toBee <- BeeRequest{ Job{}, "QUERY", response.JobID}
  }

}