Skip to contents

Setup

library(jobqueue)
q <- Queue$new(workers = 1)

Introduction

Callbacks are the cornerstone of asynchronous programming.

If you want to calculate 2 + 2 and show the results, the synchronous programming approach would be:

message('Result = ', 2 + 2)
#> Result = 20

In asynchronous programming, this task is broken apart into two discrete steps: computation and result handling. Using jobqueue, this looks like:

job <- q$run({ 2 + 2 })
job$on('done', ~message('Result = ', .$result))
#> Result = 20

The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result.

Callbacks are not limited to just the Job finishing. See the “State Triggers” section for a list of all events that can trigger a callback.

Important

Hooks are evaluated on the main process, not the background processes. Therefore, ensure that the callback functions execute quickly so as to not delay Job handling.

A Callback Function

The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed.

This is a great place for shorthand function definitions introduced by R 4.1.0.

jobqueue also understands the lambda syntax for functions (see rlang::as_function()).

# Queue Hooks
hook <- function (queue) { message('Queue is ', queue$state) }
hook <- \(q) message('Queue is ', q$state)
hook <- ~message('Queue is ', .$state)

# Worker Hooks
hook <- function (worker) { message('Worker is ', worker$state) }
hook <- \(w) message('Worker is ', w$state)
hook <- ~message('Worker is ', .$state)

# Job Hooks
hook <- function (job) { message('Job is ', job$state) }
hook <- \(j) message('Job is ', j$state)
hook <- ~message('Job is ', .$state)

Triggers

Queue, Worker, and Job objects update their $state as described in the tables below. Each time the state changes, any callbacks registered to that state are executed. In addition, you can register state = '*' or state='.next' which trigger regardless of the present state name.

Special Triggers

State Triggers
'*' Every time the state changes.
'.next' Only one time, the next time the state changes.

Queue States

State Triggers
'starting' After initialization, before Workers are started.
'idle' When all Workers are idle. Also, after initial startup.
'busy' At least one worker is busy.
'stopped' After <Queue>$stop() is called.
'error' Workers did not start cleanly.

Worker States

State Triggers
'starting' Background process is being configured.
'idle' Waiting on Jobs to be submitted.
'busy' After a Job starts running.
'stopped' After <Worker>$stop() is called.

Job States

State Triggers
'created' After Job$new() initialization.
'submitted' After <Job>$queue is assigned.
'queued' After stop_id and copy_id are resolved.
'starting' Before evaluation begins.
'running' After <Job>$worker is set and evaluation begins.
'done' After <Job>$output is assigned.

Attaching

Callbacks can be attached to Queue, Worker, or Job objects.

You can add callbacks either when you create the object with $new(), or later with $on().

hook <- ~message(.$uid, ' is ', .$state)

q <- Queue$new(hooks  = list(q_idle = hook))
w <- Worker$new(hooks = list(idle   = hook))
j <- Job$new(hooks    = list(done   = hook))

q$on('busy',     hook)
w$on('busy',     hook)
j$on('starting', hook)

In Queue$new(), hooks can set hooks for the Queue, Worker, and Job objects. The rules are:

  • Prefixing with q_, w_, or j_ attaches the hook to the Queue, Workers, or Jobs, respectively.
  • Non-prefixed hooks are attached to Jobs.
  • Alternatively, a list of lists can be assigned to hooks, of the format:
Queue$new(
  'hooks' = list(
    'queue'  = list(idle = hook), 
    'worker' = list(idle = hook), 
    'job'    = list(done = hook) ))

Removing

Callbacks attached with $new() cannot be removed.

When you attach a callback with $on(), the return value is a function, which, when called, will remove that callback from the object.

job <- Job$new(
  'expr'  = { 3.14 }, 
  'hooks' = list(done = ~message('ABC')) )

off <- job$on('done', ~message('XYZ'))
off()

q$submit(job)
#> ABC

Default Job Hooks

When you create a Queue (with Queue$new()), you can define a set of callbacks to automatically apply to any Jobs that are created with the <Queue>$run() command.

n <- 0
q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:5) q$run({ 'Hi' })

n
#> [1] 5

How does Queue$new() know to apply the hooks to Jobs instead of Queues or Workers? Unless otherwise indicated, Queue$new() hooks are assumed to be for Jobs. You can also explicitly specify that hooks are for Jobs by using the formats described in the “Attaching Callbacks” section above.

q <- Queue$new(hooks = list(j_created = ~{ n <<- n + 1 } ))
# or
q <- Queue$new(hooks = list(job = list(created = ~{ n <<- n + 1 } )))

If you set hooks in <Queue>$run(), those hooks will REPLACE the Job hooks from Queue$new().

n <- 0
q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:3) q$run({ 'Hi' }, hooks = list(done = ~message(.$result)))
#> Hi
#> Hi
#> Hi

n
#> [1] 0

Use Case: Priority Setting

Below, we’ll set up a callback function that triggers when each Job enters the 'queued' state. It will modify the Job, adding a custom <Job>$priority field to the Job object. Then it will modify the Queue’s internal list of jobs (<Queue>$jobs), sorting them according to each Job’s <Job>$priority. Last, it will attach additional callbacks to the Job to output timing information upon exit from the 'queued’ state and upon entry into the 'done' state.

library(glue)
library(jobqueue)

# Our callback/hook function.
prioritize <- function (job) {

  queue      <- job$queue
  queue_jobs <- job$queue$jobs

  # Apply a random priority to this job.
  job$priority <- round(runif(1) * 10) - 5
  
  # Sort all this Queue's jobs by priority (including this job).
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
  
  # Add hooks to this job to report queued/total times.
  t1    <- Sys.time()
  tdiff <- function () format(round(Sys.time() - t1, 1))
  
  job$on('.next', ~message(glue(
    'Job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' )))
    
  job$on('done', ~message(glue(
    'Job {.$uid} (priority {.$priority}) finished in {tdiff()}' )))
}

# A single worker best illustrates processing order.
q <- Queue$new(
  'workers' = 1, 
  'hooks'   = list(queued = prioritize) )

for (i in 1:5) {
  job <- q$run({ 3.14 })
  message(glue_data(job, 'Created Job {uid} with priority {priority}'))
}
#> Job J11 (priority -3) was dispatched after 0.1 secs
#> Created Job J11 with priority -3
#> Created Job J12 with priority -2
#> Created Job J13 with priority 1
#> Created Job J14 with priority 0
#> Created Job J15 with priority -1
#> Job J11 (priority -3) finished in 0.7 secs
#> Job J12 (priority -2) was dispatched after 0.6 secs
#> Job J12 (priority -2) finished in 1.1 secs
#> Job J15 (priority -1) was dispatched after 0.7 secs
#> Job J15 (priority -1) finished in 1.3 secs
#> Job J14 (priority 0) was dispatched after 1.4 secs
#> Job J14 (priority 0) finished in 2 secs
#> Job J13 (priority 1) was dispatched after 2.1 secs
#> Job J13 (priority 1) finished in 2.6 secs

In an actual application, you could set <Job>$priority based on <Job>$vars.

prioritize <- function (job) {
  
  # Give priority to lower number of replications
  job$priority <- job$vars$replications
  
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))

#                                        vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))

Or, define the custom <Job>$priority field in <Queue>$run().

prioritize <- function (job) {
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))

#                                        vvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, priority = reps)

Or, set <Job>$priority in a hook that triggers before prioritize() is triggered.

set_priority <- function (job) {
  job$priority <- job$vars$replications
}

prioritize <- function (job) {
  queue_jobs     <- job$queue$jobs
  priorities     <- sapply(queue_jobs, `[[`, 'priority')
  job$queue$jobs <- queue_jobs[order(priorities)]
}

#                           vvvvvvvvvvvvvvvvvvvvvv
q <- Queue$new(hooks = list(created = set_priority, queued = prioritize))
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))

Use Case: Rate Limiting

Say you’re hosting a web service, where users are allowed to submit one Job every 30 seconds. Jobs can take more than 30 seconds, so the solution is more complex than setting stop_id = user_id. And what if you want to stop the new Job instead of the old one?

rate_limit <- function (job) {

  job$t_start <- Sys.time()

  for (j in job$queue$jobs)
    if (j$user_id == job$user_id)
      if (job$t_start - j$t_start < 30)
        job$stop('Rate Limit Exceeded')
}

q <- Queue$new(hooks = list(submitted = rate_limit))

j_A1 <- q$run({ 42 }, user_id = 'A')
j_B1 <- q$run({ 42 }, user_id = 'B')
j_B2 <- q$run({ 42 }, user_id = 'B')

j_B1$result
#> [1] 42

j_B2$result
#> <interrupt: Rate Limit Exceeded>

Note that the above code won’t completely solve the rate limiting task. If a user’s Job only takes five seconds to complete, then they could submit a Job every six seconds and the Queue would be none the wiser.

To give the Queue awareness of previously completed Jobs, you’ll need to persistently store per-user Job start times - like in the below solution.

t_user <- list()

rate_limit <- function (job) {
  t_start <- Sys.time()
  t_diff  <- t_user[[job$user_id]] - t_start
  if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded')   }
  else                     { t_user[[job$user_id]] <<- t_start }
}

q <- Queue$new(hooks = list(created = rate_limit))

In the first example, we attached a hook to 'submitted' because that’s when <Job>$queue becomes available in callbacks. In the latter example, we attached a hook to 'created' instead because we didn’t need <Job>$queue for that solution. Check the trigger order listed in the “Callback Triggers” section above, and attach callbacks as early as possible to expedite Job handling.