An interprocess message queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string or path to an RDS file.
Arguments
- name
Unique ID. Alphanumeric, starting with a letter.
- assert
Apply an additional constraint.
'create'- Error if the message queue already exists.'exists'- Error if the message queue doesn't exist.NULL- No constraint; create the message queue if it doesn't exist.
- max_count
The maximum number of messages that can be stored in the queue at the same time. Attempting to send additional messages will cause
send()to block or returnFALSE. Ignored if the message queue already exists.- max_nchar
The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the message queue already exists.
- cleanup
Remove the message queue when the R session exits. If
FALSE, the message queue will persist until$remove()is called or the operating system is restarted.- file
Use a hash of this file/directory path as the message queue name. The file itself will not be read or modified, and does not need to exist.
- data
A
msg_queueobject.- expr
Expression to evaluate if a message is received. The message can be accessed by
.in this context. See examples.- alt_expr
Expression to evaluate if
timeout_msis reached.- timeout_ms
Maximum time (in milliseconds) to block the process while waiting for the operation to succeed. Use
0orInfto return immediately or only when successful, respectively.- ...
Not used.
Value
msg_queue() returns a msg_queue object with the following methods:
$nameReturns the message queue's name (scalar character).
$send(msg, timeout_ms = Inf, priority = 0)Returns
TRUEon success, orFALSEif the timeout is reached.msg: The message (scalar character) to add to the message queue.priority: Higher priority messages will be retrieved from the message queue first.0= lowest priority; integers only.
$receive(timeout_ms = Inf)Returns the next message from the message queue, or
NULLif the timeout is reached.
$count()Returns the number of messages currently in the message queue.
$max_count()Returns the maximum number of messages the queue can hold.
$max_nchar()Returns the maximum number of characters per message.
$remove()Returns
TRUEif the message queue was successfully deleted from the operating system, orFALSEon error.
with() returns eval(expr) on success; eval(alt_expr) otherwise.
Examples
mq <- interprocess::msg_queue()
print(mq)
#> <msg_queue> "Ab3qdfGwf21"
mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()
#> [1] 1
mq$receive()
#> [1] "my favorite number is 60"
mq$receive(timeout_ms = 0)
#> NULL
mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "got message: The Matrix has you..."
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "no messages"
mq$remove()
