Reacting to Results
my_job <- q$run({ 42 })
# === A 'jobqueue'-style callback ============
my_job$on('done', ~message('Result = ', .$result))
#> Result = 42
# === A 'promises'-style callback ============
my_job %...>% message('Result = ', .)
#> Result = 42
See also vignette('hooks')
Output vs Result
When expr
is finished evaluating, the result is assigned
to <Job>$output
.
job <- q$run({ 42 })
job$output
#> [1] 42
By default, <Job>$result
will return the exact
same value as <Job>$output
.
job$result
#> [1] 42
However, while <Job>$output
is a fixed value,
<Job>$result
is highly configurable. Most notably by
the reformat
and signal
parameters.
Reformatting Output
Use reformat = function (job) {...}
to define what
should be returned by <Job>$result
.
Important
You may access
job$output
in yourreformat
function.
DO NOT accessjob$result
, which would start an infinite recusion.
reformat
can be set in several places:
Queue$new(reformat = http_wrap)
Job$new(reformat = http_wrap)
<Queue>$run(reformat = http_wrap)
<Job>$reformat <- http_wrap
Non-blocking
<Job>$output
blocks until the Job is done. If you
want <Job>$result
to not block, you can return a
placeholder value instead.
reformat <- function (job) {
if (job$is_done) { job$output$result }
else { 'result not available yet' }
}
job <- q$run({ Sys.sleep(5); 42 }, reformat = reformat)
job$result
#> [1] "result not available yet"
job$state
#> [1] "running"
Signaling Errors
By default, errors and interrupts will be caught by jobqueue and
returned as a condition object from <Job>$result
.
expr <- quote(stop('error XYZ'))
job <- q$run(expr)
x <- job$result
inherits(x, 'condition')
#> TRUE
x
#> Error in `q$run(expr)`:
#> ! Error evaluating <Job>$expr on background process
#> Caused by error in `stop("error XYZ")`:
#> ! error XYZ
#> ---
#> Subprocess backtrace:
#> 1. base::eval(expr = expr, envir = vars, enclos = .GlobalEnv)
#> 2. base::stop("error XYZ")
#> 3. | base::.handleSimpleError(function (e) …
#> 4. global h(simpleError(msg, call))
If you want those errors to continue propagating, set
signal = TRUE
.
job <- q$run(expr, signal = TRUE)
x <- job$result
#> Error in "q$run(expr, signal = TRUE)" :
#> ! Error evaluating <Job>$expr on background process
#> Caused by error in `stop("error XYZ")`:
#> ! error XYZ
These signals can be caught as usual by try()
and
tryCatch()
.
print_cnd <- function (cnd) cli::cli_text('Caught {.type {cnd}}.')
job <- q$run({ Sys.sleep(5) }, signal = TRUE)$stop()
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <interrupt> object.
res
#> NULL
job <- q$run({ stop('error XYZ') }, signal = TRUE)
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <error> object.
res
#> NULL
You can also signal some conditions while catching others. Perhaps stopping a job is part of the “normal” workflow and shouldn’t be lumped in with other errors.
Promise Handling
You can pass a Job directly to any function that expects a ‘promise’
object, such as promises::then
or
promises::%...>%
. These functions are re-exported by
jobqueue, so calling library(promises)
is optional.
q$run({ 42 }) %...>% message
#> 42
By default, errors and interrupts are caught by jobqueue and used for promise fulfillment.
onFulfilled <- function (value) cli::cli_text('Fulfilled with {.type {value}}.')
onRejected <- function (err) cli::cli_text('Rejected with {.type {err}}.')
job <- q$run({ Sys.sleep(5) })$stop()
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <interrupt> object.
job <- q$run({ stop('error XYZ') })
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <error> object.
Setting signal = TRUE
will cause errors and interrupts
be sent to the promise’s rejection handler instead.