BEP2: Step Run Strategy Selection - Vipyr/BazaarCI GitHub Wiki

9 June 2019 - Opened BEP
19 June 2019 - Accepted Proposal 2 PR-13

Abstract

There are cases where the way that dependencies and the run time control flow of a Step. For example, it is reasonable for these two execution strategies to be used by different projects or even within the same graph:

  1. "Normal" execution
    1. Wait for consumed Products to be set
    2. Execute target
    3. Set all produced Products
    4. Return
  2. "Short Circuit" execution
    1. Wait for consumed Products to be set
    2. If any produced Products have not been set:
      1. Execute target
      2. Set all produced Products
    3. Return

Proposal 1 (Tom Manner)

Implement a bazaarci.runner.set_run_strategy function that can be called on a class or instance with a run method and replace the function with the selected one.

bazaarci.runner.__init__.py:

def set_run_strategy(class_or_instance, run_function):
    setattr(class_or_instance, "run", run_function)

bazaarci.runner.step.py:

class Step(Node):
    ...

    def run_default(self):
        [product.wait() for product in self.consumes()]
        if self.target is not None:
            self.output = self.target()
        [product.set() for product in self.produces()]

    def run_with_short_circuit(self):
        [product.wait() for product in self.consumes()]
        # Once all inputs are available, check that there are unset outputs.
        # If all output products have already been set, then this step is
        # not required to run.
        if reduce(lambda x, y: x and y.wait(0), self.produces(), True):
            if self.target is not None:
                self.output = self.target()
            [product.set() for product in self.produces()]

    run = Step.run_default

usercode.py:

from bazaarci.runner import *
set_run_strategy(Step, Step.run_with_short_circuit)

TSM - This feels kinda bad to be because the run functions are copy/paste except for the if reduce(...): line. It might be possible to implement more succinctly with a wait_for_producers decorator. Order could get janky, as you can see from the example below

Support Wrappers/Decorators

More code reuse for run functions via wrappers/decorators. To solve the specific problem above, it could be done with two decorators: wait_for_producers and skip_if_redundant

import functools

def wait_for_producers(func):
    @functools.wraps(func)
    def wrapped(self):
        [product.wait() for product in self.consumes()]
        func(self)
    return wrapped

def skip_if_redundant(func):
    @functools.wraps(func)
    def wrapped(self):
        if reduce(lambda x, y: x and y.wait(0), self.produces(), True):
            func(self)
    return wrapped

In this example, the Step implementation above would then be written as

class Step(Node):
    ...

    def _run(self):
        if self.target is not None:
            self.output = self.target()
        [product.set() for product in self.produces()]

    @wait_for_producers
    def run_default(self):
        self._run()

    @wait_for_producers
    @skip_if_redundant
    def run_with_short_circuit(self):
        self._run()

Proposal 2 (Tom Manner)

I wrote an implementation of this you can find in PR-13, I think it came out pretty well.

Using a pattern similar to the serialization frameworks, a set_run_behavior function could be implemented that receives an iterable of wrapper functions and a Class or instance to apply them to. The contents of the iterable are executed in order on each run call, which means they're applied in reverse.

def set_run_behavior(class_or_instance, *args):
    run_function = class_or_instance._run
    for wrapper in reversed(args):
        run_function = wrapper(run_function)
    setattr(class_or_instance, "run", run_function)

This would require moving the base run functionality into a different function (I suggest _run) so that the starting point is always consistent. The default behavior of waiting for all Products in self.consumes could be implemented through this framework.

def wait_for_producers(func):
    @wraps(func)
    def wrapped(self):
        [product.wait() for product in self.consumes()]
        func(self)
    return wrapped

set_run_behavior(Step, wait_for_producers)
⚠️ **GitHub.com Fallback** ⚠️