Graph Managers with Frontera - scrapinghub/shub-workflow GitHub Wiki
Previous Chapter: Managing Hubstorage Crawl Frontiers with Frontera
Lets suppose we have the following spider:
class MySiteArticlesSpider(Spider):
name = 'mysite.com'
def parse(Self, response):
(... link discovery logic ...)
yield Request(..., callback=self.parse_article)
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
We can run this spider as is until completion. We can also split the spider into a frontier consumer and a fronter producer as we did in the previous chapter. In that case, we ended up building a workflow where a producer spider run first, and a crawl manager that schedule consumers run after it. In that case we need to schedule both components either manually or by programming their scheduling separately at specific times. We can also build a more sophisticated scheduling by using another external script, a graph manager. The graph manager allows to automatize the building of Zyte Scrapy Cloud workflows by defining a graph of tasks.
With this idea in mind, we could create a graph that schedules the producer spider and the hcf crawl manager that schedules the consumers. In this chapter we will explain this approach, but without having a separated producer and consumer spider. We will rather use a single spider that can either run standalone, or plugged into a workflow to run in two different modes, producer and consumer, without performing any change in its code. If you still prefer the splitted spiders version, you can easily adapt the example used here for your purpose. But notice that starting from a single spider will be the most common case, as most spiders are implemented in that way. This approach has the advantage that you don't need to write spiders thinking on how they will run. You decouple the spider implementation from its scheduling strategy, that may vary along stages of a project.
So, lets start by creating a manager script (i.e. scripts/manager_articles.py
), by subclassing GraphManager
class (explanation of the code goes below):
import json
from typing import Tuple
from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task, SpiderTask
class MyArticlesGraphManager(GraphManager):
loop_mode = 120
default_max_jobs = 8
frontier = 'mysite-articles-frontier'
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument("--slot-prefix", default="test")
def configure_workflow(self) -> Tuple[Task]:
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": self.args.slot_prefix,
"HCF_PRODUCER_NUMBER_OF_SLOTS": self.args.max_running_jobs,
}
producer_settings = {
"FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse_article",
}
discover = SpiderTask(
"discover",
"mysite.com",
job_settings=producer_settings,
frontera_settings_json=json.dumps(producer_frontera_settings)
)
consumer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_CONSUMER_FRONTIER": self.frontier,
"HCF_CONSUMER_MAX_BATCHES": 50,
}
consumer_sertings = {
"FRONTERA_SCHEDULER_SKIP_START_REQUESTS": True,
}
scrapers = Task(
"scrapers",
"py:hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
self.args.slot_prefix,
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
f"--max-running-jobs={self.args.max_running_jobs}",
f"--job-settings={json.dumps(consumer_settings)}"
]
)
discover.add_next_task(scrapers)
myfilename = str(int(time.time()))
deliverTask = Task("deliver", "py:deliver.py", init_args=["mysite.com", f"--output-file=s3://mybucket/myfolder/{myfilename}.jl"])
scrapers.add_next_task(deliverTask)
return (discover,)
For basic features explanation of the graph managers see Crawl Managers.
Here we will dig into the additional details introduced by the code above. This method defines three tasks: discover
scrapers
and deliver
. The discover
task is a spider task. It schedules the producer, using same frontera and scrapy settings as before, so they don't
need explanation. The SpiderTask class is a wrapper to define the scheduling of a spider. The first parameter is the name of the task. The second
one is the name of the spider. Then we are passing job_settings
and frontera_settings_json
parameters.
The scrapers
task is a script task. The first parameter is the task name, the second, the script name, and then a list of arguments, which are
the same we passed manually to the hcf_crawlmanager.py
invoked in the previous sections, except that we are passing an additional scrapy
setting: FRONTERA_SCHEDULER_SKIP_START_REQUESTS
. This setting instructs the consumer not to execute start requests. In our previous section,
this setting was not required because we had separated consumer and producer codes. Here we need it in order to avoid the consumer to perform
discovery stage.
The deliver
task is used for deliverying a single file from all the scrapers. See Graph Managers
for explanation of the deliver script. Here we will use same approach.
The workflow declaration is completed by adding the scrapers
task as a next job of discover
task, and the deliver
task as next job of scrapers
task. So, when discover
finishes, scrapers
is scheduled. When scrapers finishes, deliver
is scheduled. The return value of the configure_workflow()
method is a tuple of tasks which will be the root
tasks. In this case, only discover will be the root. In order to execute this workflow, the script must be invoked in this way:
$ python manager_articles.py --root-jobs
The same rules described before for hcf_manager.py
apply regarding the target SC project where the jobs are scheduled.
In some use cases, the producer can take long time to be completed. However, during the run the requests are already being sent to the frontier. If you don't
want the consumers task to wait for the producer to be completed and start the consumers as soon as possible, you can modify slightly the workflow by adding
the scrapers task the wait_time
parameter, and returning both discover and scraper tasks as root jobs, instead of adding one as next job of the other:
(...)
scrapers = Task(
"scrapers",
"py:hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
self.args.slot_prefix,
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
f"--max-running-jobs={self.args.max_running_jobs}",
f"--job-settings={json.dumps(consumer_settings)}"
],
wait_time=600,
)
return (discover, scraper)
The addition of wait_time=600
instructs the scrapers task to wait 10 minutes before actually starting, thus giving a margin to the discovery task to generate
requests to start to consumer (if the hcf_manager.py
don't find any request in the target slots, it will consider that the crawl was completed and will terminate)
What if the producer spider needs very long time to be completed and a failure in the middle stops it prematurely? This is the typical case of a broad crawler:
class MySiteArticlesSpider(Spider):
name = 'mysite.com'
def parse(Self, response):
(... link discovery logic ...)
if <some condition>:
# articles links
yield Request(..., callback=self.parse_article)
else:
# other links
yield Request(..., callback=self.parse)
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
It is very expensive to start again from zero if the producer stops. So here we can also save the status of the discovery spider in the frontier.
We need to write exploration links on a different set of slots than the one we used for articles links. So, we need a way to map own requests to a
different set of slots. And that feature is provided by the scrapy-frontera setting FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
. So, you will
modify the previous configure_workflow()
method by changing the producer section in this way:
(...)
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": "links",
}
producer_settings = {
"FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse,parse_article",
'FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP': {
'parse_article': f'{self.args.slot_prefix}/{self.args.max_running_jobs}',
},
}
discover = Task(
"discover",
"py:hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
"links",
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(producer_frontera_settings)}",
f"--job-settings={json.dumps(producer_settings)}"
]
)
Let's review the changes performed:
- Notice the addition of 'parse' into the
FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
list. Now we will send to the frontier not only the requests withparse_article()
callback as previously, but also the ones with theparse()
callback. - We changed the value of
HCF_PRODUCER_SLOT_PREFIX
tolinks
and removed theHCF_PRODUCER_NUMBER_OF_SLOTS
fromproducer_frontera_settings
, so the number of slots will take the default value1
. So by default the producer will write all requests to a single slotlinks0
. - We now configure the target slots for the consumer via
FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
. -
discover
task is now an hcf crawlmanager scriptTask
instead of aSpiderTask
. This hcf crawlmanager will configure the producer also as a consumer, and take care of its periodic scheduling.
This will result in a producer that on first job will scrape the start url, extract links, both for itself and for the consumer, and finish. The hcf crawl manager will then detect the
presence of requests in the links0
slot and schedule the producer again, consumer all links saved in the frontier, generate more links and stop. Once the requests generated for itself
are big enough, the flush of them to the frontier will be performed before the spider finish, so it will reach a continuous regime of reading/writing without stopping, except for an
abnormal situation. This premature stopping will not, however, will not affect significantly the producer crawl process. Once the producer is rescheduled by the hcf crawl manager,
it will continue by reading the next batch after the one last read in the failed job. If that last batch was not fully processed, some requests will be lost.
If you want to avoid this collateral loss, you can pass an additional HCF frontera setting: HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True
. By default, batches are deleted immediately after
read, and that is the cause of the loss in case the job finishes abnormally. With this setting you will ensure that batches are deleted when the spider stops. However, if you
provide this setting with no extra configuration, as the batches are deleted on finish and the producer never reaches that stage, it will loop reading continuously the same batches once
and again. So when using this setting you need to provide additional settings in order to ensure that the producer finishes each time it reads some number of batches. You must play with
HCF_CONSUMER_MAX_BATCHES
and MAX_NEXT_REQUESTS
. MAX_NEXT_REQUESTS
governs how many requests will be read on a next read request. This is a frontera (not HCF frontera backend) setting,
and its default value is 64. The number of batches before stopping will depend on this value and the batch sizes generated by the producer (see HCF_PRODUCER_BATCH_SIZE
on
hcf backend documentation). Roughly, number of read batches on each read cycle will be
int(MAX_NEXT_REQUESTS / <batch size>)
. And take special care to set HCF_CONSUMER_MAX_BATCHES
to a number equal or smaller than this one. If bigger, the spider will read some batches
more than once, thus loosing performance. Also, HCF_CONSUMER_MAX_BATCHES
must be bigger than 0, which is the default value for this setting. 0 means not limit the number of batches to read,
and this in combination with HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True
will result in a job reading the same batches forever and never stop.
Here you will have a compromise: the smaller the number of read batches per job, the bigger the proportion of tail requests in the total job time, and more cost in setting up/shutting down SC jobs, so performance will decay. But as the number of read batches per job is bigger, more memory you will use, and more links will be reprocessed on a new job if current one finishes abnormally. If the spider is enough stable this second effect will be insignificant, however. But the memory factor will still be very important. A starting typical number could be to read around 100 batches per job, so an appropiate set of settings for this case would be:
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": "links",
HCF_CONSUMER_MAX_BATCHES: 100,
MAX_NEXT_REQUESTS: 10000,
}
With the combination of settings above you will never read less than 100 batches. And number of requests read will be between 10000 and 10099 depending on exact batch sizes, which is
not a constant value. Starting from these values you can tune your project further, as ultimately the best performance will depend on the exact case. As the maximal size of an HCF batch
is 100, this will also ensure that HCF_CONSUMER_MAX_BATCHES
is equal or smaller than MAX_NEXT_REQUESTS / <batch size>
.
Finally, a good workflow that uses HCF must include the cleaning of the frontier slots for a future crawl. For our tutorial example:
def configure_workflow(self):
(...)
clean = Task('clean', 'py:hcfpal.py', init_args=["delete", self.frontier, self.args.slot_prefix])
scrapers.add_next_task(clean)
(...)
And additionally, for the broad crawler example:
def configure_workflow(self):
(...)
cleanLinks = Task('cleanLinks', 'py:hcfpal.py', init_args=["delete", self.frontier, 'links'])
discover.add_next_task(cleanLinks)
(...)
Next Chapter: Monitors