Skip to contents

Jobs go in. Results come out.

Active bindings

hooks

A named list of currently registered callback hooks.

jobs

Get or set - List of Jobs currently managed by this Queue.

state

The Queue's state: 'starting', 'idle', 'busy', 'stopped', or 'error.'

uid

Get or set - Unique identifier, e.g. 'Q1'.

tmp

The Queue's temporary directory.

workers

Get or set - List of Workers used for processing Jobs.

Methods


Method new()

Creates a pool of background processes for handling $run() and $submit() calls. These workers are initialized according to the globals, packages, and init arguments.

Usage

Queue$new(
  globals = NULL,
  packages = NULL,
  init = NULL,
  max_cpus = parallelly::availableCores(),
  workers = ceiling(max_cpus * 1.2),
  timeout = NULL,
  hooks = NULL,
  reformat = NULL,
  signal = FALSE,
  cpus = 1L,
  stop_id = NULL,
  copy_id = NULL
)

Arguments

globals

A named list of variables that all <Job>$exprs will have access to. Alternatively, an object that can be coerced to a named list with as.list(), e.g. named vector, data.frame, or environment.

packages

Character vector of package names to load on workers.

init

A call or R expression wrapped in curly braces to evaluate on each worker just once, immediately after start-up. Will have access to variables defined by globals and assets from packages. Returned value is ignored.

max_cpus

Total number of CPU cores that can be reserved by all running Jobs (sum(<Job>$cpus)). Does not enforce limits on actual CPU utilization.

workers

How many background Worker processes to start. Set to more than max_cpus to enable standby Workers to quickly swap out with Workers that need to restart.

timeout, hooks, reformat, signal, cpus, stop_id, copy_id

Defaults for this Queue's $run() method. Here only, stop_id and copy_id must be either a function (job) or NULL. hooks can set queue, worker, and/or job hooks - see the "Attaching" section in vignette('hooks').

Returns

A Queue object.


Method print()

Print method for a Queue.

Usage

Queue$print(...)

Arguments

...

Arguments are not used currently.


Method run()

Creates a Job object and submits it to the queue for running. Any NA arguments will be replaced with their value from Queue$new().

Usage

Queue$run(
  expr,
  vars = list(),
  timeout = NA,
  hooks = NA,
  reformat = NA,
  signal = NA,
  cpus = NA,
  stop_id = NA,
  copy_id = NA,
  ...
)

Arguments

expr

A call or R expression wrapped in curly braces to evaluate on a worker. Will have access to any variables defined by vars, as well as the Worker's globals, packages, and init configuration. See vignette('eval').

vars

A named list of variables to make available to expr during evaluation. Alternatively, an object that can be coerced to a named list with as.list(), e.g. named vector, data.frame, or environment.

timeout

A named numeric vector indicating the maximum number of seconds allowed for each state the job passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Example: timeout = c(total = 2.5, running = 1). See vignette('stops').

hooks

A named list of functions to run when the Job state changes, of the form hooks = list(created = function (worker) {...}). Names of worker hooks are typically 'created', 'submitted', 'queued', 'dispatched', 'starting', 'running', 'done', or '*' (duplicates okay). See vignette('hooks').

reformat

Set reformat = function (job) to define what <Job>$result should return. The default, reformat = NULL passes <Job>$output to <Job>$result unchanged. See vignette('results').

signal

Should calling <Job>$result signal on condition objects? When FALSE, <Job>$result will return the object without taking additional action. Setting to TRUE or a character vector of condition classes, e.g. c('interrupt', 'error', 'warning'), will cause the equivalent of stop(<condition>) to be called when those conditions are produced. See vignette('results').

cpus

How many CPU cores to reserve for this Job. Used to limit the number of Jobs running simultaneously to respect <Queue>$max_cpus. Does not prevent a Job from using more CPUs than reserved.

stop_id

If an existing Job in the Queue has the same stop_id, that Job will be stopped and return an 'interrupt' condition object as its result. stop_id can also be a function (job) that returns the stop_id to assign to a given Job. A stop_id of NULL disables this feature. See vignette('stops').

copy_id

If an existing Job in the Queue has the same copy_id, the newly submitted Job will become a "proxy" for that earlier Job, returning whatever result the earlier Job returns. copy_id can also be a function (job) that returns the copy_id to assign to a given Job. A copy_id of NULL disables this feature. See vignette('stops').

...

Arbitrary named values to add to the returned Job object.

Returns

The new Job object.


Method submit()

Adds a Job to the Queue for running on a background process.

Usage

Queue$submit(job)

Arguments

job

A Job object, as created by Job$new().

Returns

This Queue, invisibly.


Method wait()

Blocks until the Queue enters the given state.

Usage

Queue$wait(state = "idle")

Arguments

state

The name of a Queue state. Typically one of:

  • '*' - Every time the state changes.

  • '.next' - Only one time, the next time the state changes.

  • 'starting' - Workers are starting.

  • 'idle' - All workers are ready/idle.

  • 'busy' - At least one worker is busy.

  • 'stopped' - Shutdown is complete.

  • 'error' - Workers did not start cleanly.

Returns

This Queue, invisibly.


Method on()

Attach a callback function to execute when the Queue enters state.

Usage

Queue$on(state, func)

Arguments

state

The name of a Queue state. Typically one of:

  • '*' - Every time the state changes.

  • '.next' - Only one time, the next time the state changes.

  • 'starting' - Workers are starting.

  • 'idle' - All workers are ready/idle.

  • 'busy' - At least one worker is busy.

  • 'stopped' - Shutdown is complete.

  • 'error' - Workers did not start cleanly.

func

A function that accepts a Queue object as input. Return value is ignored.

Returns

A function that when called removes this callback from the Queue.


Method stop()

Stop all jobs and workers.

Usage

Queue$stop(reason = "job queue shut down by user", cls = NULL)

Arguments

reason

Passed to <Job>$stop() for any Jobs currently managed by this Queue.

cls

Passed to <Job>$stop() for any Jobs currently managed by this Queue.

Returns

This Queue, invisibly.