An interprocess queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string, path to an RDS file, or a simple command like 'exit'.
Arguments
- name
Unique ID. Alphanumeric, starting with a letter.
- assert
Apply an additional constraint.
'create'
- Error if the queue already exists.'exists'
- Error if the queue doesn't exist.NULL
- No constraint; create the queue if it doesn't exist.
- max_count
The maximum number of messages that can be stored in the queue at the same time. Attempting to send additional messages will cause
send()
to block or returnFALSE
. Ignored if the queue already exists.- max_nchar
The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the queue already exists.
- cleanup
Remove the queue when the R session exits. If
FALSE
, the queue will persist until$remove()
is called or the operating system is restarted.- file
Use a hash of this file/directory path as the queue name. The file itself will not be read or modified, and does not need to exist.
- data
A
queue
object.- expr
Expression to evaluate if a message is received. The message can be accessed by
.
in this context. See examples.- alt_expr
Expression to evaluate if
timeout_ms
is reached.- timeout_ms
Maximum time (in milliseconds) to block the process while waiting for the operation to succeed. Use
0
orInf
to return immediately or only when successful, respectively.- ...
Not used.
Value
queue()
returns a queue
object with the following methods:
$name
Returns the message queue's name (scalar character).
$send(msg, timeout_ms = Inf, priority = 0)
Returns
TRUE
on success, orFALSE
if the timeout is reached.msg
: The message (scalar character) to add to the message queue.priority
: Higher priority messages will be retrieved from the queue first.0
= lowest priority; integers only.
$receive(timeout_ms = Inf)
Returns the next message from the queue, or
NULL
if the timeout is reached.
$count()
Returns the number of messages currently in the queue (scalar integer).
$max_count()
Returns the maximum number of messages the queue can hold (scalar integer).
$max_nchar()
Returns the maximum number of characters per message (scalar integer).
$remove()
Returns
TRUE
on success, orFALSE
on error.
with()
returns eval(expr)
on success; eval(alt_expr)
otherwise.
Examples
mq <- interprocess::queue()
print(mq)
#> <queue> "acHgb4k2HOb8Nj"
mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()
#> [1] 1
mq$receive()
#> [1] "my favorite number is 8"
mq$receive(timeout_ms = 0)
#> NULL
mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "got message: The Matrix has you..."
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "no messages"
mq$remove()