BEP3: Combinatorial Step Triggers - Vipyr/BazaarCI GitHub Wiki

1 Aug 2019 - Opened
2 Aug 2019 - Added API section. Added Proposal 2

Abstract

Triggering steps in the graphs is very one dimensional, requiring that exactly all Products in the consumes set are ready before executing. For any given process, there can be many complex ways they should be triggered. For example, a build for a project written in a compiled language might compile for several architectures, but trigger it's unit test suite when the first of those compiles is complete.

Suggested Frontend API


Supply a `Trigger` base class that implements `add()`, `wait()`, and `causal_products()` methods. A `Trigger` is a collection of `Product`s and other `Trigger`s that can be used as the `consumes` attribute for a `Step`. Implement some common triggers: - `Any`: one or more constituents are `set` - `All`: all constituents are `set` - `ExactlyN`: call when exactly `N` of the constituents are set.

A step s that required product a and (b or c) in order to run, would be represented like this:

s = Step("s")
s.consumes = AllOf(
    Product('a'),
    AnyOf(
        Product('b'),
        Product('c'),
    ),
)

Proposal 1 (Tom Manner)

Thread Listener

Implementation based on listener Threads that each wait for their member products and then capture the them as they are set, until their conditions are satisfied.

Possible implementations:

AllOf wouldn't need a thread, it could just call wait on each product. The set of causal products is all products, so it recursively takes the union of all causal product sets.

class AllOf(Trigger):
    def wait(self):
        [item.wait() for item in self.products]
        self.set()

    def causal_products(self):
        cps = set()
        for item in self.products:
            cps += item.causal_products()

AnyOf would need a thread per Product where each thread would wait and then update the parent. This event's causal product set is the set of products that caused the first one to be set. The thread's target function takes a product as an argument, waits for it to be set, then if it is the first one done (not self.is_set()), sets self and saves the products to be yielded later.

class AnyOf(Trigger):
    def __init__(self, *args):
        super().__init__(*args)
        self._causal_products = set()

    def wait(self):
        threads = []

        def wait_handler(product_or_trigger):
            product_or_trigger.wait()
            if not self.is_set():
                self.set()
                self._causal_products = product_or_trigger.causal_products()
                [thread.join() for thread in threads]

        # Set up a thread to wait on each item
        threads = [Thread(target=wait_handler, args=(item,)) for item in self.products]
        [thread.start() for thread in threads]

Proposal 2 (Miguel Nistal, Tom Manner)

asyncio Futures and wait

Using Futures from asyncio for Product and Trigger would allow use of the yield from asyncio.wait(futures) syntax. Different behaviors can be controlled using the built-in return_when argument and concurrent.futures constants FIRST_COMPLETED and ALL_COMPLETED.

The base Trigger class would implement most functionality, including the wait function for All and Any:

class Trigger(Event, asyncio.Future):
    def __init__(self, *args, return_when):
        super().__init__(self)
        self.products = set(args)
        self.return_when = return_when
        self.causal_products = None
        self.pending_products = None
        self.waiting = False

    def add(self, item):
        self.products.add(item)

    def wait(self):
        if (not self.waiting) and (not self.is_set()):
            self.waiting = True
            self.causal_products, self.pending_products = yield from asyncio.wait(self.products, return_when=self.return_when)
            [pending.cancel() for pending in self.pending_products]
            self.set()
            self.waiting = False
        return Event.wait(self)

AllOf and AnyOf would then simply change the return_when argument to get their behavior:

class AllOf(Trigger):
    def __init__(self, *args):
        super().__init__(*args, ALL_COMPLETED)


class AnyOf(Trigger):
    def __init__(self, *args):
        super().__init__(*args, FIRST_COMPLETED)

An implementation of ExactlyN would need to modify the wait behavior to iteratively use the return when FIRST_COMPLETED to accumulate Futures one at a time.

class ExactlyN(Trigger):
    def __init__(self, n, *args):
        super().__init__(*args, FIRST_COMPLETED)
        self.n = n

    def wait(self):
        if (not self.waiting) and (not self.is_set()):
            self.waiting = True
            pending_products = self.products
            for _ in range(self.n):
                complete_products, pending_products =  yield from asyncio.wait(pending_products, return_when=self.return_when)
                causal_products += complete_products
            [product.cancel() for product in pending_products]
            self.pending_products = pending_products
            self.causal_products = causal_products
            self.set()
            self.waiting = False
        return Event.wait(self)
⚠️ **GitHub.com Fallback** ⚠️