Jobs go in. Results come out.
Active bindings
hooks
A named list of currently registered callback hooks.
jobs
Get or set - List of Jobs currently managed by this Queue.
state
The Queue's state:
'starting'
,'idle'
,'busy'
,'stopped'
, or'error.'
uid
Get or set - Unique identifier, e.g.
'Q1'
.tmp
The Queue's temporary directory.
workers
Get or set - List of Workers used for processing Jobs.
Methods
Method new()
Creates a pool of background processes for handling $run()
and
$submit()
calls. These workers are initialized according to the
globals
, packages
, and init
arguments.
Usage
Queue$new(
globals = NULL,
packages = NULL,
init = NULL,
max_cpus = parallelly::availableCores(),
workers = ceiling(max_cpus * 1.2),
timeout = NULL,
hooks = NULL,
reformat = NULL,
signal = FALSE,
cpus = 1L,
stop_id = NULL,
copy_id = NULL
)
Arguments
globals
A named list of variables that all
<Job>$expr
s will have access to. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment.packages
Character vector of package names to load on workers.
init
A call or R expression wrapped in curly braces to evaluate on each worker just once, immediately after start-up. Will have access to variables defined by
globals
and assets frompackages
. Returned value is ignored.max_cpus
Total number of CPU cores that can be reserved by all running Jobs (
sum(<Job>$cpus)
). Does not enforce limits on actual CPU utilization.workers
How many background Worker processes to start. Set to more than
max_cpus
to enable standby Workers to quickly swap out with Workers that need to restart.timeout, hooks, reformat, signal, cpus, stop_id, copy_id
Defaults for this Queue's
$run()
method. Here only,stop_id
andcopy_id
must be either afunction (job)
orNULL
.hooks
can set queue, worker, and/or job hooks - see the "Attaching" section invignette('hooks')
.
Method print()
Print method for a Queue
.
Method run()
Creates a Job object and submits it to the queue for running.
Any NA
arguments will be replaced with their value from Queue$new()
.
Usage
Queue$run(
expr,
vars = list(),
timeout = NA,
hooks = NA,
reformat = NA,
signal = NA,
cpus = NA,
stop_id = NA,
copy_id = NA,
...
)
Arguments
expr
A call or R expression wrapped in curly braces to evaluate on a worker. Will have access to any variables defined by
vars
, as well as the Worker'sglobals
,packages
, andinit
configuration. Seevignette('eval')
.vars
A named list of variables to make available to
expr
during evaluation. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment.timeout
A named numeric vector indicating the maximum number of seconds allowed for each state the job passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Example:
timeout = c(total = 2.5, running = 1)
. Seevignette('stops')
.hooks
A named list of functions to run when the Job state changes, of the form
hooks = list(created = function (worker) {...})
. Names of worker hooks are typically'created'
,'submitted'
,'queued'
,'dispatched'
,'starting'
,'running'
,'done'
, or'*'
(duplicates okay). Seevignette('hooks')
.reformat
Set
reformat = function (job)
to define what<Job>$result
should return. The default,reformat = NULL
passes<Job>$output
to<Job>$result
unchanged. Seevignette('results')
.signal
Should calling
<Job>$result
signal on condition objects? WhenFALSE
,<Job>$result
will return the object without taking additional action. Setting toTRUE
or a character vector of condition classes, e.g.c('interrupt', 'error', 'warning')
, will cause the equivalent ofstop(<condition>)
to be called when those conditions are produced. Seevignette('results')
.cpus
How many CPU cores to reserve for this Job. Used to limit the number of Jobs running simultaneously to respect
<Queue>$max_cpus
. Does not prevent a Job from using more CPUs than reserved.stop_id
If an existing Job in the Queue has the same
stop_id
, that Job will be stopped and return an 'interrupt' condition object as its result.stop_id
can also be afunction (job)
that returns thestop_id
to assign to a given Job. Astop_id
ofNULL
disables this feature. Seevignette('stops')
.copy_id
If an existing Job in the Queue has the same
copy_id
, the newly submitted Job will become a "proxy" for that earlier Job, returning whatever result the earlier Job returns.copy_id
can also be afunction (job)
that returns thecopy_id
to assign to a given Job. Acopy_id
ofNULL
disables this feature. Seevignette('stops')
....
Arbitrary named values to add to the returned Job object.
Returns
The new Job object.
Method submit()
Adds a Job to the Queue for running on a background process.
Arguments
job
A Job object, as created by
Job$new()
.
Method wait()
Blocks until the Queue enters the given state.
Arguments
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.'.next'
- Only one time, the next time the state changes.'starting'
- Workers are starting.'idle'
- All workers are ready/idle.'busy'
- At least one worker is busy.'stopped'
- Shutdown is complete.'error'
- Workers did not start cleanly.
Method on()
Attach a callback function to execute when the Queue enters state
.
Arguments
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.'.next'
- Only one time, the next time the state changes.'starting'
- Workers are starting.'idle'
- All workers are ready/idle.'busy'
- At least one worker is busy.'stopped'
- Shutdown is complete.'error'
- Workers did not start cleanly.
func
A function that accepts a Queue object as input. Return value is ignored.
Method stop()
Stop all jobs and workers.