Skip to contents
library(jobqueue)
q <- Queue$new()

Reacting to Results

my_job <- q$run({ 42 })

# ===  A 'jobqueue'-style callback  ============
my_job$on('done', ~message('Result = ', .$result))
#> Result = 42

# ===  A 'promises'-style callback  ============
my_job %...>% message('Result = ', .)
#> Result = 42

See also vignette('hooks')

Output vs Result

When expr is finished evaluating, the result is assigned to <Job>$output.

job <- q$run({ 42 })

job$output
#> [1] 42

By default, <Job>$result will return the exact same value as <Job>$output.

job$result
#> [1] 42

However, while <Job>$output is a fixed value, <Job>$result is highly configurable. Most notably by the reformat and signal parameters.

Reformatting Output

Use reformat = function (job) {...} to define what should be returned by <Job>$result.

Important

You may access job$output in your reformat function.
DO NOT access job$result, which would start an infinite recusion.

reformat can be set in several places:

  • Queue$new(reformat = http_wrap)
  • Job$new(reformat = http_wrap)
  • <Queue>$run(reformat = http_wrap)
  • <Job>$reformat <- http_wrap

Transformation

http_wrap <- function (job) {
  result <- job$output
  paste0('{"status":200,"body":{"result":', result, '}}')
}

job <- q$run({ 42 }, reformat = http_wrap)
cat(job$result)
#> {"status":200,"body":{"result":42}}

Job Details

with_id <- function (job) {
  list(result = job$output, id = job$id)
}

job <- q$run({ 42 }, reformat = with_id, id = 'abc')
dput(job$result)
#> list(result = 42, id = "abc")

Non-blocking

<Job>$output blocks until the Job is done. If you want <Job>$result to not block, you can return a placeholder value instead.

reformat <- function (job) {
  if (job$is_done) { job$output$result          }
  else             { 'result not available yet' }
}

job <- q$run({ Sys.sleep(5); 42 }, reformat = reformat)
job$result
#> [1] "result not available yet"

job$state
#> [1] "running"

Signaling Errors

By default, errors and interrupts will be caught by jobqueue and returned as a condition object from <Job>$result.

expr <- quote(stop('error XYZ'))

job <- q$run(expr)
x   <- job$result

inherits(x, 'condition')
#> TRUE

x
#> Error in `q$run(expr)`:
#> ! Error evaluating <Job>$expr on background process
#> Caused by error in `stop("error XYZ")`:
#> ! error XYZ
#> ---
#> Subprocess backtrace:
#> 1. base::eval(expr = expr, envir = vars, enclos = .GlobalEnv)
#> 2. base::stop("error XYZ")
#> 3. | base::.handleSimpleError(function (e) …
#> 4. global h(simpleError(msg, call))

If you want those errors to continue propagating, set signal = TRUE.

job <- q$run(expr, signal = TRUE)
x   <- job$result
#> Error in "q$run(expr, signal = TRUE)" : 
#>    ! Error evaluating <Job>$expr on background process
#>  Caused by error in `stop("error XYZ")`:
#>  ! error XYZ

These signals can be caught as usual by try() and tryCatch().

print_cnd <- function (cnd) cli::cli_text('Caught {.type {cnd}}.')

job <- q$run({ Sys.sleep(5) }, signal = TRUE)$stop()
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <interrupt> object.

res
#> NULL

job <- q$run({ stop('error XYZ') }, signal = TRUE)
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <error> object.

res
#> NULL

You can also signal some conditions while catching others. Perhaps stopping a job is part of the “normal” workflow and shouldn’t be lumped in with other errors.

job <- q$run({ Sys.sleep(5) }, signal = 'error')$stop()
res <- tryCatch(job$result, error = print_cnd)

res
#> <interrupt: job stopped by user>

job <- q$run({ stop('error XYZ') }, signal = 'error')
res <- tryCatch(job$result, error = print_cnd)
#> Caught an <error> object.

res
#> NULL

Promise Handling

You can pass a Job directly to any function that expects a ‘promise’ object, such as promises::then or promises::%...>%. These functions are re-exported by jobqueue, so calling library(promises) is optional.

q$run({ 42 }) %...>% message
#> 42

By default, errors and interrupts are caught by jobqueue and used for promise fulfillment.

onFulfilled <- function (value) cli::cli_text('Fulfilled with {.type {value}}.')
onRejected  <- function (err)   cli::cli_text('Rejected with {.type {err}}.')

job <- q$run({ Sys.sleep(5) })$stop()
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <interrupt> object.

job <- q$run({ stop('error XYZ') })
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <error> object.

Setting signal = TRUE will cause errors and interrupts be sent to the promise’s rejection handler instead.

job <- q$run({ Sys.sleep(5) }, signal = TRUE)$stop()
job %>% then(onFulfilled, onRejected)
#> Rejected with an <interrupt> object.

job <- q$run({ stop('error XYZ') }, signal = TRUE)
job %>% then(onFulfilled, onRejected)
#> Rejected with an <error> object.