In depth, step by step guide to Producer iteration - acrylic-origami/HHReactor GitHub Wiki
The Producer
implementation, at its most fundamental level, tries to extend AsyncGenerator
and AsyncIterator
with:
- Merge for many, sometimes higher-order, iterators into one common output
- Safe cloning for differently-paced consumers
- Explicit lifecycle control
while preserving the following nice properties of AsyncGenerator
:
- Laziness by deferring iteration until the first
next
call. - Low memory footprint for simple consumption patterns (e.g. a handful of fast consumers)
Vector<shape('engine' => AsyncIterator<T>, 'driver' => ?Awaitable<mixed>)> $racetrack
At the heart of the iterator is a list of AsyncIterator
s paired with an Awaitable
that comes from iterating over the whole iterator — its "lifetime" in other words. This list begins at construction, originating from the direct output of the generator factories that are passed into the constructor.
BaseProducer
keeps track of the number of running clones. If a clone is iterated and there are no previously running clones, only then does Producer
begin iterating these AsyncIterator
s, stashing their lifetimes in the "driver" field. This last-minute approach satisfies the laziness property. These driver fields "combined" together via \HH\Asio\v
make up the lifetime of a ConditionWaitHandle<mixed>
that is purely for signaling purposes, known as the "bell".
Producer
listens for values as it iterates the AsyncIterator
s, and when a value arrives, it notifies the "bell" with a success
if the bell isn't notified already, and enqueues the value in the buffer, implemented as a Queue
shared between all clones. The next
value by the Producer
to the consumer is blocked on this "bell", so when next
eventually receives control, it shifts the buffer and yields to the consumer. Subsequent calls to next
will exhaust the buffer synchronously, then reset the bell by combining the driver fields again, if it hadn't been reset by that point.
The exact point where all the iterators collectively emit their last value isn't knowable by Producer
, so the last iteration leaves the bell unnotified as the iterators end, signaling the end of the Producer
via Exception
, specifically UnexpectedValueException
. Sadly, this is indistinguishable from the bell being fail
ed with an identical but true exception further up the stack — this is an open issue.
The buffer is a Queue implemented by linked list. This is in contrast to a Vector
which would leak copious amounts of memory from retaining spent items. However, the implementation must be more complicated than a naive queue if we want clones to access the same elements but step through the queue independently.
The goal is to produce the broadest safe behavior, because the best time to clone Producer
is anytime. When there aren't any items in the buffer, however, this is not straightforward, because there are no nodes to distribute between clones; only null pointers to the head and tail. When there are multiple clones of an unstarted Producer
— perhaps the most common initial condition — the first item must propagate to all of the head pointers, but then subsequent changes to the head of a single clone must stay confined to that clone. The Queue
implementation manages clones of the head pointer to achieve this, with some choreographing of "start" flags.
For async code to run in Hack, something has to await
it. To allow generator factories to manufacture and run async code without blocking themselves then, the burden to await
is shifted to the consumer which conveniently must already be await
ing the Producer
. Appending takes advantage of the bell resetting process, which combines the lifetimes of the child iterators at reset-time and blocks the consumer's next
call with it. All appending must do then is literally append the iterator to the list of child iterators and try to notify the bell if it isn't already, to get this new child running as quickly as possible.
When all the running references to a Producer
's clones are garbage-collected, the internal some_running
flag is unset, so running child iterators will produce at most one more item before their loops are broken. This way, no more CPU or memory are used when a Producer
is paused.
Restarting is a simple matter of remaking the loops in identical fashion to the first iteration (via _attach
) after exhausting any residual work on the child iterators that were still running while the Producer
was paused (also seen in _attach
).