2024.11.12 Dagster Notes - fcrimins/fcrimins.github.io GitHub Wiki

Dagster tutorial notes

  • https://docs.dagster.io/getting-started/quickstart
  • "Create declarative freshness policies instead of task-driven cron schedules"
  • "Dagster also offers ops and jobs, but we recommend starting with assets."
    • "Asset definitions enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed." [i.e. Ex Ante Lineage]
    • "Behind-the-scenes, [an asset] is an op. Ops are an advanced topic that isn't required to get started with Dagster. A crucial distinction between asset definitions and ops is that asset definitions know about their dependencies, while ops do not. Ops aren't connected to dependencies until they're placed inside a graph."
    • "Assets in Dagster can specify a config schema. This allows you to provide values to assets at run time."
    • Assets may be assigned a code_version. Versions let you help Dagster track what assets haven't been re-materialized since their code has changed, and avoid performing redundant computation.
      @asset(code_version="1")
      def asset_with_version():
        with open("data/asset_with_ve....
      
  • After modifying an asset in Python:
    • "In your browser, navigate back to Dagster's Global Asset Lineage (localhost:3000/asset-groups), and click on the Reload Definitions button on the top-right region of the page."
  • Error: dagster._core.errors.DagsterInvariantViolationError: No Definitions, RepositoryDefinition, Job, Pipeline, Graph, or AssetsDefinition found in "pylever_dagster".
    • Changed this line from . import assets to this from pylever_dagster import assets
    • Tested with this command: poetry run dagster dev -f pylever_dagster/definitions.py
    • But the error was still occurring
    • pylever_dagster/__init__.py needed to contain from .definitions import defs
    • Now poetry run dagster dev works.
    • The possibly missing workspace.yaml file had nothing to do with it.
  • "Up until now, you've annotated your asset functions with None, meaning the asset doesn't return anything. In this section, you'll learn about the MaterializeResult object, which lets you record metadata about your asset."
    • Similar to Memento?
  • Assets, Schedules/Jobs, and Sensors
    • "You've used a schedule to update your data on a regular cadence. However, there are other ways to automatically trigger updates. For example, sensors can launch runs after routinely polling a source. Check out the Automation guide to learn more."
    • "Managing one type of definition, such as assets, is easy. However, it can quickly become unruly as your project grows to have a variety of definitions (ex. schedules, jobs, sensors). To combine definitions and have them aware of each other, Dagster provides a utility called the Definitions object."
  • Tutorial, part 6: Connecting to external services
    • "By building this asset graph, Dagster acts as a control plane of multiple different external services. However, connecting to these services and managing their use can be difficult. Resources are the recommended way to manage these connections in Dagster. Resources are Dagster objects that connect to external services, such as Slack, Snowflake, or Amazon S3."
  • Resources
    • "Verify this new asset works by reloading your definitions and materializing the signups asset. If you navigate to the hackernews_api's resource page from earlier, you’ll notice its Uses value now says 1. Click on the number and ensure that your signups asset uses the hackernews_api resource."
    • "Make resources reusable and share them across jobs or asset groups. In this example, we grouped resources (e.g., database connections, Spark sessions, API clients, and I/O managers) in the resources folder, where they are bound to configuration sets that vary based on the environment."
    • "Assets specify resource dependencies by annotating the resource as a parameter to the asset function... To specify resource dependencies on a sensor, annotate the resource type as a parameter to the sensor's function."
    • "For more complex use cases, you can override the yield_for_execution. By default, this context manager calls setup_for_execution, yields the resource, and then calls teardown_after_execution, but you can override it to provide any custom behavior. This is useful for resources that require a context to be open for the duration of a run, such as database connections or file handles."
  • Tutorial: Next steps#
    • Congratulations! Having reached this far, you now have a working data pipeline.
    • What if you want to do more?
      • Data quality - Dagster provides ways to validate the data that your assets produce via asset checks, and if using Dagster+, receive alerts when problems are detected.
      • Partitioning assets - This tutorial covered assets whose entire contents get re-computed and overwritten with every materialization. When assets are large, it's common to partition them, so that each run only materializes a single partition.
      • Test your assets - This tutorial showed you how to materialize assets. To learn how to test them, check out the guide on testing assets.
      • Jobs - This tutorial showed you how to work with Dagster's primary building block, assets. However, sometimes you'll have tasks that don't produce assets. To learn how to execute tasks, check out the Intro to ops and jobs guide.
  • Intro to ops and jobs
    • Why split up code into ops instead of splitting it up into regular Python functions? There are a few reasons:
    • Dagster can execute sets of ops without executing the entire job. This means that, if we hit a failure in our job, we can re-run just the steps that didn't complete successfully, which often allows us to avoid re-executing expensive steps.
    • Parallelism - When two ops don't depend on each other, Dagster can execute them simultaneously.
    • Dagster can materialize the output of an op to persistent storage. I/O managers let us separate business logic from I/O, which lets us write code that's more testable and portable across environments.
    • "When using asset-based data pipelines (as opposed to when using ops and graphs), we recommend having a jobs.py file that imports the assets, partitions, sensors, etc. to build each job."
  • Re-execution in Dagster
    • Imagine a machine learning job with three ops. The first op, training the model, is the most time and resource intensive. Then, we test the model, and build analyses on the results.
  • Structuring your Dagster project
    • Example file tree
    • "use load_assets_from_package_module in the top-level definitions to load them, rather than needing to manually add assets to the top-level definitions every time you define one."
    • "A definition can be an asset, a job, a schedule, a sensor, or a resource."
  • $DAGSTER_HOME/dagster.yaml
    • "In persistent Dagster deployments, you'll typically want to configure many of the components on the instance. For example, you may want to use a Postgres instance to store runs and the corresponding event logs, but stream compute logs to an Amazon S3 bucket."
  • workspace.yaml
    • "Defines multiple code locations for local development or deploying to your infrastructure. Refer to the Workspace file documentation for more info and available options."
    • "We don't recommend over-abstracting too early; in most cases, one code location should be sufficient. To include multiple code locations in a single project, you'll need to add a configuration file (workspace.yaml) to your project: If developing locally or deploying to your infrastructure, add a workspace.yaml file to the root of your project."
  • Fully Featured Example Project
    • "this project consists of three asset groups... partitioned by hour and updated every hour"
    • "different implementations of resources can be used depending on the environment... same interface but different implementations [APIClient, SubsampleAPIClient, SnapshotClient]... helps separate the business logic in code from environments"
    • Assigning assets to groups
      • "two ways to assign assets to groups: (1) specifying a group name on an individual asset (2) using the group_name argument when calling load_assets_from_package_module... @asset(group_name="cereal_assets")"
  • Defining metadata and tags
    • https://docs.dagster.io/concepts/metadata-tags
    • "Dagster offers several ways to provide useful information and documentation alongside your data pipelines, including metadata and tagging. For example, you can attach metadata to an asset that calculates how many records are processed during each run and then view the data as a plot in the Dagster UI!"
  • Retrying failed assets
    • "If an exception occurs during asset execution, you can use a RetryPolicy to automatically retry the asset within the same run." (kws: exponential backoff)
  • Subsetting graph-backed assets
    • "Because Dagster flattens each op graph into a flat input/output mapping between ops under the hood, any op that produces an output of the graph must be structured to yield ['selectively returned'] its outputs optionally, enabling the outputs to be returned independently... must structure our code to conditionally yield its outputs"
    • @op(out={"foo_1": Out(is_required=False), "foo_2": Out(is_required=False)})
      def foo(context: OpExecutionContext, bar_1):
          # Selectively returns outputs based on selected assets
          if "foo_1" in context.selected_output_names:
              yield Output(bar_1 + 1, output_name="foo_1")
          if "foo_2" in context.selected_output_names:
              yield Output(bar_1 + 2, output_name="foo_2")
      
  • Multi-Assets
    • "When working with asset definitions, it's sometimes inconvenient or impossible to map each persisted asset to a unique op or graph. A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once"
    • "the underlying op will have multiple outputs -- one for each associated asset."
    • Conditional materialization
      • "In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the skippable parameter along with yield syntax and MaterializeResult to implement this behavior... Asset sensors monitoring the asset will not trigger"
      • "Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the skippable attribute of the asset specs to True, and set the can_subset parameter of the decorator to True."
  • Asset jobs
    • "An asset job is a type of job that targets a selection of assets and can be launched:
      • Manually from the Dagster UI
      • At fixed intervals, by schedules
      • When external changes occur, using sensors"
    • Conversion from asset to job (or a job wrapper around an asset): "To create an asset job, use define_asset_job. An asset-based job is based on the assets the job targets and their dependencies."
  • Observable source assets
    • Subject-Observer Pattern for assets
    • Defined with @op, just like assets. In a way these observations should be thought of as assets themselves(?)
    • "When an asset is observed to have a newer data version than the data version it had when a downstream asset was materialized, then the downstream asset will be given a label in the Dagster UI that indicates that upstream data has changed."
      • a.k.a. a dirty bit
  • Asset checks
    • "tests some property of a data asset, such as:
      • Ensuring a particular column, like an ID, doesn't contain null values [a.k.a. validation]
      • Verifying that a tabular asset adheres to a specified schema [a.k.a. validation]
      • If the asset's data is in need of a refresh"
    • This means that assets must be the actual files that have been downloaded, their destination paths + destination filenames, but the latter might not be known until a download has occurred if, for example, the filename is obtained via some wildcard at the source. Perhaps the log could be used to infer these filenames?
    • "Asset check results can also be used to create conditional steps in your pipelines - for example, if a quality check fails, execution can be halted to prevent issues spreading downstream."
      • => So then we should deliver every file, even if validation fails, but then only trigger downstream processing in the case where it succeeds.
    • Built-in asset checks
      • e.g. build_column_schema_change_checks - Builds asset checks that pass if an asset's columns are the same, compared with its prior materialization
    • "Checks are currently only supported per-asset, not per-partition. See this issue for updates."
  • External Assets
    • "Attach metadata to asset definitions for documentation, tracking ownership, and so on"
  • Automation
    • Schedules
    • Sensors
    • Declarative Automation (experimental) - materialize when arbitrary criteria are met
    • Asset Sensors (experimental) - launch when a specified asset is materialized
  • Constructing schedules for partitioned assets and jobs
    • "[You can] use build_schedule_from_partitioned_job, which will build a schedule with a cadence that matches the spacing of the partitions in the asset or job."
    • "Using @schedule allows for more flexibility in determining which partitions should be run by the schedule, rather than using build_schedule_from_partitioned_job which automatically creates the schedule based on the partitioned config."
    • "This example demonstrates how to manually construct a schedule for a job with a static (not time-based) partition"
      @schedule(cron_schedule="0 0 * * *", job=continent_job)
      def continent_schedule():
          for c in CONTINENTS:
              yield RunRequest(run_key=c, partition_key=c)
      
    • "If you need to customize the ending, or most recent partition in a set, use the end_offset parameter in the partition's config. Generally, the calculation for end_offset can be expressed as: current_date - 1 type_of_partition + end_offset"
      • a.k.a. lookback_days & date macro math (which is more complex than Dagster can provide given calendars)
  • Schedules
    • "For schedules that utilize resources, you can provide the resources when invoking the schedule function."
    • A resource might be a database, as defined by the DATABASE_URL environment variable?
    • Here's how to test a schedule given a particular date using build_schedule_context.
      context = build_schedule_context(
          scheduled_execution_time=datetime.datetime(2020, 1, 1)
      )
      
  • Sensors
    • "A sensor defines an evaluation function that returns either: one or more RunRequest objects or an optional SkipReason.
    • "@sensor - The decorator used to define a sensor. The decorated function is called the sensor's evaluation function. The decorator returns a SensorDefinition."
    • Idempotence and cursors
      • "When instigating runs based on external events, you usually want to launch exactly one run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: run_key or cursor."
      • yield RunRequest(
            run_key=filename,
            run_config={"ops": {"process_file": {"config": {"filename": filename}}}},
        )
        
        "The result is exactly one run per file."
    • "If a sensor evaluation function takes more than 60 seconds to return its results, the sensor evaluation will time out and the Dagster daemon will move on to the next sensor without submitting any runs. This 60 second timeout only applies to the time it takes to run the sensor function, not to the execution time of the runs submitted by the sensor. To avoid timeouts, slower sensors can break up their work into chunks, using cursors to let subsequent sensor calls pick up where the previous call left off"
      • => Cannot scan and download and upload in the same sensor => Sensor must detect files and create a RunRequest to download and upload them.
    • "Dagster's resources system can be used with sensors to make it easier to call out to external systems and to make components of a sensor easier to plug in for testing purposes."
      • Might a "resource" be an abstract_data_conn instance?
    • "Here is an example of a sensor that reports job success in a Slack message"
      • slack_client.chat_postMessage(....
  • Declarative Automation
    • "Using Declarative Automation helps you:
      • Ensure you're working with the most up-to-date data
      • Optimize resource usage by only materializing assets or executing checks when needed
      • Precisely define when specific assets should be updated based on the state of other assets"
    • "AutomationCondition.on_missing() - This condition will materialize an asset if all its dependencies have been updated, but the asset itself has not. Filling in partitioned assets as soon as upstream data is available."
    • "Operands are base conditions which can be true or false about a given target."
      • e.g. "AutomationCondition.missing - Target has not been executed"
      • "By default, AutomationCondition.eager() will only update the latest time partition of an asset."
    • "You can create your own subclass of AutomationCondition, defining the evaluate() method. For example, imagine you want to avoid executing anything on a company holiday."
  • Asset sensors
    • "Asset sensors allow you to instigate runs when materializations occur."

Questions

  1. Is it common/precedented to build @assets and @jobs dynamically, from a configuration database, for example?
  • This was answered in a Data Platform Week Q&A: Is it common/recommended to have factor methods for computing many assets at once. The answer was "yes." It's especially useful if recreating identical/similar assets over and over. These are otherwise referred to as Asset Factories.
  1. How to poll a data source waiting on an @asset to arrive?
  • A: @sensors
  1. How to put a @job definition in the definitions.py?
  2. How to auto-heal/recover from a Dagster or other upstream outage?
  3. How to schedule a monitor (@sensor?) on a @job or @asset that must be generated by a particular time?
  • "When we build sensors, they are considered policies for when to trigger a particular job."
  • "Note: Certain sensors, like run status sensors, can listen to multiple jobs and do not trigger a job. We recommend keeping these sensors in the definition as they are often for alerting and monitoring at the code location level."
  1. What if asset names change each day, e.g. w/ a date?
  • "partitioned assets" - e.g. hourly or daily or a daily downstream asset based on an hourly upstream one
  • Maybe asset wildcards can be twice partitioned, once per date and again by wildcard? So the asset then is truly just the tb_files entry?
    • "A common use is for each partition to represent all the records in a data set that fall within a particular time window, e.g. hourly, daily or monthly. Alternatively, each partition can represent a region, a customer, an experiment - any dimension along which you want to be able to materialize and monitor independently. An asset can also be partitioned along multiple dimensions, e.g. by region and by hour."
  • "Default partition dependency rules can be overridden by providing a PartitionMapping when specifying a dependency on an asset. How this is accomplished depends on the type of dependency the asset has - refer to the following tabs for more info."
    • i.e. dependencies with time offsets; e.g. an equity return depends on 2 days of prices (and 1 day of cax)
  • "Date-partitioned job - With the job above, it's possible to supply any value for the date param. This means if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates."
  1. If I have a pre-existing app that goes to a FTP server, looks for a set of files, and downloads any that are available, how should I model those in Dagster? The files will become available on a daily basis but at different times of day.
  2. What if I don't know the filenames that will be downloaded in advance? Can these still be tracked as assets? Should I always be using assets or are jobDs sufficient in many use-cases?
  3. Can an @op or a @job output an asset that is determined dynamically at run-time?
  • Logging an AssetObservation from an Op
    @op
    def observation_op(context: OpExecutionContext):
        df = read_df()
        context.log_event(AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)}))
        return 5
    
  • "Dynamically partitioned assets - Sometimes you don't know the set of partitions ahead of time when defining assets. For example, maybe you want to add a new partition every time a new data file lands in a directory or every time you want to experiment with a new set of hyperparameters. In these cases, you can use a DynamicPartitionsDefinition."
    @sensor(job=images_job)
    def image_sensor(context: SensorEvaluationContext):
        new_images = [
            img_filename
            for img_filename in os.listdir(os.getenv("MY_DIRECTORY"))
            if not images_partitions_def.has_partition_key(
                img_filename, dynamic_partitions_store=context.instance
            )
        ]
    
        return SensorResult(
            run_requests=[RunRequest(partition_key=img_filename) for img_filename in new_images],
            dynamic_partitions_requests=[images_partitions_def.build_add_request(new_images)],
    )
    
  1. What if a previous run of a scheduled job is still running, what happens?
  2. Possible to schedule static gaps between runs of a job? Rather than scheduling the time between starts?
  • "By default, the Dagster daemon runs a sensor 30 seconds after that sensor's previous evaluation finishes executing. You can configure the interval using the minimum_interval_seconds argument on the @sensor decorator."
  1. What ensures that a dagster derived asset only accesses other assets listed explicitly as dependencies?