Jobs go in. Results come out.
Active bindings
hooksA named list of currently registered callback hooks.
jobsGet or set - List of Jobs currently managed by this Queue.
stateThe Queue's state:
'starting','idle','busy','stopped', or'error.'uidA short string, e.g.
'Q1', that uniquely identifies this Queue.tmpThe Queue's temporary directory.
workersGet or set - List of Workers used for processing Jobs.
cndThe error that caused the Queue to stop.
Methods
Method new()
Creates a pool of background processes for handling $run() and
$submit() calls. These workers are initialized according to the
globals, packages, and init arguments.
Usage
Queue$new(
globals = NULL,
packages = NULL,
namespace = NULL,
init = NULL,
max_cpus = availableCores(),
workers = ceiling(max_cpus * 1.2),
timeout = NULL,
hooks = NULL,
reformat = NULL,
signal = FALSE,
cpus = 1L,
stop_id = NULL,
copy_id = NULL
)Arguments
globalsA named list of variables that all
<Job>$exprs will have access to. Alternatively, an object that can be coerced to a named list withas.list(), e.g. named vector, data.frame, or environment.packagesCharacter vector of package names to load on workers.
namespaceThe name of a package to attach to the worker's environment.
initA call or R expression wrapped in curly braces to evaluate on each worker just once, immediately after start-up. Will have access to variables defined by
globalsand assets frompackagesandnamespace. Returned value is ignored.max_cpusTotal number of CPU cores that can be reserved by all running Jobs (
sum(<Job>$cpus)). Does not enforce limits on actual CPU utilization.workersHow many background Worker processes to start. Set to more than
max_cpusto enable standby Workers to quickly swap out with Workers that need to restart.timeout, hooks, reformat, signal, cpus, stop_id, copy_idDefaults for this Queue's
$run()method. Here only,stop_idandcopy_idmust be either afunction (job)orNULL.hookscan set queue, worker, and/or job hooks - see the "Attaching" section invignette('hooks').
Method print()
Print method for a Queue.
Method run()
Creates a Job object and submits it to the queue for running.
Any NA arguments will be replaced with their value from Queue$new().
Usage
Queue$run(
expr,
vars = list(),
timeout = NA,
hooks = NA,
reformat = NA,
signal = NA,
cpus = NA,
stop_id = NA,
copy_id = NA,
...
)Arguments
exprA call or R expression wrapped in curly braces to evaluate on a worker. Will have access to any variables defined by
vars, as well as the Worker'sglobals,packages, andinitconfiguration. Seevignette('eval').varsA named list of variables to make available to
exprduring evaluation. Alternatively, an object that can be coerced to a named list withas.list(), e.g. named vector, data.frame, or environment. Or afunction (job)that returns such an object.timeoutA named numeric vector indicating the maximum number of seconds allowed for each state the job passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Can also limit the 'starting' state for Workers. A
function (job)can be used in place of a number. Example:timeout = c(total = 2.5, running = 1). Seevignette('stops').hooksA named list of functions to run when the Job state changes, of the form
hooks = list(created = function (worker) {...}). Or afunction (job)that returns the same. Names of worker hooks are typically'created','submitted','queued','dispatched','starting','running','done', or'*'(duplicates okay). Seevignette('hooks').reformatSet
reformat = function (job)to define what<Job>$resultshould return. The default,reformat = NULLpasses<Job>$outputto<Job>$resultunchanged. Seevignette('results').signalShould calling
<Job>$resultsignal on condition objects? WhenFALSE,<Job>$resultwill return the object without taking additional action. Setting toTRUEor a character vector of condition classes, e.g.c('interrupt', 'error', 'warning'), will cause the equivalent ofstop(<condition>)to be called when those conditions are produced. Alternatively, afunction (job)that returnsTRUEorFALSE. Seevignette('results').cpusHow many CPU cores to reserve for this Job. Or a
function (job)that returns the same. Used to limit the number of Jobs running simultaneously to respect<Queue>$max_cpus. Does not prevent a Job from using more CPUs than reserved.stop_idIf an existing Job in the Queue has the same
stop_id, that Job will be stopped and return an 'interrupt' condition object as its result.stop_idcan also be afunction (job)that returns thestop_idto assign to a given Job. Astop_idofNULLdisables this feature. Seevignette('stops').copy_idIf an existing Job in the Queue has the same
copy_id, the newly submitted Job will become a "proxy" for that earlier Job, returning whatever result the earlier Job returns.copy_idcan also be afunction (job)that returns thecopy_idto assign to a given Job. Acopy_idofNULLdisables this feature. Seevignette('stops')....Arbitrary named values to add to the returned Job object.
Returns
The new Job object.
Method submit()
Adds a Job to the Queue for running on a background process.
Arguments
jobA Job object, as created by
Job$new().
Method wait()
Blocks until the Queue enters the given state.
Arguments
stateThe name of a Queue state. Typically one of:
'*'- Every time the state changes.'.next'- Only one time, the next time the state changes.'starting'- Workers are starting.'idle'- All workers are ready/idle.'busy'- At least one worker is busy.'stopped'- Shutdown is complete.
timeoutStop the Queue if it takes longer than this number of seconds, or
NULL.signalRaise an error if encountered (will also be recorded in
<Queue>$cnd).
Method on()
Attach a callback function to execute when the Queue enters state.
Arguments
stateThe name of a Queue state. Typically one of:
'*'- Every time the state changes.'.next'- Only one time, the next time the state changes.'starting'- Workers are starting.'idle'- All workers are ready/idle.'busy'- At least one worker is busy.'stopped'- Shutdown is complete.
funcA function that accepts a Queue object as input. Return value is ignored.
Method stop()
Stop all jobs and workers.