Submit your jobs to a Queue and it will run them on a background R process when one becomes available. Jobs are launched in the same order as they are submitted, but may finish in a different order based on each job's run time. Treat the returned Job object as a promise for asynchronous downstream processing of the job's result.
Starts R sessions that run in the background. Calls to run()
evaluate R
functions on these background sessions. run()
returns a Job
object which can be passed to then()
to schedule work to be done on the
result, when it is ready.
The Job object also has a $stop()
element which can be called to return
a result immediately. If the job was actively running in a background R
session, that process is killed and a new process is started to take its
place.
Methods
Method new()
Creates n workers
background processes for handling $run()
and
$submit()
calls. These workers are initialized according to the
globals
, packages
, init
, and options
arguments. The Queue will
not use more than max_cpus
at once, assuming the cpus
argument is
properly set for each Job.
Arguments
globals
A list or similar set of values that are added to the
.GlobalEnv
of workers.packages
Character vector of package names to load on workers.
init
A call or R expression wrapped in curly braces to evaluate on each worker just once, immediately after start-up. Will have access to any variables defined by
globals
and assets frompackages
. Returned value is ignored.max_cpus
Total number of CPU cores that can be reserved all running Jobs via their combined
cpus
arguments. Does not enforce limits on actual CPU utilization.workers
How many background Worker processes to start. Set to more than
max_cpus
to enable interrupted workers to be quickly swapped out with standby Workers while a replacement Worker boots up.options
Passed to
callr::r_session$new()
tmax, hooks, reformat, stop_id, copy_id
Defaults for this Queue's
$run()
method. Here,stop_id
andcopy_id
must be afunction (job)
orNULL
.
Method print()
Print method for a Queue
.
Method run()
Creates a Job object and submits it to the queue for running on a
background process. Here, the default NA
value will use the value set
by Queue$new()
.
Usage
Queue$run(
expr,
vars = NULL,
scan = is.null(vars),
ignore = NULL,
tmax = NA,
hooks = NA,
reformat = NA,
cpus = 1L,
stop_id = NA,
copy_id = NA,
start = TRUE
)
Arguments
expr
A call or R expression wrapped in curly braces to evaluate on a worker. Will have access to any variables defined by
vars
, as well as the Worker'sglobals
,packages
, andinit
configuration.vars
A list of named variables to make available to
expr
during evaluation.scan
Should additional variables be added to
vars
based on scanningexpr
for missing global variables? By default,scan = is.null(vars)
, meaning if you setvars = list()
then no scan is done. Setscan = TRUE
to always scan,scan = FALSE
to never scan, andscan = <an environment-like object>
to look for globals there. When scanning, the worker's environment is taken into account, and globals on the worker are favored over globals locally.vars
defined by the user are always left untouched.ignore
A character vector of variable names that should NOT be added to
vars
byscan
.tmax
A named numeric vector indicating the maximum number of seconds allowed for each state the job passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Example:
tmax = c(total = 2.5, running = 1)
will force-stop a job 2.5 seconds after it is submitted, and also limits its time in the running state to just 1 second.hooks
A list of functions to run when the Job state changes, of the form
hooks = list(created = function (job) {...}, done = ~{...})
. The names of these functions should becreated
,submitted
,queued
,dispatched
,starting
,running
,done
, or'*'
.'*'
will be run every time the state changes, whereas the others will only be run when the Job enters that state. Duplicate names are allowed.reformat
The underlying call to
callr::r_session$call()
returns information on stdout, stderr, etc. Whenreformat=TRUE
(the default), only the result of the expression is returned. Setreformat=FALSE
to return the entire callr output, orreformat=function(job,output)
to use a function of your own to post-process the output from callr.cpus
How many CPU cores to reserve for this Job. The Queue uses this number to limit the number of simultaneously running Jobs; it does not prevent a Job from using more CPUs than reserved.
stop_id
If an existing Job in the Queue has the same
stop_id
, that Job will be stopped and return an 'interrupt' condition object as its result.stop_id
can also be afunction (job)
that returns thestop_id
to assign to a given Job. Astop_id
ofNULL
disables this feature.copy_id
If an existing Job in the Queue has the same
copy_id
, the newly submitted Job will become a "proxy" for that earlier Job, returning whatever result the earlier Job returns.copy_id
can also be afunction (job)
that returns thecopy_id
to assign to a given Job. Acopy_id
ofNULL
disables this feature.start
Should a Job be submitted to the Queue (
start = TRUE
) or just created (start = FALSE
)?
Returns
The new Job object.
Method submit()
Adds a Job to the Queue for running on a background process.
Arguments
job
A Job object, as created by
Job$new()
.