Overview
Interrupts in webqueue
fall into three categories:
- Timeout - set before the request starts processing.
- Replacement - a later request replaces an earlier one.
-
Custom - your code calls
<job>$stop()
.
Requests vs Jobs
A request (req
) is an environment with the data sent
from a web browser to the web server. A job
is a jobqueue::job_class
R6 object containing a req
(<job>$req
) and the parameters for processing it.
Some useful elements of each are:
|
|
Interrupts are always defined at the job
level.
Setup
library(webqueue)
# By default, returns "hello" immediately.
# Adding '?return=myvalue' will return "myvalue" instead.
# Adding '?sleep=2' will wait 2 seconds before returning.
handler <- function (req) {
if (!is.null(req$ARGS$sleep)) Sys.sleep(req$ARGS$sleep)
if (!is.null(req$ARGS$return)) req$ARGS$return else 'hello'
}
To interrupt a job
from within handler
, call stop()
,
rlang::abort()
, or cli::cli_abort()
as usual,
or return (webqueue::response())
to send a more informative
message back.
Timeout
Basic
Let’s start by limiting all job
s
to 1 second.
Per State
Setting timeout = 1
is shorthand for
timeout = c(total = 1)
, which starts the clock as soon as
the job
is created. If these job
s
are likely to wait a long time before running, you might consider
setting timeout = c(running = 1)
which starts the clock
when the job
actually begins running. Or, set
timeout = c(running = 1, queued = 60)
to also set a limit
to how long a job
can spend waiting in the queue.
See the jobqueue::job_class
reference page for information on all standard job
states.
Replacement
Use the stop_id
field to only run the most recent
request with a given hash.
For instance, if you have a session id (id
) you can use
that as the request hash.
wq <- webqueue(
handler = handler,
stop_id = function (job) { job$req$ARGS$id } )
# Fetch three URLs at the same time.
responses <- fetch(
first = 'http://localhost:8080?id=1&sleep=1&return=first',
second = 'http://localhost:8080?id=1&sleep=1&return=second',
third = 'http://localhost:8080?id=1&sleep=1&return=third' ))
responses$first
#> [1] "superseded: duplicated stop_id\n"
responses$second
#> [1] "superseded: duplicated stop_id\n"
responses$third
#> [1] "third"
wq$stop()
Custom
Anywhere you provide a function (job)
, you can examine
job
and req
(<job>$req
), and call
job$stop()
as needed.
IP Filter
To ignore requests from certain IP addresses:
ip_check <- function (job) {
ip <- job$req$REMOTE_ADDR
if (!startsWith(ip, '192.168.'))
job$stop(paste('Unauthorized IP Address:', ip))
}
wq <- webqueue(handler, hooks = list(created = ip_check))
fetch('http://localhost:8080')
#> [1] "interrupt: Unauthorized IP Address: 127.0.0.1\n"
wq$stop()
Note: in reality, you’d want to use webqueue(onHeaders)
to do this particular task more efficiently.
Limit Concurrency
Once the job is assigned to a jobqueue
,
you can access the list of all job
s
that are currently queued or running.
Here, we’ll refuse to add more than 2 job
s
to the jobqueue
at once.
qlimit <- function (job) {
if (length(job$queue$jobs) > 2)
job$stop('Too many concurrent jobs.')
}
wq <- webqueue(handler, hooks = list(queued = qlimit))
# Fetch three URLs at the same time.
dput(fetch(
'http://localhost:8080?sleep=1',
'http://localhost:8080?sleep=1',
'http://localhost:8080?sleep=1' ))
#> c("hello", "hello", "interrupt: Too many concurrent jobs.\n")
wq$stop()
Stop Other Jobs
Suppose an admin needs to stop all job
s
for a particular user.
stop_user <- function (job) {
stop_uid <- job$req$ARGS$stop
if (!is.null(stop_uid)) {
for (j in job$queue$jobs)
if (j$req$ARGS$uid == stop_uid)
j$stop('Stopped by admin.')
job$output <- 'done'
}
}
wq <- webqueue(handler, hooks = list(queued = stop_user))
# Fetch three URLs at the same time.
responses <- fetch(
'http://localhost:8080?uid=1&sleep=10',
'http://localhost:8080?uid=1&sleep=10',
'http://localhost:8080?uid=2&stop=1' )
responses[[1]]
#> [1] "interrupt: Stopped by admin.\n"
responses[[2]]
#> [1] "interrupt: Stopped by admin.\n"
responses[[3]]
#> [1] "done"
wq$stop()