Introduction
You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones.
There are several ways to stop a Job:
-
<Job>$stop()
: return an interrupt condition. -
<Job>$output<-
: return a custom result. -
<Job>$stop_id
: replace redundant Jobs. -
<Job>$copy_id
: combine redundant Jobs. -
<Job>$timeout
: cap a Job’s compute time.
Immediate Stop
As soon as a Job is created, it can be stopped by either calling
<Job>$stop()
or by assigning to
<Job>$output
. These methods work up until the Job
state is 'done'
. Once the Job state is 'done'
,
<Job>$output
is immutable and calling
<Job>$stop()
will have no effect (since it’s already
stopped). Calling <Job>$stop()
and assigning to
<Job>$output
both have the side effect of setting the
Job state to 'done'
.
Interrupting with <Job>$stop()
job <- Job$new({ Sys.sleep(5); 10 })
job$stop()
job$result
#> <interrupt: job stopped by user>
job <- Job$new({ Sys.sleep(5); 10 })
job$stop('my reason')
job$result
#> <interrupt: my reason>
class(job$result)
#> [1] "interrupt" "condition"
You can call <Job>$stop()
from within callback
hooks.
Assigning to <Job>$output
<Job>$output
can only be assigned to once. If the
Job is not yet done when <Job>$output
is assigned,
then any ongoing processing will be halted.
job <- Job$new({ Sys.sleep(5); 10 })
job$state
#> [1] "created"
job$output <- "Custom result"
job$state
#> [1] "done"
job$result
#> [1] "Custom result"
If you’ve set <Job>$reformat
, it will determine
<Job>$result
as usual. See
vignette('results')
.
Replacing and Combining
Sometimes repeat jobs may be sent to the Queue, for instance, a user
clicking a ‘submit’ button repeatedly. stop_id
and
copy_id
can prevent such requests from overloading the
server.
Stop ID
New jobs will replace existing jobs with the same
stop_id
.
job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123)
job2 <- q$run({ 'B' }, stop_id = 123)
job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] "B"
Copy ID
New jobs will mirror the output of existing jobs with the same
copy_id
.
job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456)
job2 <- q$run({ 'B' }, copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"
In Queue$new()
If you set stop_id
and/or copy_id
in
Queue$new()
, then they must be functions (or lambda
notation functions) that accept a Job object as input and returns a
value id
. If id
is NULL
, then no
comparisons will be done. Otherwise, id
can be any data
type and will be compared to other id
s with
identical()
.
q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id))
job1 <- q$run({ f(x) }, vars = list(f = sum, x = 1:10), user_id = 'A')
job2 <- q$run({ f(x) }, vars = list(f = sum, x = 1:2), user_id = 'A')
job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A')
job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] 3
job3$result
#> [1] 5.5
Timeouts
job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2)
q$submit(job)
job$result
#> <interrupt: total runtime exceeded 0.2 seconds>
job <- Job$new({ 10 }, timeout = list(created = 0.2))
job$result
#> <interrupt: exceeded 0.2 seconds while in "created" state>
Limits (in seconds) can be set on:
- The total
'submitted'
to'done'
time:timeout = 2
- On a per-state basis:
timeout = list(queued = 1, running = 2)
- Or both:
timeout = list(total = 3, queued = 2, running = 2)