Skip to contents

Overview

Interrupts in webqueue fall into three categories:

  1. Timeout - set before the request starts processing.
  2. Replacement - a later request replaces an earlier one.
  3. Custom - your code calls a Job’s $stop() method.

Requests vs Jobs

A request (req) is an environment with the data sent from a web browser to the web server. A Job (job) is a jobqueue::Job R6 object containing a single req along with the parameters for processing it.

Some useful elements of each are:

  • req$ARGS
  • req$COOKIES
  • req$PATH_INFO
  • req$REMOTE_ADDR
  • job$result
  • job$timeout
  • job$on()
  • job$stop()

Interrupts are always defined at the Job level.

Setup

library(webqueue)
library(RCurl)

handler <- function (req) {
  args <- req$ARGS                         # POST/GET parameters
  if (!is.null(args$s)) Sys.sleep(args$s)  # sleep for `s` seconds
  if (!is.null(args$x)) return (args$x)    # return `x` if present
  return ('hello')                         # default to 'hello'
}

To interrupt a job from within handler, call stop() or cli_abort() as usual, or return (webqueue::response()) to send a more informative message back.

Timeout

Basic

Let’s start by limiting all Jobs to 1 second.

wq <- WebQueue$new(handler, timeout = 1)

getURL('http://localhost:8080')
#> [1] "hello"

getURL('http://localhost:8080?x=hi')
#> [1] "hi"

getURL('http://localhost:8080?s=3')
#> [1] "timeout: total runtime exceeded 1 second\n"

wq$stop()

Per State

Setting timeout = 1 is shorthand for timeout = c(total = 1), which starts the clock as soon as the job is created. If these jobs are likely to wait a long time before running, you might consider setting timeout = c(running = 1) which starts the clock when the job actually begins running. Or, set timeout = c(running = 1, queued = 60) to also set a limit to how long a job can spend waiting in the queue.

See the jobqueue::Job reference page for information on all standard Job states.

wq <- WebQueue$new(handler, timeout = c(queued = 1))

getURL('http://localhost:8080?s=3')  # spends 3 seconds in 'running' state
#> [1] "hello"

wq$stop()

Per Request

Perhaps some pages on your website need a different time limit.

timeout <- function (job) {
  ifelse(job$req$PATH_INFO == '/compute', 5, 1)
}

wq <- WebQueue$new(handler, timeout = timeout)

getURL('http://localhost:8080/compute?s=3')
#> [1] "hello"

getURL('http://localhost:8080?s=3')
#> [1] "timeout: total runtime exceeded 1 second\n"

wq$stop()

Replacement

Use the stop_id field to only run the most recent request with a given hash.

For instance, if you have a session id (sid) you can use that as the request hash.

stop_id <- function (job) { job$req$ARGS$sid }

wq <- WebQueue$new(handler, stop_id = stop_id)

# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=first')  })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=second') })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=third')  })

r1$result
#> [1] "superseded: duplicated stop_id\n"

r2$result
#> [1] "superseded: duplicated stop_id\n"

r3$result
#> [1] "third"

jq$stop()
wq$stop()

Cancel

You can also send a dummy request with the appropriate stop_id hash to cancel an actual request.

Custom

Anywhere you provide a function (job), you can examine the Job and request, and call job$stop() as needed.

IP Filter

To ignore requests from certain IP addresses:

ip_check <- function (job) {
  ip <- job$req$REMOTE_ADDR
  if (!startsWith(ip, '192.168.'))
    job$stop(paste('Unauthorized IP Address:', ip))
}

wq <- WebQueue$new(handler, hooks = list(created = ip_check))

getURL('http://localhost:8080')
#> [1] "interrupt: Unauthorized IP Address: 127.0.0.1\n"

wq$stop()

Note: in reality, you’d want to use webqueue::WebQueue$new(onHeaders) to do this particular task more efficiently.

Queue Limit

Once the job is assigned to a Queue, you can access the list of all jobs that are currently queued or running.

Here, we’ll refuse to add more than 2 jobs to the queue at once.

qlimit <- function (job) {
  if (length(job$queue$jobs) > 2)
    job$stop('Queue is too full.')
}

wq <- WebQueue$new(handler, hooks = list(queued = qlimit))

# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') })

r1$result
#> [1] "hello"

r2$result
#> [1] "hello"

r3$result
#> [1] "interrupt: Queue is too full.\n"

jq$stop()
wq$stop()

Stop Other Jobs

Suppose an admin needs to stop all jobs for a particular user.

stop_user <- function (job) {

  stop_uid <- job$req$ARGS$stop
  if (!is.null(stop_uid)) {
  
    for (j in job$queue$jobs)
      if (j$req$ARGS$u == stop_uid)
        j$stop('Stopped by admin.')
    
    job$output <- 'done'
  }
}

wq <- WebQueue$new(handler, hooks = list(queued = stop_user))

# Fetch three URLs at the same time.
jq <- jobqueue::Queue$new(workers = 3L)
r1 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10')   })
r2 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10')   })
r3 <- jq$run({ RCurl::getURL('http://localhost:8080?u=2&stop=1') })

r1$result
#> [1] "interrupt: Stopped by admin.\n"

r2$result
#> [1] "interrupt: Stopped by admin.\n"

r3$result
#> [1] "done"

jq$stop()
wq$stop()