Overview
Interrupts in webqueue fall into three categories:
- Timeout - set before the request starts processing.
- Replacement - a later request replaces an earlier one.
-
Custom - your code calls a Job’s
$stop()
method.
Requests vs Jobs
A request (req
) is an environment with the data sent
from a web browser to the web server. A Job (job
) is a jobqueue::Job
R6 object containing a single req
along with the
parameters for processing it.
Some useful elements of each are:
|
|
Interrupts are always defined at the Job level.
Setup
library(webqueue)
library(RCurl)
handler <- function (req) {
args <- req$ARGS # POST/GET parameters
if (!is.null(args$s)) Sys.sleep(args$s) # sleep for `s` seconds
if (!is.null(args$x)) return (args$x) # return `x` if present
return ('hello') # default to 'hello'
}
To interrupt a job from within handler
, call
stop()
or cli_abort()
as usual, or
return (webqueue::response())
to send a more informative
message back.
Timeout
Per State
Setting timeout = 1
is shorthand for
timeout = c(total = 1)
, which starts the clock as soon as
the job is created. If these jobs are likely to wait a long time before
running, you might consider setting
timeout = c(running = 1)
which starts the clock when the
job actually begins running. Or, set
timeout = c(running = 1, queued = 60)
to also set a limit
to how long a job can spend waiting in the queue.
See the jobqueue::Job reference page for information on all standard Job states.
Replacement
Use the stop_id
field to only run the most recent
request with a given hash.
For instance, if you have a session id (sid
) you can use
that as the request hash.
stop_id <- function (job) { job$req$ARGS$sid }
wq <- WebQueue$new(handler, stop_id = stop_id)
# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=first') })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=second') })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=third') })
r1$result
#> [1] "superseded: duplicated stop_id\n"
r2$result
#> [1] "superseded: duplicated stop_id\n"
r3$result
#> [1] "third"
jq$stop()
wq$stop()
Custom
Anywhere you provide a function (job)
, you can examine
the Job and request, and call job$stop()
as needed.
IP Filter
To ignore requests from certain IP addresses:
ip_check <- function (job) {
ip <- job$req$REMOTE_ADDR
if (!startsWith(ip, '192.168.'))
job$stop(paste('Unauthorized IP Address:', ip))
}
wq <- WebQueue$new(handler, hooks = list(created = ip_check))
getURL('http://localhost:8080')
#> [1] "interrupt: Unauthorized IP Address: 127.0.0.1\n"
wq$stop()
Note: in reality, you’d want to use
webqueue::WebQueue$new(onHeaders)
to do this particular
task more efficiently.
Queue Limit
Once the job is assigned to a Queue, you can access the list of all jobs that are currently queued or running.
Here, we’ll refuse to add more than 2 jobs to the queue at once.
qlimit <- function (job) {
if (length(job$queue$jobs) > 2)
job$stop('Queue is too full.')
}
wq <- WebQueue$new(handler, hooks = list(queued = qlimit))
# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })
r1$result
#> [1] "hello"
r2$result
#> [1] "hello"
r3$result
#> [1] "interrupt: Queue is too full.\n"
jq$stop()
wq$stop()
Stop Other Jobs
Suppose an admin needs to stop all jobs for a particular user.
stop_user <- function (job) {
stop_uid <- job$req$ARGS$stop
if (!is.null(stop_uid)) {
for (j in job$queue$jobs)
if (j$req$ARGS$u == stop_uid)
j$stop('Stopped by admin.')
job$output <- 'done'
}
}
wq <- WebQueue$new(handler, hooks = list(queued = stop_user))
# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10') })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10') })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?u=2&stop=1') })
r1$result
#> [1] "interrupt: Stopped by admin.\n"
r2$result
#> [1] "interrupt: Stopped by admin.\n"
r3$result
#> [1] "done"
jq$stop()
wq$stop()