The goal of webqueue is to process HTTP requests on interruptible background processes.
Use cases:
Prevent user-submitted jobs from excessively hogging compute resources.
Stop tasks that are no longer needed.
Installation
# Install the latest stable version from CRAN:
install.packages("webqueue")
# Or the development version from GitHub:
install.packages("pak")
pak::pak("cmmr/webqueue")
Query Parameters
wq <- webqueue(~{ jsonlite::toJSON(.$ARGS) })
cat(fetch('http://localhost:8080?myvar=123'))
#> {"myvar":["123"]}
wq$stop()
Accepts both GET and POST parameters.
Simple API
wq <- webqueue(
handler = function (req) {
switch(
EXPR = req$PATH_INFO,
'/date' = date(),
'/req' = ls.str(as.list(req)),
'/args' = req$ARGS,
'/cpus' = as.numeric(parallelly::availableCores()),
404L )
})
fetch('http://localhost:8080/cpus')
#> [1] "6"
cat(fetch('http://localhost:8080/req?x=123&id=ABC'))
#> ARGS : List of 2
#> $ x : chr "123"
#> $ id: chr "ABC"
#> COOKIES : Named list()
#> HEADERS : List of 1
#> $ host: chr "localhost:8080"
#> PATH_INFO : chr "/req"
#> REMOTE_ADDR : chr "127.0.0.1"
#> REQUEST_METHOD : chr "GET"
#> SERVER_NAME : chr "127.0.0.1"
#> SERVER_PORT : chr "8080"
wq$stop()
Interrupting Requests
The strength of webqueue
is its ability to cleanly abort a request at any point.
See vignette('interrupts')
for more detailed examples.
Merge duplicate requests
# `copy_id` will be "/a" or "/b"
wq <- webqueue(
handler = function (req) { Sys.sleep(1); req$ARGS$x },
copy_id = function (job) { job$req$PATH_INFO } )
# Fetch three URLs at the same time. "/b" path is merged.
dput(fetch(
'http://localhost:8080/a?x=first',
'http://localhost:8080/b?x=second',
'http://localhost:8080/b?x=third' ))
#> c("first", "second", "second")
wq$stop()
Stop duplicate requests
# `stop_id` will be "/a" or "/b"
wq <- webqueue(
handler = function (req) { Sys.sleep(1); req$ARGS$x },
stop_id = function (job) { job$req$PATH_INFO } )
# Fetch three URLs at the same time. "/b" path is stopped.
dput(fetch(
'http://localhost:8080/a?x=first',
'http://localhost:8080/b?x=second',
'http://localhost:8080/b?x=third' ))
#> c("first", "superseded: duplicated stop_id\n", "third")
wq$stop()