A demultiplexer in Go - npat-efault/musings GitHub Wiki

Imagine you have a stream of packets (requests, messages) emitted by a producer. Let's say that each packet looks like this: :

type packet {
     sel int
     data []byte
}

Field sel caries a selector value that can range from 0 to N-1. Then you have N consumers, each waiting to process packets with a matching selector. That is, consumer 0 is waiting to process packets with selector == 0, consumer 1 packets with selector == 1, and so on. You want to demultiplex the stream of packets and send each to its appropriate consumer. It is an error to send a packet with selector == m to a consumer other than the m'th one (counting from 0).

Schematically the system can be depicted like this: :

+-----+   sel=0   +----+

sel= | C0 |

+-----+ 0...N | De- | +----+ | P ... +-----+ | | sel=N +----+ | CN | +-----+ +----+

Let's see how we could code something like this in Go(lang).

First let's write a simple test producer. Once every tick (user supplied period) our test producer emits a packet with a random sel field (in range[0..N)) on a channel. The code looks like this. :

type Packet struct {
        sel int
        id  int
}

We have replaced the data field of the packet with an integer id to make it easier to trace the packets in the program's printouts (the payload of the packet is, anyway, immaterial to our problem). Follows a type for the producer, and a New function that initializes things, starts the respective goroutine, and returns the newly created producer. :

type Producer1 struct {
    Out  chan Packet
    n    int          // Selector [0..n)
    id   int          // Packet id
    tick *time.Ticker // Packet ticker
    quit chan chan int
}

func NewProducer1(n int, every time.Duration) *Producer1 {
    p := &Producer1{n: n, id: 0}
    p.Out = make(chan Packet)
    p.tick = time.NewTicker(every)
    p.quit = make(chan chan int)
    go p.run()
    return p
}

Observe that we use a quit channel to signal the producer that it's time to quit. This is accomplished by calling the Quit() method. :

func (p *Producer1) Stop() int {
    r := make(chan int)
    p.quit <- r
    close(p.quit)
    return <-r
}

The producer replies to our quit request with the number of packets produced. What remains is the run() method which executes as the producer's goroutine and contains the trivial packet generation and emission logic. It basically goes like this: We wait for a tick. When the tick arrives we produce a packet and enable the "send to output channel" select clause, to send the packet. Once the packet is sent, we disable the send clause; it will be re-enabled when the next packet is produced. The send clause is enabled and disabled by setting the channel-typed local variable out to either p.Out, or nil, respectively (sends and receives to or from nil channels are never selected---this is a common idiom for enabling / disabling select clauses). If a new tick arrives before the packet is sent, the packet is replaced by the new one. This way the producer drops packets if pushed back by the consumer (if the consumer cannot keep up). We also constantly monitor the p.quit channel for termination requests. :

func (p *Producer1) run() {
    var out chan<- Packet
    var pck Packet

    // Initially, no packet to emit
    out = nil
    for {
        select {
        case <-p.tick.C:
            // Generate and emit packet.
            pck = Packet{sel: rand.Intn(p.n), id: p.id}
            p.id++
            out = p.Out
        case out <- pck:
            // Packet emitted.
            log.Printf("P : %03d/%d\n", pck.id, pck.sel)
            out = nil
        case r := <-p.quit:
            p.tick.Stop()
            close(p.Out)
            r <- p.id
            return
        }
    }
}

With the producer in place we need a similar consumer. Our consumer is keyed with a specific selector value, indicating what packets it should accept, and receives packets from an in channel. Instead of consuming packets instantly it simulates a packet processing delay. :

type Consumer1 struct {
    sel   int
    in    <-chan Packet
    delay time.Duration
    end   chan int
}

func NewConsumer1(sel int, in <-chan Packet,
    delay time.Duration) *Consumer1 {

    c := &Consumer1{sel: sel, in: in, delay: delay}
    c.end = make(chan int)
    go c.run()
    return c
}

func (c *Consumer1) Wait() int {
    return <-c.end
}

The logic that runs in the consumer goroutine is quite simple: Wait for a packet on channel in. Once a packet is received, if it doesn't match our keyed selector, drop it and go back waiting for the next. If the packet matches, delay a specified amount of time (to simulate the packet processing delay) and then (after the delay) go back waiting for the next packet. Stop when the input channel is closed. After stopping, send the number of packets received on the end channel. :

func (c *Consumer1) run() {
    var npck int

    for pck := range c.in {
        npck++
        if pck.sel != c.sel {
            log.Printf("C%d: %03d/%d: Bad selector!\n",
                c.sel, pck.id, pck.sel)
            continue
        }
        // Delay for "processing packet".
        <-time.After(c.delay)
        log.Printf("C%d: %03d/%d\n", c.sel, pck.id, pck.sel)
    }
    c.end <- npck
    close(c.end)
}

We can now connect the producer directly to the consumer to see if everything works. We can do it like this: :

p := NewProducer(5, 1*time.Second, 0)
c := NewConsumer1(0, p.Out, 1*time.Second)

// Let the system run for 30 seconds
<-time.After(30 * time.Second)
p.Stop()
c.Wait()

The producer produces packets with 5 possible selector values, so our consumer will reject most of them (all but those with sel == 0, which is keyed to it). Running the program will produce an output like this: :

17:35:47.958626 P : 000/1
17:35:47.958896 C0: 000/1: Bad selector!
17:35:48.958458 P : 001/1
17:35:48.958552 C0: 001/1: Bad selector!

...

17:35:54.958503 P : 007/0
17:35:55.958877 C0: 007/0
17:35:55.958963 P : 008/0
17:35:56.959350 C0: 008/0

With our simple consumer and producer in place, let's focus on the demultiplexer: the block that decides which packet should be routed to which consumer based on the packet's selector. Obviously, our demultiplexer should receive input packets from a channel (connected to the producer) and forward them to N channels, each connected to the respective consumer. Our Demux1 type could look like this: :

type Demux1 struct {
    Out []chan Packet // Output channels
    in  <-chan Packet
    end chan int
}

Field in is the input channel connected to the producer and Out is a slice of N output channels. A new Demux1 is created like this: :

func NewDemux1(n int, in <-chan Packet) *Demux1 {
    d := &Demux1{in: in}
    d.Out = make([]chan Packet, n)
    for i := 0; i < n; i++ {
        d.Out[i] = make(chan Packet)
    }
    d.end = make(chan int)
    go d.run()
    return d
}

The demultiplexing logic is in the run() method which executes as the demultiplexer's goroutine: We wait for a packet on channel d.in. Once the packet is received we first do a sanity check to see if its selector is within valid range (i.e. if it matches one of our output channels). If not, we just drop the packet and go back waiting for the next one. If the packet is ok, we select the appropriate output channel based on pck.sel, and send the packet to it. Once the packet has been sent we go back waiting for the next packet. We stop when the input channel is closed. After stopping, we close our output channels (to notify the consumers that it's time for them to stop) and send the number of packets received on the end channel. :

func (d *Demux1) run() {
    var npck int

    for pck := range d.in {
        npck++
        if pck.sel < 0 || pck.sel >= len(d.Out) {
            // Drop packet.
            log.Printf("D : %03d/%d: Bad selector!\n",
                pck.id, pck.sel)
            continue
        }
        // Emit packet.
        d.Out[pck.sel] <- pck
        log.Printf("D : %03d/%d: --> O%d (%d)\n",
            pck.id, pck.sel, pck.sel, len(d.Out[pck.sel]))
    }
    for i := range d.Out {
        close(d.Out[i])
    }
    d.end <- npck
    close(d.end)
}

We can now put everything together, like this: :

p := NewProducer1(n, 1*time.Second)
d := NewDemux1(n, p.Out)
c := make([]*Consumer1, n)
for i := 0; i < n; i++ {
    c[i] = NewConsumer1(i, d.Out[i], 1*time.Second)
}

<-time.After(30*time.Second)
p.Stop()
d.Wait()
for i := range c {
    c[i].Wait()
}

And we'll get an output similar to this: :

18:05:41.581252 P : 000/1
18:05:41.581433 D : 000/1: --> O1 (0)
18:05:42.580966 P : 001/1
18:05:42.581788 C1: 000/1
18:05:42.581904 D : 001/1: --> O1 (0)
18:05:43.580881 P : 002/1
18:05:43.582086 C1: 001/1
...
18:05:46.581142 P : 005/0
18:05:46.581206 D : 005/0: --> O0 (0)
18:05:46.583303 C1: 004/1
18:05:47.581127 P : 006/1
18:05:47.581190 D : 006/1: --> O1 (0)
18:05:47.581430 C0: 005/0

There's still a problem with our Demux1 implementation: A slow consumer can block all other consumers. Imagine an input stream containing two packets for slow consumer 1, followed by packets for other consumers, like this: :

+--------+-------+-------+-------+-------+
|   P5   |   P4  |   P3  |   P2  |   P1  |-----> to Demux
| sel=4  | sel=3 | sel=2 | sel=1 | sel=1 |
+--------+-------+-------+-------+-------+

The first packet (P1) is sent to consumer 1 (C1). Then we have to wait for C1 to process the packet and accept P2, before we can sent P3, P4, and P5 to the idle consumers C2, C3, and C4. Ideally, we would like to, somehow, be able to stash away P2, until C1 finishes processing P1, and go on dispatching packets P3, P4, and P5. Understand, though, that no matter what we do, it will always be possible to block, if a large-enough number of packets for a "slow" consumer arrive close together---since we cannot "stash away" an infinite number of packets. That is, if the long-term average arrival rate of packets for a consumer is faster than the ability of the consumer to process the packets we will, inevitably, eventually block. What we can do is fend against short-term fluctuations in the arrival rate (or in the consumer's processing speed). We can do this by introducing a "buffer" (bounded in size) of packets waiting to be dispatched or consumed. This way, if we select a buffer of N packets we will be able to say that we will not block as long as the average packet arrival rate for any consumer is less than, or equal, to the consumer's capacity (packet processing rate), and there are no bursts larger than N packets.

The most straight-forward way to implement this is to introduce buffering to the output channels of our demultiplexer. In order to be able to fend against any burst of N packets, we must add a buffer of N packets to each demultiplexer output channel. Like this: :

func NewDemux1(n int, in <-chan Packet, buffer int) *Demux1 {
    ...
    for i := 0; i < n; i++ {
        d.Out[i] = make(chan Packet, buffer)
    }
    ...

Everything else remains the same.

To better observe and experiment with the effects of adding output buffers to our demultiplexer, it would be useful to have a producer that (apart from the configured constant rate) can also generate occasional bursts of packets. It is not very hard to modify our producer to do so. :

type Producer2 struct {
    Out    chan Packet
    n      int          // Selector [0..n)
    id     int          // Packet id
    bEvery int          // Burst every bEvery slow ticks
    bSz    int          // Burst size
    stick  *time.Ticker // Slow ticker (ordinary packets)
    ftick  *time.Ticker // Fast ticker (burst packets)
    quit   chan chan int
}

func NewProducer2(n int,
    every time.Duration, burstEvery, burstSz int) *Producer2 {

    p := &Producer2{n: n, id: 0, bEvery: burstEvery, bSz: burstSz}
    p.Out = make(chan Packet)
    p.stick = time.NewTicker(every)
    p.quit = make(chan chan int)
    go p.run()
    return p
}

We add two new parameters: bEvery and bSz. The new producer will generate a burst of bSz packets after emitting bEvery packets at normal rate. During these bursts, packets are emitted timed by the "fast" ticker ftick. This ticker is enabled only during bursts, and stopped afterwards (since it ticks fast it's, presumably, costly to leave it running all the time). The packet generation / emission logic becomes like this: :

func (p *Producer2) run() {
    var tick <-chan time.Time
    var out chan<- Packet
    var pck Packet

    tick, out = p.stick.C, nil
    burst, npck := false, p.bEvery
    for {
        select {
        case <-tick:
            if npck == 0 {
                if burst {
                    // Burst ended, back to normal tick.
                    tick = p.stick.C
                    p.ftick.Stop()
                    p.ftick = nil
                    npck = p.bEvery
                    burst = false
                } else {
                    // Time for burst.
                    p.ftick = time.NewTicker(FastTick)
                    tick = p.ftick.C
                    npck = p.bSz
                    burst = true
                }
                break
            }
            // Generate and emit packet.
            pck = Packet{sel: rand.Intn(p.n), id: p.id}
            p.id++
            out = p.Out
            npck--
        case out <- pck:
            // Packet emitted.
            log.Printf("P : %03d/%d\n", pck.id, pck.sel)
            out = nil
        case r := <-p.quit:
            p.stick.Stop()
            if p.ftick != nil {
                p.ftick.Stop()
            }
            close(p.Out)
            r <- p.id
            return
        }
    }
}

Running our test with the new "bursty" producer, configured for bursts of 10 packets, every 10 packets emitted at normal rate, and using 10-packets-deep demultiplexer output buffers we get an output like the following (focusing on the first burst):

18:13:36.299728 P : 009/0
18:13:36.299799 D : 009/0: --> O0 (1)
18:13:36.300347 C0: 008/0
18:13:37.300832 C0: 009/0
18:13:37.300974 P : 010/0
18:13:37.300994 D : 010/0: --> O0 (1)
18:13:37.302094 P : 011/1
18:13:37.302193 D : 011/1: --> O1 (1)
18:13:37.302855 P : 012/0
18:13:37.302887 D : 012/0: --> O0 (1)
18:13:37.304056 P : 013/1
18:13:37.304153 D : 013/1: --> O1 (1)
18:13:37.304847 P : 014/0
18:13:37.304915 D : 014/0: --> O0 (2)
18:13:37.305830 P : 015/0
18:13:37.305870 D : 015/0: --> O0 (3)
18:13:37.306823 P : 016/1
18:13:37.306864 D : 016/1: --> O1 (2)
18:13:37.307823 P : 017/1
18:13:37.307862 D : 017/1: --> O1 (3)
18:13:37.308820 P : 018/1
18:13:37.308860 D : 018/1: --> O1 (4)
18:13:37.310048 P : 019/0
18:13:37.310154 D : 019/0: --> O0 (4)
18:13:38.299721 P : 020/1
18:13:38.299793 D : 020/1: --> O1 (5)
18:13:38.301335 C0: 010/0

Observe how the buffers start filling-up during the burst until they hold 4 packets for consumer 0 and 5 packets for consumer 1. This input stream would have, obviously, led to lost (dropped) packets without buffers on the demultiplexer's outputs.

This demux implementation is perfectly adequate for most cases.

If we like, though, to pursue the problem a bit further we could notice this: In order to handle any burst of N packets we have to allocate buffers of total capacity (measured in packets) equal to N * # of consumers (with this capacity we can also handle certain larger burst). Ideally, we would like to be able to handle any burst of N packets with a total buffer capacity of no more than N. If we reduce each output channel's buffer capacity to N / # of consumers (reducing the total capacity to N), then we could still handle bursts of N packets, but no longer all bursts of N packets (e.g. we could no longer handle a burst of N packets for a single consumer). In order to achieve the desired effect we have to modify our demultiplexer implementation considerably: Instead of keeping per-consumer buffers we can keep a single buffer of packets to be dispatched, and instead of always pulling the first packet out of this buffer we have to be able to pull the packet that is destined for an available (idle) consumer, even if in-front of it are packets for other (busy) consumers. Image the queue containing the packets: :

+--------+-------+-------+-------+-------+
|   P5   |   P4  |   P3  |   P2  |   P1  |--->
| sel=0  | sel=1 | sel=0 | sel=1 | sel=1 |
+--------+-------+-------+-------+-------+

If we know that consumer 1 is busy, and consumer 0 is idle, we must be able to pull-out packet P3, even though P1 and P2 are in front of it in the queue. A behavior like this cannot be directly supported by the built-in channel buffers, so we have to implement our own buffering scheme. We choose to implement our queue with a slice of packets, and use linear search to locate the first available packet for a specific consumer (this is certainly not the most efficient approach, but it is the easiest to implement and demonstrate):

// queue is a queue of packets used internally by Demux2.
type queue []Packet

// NewQueue creatres a new queue with depth of "sz" packets.
func NewQueue(sz int) *queue {
    q := make(queue, 0, sz)
    return &q
}

// Full tests if the queue is full.
func (q *queue) Full() bool {
    return len(*q) == cap(*q)
}

// Put enqueus packet "p".
func (q *queue) Put(p Packet) {
    *q = append(*q, p)
}

// Get dequeues and returns the first packet with selector equal 
// to "sel". Returns ok == false if no such packet exists in 
// the queue, ok == true otherwise.
func (q *queue) Get(sel int) (p Packet, ok bool) {
    q0 := *q
    for i, p := range q0 {
        if p.sel == sel {
            copy(q0[i:], q0[i+1:])
            // Needed if Packet contains pointers.
            q0[len(q0)-1] = Packet{}
            *q = q0[:len(q0)-1]
            return p, true
        }
    }
    return p, false
}

In addition to our custom queue, we also need a way to know when each of our consumers becomes available (idle, ready to accept the next packet) so we can act accordingly. For this we make our consumers report their availability by writing their keyed selector to a special Avail channel, that is read by the demultiplexer. Out modified consumers look like this: :

type Consumer2 struct {
    sel   int
    in    <-chan Packet
    avail chan<- int
    delay time.Duration
    end   chan int
}

func NewConsumer2(sel int, in <-chan Packet, avail chan<- int,
    delay time.Duration) *Consumer2 {

    c := &Consumer2{sel: sel, in: in, avail: avail, delay: delay}
    c.end = make(chan int)
    go c.run()
    return c
}

We have just added the avail input channel which is given as an argument to the NewConsumer2 function. The packet reception logic for the consumers now looks like this: :

// run runs as the consumer's goroutine.
func (c *Consumer2) run() {
    var npck int

    // Initially, report availability.
    c.avail <- c.sel
    for pck := range c.in {
        npck++
        if pck.sel != c.sel {
            // Drop packet, report availability.
            log.Printf("C%d: %03d/%d: Bad selector!\n",
                c.sel, pck.id, pck.sel)
            c.avail <- c.sel
            continue
        }
        // Delay for processing.
        <-time.After(c.delay)
        log.Printf("C%d: %03d/%d\n", c.sel, pck.id, pck.sel)
        // Report availability
        c.avail <- c.sel
    }
    c.end <- npck
    close(c.end)
}

It is almost the same as for Consumer1, with addition of reporting the consumer's availability by writing its selector (c.sel field) to the c.avail channel, after finishing processing every packet.

With this in place, we can write our new demultiplexer logic which uses the custom queue and takes into account the availability signals from the consumers. Our new demultiplexer becomes like this: :

type Demux2 struct {
    Out   []chan Packet // Output channels (per consumer).
    Avail chan int      // Avail. reports (from consumers).
    avf   []bool        // Avail. flags (per consumer).
    nbusy int           // # of busy consumers.
    pq    *queue        // Packet queue.
    in    <-chan Packet // Input channel (from producer).
    end   chan int
}

func NewDemux2(n int, in <-chan Packet, buffer int) *Demux2 {
    d := &Demux2{in: in}
    d.Out = make([]chan Packet, n)
    for i := 0; i < n; i++ {
        d.Out[i] = make(chan Packet)
    }
    d.Avail = make(chan int)
    d.avf = make([]bool, n)
    d.nbusy = n
    if buffer < 1 {
        buffer = 1
    }
    d.pq = NewQueue(buffer)
    d.end = make(chan int)
    go d.run()
    return d
}

We have added the following: The Avail channel, used by the consumers to report their availability, the avf flags, one per consumer, used by the multiplexer logic to keep track of which consumers are available, the nbusy counter, which tracks the number of busy (not-available) consumers, and naturally, our custom queue.

The demultiplexer packet reception and forwarding logic now becomes considerably more complex. Under normal operation we wait on two input channels: on d.in for input packets, and on d.Avail for consumer-availability reports. If a packet is received on d.in, then we check the respective c.avf flag. If it indicates that the consumer is available, we send the packet to it, mark the consumer as unavailable (c.avf[consumer] = false) and increment the c.nbusy counter. If the c.avf flag indicates that the consumer is busy, we simply enqueue the packet. If an availability report arrives on c.Avail we search the queue for the first packet for this consumer. If such a packet is found, we send it to the consumer. If not, we mark the respective consumer as available (c.avf[consumer] = true) and decrement c.nbusy. To this add the logic required to gracefully terminate the demultiplexer: Once the c.in close is detected, we cannot immediately terminate the demultiplexer operation. We must wait for our queue to empty and for all consumers to report their availability back before we exit (otherwise consumers may deadlock waiting to report their availability to a multiplexer that has already exited). We use the c.nbusy counter to quickly decide if all consumers have reported-back available. :

// run runs as the demultiplexer goroutine.
func (d *Demux2) run() {
    var npck int

    in := d.in
loop:
    for {
        select {
        case pck, ok := <-in:
            if !ok {
                if d.nbusy == 0 {
                    // All consumers are idle.  We
                    // can exit.
                    break loop
                }
                // Delay exit until all consumers become
                // idle.
                in, d.in = nil, nil
                break
            }
            npck++
            if pck.sel < 0 || pck.sel >= len(d.Out) {
                // Drop packet.
                log.Printf("D : %03d/%d: Bad selector!\n",
                    pck.id, pck.sel)
                break
            }
            if d.avf[pck.sel] {
                // Emit packet.
                d.avf[pck.sel] = false
                d.nbusy++
                d.Out[pck.sel] <- pck
                log.Printf("D : %03d/%d: --> O%d\n",
                    pck.id, pck.sel, pck.sel)
            } else {
                // Enqueue packet.
                d.pq.Put(pck)
                if d.pq.Full() {
                    in = nil
                }
            }
        case c := <-d.Avail:
            pck, ok := d.pq.Get(c)
            if ok {
                // Emit packet.
                in = d.in
                d.Out[pck.sel] <- pck
                log.Printf("D : %03d/%d: --> O%d\n",
                    pck.id, pck.sel, pck.sel)
            } else {
                // Mark consumer as available.
                d.avf[c] = true
                d.nbusy--
                if d.in == nil && d.nbusy == 0 {
                    // Input channel closed, and
                    // all consumers idle. We can
                    // exit.
                    break loop
                }
            }
        }
    }
    for i := range d.Out {
        close(d.Out[i])
    }
    close(d.Avail)
    d.end <- npck
    close(d.end)
}

Final remark: As an optimization, in order to reduce the scheduling switches between the demultiplexer and the consumer goroutines, we could add buffering to the d.Avail channel (ideally equal to the number of consumers), and possibly also add a small buffer (perhaps 1-deep) to each of the d.Out channels. Obviously these buffers, if we decide to add them, are not required for correct operation.

Is it worth all this added complexity, just to avoid wasting some buffer space? Most likely it isn't. In most cases, the simple, buffer-per-output-channel demux implementation is certainly preferable. But as an exercise and an example in concurrency and synchronization it's quite interesting, so, I believe, it's worth the discussion.

You can find the code discussed here at:

https://github.com/npat-efault/musings/tree/master/demux-go

⚠️ **GitHub.com Fallback** ⚠️