Skip to contents

The goal of interprocess is to synchronize concurrent R processes.

Allows R to access low-level operating system mechanisms for performing atomic operations on shared data structures. Mutexes provide shared and exclusive locks. Semaphores act as counters. Message queues move text strings from one process to another. All these interprocess communication (IPC) tools can optionally block with or without a timeout.

Works cross-platform, including Windows, MacOS, and Linux, and can be used to synchronize a mixture of R sessions and other types of processes if needed.

Implemented using the Boost C++ library.

Installation

# Install the latest stable version from CRAN:
install.packages("interprocess")

# Or the development version from GitHub:
install.packages("pak")
pak::pak("cmmr/interprocess")

Usage

Mutexes, semaphores, and message queues are managed by the operating system. Any process that knows the resource’s name can access it. Therefore, sharing the resource name (a short text string) amongst cooperating processes enables communication and synchronization.

These names can be any alphanumeric string that starts with a letter and is no longer than 250 characters. The mutex(), semaphore(), and queue() functions will default to generating a unique identifier, or you can provide a custom or pre-existing one with the name parameter. If the resource is associated with a specific file or directory, the file parameter can be used to derive a name from normalizing and hashing that path.

Setting cleanup = TRUE will automatically remove the resource when the R session exits. Otherwise, the resource will persist until $remove() is called or the operating system is restarted.

Mutexes

An exclusive lock is acquired by default. For a shared lock, use shared = TRUE.

tmp <- tempfile()
mut <- interprocess::mutex(file = tmp)

mut$name
#> [1] "S6qYUQK2SwA"

with(mut, writeLines('Important Data', tmp))

with(mut, shared = TRUE, readLines(tmp))
#> [1] "Important Data"

mut$remove()
unlink(tmp)

Semaphores

The semaphore’s value can be incremented with $post() and decremented with $wait(). The initial value can also be set in the constructor.

sem <- interprocess::semaphore('mySemaphore', value = 1)

sem$name
#> [1] "mySemaphore"

sem$post()

sem$wait(timeout_ms = 0)
#> [1] TRUE

sem$wait(timeout_ms = 0)
#> [1] TRUE

sem$wait(timeout_ms = 0)
#> [1] FALSE

sem$remove()

Message Queues

The constructor’s max_count and max_nchar parameters determine how much memory is allocated for the queue.

mq <- interprocess::queue(max_count = 2, max_nchar = 5)

mq$name
#> [1] "akRKb4iymMcAFV"

mq$send('Hello')
mq$send('Hi', priority = 1)

mq$receive(timeout_ms = 0)
#> [1] "Hi"

mq$receive(timeout_ms = 0)
#> [1] "Hello"

mq$receive(timeout_ms = 0)
#> [1] NULL

mq$remove()