Using a Job Queue with Golang and Reel


In the article, Restoring App State for Demos and Development with Reel, we introduced reel, which among other features allows us to:

In the further work section, we noted the next steps included:

  1. Refactoring reel to use ls-governor
  2. Make sure API requests to rewind an app state are sent into a job queue, so we get a 200 OK response right away.

Refactoring is a simple matter of following the pattern we established with zefram, starting with the interactive command in cmd/root.go:

 1rootCmd = &cobra.Command{
 2    Use:   "reel -c [config.toml] -a [application name]",
 3    Short: "run reel with a config against an app.",
 4    Long:  `restore app state, for demos and development`,
 5    Run: func(cmd *cobra.Command, args []string) {
 6        gms := &governor.ManagerService{}
 7        gms.InitManager(config)
 8        gapi := gms.CreateAPI(application)
 9        reel.InitApp(application, gapi)
10        reel.Rewind(application, "", gapi)
11    },

Running reel here doesn’t require initializing a datastore, but we could. We’ve moved some business logic from the manager package into the reel package, which includes InitApp, PrintSources, GetSources, and Rewind. The reel package also includes the specific logic for rewinding postgres and mysql databases.

We’ve also separated the api routes and handlers out into an api package per our convention. Our pkg/api/routes.go follows the convention:

 1// routes handles setting up routes for our API
 2package api
 4import (
 5	"net/http"
 7	""
10// SetupRoutes defines and associates routes to handlers.
11// Use a wrapper convention to pass a governor API to each handler.
12func SetupRoutes(gapi *governor.API) {
13	gapi.WebService.Router.HandleFunc(
14		"/reel/api/v1/sources/", 
15		func(w http.ResponseWriter, r *http.Request) {
16			SourcesHandler(w, r, gapi)
17		},
18	)
19	gapi.WebService.Router.HandleFunc(
20		"/reel/api/v1/sources/{app}",
21		func(w http.ResponseWriter, r *http.Request) {
22			SourcesHandler(w, r, gapi)
23		},
24	)

Then we define our RewindHandler and SourcesHandler in pkg/api/handlers.go.


After refactoring, we need to handle submitting a job to a worker to process in the background, using a goroutine and a channel for communicating the work to be processed.

First we need to create a job package and create the most simple way of handling jobs. Our pkg/job/worker.go looks like this:

 1package job
 3import (
 4	"fmt"
 6	""
 8	""
11// Queue for reel jobs.
12var ReelQueue = make(chan ReelJob)
14// ReelJob contains the information required to submit a job
15// to a worker for processing.
16type ReelJob struct {
17	App string
18	Source string
19	Gapi *governor.API
22type ReelWorker struct {}
24func (rw *ReelWorker) Start() {
25	go func() {
26		for {
27			work := <- ReelQueue
28			fmt.Printf("Received a work request for app: %s using source: %s\n", work.App, work.Source)
29			reel.Rewind(work.App, work.Source, work.Gapi)
30		}
31	}()

The ReelQueue channel will be used in our RewindHandler to submit a job, which must be of a single type - in this case a ReelJob struct, which contains an app name to rewind, a source to use, and a governor API. Before we can go pushing items into this ReelQueue, we’ll have to start a worker in the manager command we’ve refactored in cmd/root.go:

 1managerCmd = &cobra.Command{
 2    Use:   "manager",
 3    Short: "Run the manager.",
 4    Long:  `Run the management interface.`,
 5    Run: func(cmd *cobra.Command, args []string) {
 6        // setup the job queue
 7        rw := &job.ReelWorker{}
 8        rw.Start()
10        gms := &governor.ManagerService{}
11        gms.InitManager(config)
12        gms.InitDatastore("reel")
13        gapi := gms.CreateAPI("reel")
14        api.SetupRoutes(gapi)
15        gms.Daemonize(gapi)
16    },

Our call to rw.Start() above will enter a goroutine, an anonymous function with a for loop. The routine will block and wait for work coming from the ReelQueue channel, and when it receives it, make a call to the reel package’s Rewind function. This allows us submit the job in the RewindHandler and return status ok, while the goroutine thread handles the work:

1		if reel.InitApp(app, gapi) {
2			// submit a rewind job
3			job.ReelQueue <- job.ReelJob{App: app, Source: source, Gapi: gapi}
4			w.WriteHeader(http.StatusOK)
5		} else {
6			w.WriteHeader(http.StatusServiceUnavailable)
7		}

Further Work

We could create a library for more robust handling of job queues, including the ability to check on the status of a job, cancel a job, and get metrics about the work being done. Other programs certainly do this better, but we may choose to create a library for governor to provide the convenience of handling jobs within our framework.

Should we do this, we’ll want to look for both jobs and control signals. We may opt to have a separation between a dispatching routine and worker routine.

comments powered by Disqus