Architecture

This document describes the key components of LuaRadio and how they interact.

Table of Contents

Overview

Block Model

Blocks provide a process(...) method that accepts vectors of input samples as arguments, processes them, and returns vectors of output samples. Blocks may retain state.

Input sample vectors must be processed in their entirety by blocks in process(...). Samples across multiple inputs are synchronized, meaning that the multiple input vectors are always of the same length, and samples across the input vectors all correspond to the same timestep.

Output sample vectors may be produced by blocks asynchronously, meaning that there is no requirement on the amount of output samples produced, or on which call to process(...) that output samples are produced.

Serialization

Vectors of samples are serialized between output and input ports of blocks over anonymous UNIX sockets.

Vectors of CStruct data type samples are serialized raw, in their native memory representation, with no marshalling or unmarshalling.

Vectors of Object data type samples are marshalled and unmarshalled with MessagePack and serialized as byte strings, preceded by their length.

Concurrency

LuaRadio uses multi-processing. Every block is run in its own process under its own Lua state.

Blocks do not share memory. Blocks use IPC to serialize samples to other blocks and communicate with applications.

Memory

Memory for input samples is allocated once. A persistent buffer is allocated for each block input port, where raw input samples are read into. The raw input samples are cast into read-only vectors, which are provided as the arguments to the block’s process(...) method.

Memory for output samples is typically allocated once by each block. Blocks are responsible for managing their own output sample memory. Most blocks create a persistent output sample vector in their initialize() method, and resize it as needed in process(...) to store the computed output samples. However, this is not strictly enforced by the framework.

Blocks that do reuse output sample vectors have constant memory usage in the steady state. The output sample vectors approach a stable size, as they are resized in process(...) to accommodate the inputs vectors the block consumes.

All objects in LuaRadio are garbage collected, although this mainly applies to transient objects. The input and output sample memory in the situations described above is anchored throughout the processing lifetime of the block.

Areas of Improvement

Moving a vector of samples between blocks has an overhead of two copies: one from writing it to the UNIX socket in the producing block, and one from reading it from the UNIX socket in the consuming block. This overhead could be reduced to one copy, by changing the sample transport from UNIX sockets to a shared memory circular buffer (not unlike GNU Radio).

The overhead could be further reduced to zero copies, if the vectors for output samples were not allocated by the block, but instead pointers into the persistent shared memory circular buffer (also not unlike GNU Radio) that is shared between the output and input port. Implementing this would likely require a different process(...) signature that includes the outputs vectors as arguments, and a new resizing mechanism for output vectors.

Multi-processing instead of multi-threading for concurrency incurs some memory and CPU overhead from requiring a process for each block. However, multi-threading poses other issues, like sharing instantiated and initialized block state of the parent thread with the Lua state of each child thread. This problem is addressed in the multi-processing architecture with forking.

Block processes currently have a slightly larger memory footprint than necessary, as other blocks and their initialized objects, e.g. buffers allocated in initialize(), are not yet released after forking. This can be addressed by pruning references to all unneeded blocks and associated connectivity after forking.

Blocks cannot be manipulated at runtime, e.g. modifying their attributes or calling methods on them, since each block runs in an independent Lua state. This could be worked around by implementing RPC for these attributes and calls, but this would add substantial complexity.

Concepts

Data Types

Samples are typed by special data types that implement the necessary interface to be serialized and deserialized between blocks. These data types can either be CStruct types, which are backed by a C structure, or Object types, which are backed by a Lua object.

CStruct types are serialized as raw contiguous samples between blocks in their native memory representation, with no marshalling and unmarshalling. These types must be of a fixed size, so the sample boundaries are well-defined in the stream.

Object types are serialized as MessagePack marshalled bytes between blocks. These types can be variable size, may nest other Lua types, and may have optional members.

LuaRadio has four basic types, all of which are CStruct types. These are the ComplexFloat32, Float32, Bit, and Byte types.

They are backed by the following C structure types:

typedef struct {
    float real;
    float imag;
} complex_float32_t;

typedef struct {
    float value;
} float32_t;

typedef struct {
    uint8_t value;
} bit_t;

typedef struct {
    uint8_t value;
} byte_t;

The use of a structure to back CStruct types, rather than the raw C type (e.g. float), is for implementation reasons. It allows the framework to associate a metatable with the type using LuaJIT FFI library’s ffi.metatype(), to bind useful metamethods and methods to all instances of the type. It also makes those instances distinct from other occurrences of the underlying data types (e.g. float32_t vs float).

Users can define their own CStruct and Object types and bind methods to them. See the Creating Blocks guide for more details and examples.

Code reference: CStruct class, Object class.

Vectors

It would be inefficient for blocks to process one sample at a time, as the overhead of serializing the sample and calling the block to process it would exceed the cost of processing it. Instead, blocks operate on a vector of samples at a time, to amortize the overhead of serialization.

Vectors are dynamic arrays of a CStruct or Object data type. Blocks get their inputs as vectors, and return their outputs as vectors.

Each data type provides two static methods for creating a vector of itself: .vector(num) for a zero-initialized vector, or .vector_from_array(arr) for a vector initialized from a Lua array. For example:

-- ComplexFloat32 vector of length 16
local vec = radio.types.ComplexFloat32.vector(16)

-- Byte vector of length 10
local vec = radio.types.Byte.vector(10)

-- Float32 vector of length 3 from array initializer
local vec = radio.types.Float32.vector_from_array({1.0, 2.0, 3.0})

Vectors can be resized and appended to:

local vec = radio.types.Byte.vector(10)
print(vec.length) --> 10

vec:resize(5)
print(vec.length) --> 5

vec:append(radio.types.Byte(0xAA))
print(vec.length) --> 6

Vectors of CStruct typed samples are laid out contiguously in memory. The array of samples is available under the .data member, and its length under the .length member. These samples can be modified directly in Lua, or can be passed to an external library for processing.

Resizing a CStruct typed vector only causes a re-allocation when it is grown to a larger size. The underlying buffer is retained on resizing to a smaller size; just the bookkeeping is updated. This allows blocks that reuse vectors for output samples to approach constant memory usage, as the underlying memory of the vectors will reach a stable size for the inputs the block consumes.

All CStruct typed vectors are allocated with page alignment, to enable processing with libraries that require, or perform better with, aligned buffers. This is often because SIMD operations are involved.

Vectors of Object typed samples are stored in a Lua array, but provide a compatible interface to CStruct typed vectors. These vectors cannot be passed to external libraries.

Code reference: Vector and ObjectVector classes.

Blocks

Blocks are classes derived from radio.block.Block that implement their functionality in the following methods:

  • instantiate(...) — constructor
  • initialize() — initialization (optional)
  • process(...) — main work method
  • cleanup() — clean up (optional)

The role of the instantiate() constructor is to establish the basic state of the block and to register its type signatures, which specify the block’s input/output port names and types.

The initialize() method is called after the block has been connected in a flow graph and differentiated. It allows the block to perform additional initialization based on its differentiated type signature and its sample rate.

The process() method is the main work method of the block. It receives input vectors of samples as arguments, and returns output vectors of samples. This method is called repeatedly by the framework to process inputs into outputs.

The cleanup() method is called by the framework when the flow graph has collapsed, for additional clean up of resources.

Source blocks and blocks that modify the sample rate must implement the get_rate() method, which returns the source’s sample rate as a number, in samples per second.

Code reference: Block class.

Type Signatures

A type signature is a description of the input/output port names and data types of a block. LuaRadio blocks can support multiple type signatures, all of which must share the same input/output count and names, but may differ in data types. Blocks register their type signatures in their constructor, so that they can be connected into a flow graph after they are instantiated. The framework selects the correct type signature in its differentiation phase, described in its section below.

Code reference: Block class.

Instantiation

The instantiate(...) method is called whenever a block is instantiated by name. This method takes the arguments passed to the block on instantiation.

Blocks must register type signatures with the add_type_signature() method in their instantiate() constructor. This method takes an array of input port descriptors, followed by an array of output port descriptors. Each port descriptor specifies a name and a data type. For example, an AddBlock that supports both complex-valued and real-valued inputs would register:

function AddBlock:instantiate()
    self:add_type_signature({radio.block.Input("in", radio.types.Float32}),
                            {radio.block.Output("out", radio.types.Float32)})
    self:add_type_signature({radio.block.Input("in", radio.types.ComplexFloat32}),
                            {radio.block.Output("out", radio.types.ComplexFloat32)})
end

Source and sink blocks may specify empty arrays for inputs or outputs, respectively, when they add type signatures. Otherwise, sources and sinks are implemented largely the same way as other blocks.

The add_type_signature() method can also be used to specify different process() and initialize() methods to type signatures, that are bound to the block on differentiation. See the Creating Blocks guide for examples.

Connection

After a block is instantiated, it can be connected into a flow graph under a CompositeBlock. This is described in more detail in the Composite Blocks section below.

When the add_type_signature() method is called in a block’s instantiate() constructor, it builds PipeInput and PipeOutput containers for each specified input and output port of the block, and stores them in arrays under the .inputs and .outputs members. These containers are the actual “ports” connected in a flow graph. They contain a name and block owner, and are later populated with their concrete data type and a shared Pipe object.

The act of connecting two blocks in a flow graph is registering the block’s PipeInput instance as the key and the PipeOutput instance as a value in a hash table owned by a CompositeBlock, thereby building a graph of input and output connections.

When a connection is made, a Pipe object is also created and registered under both PipeInput and PipeOutput containers of the connected blocks. This Pipe object provides an interface for the serialization and deserialization of sample vectors between blocks. The Pipe is how a block reads or writes vectors of samples from or to another block.

Code reference: Block class, PipeInput, PipeOutput, Pipe classes.

Differentiation

When a flow graph is run, each block in the flow graph is first differentiated into a compatible type signature. This differentiation starts at the source blocks and ends at the sink blocks, and is carried out in a downstream order.

A block is differentiated by its input types, using the differentiate() method. This method takes an array of input data types and differentiates the block into a compatible type signature, by finding a registered type signature with matching input data types. The method raises an error if a compatible type signature is not found.

Type signatures may also specify a function predicate instead of a concrete data type for input ports. For example, the JSONSink does this to accept any data type that implements to_json(). In those cases, differentiate() calls the function predicate with the input type and expects a boolean result to indicate if the input type is compatible.

The result of differentiation is that the PipeInput and PipeOutput ports in the block’s .inputs and .outputs take on the concrete data types specified in the selected type signature, and the block’s initialize() and process() methods are bound to the ones specified in the type signature. The concrete data types are available to the block with the get_input_type() and get_output_type() methods.

Since a block is differentiated by its input types, it cannot have multiple type signatures that share the same input types, as this would cause ambiguity.

After differentiation, the block’s output types are well defined, and can then be used in the differentiation of downstream blocks.

local multiply = radio.MultiplyBlock()

-- Differentiate into the complex-valued flavor of MultiplyBlock
multiply:differentiate({radio.types.ComplexFloat32, radio.types.ComplexFloat32})

Code reference: Block class.

Initialization

The initialization phase takes place after the differentiation phase in running a flow graph. In this phase, every block’s initialize() method is called, starting at the source blocks and ending at the sink blocks, in a downstream order.

In the initialize() method, the block can perform data type dependent initialization with the get_input_type() and get_output_type() methods, which return the differentiated input and output types of the specified port index, respectively.

The block can also perform sample rate dependent initialization with get_rate(), which recursively calls get_rate() on upstream blocks in the flow graph to determine the sample rate. Blocks may modify the sample rate for downstream blocks by overriding this method, and source blocks are required to implement it to return a concrete value.

Most blocks will create persistent output sample vectors in initialize(), to be used by process(...). This allows blocks to efficiently produce new samples without excessive allocations and deallocations.

Code reference: Block class.

Processing

The block’s process() method is called repeatedly in the running phase of a flow graph to process inputs into outputs.

This method receives a set of input vectors as arguments, corresponding to the input ports it defined in its type signatures. These inputs are immutable, read-only vectors and are all of the same length. The process() method is responsible for computing output vectors from these inputs and any block state, and returning the output vectors in the order corresponding to the output ports it defined in its type signatures.

Blocks are responsible for managing their output sample vectors. Most blocks allocate persistent output sample vectors in their initialize() method, and then resize, populate, and return them in process(...). Since vector resizes only cause re-allocation when they are grown, the underlying memory of output vectors approach a stable size as the vector is resized to accommodate the block’s inputs.

For example, the process method for adding two inputs might look like:

function AddBlock:process(x, y)
    local out = self.out:resize(x.length)

    for i = 0, x.length-1 do
        out.data[i] = x.data[i] + y.data[i]
    end

    return out
end

Running

After a flow graph is connected, differentiated, and initialized, every block is run concurrently.

While running, each block’s inputs share a Pipe object with another block’s output. A block output may have multiple Pipe objects to several different block inputs, but every block input only has one Pipe object.

The Pipe object provides an interface for the serialization and deserialization of sample vectors between blocks. The read() method reads a vector from the pipe. The write() method writes a vector to the pipe.

A block is run with its run() method, which is a loop that repeatedly reads input pipes into an array of vectors, calls process(...) with these input vectors, and writes the resulting array of vectors to the output pipes.

Code reference: Block class.

Termination

A block runs indefinitely in its run() method, until a read() on one of the input Pipe objects returns nil. This indicates that the upstream block closed its output Pipe and that the flow graph is collapsing.

When an input Pipe returns nil, the block breaks its main run loop, calls cleanup(), and then closes its output pipes. This causes the downstream blocks to terminate similarly.

Sources that produce a finite number of samples will exit naturally after they have produced all of their samples, triggering the collapse of the flow graph. Sources that run indefinitely, on the other hand, can only be shutdown by the framework forcibly by the SIGINT signal.

A block shutdown does not mean samples are lost. Samples are buffered in the underlying implementation of the Pipe, even after the producing block has terminated, and the consuming block will only encounter the nil on an input Pipe after all of these samples have been consumed. This allows samples from a finite source to be processed through a flow graph to completion.

Code reference: Block class.

Composite Blocks

The CompositeBlock is a special block used to build and run flow graphs. It can either be used as a hierarchical block: a composition of blocks abstracted into one block with redefined inputs/outputs at its boundary, or as a top-level block: a composition of blocks forming a complete flow graph that can be run.

Hierarchical Block

A CompositeBlock used for a hierarchical block builds an internal flow graph by connecting block ports with the connect() method, just as a top-level block would. However, unlike top-level blocks, which have no boundary inputs or outputs, hierarchical blocks also specify a type signature with add_type_signature() for their boundary inputs and outputs.

When a CompositeBlock adds a type signature, instead of building the PipeInput and PipeOutput input and output ports under the .inputs and .outputs arrays as a normal block would, the CompositeBlock builds AliasedPipeInput and AliasedPipeOutput ports, respectively. These are special input and output ports that alias existing input and output ports inside a flow graph.

These aliases are established in calls to connect(), when boundary input/output ports are connected to the input/output ports of concrete blocks in the flow graph.

Code reference: CompositeBlock class.

Top-level Block

A CompositeBlock used for a top-level block builds a flow graph by connecting block ports with the connect() method. The data structure for the flow graph is a table that maps block PipeInput instances to PipeOutput instances. In other words, it is a hash table of input port to output port, for each connection between blocks in the graph.

Connection

When a connection is made with connect() on a top-level CompositeBlock, the PipeInput and PipeOutput ports are looked up in the blocks and added to the connections table, and a Pipe object is created and registered under both PipeInput and PipeOutput port instances.

If a hierarchical block is connected into a top-level block, then its AliasedPipeInput and AliasedPipeOutput ports are followed to the underlying real PipeInput and PipeOutput ports, and those are the input and output port instances registered in the flow graph. This means that the hierarchical block does not exist in a top-level flow graph; it’s just a convenience for composing blocks.

Code reference: CompositeBlock class.

Run Preparation

After the flow graph is connected under a top-level block, it can be run with start() or run(). Running a flow graph requires a few initialization steps before each block can begin to consume, process, and produce samples.

First, the CompositeBlock crawls its connections table and absorbs the internal connections table of any hierarchical blocks. Since hierarchical blocks were just connected at the boundary ports, the internal connections are not yet available to the top-level block and need to be absorbed.

Next, every block referenced in the connections table is checked for unconnected inputs. If an input is unconnected, the flow graph cannot be run and an error is raised.

The connections table data structure is then used to build an auxiliary list called the evaluation order. This is a list of all of the blocks in the flow graph, arranged in a downstream and dependency-free order. Each block in this list may depend on the outputs of previous blocks in the list, but does not depend on the outputs of any successive blocks. This order is needed to correctly differentiate the flow graph, because the output types of each block are fed as the input types to downstream blocks for differentiation.

The evaluation order is followed once to differentiate() each block, once to check that all of its input port sample rates match, and once to initialize() each block. If input port sample rates do not match, the flow graph is not run and an error is raised.

Finally, all of the Pipe objects in the flow graph are initialized. Pipes are backed by anonymous UNIX socket pairs created by socketpair().

The flow graph is now ready to run.

Code reference: CompositeBlock class.

Running

At this stage, the flow graph is fully validated, differentiated, initialized, and is ready to run.

The CompositeBlock first blocks SIGINT and SIGCHLD signals with sigprocmask(), so that it can later synchronously detect these signals with sigpending() in wait(). The CompositeBlock then calls fork() for each block.

The child process for each block closes all unneeded file descriptors, and calls the block’s main run() method. The child process spends the majority of its processing lifetime in this method. If this method returns naturally due to a flow graph collapse, the child exits with exit code 0. If it returns because of a runtime error, the child prints the error and backtrace to standard error, and exits with exit code 1.

The parent process closes all file descriptors associated with the Pipe objects it built, so that the blocks are the only owners of these connected files, and can close them to signal block termination.

The parent returns back to the top-level script, where it can use status() to get the running status of the flow graph, wait() to wait for the flow graph to finish, or stop() to stop the flow graph.

Code reference: CompositeBlock class.

Flow Graph Control

The wait() method waits for a SIGINT or SIGCHLD signal. If it gets a SIGINT signal, which indicates a user requested exit, it calls stop() to stop the flow graph. If it gets a SIGCHLD signal, which indicates a block exited, it waits on each block PID with waitpid() until the flow graph has fully collapsed, and then unblocks the SIGINT and SIGCHLD signals.

The stop() method kills all source blocks with SIGTERM, waits on each block PID with waitpid() until the flow graph has fully collapsed, and then unblocks the SIGINT and SIGCHLD signals.

The status() method checks if any block is still running with kill(<pid>, 0). If all blocks have exited, it waits on each block PID with waitpid(), and then unblocks the SIGINT and SIGCHLD signals.

Code reference: CompositeBlock class.