Introduction
Callbacks are the cornerstone of asynchronous programming.
If you want to calculate 2 + 2
and show the results, the
synchronous programming approach would be:
message('Result = ', 2 + 2)
#> Result = 20
In asynchronous programming, this task is broken apart into two discrete steps: computation and result handling. Using jobqueue, this looks like:
job <- q$run({ 2 + 2 })
job$on('done', ~message('Result = ', .$result))
#> Result = 20
The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result.
Callbacks are not limited to just the Job finishing. See the “State Triggers” section for a list of all events that can trigger a callback.
Important
Hooks are evaluated on the main process, not the background processes. Therefore, ensure that the callback functions execute quickly so as to not delay Job handling.
A Callback Function
The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed.
This is a great place for shorthand function definitions introduced by R 4.1.0.
jobqueue also understands the lambda syntax for functions (see
rlang::as_function()
).
# Queue Hooks
hook <- function (queue) { message('Queue is ', queue$state) }
hook <- \(q) message('Queue is ', q$state)
hook <- ~message('Queue is ', .$state)
# Worker Hooks
hook <- function (worker) { message('Worker is ', worker$state) }
hook <- \(w) message('Worker is ', w$state)
hook <- ~message('Worker is ', .$state)
# Job Hooks
hook <- function (job) { message('Job is ', job$state) }
hook <- \(j) message('Job is ', j$state)
hook <- ~message('Job is ', .$state)
Triggers
Queue, Worker, and Job objects update their $state
as
described in the tables below. Each time the state changes, any
callbacks registered to that state are executed. In addition, you can
register state = '*'
or state='.next'
which
trigger regardless of the present state name.
Special Triggers
State | Triggers |
---|---|
'*' |
Every time the state changes. |
'.next' |
Only one time, the next time the state changes. |
Queue States
State | Triggers |
---|---|
'starting' |
After initialization, before Workers are started. |
'idle' |
When all Workers are idle. Also, after initial startup. |
'busy' |
At least one worker is busy. |
'stopped' |
After <Queue>$stop() is called. |
'error' |
Workers did not start cleanly. |
Attaching
Callbacks can be attached to Queue, Worker, or Job objects.
You can add callbacks either when you create the object with
$new()
, or later with $on()
.
hook <- ~message(.$uid, ' is ', .$state)
q <- Queue$new(hooks = list(q_idle = hook))
w <- Worker$new(hooks = list(idle = hook))
j <- Job$new(hooks = list(done = hook))
q$on('busy', hook)
w$on('busy', hook)
j$on('starting', hook)
In Queue$new()
, hooks
can set hooks for the
Queue, Worker, and Job objects. The rules are:
- Prefixing with
q_
,w_
, orj_
attaches the hook to the Queue, Workers, or Jobs, respectively. - Non-prefixed hooks are attached to Jobs.
- Alternatively, a list of lists can be assigned to hooks, of the format:
Removing
Callbacks attached with $new()
cannot be removed.
When you attach a callback with $on()
, the return value
is a function, which, when called, will remove that callback from the
object.
Default Job Hooks
When you create a Queue (with Queue$new()
), you can
define a set of callbacks to automatically apply to any Jobs that are
created with the <Queue>$run()
command.
n <- 0
q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } ))
for (i in 1:5) q$run({ 'Hi' })
n
#> [1] 5
How does Queue$new()
know to apply the
hooks
to Jobs instead of Queues or Workers? Unless
otherwise indicated, Queue$new()
hooks
are
assumed to be for Jobs. You can also explicitly specify that
hooks
are for Jobs by using the formats described in the
“Attaching Callbacks” section above.
q <- Queue$new(hooks = list(j_created = ~{ n <<- n + 1 } ))
# or
q <- Queue$new(hooks = list(job = list(created = ~{ n <<- n + 1 } )))
If you set hooks
in <Queue>$run()
,
those hooks will REPLACE the Job hooks from
Queue$new()
.
Use Case: Priority Setting
Below, we’ll set up a callback function that triggers when each Job
enters the 'queued'
state. It will modify the Job, adding a
custom <Job>$priority
field to the Job object. Then
it will modify the Queue’s internal list of jobs
(<Queue>$jobs
), sorting them according to each Job’s
<Job>$priority
. Last, it will attach additional
callbacks to the Job to output timing information upon exit from the
'queued
’ state and upon entry into the 'done'
state.
library(glue)
library(jobqueue)
# Our callback/hook function.
prioritize <- function (job) {
queue <- job$queue
queue_jobs <- job$queue$jobs
# Apply a random priority to this job.
job$priority <- round(runif(1) * 10) - 5
# Sort all this Queue's jobs by priority (including this job).
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$queue$jobs <- queue_jobs[order(priorities)]
# Add hooks to this job to report queued/total times.
t1 <- Sys.time()
tdiff <- function () format(round(Sys.time() - t1, 1))
job$on('.next', ~message(glue(
'Job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' )))
job$on('done', ~message(glue(
'Job {.$uid} (priority {.$priority}) finished in {tdiff()}' )))
}
# A single worker best illustrates processing order.
q <- Queue$new(
'workers' = 1,
'hooks' = list(queued = prioritize) )
for (i in 1:5) {
job <- q$run({ 3.14 })
message(glue_data(job, 'Created Job {uid} with priority {priority}'))
}
#> Job J11 (priority -3) was dispatched after 0.1 secs
#> Created Job J11 with priority -3
#> Created Job J12 with priority -2
#> Created Job J13 with priority 1
#> Created Job J14 with priority 0
#> Created Job J15 with priority -1
#> Job J11 (priority -3) finished in 0.7 secs
#> Job J12 (priority -2) was dispatched after 0.6 secs
#> Job J12 (priority -2) finished in 1.1 secs
#> Job J15 (priority -1) was dispatched after 0.7 secs
#> Job J15 (priority -1) finished in 1.3 secs
#> Job J14 (priority 0) was dispatched after 1.4 secs
#> Job J14 (priority 0) finished in 2 secs
#> Job J13 (priority 1) was dispatched after 2.1 secs
#> Job J13 (priority 1) finished in 2.6 secs
In an actual application, you could set
<Job>$priority
based on
<Job>$vars
.
prioritize <- function (job) {
# Give priority to lower number of replications
job$priority <- job$vars$replications
queue_jobs <- job$queue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))
Or, define the custom <Job>$priority
field in
<Queue>$run()
.
prioritize <- function (job) {
queue_jobs <- job$queue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$queue$jobs <- queue_jobs[order(priorities)]
}
q <- Queue$new(hooks = list(queued = prioritize))
# vvvvvvvvvvvvvvv
for (reps in 5:1) job <- q$run({ 3.14 }, priority = reps)
Or, set <Job>$priority
in a hook that triggers
before prioritize()
is triggered.
set_priority <- function (job) {
job$priority <- job$vars$replications
}
prioritize <- function (job) {
queue_jobs <- job$queue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$queue$jobs <- queue_jobs[order(priorities)]
}
# vvvvvvvvvvvvvvvvvvvvvv
q <- Queue$new(hooks = list(created = set_priority, queued = prioritize))
for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps))
Use Case: Rate Limiting
Say you’re hosting a web service, where users are allowed to submit
one Job every 30 seconds. Jobs can take more than 30 seconds, so the
solution is more complex than setting stop_id = user_id
.
And what if you want to stop the new Job instead of the old one?
rate_limit <- function (job) {
job$t_start <- Sys.time()
for (j in job$queue$jobs)
if (j$user_id == job$user_id)
if (job$t_start - j$t_start < 30)
job$stop('Rate Limit Exceeded')
}
q <- Queue$new(hooks = list(submitted = rate_limit))
j_A1 <- q$run({ 42 }, user_id = 'A')
j_B1 <- q$run({ 42 }, user_id = 'B')
j_B2 <- q$run({ 42 }, user_id = 'B')
j_B1$result
#> [1] 42
j_B2$result
#> <interrupt: Rate Limit Exceeded>
Note that the above code won’t completely solve the rate limiting task. If a user’s Job only takes five seconds to complete, then they could submit a Job every six seconds and the Queue would be none the wiser.
To give the Queue awareness of previously completed Jobs, you’ll need to persistently store per-user Job start times - like in the below solution.
t_user <- list()
rate_limit <- function (job) {
t_start <- Sys.time()
t_diff <- t_user[[job$user_id]] - t_start
if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded') }
else { t_user[[job$user_id]] <<- t_start }
}
q <- Queue$new(hooks = list(created = rate_limit))
In the first example, we attached a hook to 'submitted'
because that’s when <Job>$queue
becomes available in
callbacks. In the latter example, we attached a hook to
'created'
instead because we didn’t need
<Job>$queue
for that solution. Check the trigger
order listed in the “Callback Triggers” section above, and attach
callbacks as early as possible to expedite Job handling.