Skip to contents

Submit your jobs to a Queue and it will run them on a background R process when one becomes available. Jobs are launched in the same order as they are submitted, but may finish in a different order based on each job's run time. Treat the returned Job object as a promise for asynchronous downstream processing of the job's result.

Starts R sessions that run in the background. Calls to run() evaluate R functions on these background sessions. run() returns a Job object which can be passed to then() to schedule work to be done on the result, when it is ready.

The Job object also has a $stop() element which can be called to return a result immediately. If the job was actively running in a background R session, that process is killed and a new process is started to take its place.

Active bindings

jobs

List of Jobs currently managed by this Queue.

workers

List of Workers used to process Jobs.

uid

Unique identifier, e.g. 'Q1'.

loaded

List of global variables and attached functions on the Workers.

state

Current state: starting, active, stopped, or error.

Methods


Method new()

Creates n workers background processes for handling $run() and $submit() calls. These workers are initialized according to the globals, packages, init, and options arguments. The Queue will not use more than max_cpus at once, assuming the cpus argument is properly set for each Job.

Usage

Queue$new(
  globals = NULL,
  packages = NULL,
  init = NULL,
  max_cpus = availableCores(omit = 1L),
  workers = ceiling(max_cpus * 1.2),
  options = r_session_options(),
  tmax = NULL,
  hooks = NULL,
  reformat = TRUE,
  stop_id = NULL,
  copy_id = NULL
)

Arguments

globals

A list or similar set of values that are added to the .GlobalEnv of workers.

packages

Character vector of package names to load on workers.

init

A call or R expression wrapped in curly braces to evaluate on each worker just once, immediately after start-up. Will have access to any variables defined by globals and assets from packages. Returned value is ignored.

max_cpus

Total number of CPU cores that can be reserved all running Jobs via their combined cpus arguments. Does not enforce limits on actual CPU utilization.

workers

How many background Worker processes to start. Set to more than max_cpus to enable interrupted workers to be quickly swapped out with standby Workers while a replacement Worker boots up.

options

Passed to callr::r_session$new()

tmax, hooks, reformat, stop_id, copy_id

Defaults for this Queue's $run() method. Here, stop_id and copy_id must be a function (job) or NULL.

Returns

A Queue object.


Method print()

Print method for a Queue.

Usage

Queue$print(...)

Arguments

...

Arguments are not used currently.


Method run()

Creates a Job object and submits it to the queue for running on a background process. Here, the default NA value will use the value set by Queue$new().

Usage

Queue$run(
  expr,
  vars = NULL,
  scan = is.null(vars),
  ignore = NULL,
  tmax = NA,
  hooks = NA,
  reformat = NA,
  cpus = 1L,
  stop_id = NA,
  copy_id = NA,
  start = TRUE
)

Arguments

expr

A call or R expression wrapped in curly braces to evaluate on a worker. Will have access to any variables defined by vars, as well as the Worker's globals, packages, and init configuration.

vars

A list of named variables to make available to expr during evaluation.

scan

Should additional variables be added to vars based on scanning expr for missing global variables? By default, scan = is.null(vars), meaning if you set vars = list() then no scan is done. Set scan = TRUE to always scan, scan = FALSE to never scan, and scan = <an environment-like object> to look for globals there. When scanning, the worker's environment is taken into account, and globals on the worker are favored over globals locally. vars defined by the user are always left untouched.

ignore

A character vector of variable names that should NOT be added to vars by scan.

tmax

A named numeric vector indicating the maximum number of seconds allowed for each state the job passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Example: tmax = c(total = 2.5, running = 1) will force-stop a job 2.5 seconds after it is submitted, and also limits its time in the running state to just 1 second.

hooks

A list of functions to run when the Job state changes, of the form hooks = list(created = function (job) {...}, done = ~{...}). The names of these functions should be created, submitted, queued, dispatched, starting, running, done, or '*'. '*' will be run every time the state changes, whereas the others will only be run when the Job enters that state. Duplicate names are allowed.

reformat

The underlying call to callr::r_session$call() returns information on stdout, stderr, etc. When reformat=TRUE (the default), only the result of the expression is returned. Set reformat=FALSE to return the entire callr output, or reformat=function(job,output) to use a function of your own to post-process the output from callr.

cpus

How many CPU cores to reserve for this Job. The Queue uses this number to limit the number of simultaneously running Jobs; it does not prevent a Job from using more CPUs than reserved.

stop_id

If an existing Job in the Queue has the same stop_id, that Job will be stopped and return an 'interrupt' condition object as its result. stop_id can also be a function (job) that returns the stop_id to assign to a given Job. A stop_id of NULL disables this feature.

copy_id

If an existing Job in the Queue has the same copy_id, the newly submitted Job will become a "proxy" for that earlier Job, returning whatever result the earlier Job returns. copy_id can also be a function (job) that returns the copy_id to assign to a given Job. A copy_id of NULL disables this feature.

start

Should a Job be submitted to the Queue (start = TRUE) or just created (start = FALSE)?

Returns

The new Job object.


Method submit()

Adds a Job to the Queue for running on a background process.

Usage

Queue$submit(job)

Arguments

job

A Job object, as created by Job$new().

Returns

This Queue, invisibly.


Method shutdown()

Stop all jobs and workers.

Usage

Queue$shutdown(reason = "job queue shut down by user")

Arguments

reason

Passed to <Job>$stop(reason) for any Jobs currently managed by this Queue.

Returns

This Queue, invisibly.