Skip to contents

Overview

Interrupts in webqueue fall into three categories:

  1. Timeout - set before the request starts processing.
  2. Replacement - a later request replaces an earlier one.
  3. Custom - your code calls <job>$stop().

Requests vs Jobs

A request (req) is an environment with the data sent from a web browser to the web server. A job is a jobqueue::job_class R6 object containing a req (<job>$req) and the parameters for processing it.

Some useful elements of each are:

  • req$ARGS
  • req$COOKIES
  • req$PATH_INFO
  • req$REMOTE_ADDR
  • job$result
  • job$timeout
  • job$on()
  • job$stop()

Interrupts are always defined at the job level.

Setup

library(webqueue)

# By default, returns "hello" immediately.
# Adding '?return=myvalue' will return "myvalue" instead.
# Adding '?sleep=2' will wait 2 seconds before returning.
handler <- function (req) {
  if (!is.null(req$ARGS$sleep))  Sys.sleep(req$ARGS$sleep)
  if (!is.null(req$ARGS$return)) req$ARGS$return else 'hello'
}

To interrupt a job from within handler, call stop(), rlang::abort(), or cli::cli_abort() as usual, or return (webqueue::response()) to send a more informative message back.

Timeout

Basic

Let’s start by limiting all jobs to 1 second.

wq <- webqueue(handler, timeout = 1)

fetch('http://localhost:8080')
#> [1] "hello"

fetch('http://localhost:8080?return=hi')
#> [1] "hi"

fetch('http://localhost:8080?sleep=3')
#> [1] "timeout: total runtime exceeded 1 second\n"

wq$stop()

Per State

Setting timeout = 1 is shorthand for timeout = c(total = 1), which starts the clock as soon as the job is created. If these jobs are likely to wait a long time before running, you might consider setting timeout = c(running = 1) which starts the clock when the job actually begins running. Or, set timeout = c(running = 1, queued = 60) to also set a limit to how long a job can spend waiting in the queue.

See the jobqueue::job_class reference page for information on all standard job states.

wq <- webqueue(handler, timeout = c(queued = 1))

# spend 3 seconds in 'running' state
fetch('http://localhost:8080?sleep=3')
#> [1] "hello"

wq$stop()

Per Request

Perhaps some pages on your website need a different time limit.

timeout <- function (job) {
  ifelse(job$req$PATH_INFO == '/compute', 5, 1)
}

wq <- webqueue(handler, timeout = timeout)

fetch('http://localhost:8080/compute?sleep=3')
#> [1] "hello"

fetch('http://localhost:8080?sleep=3')
#> [1] "timeout: total runtime exceeded 1 second\n"

wq$stop()

Replacement

Use the stop_id field to only run the most recent request with a given hash.

For instance, if you have a session id (id) you can use that as the request hash.

wq <- webqueue(
  handler = handler, 
  stop_id = function (job) { job$req$ARGS$id } )

# Fetch three URLs at the same time.
responses <- fetch(
  first  = 'http://localhost:8080?id=1&sleep=1&return=first',
  second = 'http://localhost:8080?id=1&sleep=1&return=second',
  third  = 'http://localhost:8080?id=1&sleep=1&return=third' ))

responses$first
#> [1] "superseded: duplicated stop_id\n"

responses$second
#> [1] "superseded: duplicated stop_id\n"

responses$third
#> [1] "third"

wq$stop()

Cancel

You can also send a dummy request with the appropriate stop_id hash to cancel an actual request.

Custom

Anywhere you provide a function (job), you can examine job and req (<job>$req), and call job$stop() as needed.

IP Filter

To ignore requests from certain IP addresses:

ip_check <- function (job) {
  ip <- job$req$REMOTE_ADDR
  if (!startsWith(ip, '192.168.'))
    job$stop(paste('Unauthorized IP Address:', ip))
}

wq <- webqueue(handler, hooks = list(created = ip_check))

fetch('http://localhost:8080')
#> [1] "interrupt: Unauthorized IP Address: 127.0.0.1\n"

wq$stop()

Note: in reality, you’d want to use webqueue(onHeaders) to do this particular task more efficiently.

Limit Concurrency

Once the job is assigned to a jobqueue, you can access the list of all jobs that are currently queued or running.

Here, we’ll refuse to add more than 2 jobs to the jobqueue at once.

qlimit <- function (job) {
  if (length(job$queue$jobs) > 2)
    job$stop('Too many concurrent jobs.')
}

wq <- webqueue(handler, hooks = list(queued = qlimit))

# Fetch three URLs at the same time.
dput(fetch(
  'http://localhost:8080?sleep=1',
  'http://localhost:8080?sleep=1',
  'http://localhost:8080?sleep=1' ))
#> c("hello", "hello", "interrupt: Too many concurrent jobs.\n")

wq$stop()

Stop Other Jobs

Suppose an admin needs to stop all jobs for a particular user.

stop_user <- function (job) {

  stop_uid <- job$req$ARGS$stop
  if (!is.null(stop_uid)) {
  
    for (j in job$queue$jobs)
      if (j$req$ARGS$uid == stop_uid)
        j$stop('Stopped by admin.')
    
    job$output <- 'done'
  }
}

wq <- webqueue(handler, hooks = list(queued = stop_user))

# Fetch three URLs at the same time.
responses <- fetch(
  'http://localhost:8080?uid=1&sleep=10',
  'http://localhost:8080?uid=1&sleep=10',
  'http://localhost:8080?uid=2&stop=1' )

responses[[1]]
#> [1] "interrupt: Stopped by admin.\n"

responses[[2]]
#> [1] "interrupt: Stopped by admin.\n"

responses[[3]]
#> [1] "done"

wq$stop()