Developing an Embedded Job Queue With Golang - Part 2


head

Building a Factory

In the article, Developing an Embedded Job Queue With Golang - Part 1, we briefly reviewed the excellent concurrency primitives Go offers, then prototyped an example job queue.

We’re going to create a package for reel and other applications to use, such that they can queue work to be done by workers and eventually specify when the work should be done, check on the status of a job, and send commands to workers through the dispatcher. To restate our problem, we want to hand off long running tasks to workers so we can immediately return and respond to (for example) a web request. We’re modeling a task queue or job queue using goroutines and channels.

We could do this directly in reel. The use case for reel is that we might want to query an API endpoint to see the status of our task, if it’s completed, how long the task has taken, and we may also want to cancel the job. Further we can generalize this and create our own message queue system, which uses our conventions such that clients can subscribe and listen for work that’s published, perform the work and respond with the result. Before we consider doing that, we’ll create an embedded task queue package in Go.

I. ls-factory

Enter ls-factory - a simple task queue. Like all projects, we’ll start small and incrementally add in features. To start with we’ll focus on creating jobs, workers and a dispatcher. Once we have these working, we’ll extend the package to generalize a message queue with an event bus between job publishers and job subscribers (client workers) in a PubSub pattern. For now, we want an embedded task queue package we can re-use that gives us some extra features outside of simply using some channels and goroutines.

As we model a task queue, the idea of a factory comes to mind, as we’re dispatching jobs to workers and getting work done. As we proceed we’ll add in most of the following features:

1. Enqueue jobs to be executed.
2. Specify an arbitrary number of workers.
3. Handle processing functions by workers.
4. Communicate status of jobs through the dispatcher.
5. Communicate orders to workers, such as halting the work.
6. Track the result of the job.
7. Specify a time for a job to be executed.
8. Set execution priority for a job.

II. job.go

The simple part of factory is the job definition. We could, and will, extend this further over time. For now we will define a structure for jobs: a unique ID, start and end run time, completed, (running is inferred through start time).

The code for job.go follows:

 1package factory
 2
 3import (
 4	"time"
 5)
 6
 7// JobExecutable defines our spec for jobs, which are functions that return
 8// an error.
 9type JobExecutable func() error
10
11// Job has an ID, start time, end time and reference to a JobExecutable.
12type Job struct {
13	ID         int
14	StartTime  time.Time
15	EndTime    time.Time
16	Executable JobExecutable
17}
18
19func CreateNewJob(id int, je JobExecutable) *Job {
20	return &Job{
21		ID:         id,
22		Executable: je,
23	}
24}

III. worker.go

The worker will concern itself with waiting for jobs, processing them and responding to signals. It should perform:

1. Process jobs from the job queue channel.
2. Respond to orders from the dispatcher to halt or get job status.
3. Set properties on a job (start time, end time, result).

We’ll add additional features to the worker during our next iteration, for now we’ll keep things simple.

 1package factory
 2
 3import (
 4	log "github.com/sirupsen/logrus"
 5	"time"
 6)
 7
 8// Worker has an ID, internal channel for jobs, internal channel for reporting
 9// status to the dispatcher, and a channel for quitting.
10type Worker struct {
11	ID             int
12	jobs           chan *Job
13	dispatchStatus chan *DispatchStatus
14	workerCmd      chan *WorkerCommand
15	log            *log.Logger
16}
17
18type WorkerCommand struct {
19	ID      int
20	Command string
21}
22
23// CreateNewWorker accepts an ID, channel for worker registration, channel for
24// jobs, and a channel for dispatch reports.
25func CreateNewWorker(id int, wCmd chan *WorkerCommand, jobQueue chan *Job, dStatus chan *DispatchStatus, l *log.Logger) *Worker {
26	w := &Worker{
27		ID:             id,
28		jobs:           jobQueue,
29		dispatchStatus: dStatus,
30		workerCmd:      wCmd,
31		log:            l,
32	}
33
34	return w
35}
36
37// Start enables the worker for processing jobs.
38func (w *Worker) Start() {
39	go func() {
40		for {
41			select {
42			case job := <-w.jobs:
43				w.log.WithFields(log.Fields{
44					"ID":     w.ID,
45					"Job ID": job.ID,
46				}).Info("Worker executing job.")
47				job.StartTime = time.Now()
48				w.dispatchStatus <- &DispatchStatus{
49					Type:   DTYPE_JOB,
50					ID:     job.ID,
51					Status: DSTATUS_START,
52				}
53
54				job.Executable()
55				job.EndTime = time.Now()
56				w.dispatchStatus <- &DispatchStatus{
57					Type:   DTYPE_JOB,
58					ID:     job.ID,
59					Status: DSTATUS_END,
60				}
61
62			case wc := <-w.workerCmd:
63				if wc.ID == w.ID || w.ID == 0 {
64					if wc.Command == WCMD_QUIT {
65						w.log.WithFields(log.Fields{
66							"ID": w.ID,
67						}).Info("Worker received quit command.")
68						w.dispatchStatus <- &DispatchStatus{
69							Type:   DTYPE_WORKER,
70							ID:     w.ID,
71							Status: DSTATUS_QUIT,
72						}
73						return
74					}
75				}
76			}
77		}
78	}()
79}

The bulk of our code is handled in the for - select section of the Start function, which listens for commands or jobs and handles them appropriately.

IV. dispatcher.go

The dispatcher will handle most of the work in our library, it performs the following:

1. Accept jobs in the form of an anonymous wrapped function, from main.go
2. Maintains a count of jobs.
3. Accept a request for a number of workers to perform the work.
4. Creates workers.
5. Dispatch work to the workers.

The code:

  1package factory
  2
  3import (
  4	"errors"
  5
  6	log "github.com/sirupsen/logrus"
  7)
  8
  9/*
 10 * DispatchStatus is a struct for passing job and worker status reports.
 11 * Type can be: worker, job, ID represents the ID of the worker or job.
 12 * Status: see WSTATUS constants.
 13 */
 14type DispatchStatus struct {
 15	Type   string
 16	ID     int
 17	Status string
 18}
 19
 20// Dispatch keeps track of an internal job request queue, a work queue of jobs
 21// that will be processed, a worker queue of workers, and a channel for status
 22// reports for jobs and workers.
 23type Dispatcher struct {
 24	jobCounter     int                  // internal counter for number of jobs
 25	jobQueue       chan *Job            // channel of submitted jobs
 26	dispatchStatus chan *DispatchStatus // channel for job/worker status reports
 27	workQueue      chan *Job            // channel of work dispatched
 28	workerQueue    chan *Worker         // channel of workers
 29	workerCommand  chan *WorkerCommand  // channel for worker commands
 30	log            *log.Logger          // log API (logrus)
 31	running        bool                 // Is the dispatcher running
 32}
 33
 34// CreateNewDispatcher creates a new dispatcher by making the necessary channels
 35// for goroutines to communicate and initializes an internal job counter.
 36func CreateNewDispatcher(l *log.Logger) *Dispatcher {
 37	return &Dispatcher{
 38		jobCounter:     0,
 39		jobQueue:       make(chan *Job),
 40		dispatchStatus: make(chan *DispatchStatus),
 41		workQueue:      make(chan *Job),
 42		workerCommand:  make(chan *WorkerCommand),
 43		log:            l,
 44		running:        false,
 45	}
 46}
 47
 48// QueueJob accepts a process (func() error) of JobExecutable, creates a job
 49// which will be tracked, and adds the job into the internal work queue for
 50// execution.
 51// If there's a constraint on the number of jobs, return an error.
 52func (d *Dispatcher) QueueJob(je JobExecutable) error {
 53	// Create a new job:
 54	j := CreateNewJob(d.jobCounter, je)
 55
 56	// Add the job to the internal queue:
 57	go func() { d.jobQueue <- j }()
 58
 59	// Increment the internal job counter:
 60	d.jobCounter++
 61	d.log.WithFields(log.Fields{
 62		"jobCounter": d.jobCounter,
 63	}).Info("Job Queued.")
 64
 65	return nil
 66}
 67
 68// Finished returns true if we have no more jobs to process, otherwise false.
 69func (d *Dispatcher) Finished() bool {
 70	if d.jobCounter < 1 {
 71		return true
 72	} else {
 73		return false
 74	}
 75}
 76
 77// Running returns true if the dispatcher has been issued Start() and is running
 78func (d *Dispatcher) Running() bool {
 79	return d.running
 80}
 81
 82// Start has the dispatcher create workers to handle jobs, then creates a
 83// goroutine to handle passing jobs in the queue off to workers and processing
 84// dispatch status reports.
 85func (d *Dispatcher) Start(numWorkers int) error {
 86	if numWorkers < 1 {
 87		return errors.New("Start requires >= 1 workers.")
 88	}
 89
 90	// Create numWorkers:
 91	for i := 0; i < numWorkers; i++ {
 92		worker := CreateNewWorker(i, d.workerCommand, d.workQueue, d.dispatchStatus, d.log)
 93		worker.Start()
 94	}
 95
 96	d.running = true
 97
 98	// wait for work to be added then pass it off.
 99	go func() {
100		for {
101			select {
102			case job := <-d.jobQueue:
103				d.log.WithFields(log.Fields{
104					"ID": job.ID,
105				}).Info("Adding a new job to the queue for dispatching.")
106				// Add the job to the work queue, and don't block the dispatcher.
107				go func() { d.workQueue <- job }()
108
109			case ds := <-d.dispatchStatus:
110				d.log.WithFields(log.Fields{
111					"Type":   ds.Type,
112					"ID":     ds.ID,
113					"Status": ds.Status,
114				}).Info("Received a dispatch status report.")
115
116				if ds.Type == DTYPE_WORKER {
117					if ds.Status == DSTATUS_QUIT {
118						d.log.WithFields(log.Fields{
119							"ID": ds.ID,
120						}).Info("Worker quit.")
121					}
122				}
123
124				if ds.Type == DTYPE_JOB {
125					if ds.Status == DSTATUS_START {
126						d.log.WithFields(log.Fields{
127							"ID": ds.ID,
128						}).Info("Job started.")
129					}
130
131					if ds.Status == DSTATUS_END {
132						d.log.WithFields(log.Fields{
133							"ID": ds.ID,
134						}).Info("Job finished.")
135						d.jobCounter--
136					}
137				}
138			}
139		}
140	}()
141	return nil
142}

Like the worker, our dispatcher is primarily handled in the Start function.

V. constants.go

The dispatcher and worker need a definition of messaging through constants, so we’ll define them in constants.go. These define dispatch types, dispatch statuses, and worker commands.

 1package factory
 2
 3const (
 4	DTYPE_JOB    = "job"
 5	DTYPE_WORKER = "worker"
 6
 7	DSTATUS_START = "start"
 8	DSTATUS_END   = "end"
 9	DSTATUS_QUIT  = "quit"
10
11	WCMD_QUIT = "quit"
12)

VI. main.go

This is our example application which uses the embedded job queue. It will import the package, create some dummy work that emulates a long running job, and document our package so others can see how to use it or create their own.

The code:

 1package main
 2
 3import (
 4	"time"
 5
 6	"github.com/lakesite/ls-factory"
 7	"github.com/sirupsen/logrus"
 8)
 9
10// GenericJobStruct is a generic structure we'll use for submitting a job,
11// this can be any structure.
12type GenericJobStruct struct {
13	Name string
14}
15
16// An example function, ProcessWork.
17func (gjs *GenericJobStruct) ProcessWork() error {
18	// Generic process, let's print out something, then wait for a time:
19	log.WithFields(logrus.Fields{"Name": gjs.Name}).Info("Processing some work.")
20	time.Sleep(5 * time.Second)
21	log.Info("Work finished.")
22	return nil
23}
24
25var log = logrus.New()
26
27func main() {
28	// Create two separate jobs to execute:
29	gjs1 := &GenericJobStruct{Name: "Testing1"}
30	gjs2 := &GenericJobStruct{Name: "Testing2"}
31
32	// We'll use the convention of simply wrapping the work in a function literal.
33	process1 := func() error {
34		log.WithFields(logrus.Fields{"Name": gjs1.Name}).Info("Wrapping work in an anonymous function.")
35		return gjs1.ProcessWork()
36	}
37
38	process2 := func() error {
39		log.WithFields(logrus.Fields{"Name": gjs2.Name}).Info("Wrapping work in an anonymous function.")
40		return gjs2.ProcessWork()
41	}
42
43	// Create a dispatcher:
44	dispatcher := factory.CreateNewDispatcher(log)
45
46	// Enqueue n*2 jobs:
47	n := 10
48	for i:= 0; i<n; i++ {
49		dispatcher.QueueJob(process1)
50		dispatcher.QueueJob(process2)
51	}
52
53	// Tell the dispatcher to start processing jobs with n workers:
54	if err := dispatcher.Start(n); err != nil {
55		log.Fatal("Dispatch startup error: %s", err.Error())
56	}
57
58	for dispatcher.Running() {
59		if dispatcher.Finished() {
60			log.Info("All jobs finished.")
61			break
62		}
63	}
64}

Here’s the output of a run from the above:

$ ./main
INFO[0000] Job Queued.                                   jobCounter=1
INFO[0000] Job Queued.                                   jobCounter=2
INFO[0000] Job Queued.                                   jobCounter=3
INFO[0000] Job Queued.                                   jobCounter=4
INFO[0000] Job Queued.                                   jobCounter=5
INFO[0000] Job Queued.                                   jobCounter=6
INFO[0000] Job Queued.                                   jobCounter=7
INFO[0000] Job Queued.                                   jobCounter=8
INFO[0000] Job Queued.                                   jobCounter=9
INFO[0000] Job Queued.                                   jobCounter=10
INFO[0000] Job Queued.                                   jobCounter=11
INFO[0000] Job Queued.                                   jobCounter=12
INFO[0000] Job Queued.                                   jobCounter=13
INFO[0000] Job Queued.                                   jobCounter=14
INFO[0000] Job Queued.                                   jobCounter=15
INFO[0000] Job Queued.                                   jobCounter=16
INFO[0000] Job Queued.                                   jobCounter=17
INFO[0000] Job Queued.                                   jobCounter=18
INFO[0000] Job Queued.                                   jobCounter=19
INFO[0000] Job Queued.                                   jobCounter=20
INFO[0000] Adding a new job to the queue for dispatching.  ID=0
INFO[0000] Adding a new job to the queue for dispatching.  ID=1
INFO[0000] Adding a new job to the queue for dispatching.  ID=2
INFO[0000] Adding a new job to the queue for dispatching.  ID=3
INFO[0000] Adding a new job to the queue for dispatching.  ID=4
INFO[0000] Adding a new job to the queue for dispatching.  ID=5
INFO[0000] Adding a new job to the queue for dispatching.  ID=6
INFO[0000] Worker executing job.                         ID=2 Job ID=2
INFO[0000] Worker executing job.                         ID=3 Job ID=1
INFO[0000] Adding a new job to the queue for dispatching.  ID=7
INFO[0000] Adding a new job to the queue for dispatching.  ID=8
INFO[0000] Adding a new job to the queue for dispatching.  ID=9
INFO[0000] Received a dispatch status report.            ID=2 Status=start Type=job
INFO[0000] Job started.                                  ID=2
INFO[0000] Worker executing job.                         ID=9 Job ID=8
INFO[0000] Worker executing job.                         ID=6 Job ID=4
INFO[0000] Worker executing job.                         ID=1 Job ID=7
INFO[0000] Adding a new job to the queue for dispatching.  ID=10
INFO[0000] Received a dispatch status report.            ID=1 Status=start Type=job
INFO[0000] Job started.                                  ID=1
INFO[0000] Worker executing job.                         ID=4 Job ID=5
INFO[0000] Worker executing job.                         ID=8 Job ID=6
INFO[0000] Worker executing job.                         ID=0 Job ID=0
INFO[0000] Worker executing job.                         ID=7 Job ID=3
INFO[0000] Worker executing job.                         ID=5 Job ID=9
INFO[0000] Wrapping work in an anonymous function.       Name=Testing1
INFO[0000] Processing some work.                         Name=Testing1
INFO[0000] Received a dispatch status report.            ID=8 Status=start Type=job
INFO[0000] Job started.                                  ID=8
INFO[0000] Wrapping work in an anonymous function.       Name=Testing1
INFO[0000] Processing some work.                         Name=Testing1
INFO[0000] Received a dispatch status report.            ID=4 Status=start Type=job
INFO[0000] Job started.                                  ID=4
INFO[0000] Wrapping work in an anonymous function.       Name=Testing2
INFO[0000] Processing some work.                         Name=Testing2
INFO[0000] Wrapping work in an anonymous function.       Name=Testing1
INFO[0000] Processing some work.                         Name=Testing1
INFO[0000] Received a dispatch status report.            ID=7 Status=start Type=job
INFO[0000] Job started.                                  ID=7
INFO[0000] Wrapping work in an anonymous function.       Name=Testing2
INFO[0000] Wrapping work in an anonymous function.       Name=Testing2
INFO[0000] Processing some work.                         Name=Testing2
INFO[0000] Processing some work.                         Name=Testing2
INFO[0000] Received a dispatch status report.            ID=5 Status=start Type=job
INFO[0000] Job started.                                  ID=5
INFO[0000] Received a dispatch status report.            ID=6 Status=start Type=job
INFO[0000] Job started.                                  ID=6
INFO[0000] Adding a new job to the queue for dispatching.  ID=12
INFO[0000] Wrapping work in an anonymous function.       Name=Testing1
INFO[0000] Processing some work.                         Name=Testing1
INFO[0000] Received a dispatch status report.            ID=0 Status=start Type=job
INFO[0000] Job started.                                  ID=0
INFO[0000] Adding a new job to the queue for dispatching.  ID=11
INFO[0000] Adding a new job to the queue for dispatching.  ID=13
INFO[0000] Wrapping work in an anonymous function.       Name=Testing1
INFO[0000] Processing some work.                         Name=Testing1
INFO[0000] Adding a new job to the queue for dispatching.  ID=14
INFO[0000] Adding a new job to the queue for dispatching.  ID=15
INFO[0000] Received a dispatch status report.            ID=3 Status=start Type=job
INFO[0000] Job started.                                  ID=3
INFO[0000] Received a dispatch status report.            ID=9 Status=start Type=job
INFO[0000] Wrapping work in an anonymous function.       Name=Testing2
INFO[0000] Processing some work.                         Name=Testing2
INFO[0000] Job started.                                  ID=9
INFO[0000] Adding a new job to the queue for dispatching.  ID=16
INFO[0000] Adding a new job to the queue for dispatching.  ID=17
INFO[0000] Adding a new job to the queue for dispatching.  ID=18
INFO[0000] Wrapping work in an anonymous function.       Name=Testing2
INFO[0000] Processing some work.                         Name=Testing2
INFO[0000] Adding a new job to the queue for dispatching.  ID=19
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=2 Job ID=10
INFO[0005] Received a dispatch status report.            ID=2 Status=end Type=job
INFO[0005] Job finished.                                 ID=2
INFO[0005] Received a dispatch status report.            ID=10 Status=start Type=job
INFO[0005] Job started.                                  ID=10
INFO[0005] Wrapping work in an anonymous function.       Name=Testing1
INFO[0005] Processing some work.                         Name=Testing1
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=9 Job ID=12
INFO[0005] Received a dispatch status report.            ID=8 Status=end Type=job
INFO[0005] Job finished.                                 ID=8
INFO[0005] Received a dispatch status report.            ID=12 Status=start Type=job
INFO[0005] Job started.                                  ID=12
INFO[0005] Wrapping work in an anonymous function.       Name=Testing1
INFO[0005] Processing some work.                         Name=Testing1
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=3 Job ID=11
INFO[0005] Received a dispatch status report.            ID=1 Status=end Type=job
INFO[0005] Job finished.                                 ID=1
INFO[0005] Received a dispatch status report.            ID=11 Status=start Type=job
INFO[0005] Job started.                                  ID=11
INFO[0005] Wrapping work in an anonymous function.       Name=Testing2
INFO[0005] Processing some work.                         Name=Testing2
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=6 Job ID=13
INFO[0005] Received a dispatch status report.            ID=4 Status=end Type=job
INFO[0005] Job finished.                                 ID=4
INFO[0005] Received a dispatch status report.            ID=13 Status=start Type=job
INFO[0005] Wrapping work in an anonymous function.       Name=Testing2
INFO[0005] Processing some work.                         Name=Testing2
INFO[0005] Job started.                                  ID=13
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=4 Job ID=14
INFO[0005] Received a dispatch status report.            ID=5 Status=end Type=job
INFO[0005] Job finished.                                 ID=5
INFO[0005] Received a dispatch status report.            ID=14 Status=start Type=job
INFO[0005] Job started.                                  ID=14
INFO[0005] Wrapping work in an anonymous function.       Name=Testing1
INFO[0005] Processing some work.                         Name=Testing1
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=1 Job ID=15
INFO[0005] Received a dispatch status report.            ID=7 Status=end Type=job
INFO[0005] Job finished.                                 ID=7
INFO[0005] Received a dispatch status report.            ID=15 Status=start Type=job
INFO[0005] Job started.                                  ID=15
INFO[0005] Wrapping work in an anonymous function.       Name=Testing2
INFO[0005] Processing some work.                         Name=Testing2
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=8 Job ID=16
INFO[0005] Received a dispatch status report.            ID=6 Status=end Type=job
INFO[0005] Job finished.                                 ID=6
INFO[0005] Received a dispatch status report.            ID=16 Status=start Type=job
INFO[0005] Job started.                                  ID=16
INFO[0005] Wrapping work in an anonymous function.       Name=Testing1
INFO[0005] Processing some work.                         Name=Testing1
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=0 Job ID=18
INFO[0005] Received a dispatch status report.            ID=0 Status=end Type=job
INFO[0005] Job finished.                                 ID=0
INFO[0005] Received a dispatch status report.            ID=18 Status=start Type=job
INFO[0005] Job started.                                  ID=18
INFO[0005] Wrapping work in an anonymous function.       Name=Testing1
INFO[0005] Processing some work.                         Name=Testing1
INFO[0005] Work finished.                               
INFO[0005] Worker executing job.                         ID=7 Job ID=17
INFO[0005] Received a dispatch status report.            ID=3 Status=end Type=job
INFO[0005] Job finished.                                 ID=3
INFO[0005] Received a dispatch status report.            ID=17 Status=start Type=job
INFO[0005] Job started.                                  ID=17
INFO[0005] Wrapping work in an anonymous function.       Name=Testing2
INFO[0005] Processing some work.                         Name=Testing2
INFO[0005] Work finished.                               
INFO[0005] Received a dispatch status report.            ID=9 Status=end Type=job
INFO[0005] Job finished.                                 ID=9
INFO[0005] Worker executing job.                         ID=5 Job ID=19
INFO[0005] Wrapping work in an anonymous function.       Name=Testing2
INFO[0005] Processing some work.                         Name=Testing2
INFO[0005] Received a dispatch status report.            ID=19 Status=start Type=job
INFO[0005] Job started.                                  ID=19
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=10 Status=end Type=job
INFO[0010] Job finished.                                 ID=10
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=12 Status=end Type=job
INFO[0010] Job finished.                                 ID=12
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=11 Status=end Type=job
INFO[0010] Job finished.                                 ID=11
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=13 Status=end Type=job
INFO[0010] Job finished.                                 ID=13
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=14 Status=end Type=job
INFO[0010] Job finished.                                 ID=14
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=15 Status=end Type=job
INFO[0010] Job finished.                                 ID=15
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=16 Status=end Type=job
INFO[0010] Job finished.                                 ID=16
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=18 Status=end Type=job
INFO[0010] Job finished.                                 ID=18
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=17 Status=end Type=job
INFO[0010] Job finished.                                 ID=17
INFO[0010] Work finished.                               
INFO[0010] Received a dispatch status report.            ID=19 Status=end Type=job
INFO[0010] Job finished.                                 ID=19
INFO[0010] All jobs finished.    

VI. Conclusion

Our first iteration works and gives us a task queue with an arbitrary number of workers which subscribe to events through a channel to perform work. Next we’ll need to add more features to ls-factory, which will also allow us to create a stand-alone messaging system (if we desire) instead of an embedded task queue.

comments powered by Disqus