Graph Managers - scrapinghub/shub-workflow GitHub Wiki

Previous Chapter: Crawl Managers


Lets suppose we have the following spider:

class MySiteArticlesSpider(Spider):

    name = 'mysite.com'
    url = None

    def start_requests(self):
        assert self.url is not None, "Missing required argument 'url'."
        yield Request(url, dont_filter=True)

    def parse(Self, response):
        (... link discovery logic ...)
        yield Request(..., callback=self.parse_article)

    def parse_article(self, response):
        (... actual implementation of the article parsing method ...)

and we need to schedule it multiple times with different start urls. We can use a GeneratorCrawlManager instance for this, as we did in the previous chapter. The urls may come from an external source like s3, HS collections, HCF (Hubstorage Crawl frontier, see next chapters), etc, or may be hardcoded in the crawl manager.

In many use cases the crawl manager and the spider would be enough for fulfilling the project requirements. But lets suppose we want to deliver a final single file with all the items from all the scheduled spider jobs.

Here comes the GraphManager to the rescue. Graph managers allow to define arbitrary workflows of spiders and scripts running in ScrapyCloud, and create dependencies between them. For our case, we need to define a workflow where the crawl manager runs first, and a deliver script runs next, once the crawl manager finishes execution:

from typing import Tuple

from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task

class MyArticlesGraphManager(GraphManager):

    loop_mode = 120

    def configure_workflow(self) -> Tuple[Task]:

        crawlTask = Task("crawl", "py:crawlmanager.py", init_args=["mysite.com"])

        myfilename = str(int(time.time()))
        deliverTask = Task("deliver", "py:deliver.py", init_args=["mysite.com", f"--output-file=s3://mybucket/myfolder/{myfilename}.jl"])
        crawlTask.add_next_task(deliverTask)

        return (crawl,)

Lets save this code into scripts/flowmanager.py. The workflow is defined via the configure_workflow() method. This method defines two tasks: crawl and deliver. The crawl task instructs the graph manager to schedule the crawl manager with the appropiate arguments. The deliver task do the same for the deliver script (see next section on how to implement the deliver script). The line crawlTask.add_next_task(deliverTask) instruct the graph manager to schedule the deliver task immediately after the crawl task is completed. The return value of the configure_workflow() method is a tuple of tasks indicating the root tasks. In our case, just the crawl task.

In order to execute this workflow, the script must be invoked in this way:

$ python flowmanager.py --root-jobs

The flag --root-jobs instruct the graph manager to start execution with the root jobs defined in the return value of configure_workflow(). Eventually this can be altered (see the graph manager command line help).

Making deliveries

We did't define the deliver.py script. It may be any script that reads all jobs from this specific workflow instance, merge them, and deliver. shub-workflow library also provides a convenient class for easy implementation of deliver scripts, that takes advantage from the fact that all jobs within the same workflow instance share the same FLOW_ID tag, and this value will be different from other workflow instances, even if initiated by the same graph manager script. This feature ensures that all and only the spider jobs belonging to same crawl will be read and delivered by the deliver script. We may be running multiple crawls and deliver scripts at same time

Let's suppose we want to deliver our job items into s3 (the script also supports GCS and local file storage), so we would add to our project the script scripts/deliver.py:

import json
from tempfile import mktemp

from shub_workflow.deliver import BaseDeliverScript
from shub_workflow.utils.futils import mv_file

class MyDeliverScript(BaseDeliverScript):

    def add_argparser_options(self):
        super().add_argparser_options()
        self.argparser.add_argument("--output-file")

    def on_item(self, item: dict, scrapername: str):
        print(json.dumps(item), file=self.tempfile)

    def run(self):
        if self.args.output_file is not None:
            self.tempfile = open(mktemp(), "w", encoding="utf8")
            super().run()

    def on_close(self):
        if self.args.output_file and self.total_items_count > 0:
            self.tempfile.close()
            mv_file(self.tempfile.name, self.args.output_file)
        super().on_close()


if __name__ == '__main__':
    from shub_workflow.utils import get_kumo_loglevel
    logging.basicConfig(format="%(asctime)s %(name)s [%(levelname)s]: %(message)s", level=get_kumo_loglevel())

    deliver = MyDeliverScript()
    deliver.run()

The above subclass is very simple and it is easy to understand what it does. But this class is designed for easy overriding of multiple attributes and methods in order to provide very flexible customization for the specific needs of a project. (see DeliverScript code).

Notice that in the graph manager we passed to the deliver script the spider name and the output file. The first one is a standard argument from the base deliver class, that allows the script to select the target spiders to deliver from. The output file option is specific to our case and it is easily implemented with the help of shub-workflow futils tools.

Lets explore more complex approaches in following chapters.


Next Chapter: Managing Hubstorage Crawl Frontiers