Skip to contents

An interprocess queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string, path to an RDS file, or a simple command like 'exit'.

Usage

queue(
  name = uid(),
  assert = NULL,
  max_count = 100,
  max_nchar = 128,
  cleanup = FALSE,
  file = NULL
)

# S3 method for class 'queue'
with(data, expr, alt_expr = NULL, timeout_ms = Inf, ...)

Arguments

name

Unique ID. Alphanumeric, starting with a letter.

assert

Apply an additional constraint.

  • 'create' - Error if the queue already exists.

  • 'exists' - Error if the queue doesn't exist.

  • NULL - No constraint; create the queue if it doesn't exist.

max_count

The maximum number of messages that can be stored in the queue at the same time. Attempting to send additional messages will cause send() to block or return FALSE. Ignored if the queue already exists.

max_nchar

The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the queue already exists.

cleanup

Remove the queue when the R session exits. If FALSE, the queue will persist until $remove() is called or the operating system is restarted.

file

Use a hash of this file/directory path as the queue name. The file itself will not be read or modified, and does not need to exist.

data

A queue object.

expr

Expression to evaluate if a message is received. The message can be accessed by . in this context. See examples.

alt_expr

Expression to evaluate if timeout_ms is reached.

timeout_ms

Maximum time (in milliseconds) to block the process while waiting for the operation to succeed. Use 0 or Inf to return immediately or only when successful, respectively.

...

Not used.

Value

queue() returns a queue object with the following methods:

  • $name

    • Returns the message queue's name (scalar character).

  • $send(msg, timeout_ms = Inf, priority = 0)

    • Returns TRUE on success, or FALSE if the timeout is reached.

    • msg: The message (scalar character) to add to the message queue.

    • priority: Higher priority messages will be retrieved from the queue first. 0 = lowest priority; integers only.

  • $receive(timeout_ms = Inf)

    • Returns the next message from the queue, or NULL if the timeout is reached.

  • $count()

    • Returns the number of messages currently in the queue (scalar integer).

  • $max_count()

    • Returns the maximum number of messages the queue can hold (scalar integer).

  • $max_nchar()

    • Returns the maximum number of characters per message (scalar integer).

  • $remove()

    • Returns TRUE on success, or FALSE on error.

with() returns eval(expr) on success; eval(alt_expr) otherwise.

Examples


mq <- interprocess::queue()
print(mq)
#> <queue> "acHgb4k2HOb8Nj"

mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()
#> [1] 1

mq$receive()
#> [1] "my favorite number is 8"
mq$receive(timeout_ms = 0)
#> NULL

mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "got message: The Matrix has you..."
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
#> [1] "no messages"

mq$remove()