Developing an Embedded Job Queue With Golang - Part 2
Building a Factory
In the article, Developing an Embedded Job Queue With Golang - Part 1, we briefly reviewed the excellent concurrency primitives Go offers, then prototyped an example job queue.
We’re going to create a package for reel and other applications to use, such that they can queue work to be done by workers and eventually specify when the work should be done, check on the status of a job, and send commands to workers through the dispatcher. To restate our problem, we want to hand off long running tasks to workers so we can immediately return and respond to (for example) a web request. We’re modeling a task queue or job queue using goroutines and channels.
We could do this directly in reel. The use case for reel is that we might want to query an API endpoint to see the status of our task, if it’s completed, how long the task has taken, and we may also want to cancel the job. Further we can generalize this and create our own message queue system, which uses our conventions such that clients can subscribe and listen for work that’s published, perform the work and respond with the result. Before we consider doing that, we’ll create an embedded task queue package in Go.
I. ls-factory
Enter ls-factory - a simple task queue. Like all projects, we’ll start small and incrementally add in features. To start with we’ll focus on creating jobs, workers and a dispatcher. Once we have these working, we’ll extend the package to generalize a message queue with an event bus between job publishers and job subscribers (client workers) in a PubSub pattern. For now, we want an embedded task queue package we can re-use that gives us some extra features outside of simply using some channels and goroutines.
As we model a task queue, the idea of a factory comes to mind, as we’re dispatching jobs to workers and getting work done. As we proceed we’ll add in most of the following features:
1. Enqueue jobs to be executed.
2. Specify an arbitrary number of workers.
3. Handle processing functions by workers.
4. Communicate status of jobs through the dispatcher.
5. Communicate orders to workers, such as halting the work.
6. Track the result of the job.
7. Specify a time for a job to be executed.
8. Set execution priority for a job.
II. job.go
The simple part of factory is the job definition. We could, and will, extend this further over time. For now we will define a structure for jobs: a unique ID, start and end run time, completed, (running is inferred through start time).
The code for job.go follows:
1package factory
2
3import (
4 "time"
5)
6
7// JobExecutable defines our spec for jobs, which are functions that return
8// an error.
9type JobExecutable func() error
10
11// Job has an ID, start time, end time and reference to a JobExecutable.
12type Job struct {
13 ID int
14 StartTime time.Time
15 EndTime time.Time
16 Executable JobExecutable
17}
18
19func CreateNewJob(id int, je JobExecutable) *Job {
20 return &Job{
21 ID: id,
22 Executable: je,
23 }
24}
III. worker.go
The worker will concern itself with waiting for jobs, processing them and responding to signals. It should perform:
1. Process jobs from the job queue channel.
2. Respond to orders from the dispatcher to halt or get job status.
3. Set properties on a job (start time, end time, result).
We’ll add additional features to the worker during our next iteration, for now we’ll keep things simple.
1package factory
2
3import (
4 log "github.com/sirupsen/logrus"
5 "time"
6)
7
8// Worker has an ID, internal channel for jobs, internal channel for reporting
9// status to the dispatcher, and a channel for quitting.
10type Worker struct {
11 ID int
12 jobs chan *Job
13 dispatchStatus chan *DispatchStatus
14 workerCmd chan *WorkerCommand
15 log *log.Logger
16}
17
18type WorkerCommand struct {
19 ID int
20 Command string
21}
22
23// CreateNewWorker accepts an ID, channel for worker registration, channel for
24// jobs, and a channel for dispatch reports.
25func CreateNewWorker(id int, wCmd chan *WorkerCommand, jobQueue chan *Job, dStatus chan *DispatchStatus, l *log.Logger) *Worker {
26 w := &Worker{
27 ID: id,
28 jobs: jobQueue,
29 dispatchStatus: dStatus,
30 workerCmd: wCmd,
31 log: l,
32 }
33
34 return w
35}
36
37// Start enables the worker for processing jobs.
38func (w *Worker) Start() {
39 go func() {
40 for {
41 select {
42 case job := <-w.jobs:
43 w.log.WithFields(log.Fields{
44 "ID": w.ID,
45 "Job ID": job.ID,
46 }).Info("Worker executing job.")
47 job.StartTime = time.Now()
48 w.dispatchStatus <- &DispatchStatus{
49 Type: DTYPE_JOB,
50 ID: job.ID,
51 Status: DSTATUS_START,
52 }
53
54 job.Executable()
55 job.EndTime = time.Now()
56 w.dispatchStatus <- &DispatchStatus{
57 Type: DTYPE_JOB,
58 ID: job.ID,
59 Status: DSTATUS_END,
60 }
61
62 case wc := <-w.workerCmd:
63 if wc.ID == w.ID || w.ID == 0 {
64 if wc.Command == WCMD_QUIT {
65 w.log.WithFields(log.Fields{
66 "ID": w.ID,
67 }).Info("Worker received quit command.")
68 w.dispatchStatus <- &DispatchStatus{
69 Type: DTYPE_WORKER,
70 ID: w.ID,
71 Status: DSTATUS_QUIT,
72 }
73 return
74 }
75 }
76 }
77 }
78 }()
79}
The bulk of our code is handled in the for - select section of the Start function, which listens for commands or jobs and handles them appropriately.
IV. dispatcher.go
The dispatcher will handle most of the work in our library, it performs the following:
1. Accept jobs in the form of an anonymous wrapped function, from main.go
2. Maintains a count of jobs.
3. Accept a request for a number of workers to perform the work.
4. Creates workers.
5. Dispatch work to the workers.
The code:
1package factory
2
3import (
4 "errors"
5
6 log "github.com/sirupsen/logrus"
7)
8
9/*
10 * DispatchStatus is a struct for passing job and worker status reports.
11 * Type can be: worker, job, ID represents the ID of the worker or job.
12 * Status: see WSTATUS constants.
13 */
14type DispatchStatus struct {
15 Type string
16 ID int
17 Status string
18}
19
20// Dispatch keeps track of an internal job request queue, a work queue of jobs
21// that will be processed, a worker queue of workers, and a channel for status
22// reports for jobs and workers.
23type Dispatcher struct {
24 jobCounter int // internal counter for number of jobs
25 jobQueue chan *Job // channel of submitted jobs
26 dispatchStatus chan *DispatchStatus // channel for job/worker status reports
27 workQueue chan *Job // channel of work dispatched
28 workerQueue chan *Worker // channel of workers
29 workerCommand chan *WorkerCommand // channel for worker commands
30 log *log.Logger // log API (logrus)
31 running bool // Is the dispatcher running
32}
33
34// CreateNewDispatcher creates a new dispatcher by making the necessary channels
35// for goroutines to communicate and initializes an internal job counter.
36func CreateNewDispatcher(l *log.Logger) *Dispatcher {
37 return &Dispatcher{
38 jobCounter: 0,
39 jobQueue: make(chan *Job),
40 dispatchStatus: make(chan *DispatchStatus),
41 workQueue: make(chan *Job),
42 workerCommand: make(chan *WorkerCommand),
43 log: l,
44 running: false,
45 }
46}
47
48// QueueJob accepts a process (func() error) of JobExecutable, creates a job
49// which will be tracked, and adds the job into the internal work queue for
50// execution.
51// If there's a constraint on the number of jobs, return an error.
52func (d *Dispatcher) QueueJob(je JobExecutable) error {
53 // Create a new job:
54 j := CreateNewJob(d.jobCounter, je)
55
56 // Add the job to the internal queue:
57 go func() { d.jobQueue <- j }()
58
59 // Increment the internal job counter:
60 d.jobCounter++
61 d.log.WithFields(log.Fields{
62 "jobCounter": d.jobCounter,
63 }).Info("Job Queued.")
64
65 return nil
66}
67
68// Finished returns true if we have no more jobs to process, otherwise false.
69func (d *Dispatcher) Finished() bool {
70 if d.jobCounter < 1 {
71 return true
72 } else {
73 return false
74 }
75}
76
77// Running returns true if the dispatcher has been issued Start() and is running
78func (d *Dispatcher) Running() bool {
79 return d.running
80}
81
82// Start has the dispatcher create workers to handle jobs, then creates a
83// goroutine to handle passing jobs in the queue off to workers and processing
84// dispatch status reports.
85func (d *Dispatcher) Start(numWorkers int) error {
86 if numWorkers < 1 {
87 return errors.New("Start requires >= 1 workers.")
88 }
89
90 // Create numWorkers:
91 for i := 0; i < numWorkers; i++ {
92 worker := CreateNewWorker(i, d.workerCommand, d.workQueue, d.dispatchStatus, d.log)
93 worker.Start()
94 }
95
96 d.running = true
97
98 // wait for work to be added then pass it off.
99 go func() {
100 for {
101 select {
102 case job := <-d.jobQueue:
103 d.log.WithFields(log.Fields{
104 "ID": job.ID,
105 }).Info("Adding a new job to the queue for dispatching.")
106 // Add the job to the work queue, and don't block the dispatcher.
107 go func() { d.workQueue <- job }()
108
109 case ds := <-d.dispatchStatus:
110 d.log.WithFields(log.Fields{
111 "Type": ds.Type,
112 "ID": ds.ID,
113 "Status": ds.Status,
114 }).Info("Received a dispatch status report.")
115
116 if ds.Type == DTYPE_WORKER {
117 if ds.Status == DSTATUS_QUIT {
118 d.log.WithFields(log.Fields{
119 "ID": ds.ID,
120 }).Info("Worker quit.")
121 }
122 }
123
124 if ds.Type == DTYPE_JOB {
125 if ds.Status == DSTATUS_START {
126 d.log.WithFields(log.Fields{
127 "ID": ds.ID,
128 }).Info("Job started.")
129 }
130
131 if ds.Status == DSTATUS_END {
132 d.log.WithFields(log.Fields{
133 "ID": ds.ID,
134 }).Info("Job finished.")
135 d.jobCounter--
136 }
137 }
138 }
139 }
140 }()
141 return nil
142}
Like the worker, our dispatcher is primarily handled in the Start function.
V. constants.go
The dispatcher and worker need a definition of messaging through constants, so we’ll define them in constants.go. These define dispatch types, dispatch statuses, and worker commands.
1package factory
2
3const (
4 DTYPE_JOB = "job"
5 DTYPE_WORKER = "worker"
6
7 DSTATUS_START = "start"
8 DSTATUS_END = "end"
9 DSTATUS_QUIT = "quit"
10
11 WCMD_QUIT = "quit"
12)
VI. main.go
This is our example application which uses the embedded job queue. It will import the package, create some dummy work that emulates a long running job, and document our package so others can see how to use it or create their own.
The code:
1package main
2
3import (
4 "time"
5
6 "github.com/lakesite/ls-factory"
7 "github.com/sirupsen/logrus"
8)
9
10// GenericJobStruct is a generic structure we'll use for submitting a job,
11// this can be any structure.
12type GenericJobStruct struct {
13 Name string
14}
15
16// An example function, ProcessWork.
17func (gjs *GenericJobStruct) ProcessWork() error {
18 // Generic process, let's print out something, then wait for a time:
19 log.WithFields(logrus.Fields{"Name": gjs.Name}).Info("Processing some work.")
20 time.Sleep(5 * time.Second)
21 log.Info("Work finished.")
22 return nil
23}
24
25var log = logrus.New()
26
27func main() {
28 // Create two separate jobs to execute:
29 gjs1 := &GenericJobStruct{Name: "Testing1"}
30 gjs2 := &GenericJobStruct{Name: "Testing2"}
31
32 // We'll use the convention of simply wrapping the work in a function literal.
33 process1 := func() error {
34 log.WithFields(logrus.Fields{"Name": gjs1.Name}).Info("Wrapping work in an anonymous function.")
35 return gjs1.ProcessWork()
36 }
37
38 process2 := func() error {
39 log.WithFields(logrus.Fields{"Name": gjs2.Name}).Info("Wrapping work in an anonymous function.")
40 return gjs2.ProcessWork()
41 }
42
43 // Create a dispatcher:
44 dispatcher := factory.CreateNewDispatcher(log)
45
46 // Enqueue n*2 jobs:
47 n := 10
48 for i:= 0; i<n; i++ {
49 dispatcher.QueueJob(process1)
50 dispatcher.QueueJob(process2)
51 }
52
53 // Tell the dispatcher to start processing jobs with n workers:
54 if err := dispatcher.Start(n); err != nil {
55 log.Fatal("Dispatch startup error: %s", err.Error())
56 }
57
58 for dispatcher.Running() {
59 if dispatcher.Finished() {
60 log.Info("All jobs finished.")
61 break
62 }
63 }
64}
Here’s the output of a run from the above:
$ ./main
INFO[0000] Job Queued. jobCounter=1
INFO[0000] Job Queued. jobCounter=2
INFO[0000] Job Queued. jobCounter=3
INFO[0000] Job Queued. jobCounter=4
INFO[0000] Job Queued. jobCounter=5
INFO[0000] Job Queued. jobCounter=6
INFO[0000] Job Queued. jobCounter=7
INFO[0000] Job Queued. jobCounter=8
INFO[0000] Job Queued. jobCounter=9
INFO[0000] Job Queued. jobCounter=10
INFO[0000] Job Queued. jobCounter=11
INFO[0000] Job Queued. jobCounter=12
INFO[0000] Job Queued. jobCounter=13
INFO[0000] Job Queued. jobCounter=14
INFO[0000] Job Queued. jobCounter=15
INFO[0000] Job Queued. jobCounter=16
INFO[0000] Job Queued. jobCounter=17
INFO[0000] Job Queued. jobCounter=18
INFO[0000] Job Queued. jobCounter=19
INFO[0000] Job Queued. jobCounter=20
INFO[0000] Adding a new job to the queue for dispatching. ID=0
INFO[0000] Adding a new job to the queue for dispatching. ID=1
INFO[0000] Adding a new job to the queue for dispatching. ID=2
INFO[0000] Adding a new job to the queue for dispatching. ID=3
INFO[0000] Adding a new job to the queue for dispatching. ID=4
INFO[0000] Adding a new job to the queue for dispatching. ID=5
INFO[0000] Adding a new job to the queue for dispatching. ID=6
INFO[0000] Worker executing job. ID=2 Job ID=2
INFO[0000] Worker executing job. ID=3 Job ID=1
INFO[0000] Adding a new job to the queue for dispatching. ID=7
INFO[0000] Adding a new job to the queue for dispatching. ID=8
INFO[0000] Adding a new job to the queue for dispatching. ID=9
INFO[0000] Received a dispatch status report. ID=2 Status=start Type=job
INFO[0000] Job started. ID=2
INFO[0000] Worker executing job. ID=9 Job ID=8
INFO[0000] Worker executing job. ID=6 Job ID=4
INFO[0000] Worker executing job. ID=1 Job ID=7
INFO[0000] Adding a new job to the queue for dispatching. ID=10
INFO[0000] Received a dispatch status report. ID=1 Status=start Type=job
INFO[0000] Job started. ID=1
INFO[0000] Worker executing job. ID=4 Job ID=5
INFO[0000] Worker executing job. ID=8 Job ID=6
INFO[0000] Worker executing job. ID=0 Job ID=0
INFO[0000] Worker executing job. ID=7 Job ID=3
INFO[0000] Worker executing job. ID=5 Job ID=9
INFO[0000] Wrapping work in an anonymous function. Name=Testing1
INFO[0000] Processing some work. Name=Testing1
INFO[0000] Received a dispatch status report. ID=8 Status=start Type=job
INFO[0000] Job started. ID=8
INFO[0000] Wrapping work in an anonymous function. Name=Testing1
INFO[0000] Processing some work. Name=Testing1
INFO[0000] Received a dispatch status report. ID=4 Status=start Type=job
INFO[0000] Job started. ID=4
INFO[0000] Wrapping work in an anonymous function. Name=Testing2
INFO[0000] Processing some work. Name=Testing2
INFO[0000] Wrapping work in an anonymous function. Name=Testing1
INFO[0000] Processing some work. Name=Testing1
INFO[0000] Received a dispatch status report. ID=7 Status=start Type=job
INFO[0000] Job started. ID=7
INFO[0000] Wrapping work in an anonymous function. Name=Testing2
INFO[0000] Wrapping work in an anonymous function. Name=Testing2
INFO[0000] Processing some work. Name=Testing2
INFO[0000] Processing some work. Name=Testing2
INFO[0000] Received a dispatch status report. ID=5 Status=start Type=job
INFO[0000] Job started. ID=5
INFO[0000] Received a dispatch status report. ID=6 Status=start Type=job
INFO[0000] Job started. ID=6
INFO[0000] Adding a new job to the queue for dispatching. ID=12
INFO[0000] Wrapping work in an anonymous function. Name=Testing1
INFO[0000] Processing some work. Name=Testing1
INFO[0000] Received a dispatch status report. ID=0 Status=start Type=job
INFO[0000] Job started. ID=0
INFO[0000] Adding a new job to the queue for dispatching. ID=11
INFO[0000] Adding a new job to the queue for dispatching. ID=13
INFO[0000] Wrapping work in an anonymous function. Name=Testing1
INFO[0000] Processing some work. Name=Testing1
INFO[0000] Adding a new job to the queue for dispatching. ID=14
INFO[0000] Adding a new job to the queue for dispatching. ID=15
INFO[0000] Received a dispatch status report. ID=3 Status=start Type=job
INFO[0000] Job started. ID=3
INFO[0000] Received a dispatch status report. ID=9 Status=start Type=job
INFO[0000] Wrapping work in an anonymous function. Name=Testing2
INFO[0000] Processing some work. Name=Testing2
INFO[0000] Job started. ID=9
INFO[0000] Adding a new job to the queue for dispatching. ID=16
INFO[0000] Adding a new job to the queue for dispatching. ID=17
INFO[0000] Adding a new job to the queue for dispatching. ID=18
INFO[0000] Wrapping work in an anonymous function. Name=Testing2
INFO[0000] Processing some work. Name=Testing2
INFO[0000] Adding a new job to the queue for dispatching. ID=19
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=2 Job ID=10
INFO[0005] Received a dispatch status report. ID=2 Status=end Type=job
INFO[0005] Job finished. ID=2
INFO[0005] Received a dispatch status report. ID=10 Status=start Type=job
INFO[0005] Job started. ID=10
INFO[0005] Wrapping work in an anonymous function. Name=Testing1
INFO[0005] Processing some work. Name=Testing1
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=9 Job ID=12
INFO[0005] Received a dispatch status report. ID=8 Status=end Type=job
INFO[0005] Job finished. ID=8
INFO[0005] Received a dispatch status report. ID=12 Status=start Type=job
INFO[0005] Job started. ID=12
INFO[0005] Wrapping work in an anonymous function. Name=Testing1
INFO[0005] Processing some work. Name=Testing1
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=3 Job ID=11
INFO[0005] Received a dispatch status report. ID=1 Status=end Type=job
INFO[0005] Job finished. ID=1
INFO[0005] Received a dispatch status report. ID=11 Status=start Type=job
INFO[0005] Job started. ID=11
INFO[0005] Wrapping work in an anonymous function. Name=Testing2
INFO[0005] Processing some work. Name=Testing2
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=6 Job ID=13
INFO[0005] Received a dispatch status report. ID=4 Status=end Type=job
INFO[0005] Job finished. ID=4
INFO[0005] Received a dispatch status report. ID=13 Status=start Type=job
INFO[0005] Wrapping work in an anonymous function. Name=Testing2
INFO[0005] Processing some work. Name=Testing2
INFO[0005] Job started. ID=13
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=4 Job ID=14
INFO[0005] Received a dispatch status report. ID=5 Status=end Type=job
INFO[0005] Job finished. ID=5
INFO[0005] Received a dispatch status report. ID=14 Status=start Type=job
INFO[0005] Job started. ID=14
INFO[0005] Wrapping work in an anonymous function. Name=Testing1
INFO[0005] Processing some work. Name=Testing1
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=1 Job ID=15
INFO[0005] Received a dispatch status report. ID=7 Status=end Type=job
INFO[0005] Job finished. ID=7
INFO[0005] Received a dispatch status report. ID=15 Status=start Type=job
INFO[0005] Job started. ID=15
INFO[0005] Wrapping work in an anonymous function. Name=Testing2
INFO[0005] Processing some work. Name=Testing2
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=8 Job ID=16
INFO[0005] Received a dispatch status report. ID=6 Status=end Type=job
INFO[0005] Job finished. ID=6
INFO[0005] Received a dispatch status report. ID=16 Status=start Type=job
INFO[0005] Job started. ID=16
INFO[0005] Wrapping work in an anonymous function. Name=Testing1
INFO[0005] Processing some work. Name=Testing1
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=0 Job ID=18
INFO[0005] Received a dispatch status report. ID=0 Status=end Type=job
INFO[0005] Job finished. ID=0
INFO[0005] Received a dispatch status report. ID=18 Status=start Type=job
INFO[0005] Job started. ID=18
INFO[0005] Wrapping work in an anonymous function. Name=Testing1
INFO[0005] Processing some work. Name=Testing1
INFO[0005] Work finished.
INFO[0005] Worker executing job. ID=7 Job ID=17
INFO[0005] Received a dispatch status report. ID=3 Status=end Type=job
INFO[0005] Job finished. ID=3
INFO[0005] Received a dispatch status report. ID=17 Status=start Type=job
INFO[0005] Job started. ID=17
INFO[0005] Wrapping work in an anonymous function. Name=Testing2
INFO[0005] Processing some work. Name=Testing2
INFO[0005] Work finished.
INFO[0005] Received a dispatch status report. ID=9 Status=end Type=job
INFO[0005] Job finished. ID=9
INFO[0005] Worker executing job. ID=5 Job ID=19
INFO[0005] Wrapping work in an anonymous function. Name=Testing2
INFO[0005] Processing some work. Name=Testing2
INFO[0005] Received a dispatch status report. ID=19 Status=start Type=job
INFO[0005] Job started. ID=19
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=10 Status=end Type=job
INFO[0010] Job finished. ID=10
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=12 Status=end Type=job
INFO[0010] Job finished. ID=12
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=11 Status=end Type=job
INFO[0010] Job finished. ID=11
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=13 Status=end Type=job
INFO[0010] Job finished. ID=13
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=14 Status=end Type=job
INFO[0010] Job finished. ID=14
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=15 Status=end Type=job
INFO[0010] Job finished. ID=15
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=16 Status=end Type=job
INFO[0010] Job finished. ID=16
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=18 Status=end Type=job
INFO[0010] Job finished. ID=18
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=17 Status=end Type=job
INFO[0010] Job finished. ID=17
INFO[0010] Work finished.
INFO[0010] Received a dispatch status report. ID=19 Status=end Type=job
INFO[0010] Job finished. ID=19
INFO[0010] All jobs finished.
VI. Conclusion
Our first iteration works and gives us a task queue with an arbitrary number of workers which subscribe to events through a channel to perform work. Next we’ll need to add more features to ls-factory, which will also allow us to create a stand-alone messaging system (if we desire) instead of an embedded task queue.
- We need a message bus for dispatching commands to workers.
- We should actually tell our workers to quit if needed, close channels, etc.
- We should get status reports on jobs and store results.
- We should put constraints on the system (how much we store, etc)
- We should be able to specify a time to start a job.
- We should be able to specify priority for a job.