Skip to contents

An interprocess message queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string or path to an RDS file.

Usage

msg_queue(
  name = uid(),
  assert = NULL,
  max_count = 100,
  max_nchar = 128,
  cleanup = FALSE,
  file = NULL
)

# S3 method for class 'msg_queue'
with(data, expr, alt_expr = NULL, timeout_ms = Inf, ...)

Arguments

name

Unique ID. Alphanumeric, starting with a letter.

assert

Apply an additional constraint.

  • 'create' - Error if the message queue already exists.

  • 'exists' - Error if the message queue doesn't exist.

  • NULL - No constraint; create the message queue if it doesn't exist.

max_count

The maximum number of messages that can be stored in the queue at the same time. Attempting to send additional messages will cause send() to block or return FALSE. Ignored if the message queue already exists.

max_nchar

The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the message queue already exists.

cleanup

Remove the message queue when the R session exits. If FALSE, the message queue will persist until $remove() is called or the operating system is restarted.

file

Use a hash of this file/directory path as the message queue name. The file itself will not be read or modified, and does not need to exist.

data

A msg_queue object.

expr

Expression to evaluate if a message is received. The message can be accessed by . in this context. See examples.

alt_expr

Expression to evaluate if timeout_ms is reached.

timeout_ms

Maximum time (in milliseconds) to block the process while waiting for the operation to succeed. Use 0 or Inf to return immediately or only when successful, respectively.

...

Not used.

Value

msg_queue() returns a msg_queue object with the following methods:

  • $name

    • Returns the message queue's name (scalar character).

  • $send(msg, timeout_ms = Inf, priority = 0)

    • Returns TRUE on success, or FALSE if the timeout is reached.

    • msg: The message (scalar character) to add to the message queue.

    • priority: Higher priority messages will be retrieved from the message queue first. 0 = lowest priority; integers only.

  • $receive(timeout_ms = Inf)

    • Returns the next message from the message queue, or NULL if the timeout is reached.

  • $count()

    • Returns the number of messages currently in the message queue.

  • $max_count()

    • Returns the maximum number of messages the queue can hold.

  • $max_nchar()

    • Returns the maximum number of characters per message.

  • $remove()

    • Returns TRUE if the message queue was successfully deleted from the operating system, or FALSE on error.

with() returns eval(expr) on success; eval(alt_expr) otherwise.

See also

Other shared objects: mutex(), semaphore()

Examples


mq <- interprocess::msg_queue()
print(mq)
#> <msg_queue> "Ab2ZdeYaGr7"

mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()
#> [1] 1

mq$receive()
#> [1] "my favorite number is 8"
mq$receive(timeout_ms = 0)
#> NULL

mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "got message: The Matrix has you..."
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "no messages"

mq$remove()