Prefect Tutorial Notes - casangi/RADPS GitHub Wiki

The main resource for Prefect tutorials and documentation is the official Prefect documentation

Note: Prefect 3.0 was released in September 2024. This was a major change from Prefect 2.0 (See also "What's new in Prefect 3.0") and many online resources for Prefect are only applicable to 2.0 or earlier. Be careful to check the version when searching for information on how to use Prefect, as the official documentation (and much of the publicly available discussion online) still contains many references to features specific to Prefect 2.

While tutorials for Prefect 3.0 beyond the official docs are limited,

  • Prefect is on GitHub, so it's easy to see and raise issues and related discussion on the source repository.
  • Prefect has an active Slack.

Recommended Path through the Documentation:

Installation, Quickstart, and Tutorial:

  1. Install Prefect
  2. Quickstart: Use the self-hosted local API server
  3. Tutorial - For Data Engineers
  4. Go through the sections of the documentation under Develop
    • These are heavily cross-linked to other sections of the documentation so in the process of working through "Develop", topics in Deployment, Operations, and Automation will also be covered. Note that these other topics document context which will no doubt be essential in creating a production-grade Prefect service.

Key concepts

The components of the example workflows employed by the documentation are:

  • workflow (composed of python functions decorated as task or flow objects)
  • API server (launched via the SDK or CLI, hosted locally in a user-managed python environment, defaults to sqlite database at ~/.prefect/prefect.db)
  • execution/resource layer (configured by default based on a work pool composed of ProcessWorkers, could also be a remote execution environment or task scheduling service)

Examples from the tutorial docs are focused on tasks and flows composed of HTTP requests, which makes sense because

the Prefect API is organized around REST.

The Prefect web service is fundamental to interacting with the framework. Configuration of this service is of course possible (e.g., to point at the hosted Prefect Cloud, or a self-managed instance running as either a static or dynamic deployment) but the default port for the dashboard when locally hosted is:

The methods for interacting with the server (generated using FastAPI) are exposed there too:

  • http://localhost:4200/docs

Tasks

Tasks are the basic units of work in a Prefect workflow. Each one represents a single logical step of a workflow. A python function can be turned into a Prefect task by adding the @task decorator.

A Task run is a single execution of a flow. An individual task run is associated with its own unique set of logs, inputs, artifacts, and other metadata.

The 3.0 release is based on an event-driven architecture;

Tasks can be nested within other tasks, allowing for more flexible and modular workflows; they can also be called outside of flows, essentially enabling Prefect to function as a background task service.

This framework has no technical limitation that domain applications must be expressed as a flow (e.g., heuristics, WS queries, other standalone services). Would it be helpful to decompose our whole application concepts into Tasks before assembling Flows?

Use tasks for simpler, atomic operations.

Tags can be used to label tasks. These tags can be used to filter which runs are shown via the Prefect UI or the Prefect REST API, or to manage concurrency.

The quote task annotation is used to disable introspection on task runs. The existence of methods to improve performance of operations like this that scale with data size implies that Task parameters are probably the place to pass larger objects around in Prefect.

Deferred tasks can be set up to run in the background.

Flows

Flows represent workflows. A flow can contain tasks and nested flows. Prefect flows are created by adding a @flow decorator to a python function.

A Flow run is a single execution of a flow. An individual flow run is associated with its own unique set of component task(s), logs, parameters, variables, and other metadata.

Conditional loops in flows are handled by grouping tasks that only run under certain conditions into a nested flow, and then conditionally running the flow.

Prefect supports transactional semantics in your workflows that allow you to rollback on task failure and configure groups of tasks that run as an atomic unit.

Nested flows are used for more complex operations that involve multiple steps.

A nested flow run cannot be cancelled without cancelling its parent flow run. If you need to be able to cancel a nested flow run independent of its parent flow run, we recommend deploying it separately and starting it with the run_deployment method.

This could impact our design decisions for when to use subflows.

If a flow doesn't even start, it doesn't register with the server as a "failed or crashed run" on the dashboard.

This seems like it will be an important limitation on the kinds of arguments passed to task functions:

Flow run parameters cannot exceed 512kb in size.

There is a strong emphasis on the use of commercial cloud storage services to serialize objects within flows rather than passing them between tasks.

To save memory and time with big data, you don’t need to pass results between tasks. Instead, you can write and read data to disk directly in your flow code. In production, it’s recommended to write results to a cloud provider storage such as AWS S3.

This seems like a helpful guide for guiding the abstractions used to define architecture requirements, especially for integration with the resource management platform.

Transactions and States

Another key feature of the Prefect orchestration engine is that each event in a constructed pipeline -- task or flow, scheduled or deferred -- is aggregated into the API server database, and relationships between them are organized into a graph of transactions via the runtime context. These (customizable) properties allow for the caching of results returned by an individual run, so that the desired state can be reached.

Under the hood, every Prefect task run is governed by a transaction. Transactions and states are similar but different in important ways. Transactions determine whether a task should or should not execute, whereas states enable visibility into code execution status.

Transaction stages:

BEGIN, STAGE, ROLLBACK, COMMIT

State types:

SCHEDULED, PENDING, RUNNING, PAUSED, CANCELLING, CANCELLED, COMPLETED, FAILED, CRASHED

It will likely be helpful to conceptualize components of a RADPS workflow using similar idioms.

Task Runners

Task runners are used for concurrent, parallel, or distributed execution of tasks

Calling a task function directly without a task runner executes the function in the main thread by default, which blocks execution of its flow until the task completes.

Use .submit() to submit a task to a task runner. This returns a PrefectFuture object, which includes the task run id and a State that reflects the current state of the computation. These must be resolved before returning from a flow.

Use .map() to submit a task for each element of an input iterable.

Use .result() to get the result from the future when the task has completed. (This is blocking.)

When you pass a future into a task, Prefect automatically waits for the “upstream” task (the one that the future references), to reach a final state before starting the downstream task using the value that the upstream task returned.

To explicitly set upstream task dependencies, you can specify these in the wait_for=[] parameter for a task.

There are three types of task runners, which are set at the flow level: ThreadPoolTaskRunner, DaskTaskRunner, RayTaskRunner

Since tasks have a map method, what happens if a flow runs a task.map on a function that attempts to perform its own map operation e.g., a local deployment composed of ProcessWorkers sharing resources runs a task that contains some dask.array.map_blocks call? What happens if a deferred task using the delay method to run in the background calls a function decorated by dask.delayed?

Prefect was designed to integrate with Dask and other distributed processing libraries, so in theory these use cases should be fairly seamless, but we'll need to continue learning about deployment (from the docs as well as by building things) in order to satisfactorily answer questions like these.

Scheduling

To set a schedule for a flow, create a deployment. Schedules can be created or updated via the Prefect UI, CLI, or a yaml file.

Prefect’s Scheduler service evaluates each deployment’s schedules and creates new runs appropriately, but is not involved in flow or task execution.

Work pools and workers

Work pools are a bridge between the Prefect orchestration layer and the infrastructure where flows are run.

The type of a work pool controls where and how flow runs are executed, for example:

  • Process: Execute flow runs as subprocesses on a worker. Works well for local execution when first getting started.
  • Docker: Execute flow runs in Docker containers.

Workers are lightweight polling services that retrieve scheduled runs from a work pool and execute them.

The diagrams in the following illustrate the conceptual architecture of work pools:

Questions and issues

Server errors

Re-starting a local Prefect server may fail with

sqlite3.OperationalError: database is locked

This can be resolved by restarting the Prefect server again, or by resetting the database:

prefect server database reset -y

WARNING: resetting the database removes the records of all previous runs.

Resetting ~/.prefect/prefect.db with the preceding command didn't always work - in one case it resulted in a chain of traceback errors resembling a CrashLoopBackoff state, with qlite3.OperationalError messages such as: no such column: flow_run.deployment_version and no such table: automation_event_follower. In this case killing the process running the API server and restarting it solved the issue (and restored the state of the database).

Switching from the default sqlite database to a postgres database using the instructions here eliminated some of the "database lock" errors we were seeing.

Scheduling weirdness

  • If you disable but don't delete the deployed workpool from the tutorial (for example via the switch on the Prefect dashboard,) flow runs will continue to be scheduled but not run. They are all labeled "Late" and are orange on the dashboard. This is expected: the Scheduler runs independently from the workpool and schedules jobs for the workpool, which are in-turn executed by workers. See: https://docs.prefect.io/v3/automate/add-schedules#how-scheduling-works

  • Set the cron schedule for a basic workflow run to "every hour" just to see how it would respond out of hours. Weirdly enough, it actually seemed to run while the laptop where its work pool (and API server, and pipeline) were deployed was closed in sleep mode overnight:

One scheduled run that happened in sleep mode took ~50x as long as the others, but still completed. Only the scheduled flow runs with network access explicitly disabled crashed (which is also weird considering the API server is running on localhost).

Dependency errors

Installing prefect==3.1.5 in a clean python 3.10 environment managed using micromamba (default channels, conda-forge/osx-arm64 and conda-forge/noarch) caused it to fail to execute the very first demo script due to ModuleNotFoundError: No module named 'opentelemetry'. Further inspection revealed the following additional differences between dependencies in the pip and mamba installations of the same version of prefect:

pip only conda-forge only
Deprecated async-exit-stack
opentelemetry-api bcrypt
pytz blinker
wrapt Brotli
dataclasses
dnspython
email_validator
fastapi-cli
httptools
importlib_resources
paramiko
pkgutil_resolve_name
PyJWT
PyNaCl
PySocks
python-multipart
rich-toolkit
typer-slim
uvloop
watchfiles
websocket-client
zstandard

For the future

Comparing Apache Airflow and Prefect

This page was created to support ticket #11 and to be a reference point for issue #13 and beyond.