Project Github

https://github.com/dagster-io/dagster

Dagster origin

I like this post so much https://medium.com/dagster-io/introducing-dagster-dbd28442b2b7

This leaves data applications in a state where computations are written in a wide variety of tools and languages, but without an integration layer to describe the meaning of those computations or the relationships between them. As a result, massive amounts of metadata and context is often lost as data is flowed from tool to tool, and there is no standard for interacting with the computations crafted within those tools. Up until now, the focus of higher level tools has been the mechanics of the physical orchestration of those computations — ensuring correct ordering, retries, and so forth — and not the semantic meaning of those computations. This is the gap that Dagster is designed to fill.

Description

From my reading, keywords are

  • schedule / orchestration
  • integration with major cloud providers
  • integration with major ETL / ML frameworks

Dagster is a data orchestrator for machine learning, analytics, and ETL

Dagster lets you define pipelines in terms of the data flow between reusable, logical components, then test locally and run anywhere. With a unified view of pipelines and the assets they produce, Dagster can schedule and orchestrate Pandas, Spark, SQL, or anything else that Python can invoke.

Dagster is designed for data platform engineers, data engineers, and full-stack data scientists. Building a data platform with Dagster makes your stakeholders more independent and your systems more robust. Developing data pipelines with Dagster makes testing easier and deploying faster.

demo

Integration Dagster Library
Apache Airflow dagster-airflow</a>
Allows Dagster pipelines to be scheduled and executed, either containerized or uncontainerized, as
Apache Airflow DAGs.
Apache Spark dagster-spark</a> · dagster-pyspark</a>
Libraries for interacting with Apache Spark and PySpark.
Dask dagster-dask</a>
Provides a Dagster integration with Dask / Dask.Distributed.
Datadog dagster-datadog</a>
Provides a Dagster resource for publishing metrics to Datadog.
 /  Jupyter / Papermill dagstermill</a>
Built on the
papermill library, dagstermill is meant for integrating productionized Jupyter notebooks into dagster pipelines.
PagerDuty dagster-pagerduty</a>
A library for creating PagerDuty alerts from Dagster workflows.
Snowflake dagster-snowflake</a>
A library for interacting with the Snowflake Data Warehouse.
Cloud Providers
AWS dagster-aws</a>
A library for interacting with Amazon Web Services. Provides integrations with Cloudwatch, S3, EMR, and Redshift.
Azure dagster-azure</a>
A library for interacting with Microsoft Azure.
GCP dagster-gcp</a>
A library for interacting with Google Cloud Platform. Provides integrations with GCS, BigQuery, and Cloud Dataproc.

Show case - cli as example

$ dagster pipeline execute -f hello_dagster.py

preset.py

Args:
    name (str): The name of this preset. Must be unique in the presets defined on a given
        pipeline.
    run_config (Optional[dict]): A dict representing the config to set with the preset.
        This is equivalent to the ``run_config`` argument to :py:func:`execute_pipeline`.
    solid_selection (Optional[List[str]]): A list of solid subselection (including single
        solid names) to execute with the preset. e.g. ``['*some_solid+', 'other_solid']``
    mode (Optional[str]): The mode to apply when executing this preset. (default: 'default')
    tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset.

0. Setup IDE

If you are using IntelliJ, mark dagster as root source

1. API execute_pipeline()

Core module, api.py (dagster.core.definitions.execution)

def execute_pipeline(
    pipeline: Union[PipelineDefinition, IPipeline],
    run_config: Optional[dict] = None,
    mode: Optional[str] = None,
    preset: Optional[str] = None,
    tags: Optional[Dict[str, Any]] = None,
    solid_selection: Optional[List[str]] = None,
    instance: Optional[DagsterInstance] = None,
    raise_on_error: bool = True,
) -> PipelineExecutionResult:

hidden method

def _logged_execute_pipeline(
    pipeline: Union[IPipeline, PipelineDefinition],
    instance: DagsterInstance,
    run_config: Optional[dict] = None,
    mode: Optional[str] = None,
    preset: Optional[str] = None,
    tags: Optional[Dict[str, Any]] = None,
    solid_selection: Optional[List[str]] = None,
    raise_on_error: bool = True,
) -> PipelineExecutionResult:

here it’s calling

  • create_run_for_pipeline()
  • execute_run()

2. create_run_for_pipeline()

2.1 If no plan, then create one, so here calling another API method create_execution_plan

create_execution_plan( pipeline_def, run_config=run_config, mode=mode, )

and will return a plan

return ExecutionPlan.build(
    pipeline, environment_config, mode=mode, step_keys_to_execute=step_keys_to_execute
)

the build() is instantiate a ExecutionPlan class

    full_plan = ExecutionPlan(
        self.pipeline,
        {step.handle: step for step in self._steps.values()},
        [step.handle for step in self._steps.values()],
        self.environment_config,
    )

some key parameters of a execution plan, includng DAG of steps

class ExecutionPlan(
    NamedTuple(
        "_ExecutionPlan",
        [
            ("pipeline", IPipeline),
            ("step_dict", Dict[StepHandleUnion, ExecutionStepUnion]),
            ("executable_map", Dict[str, Union[StepHandle, ResolvedFromDynamicStepHandle]]),
            ("resolvable_map", Dict[str, List[UnresolvedStepHandle]]),
            ("step_handles_to_execute", List[StepHandleUnion]),
            ("environment_config", EnvironmentConfig),
        ],
    )

2.2 create_run() after a plan is ready

    def create_run(
        self,
        pipeline_name,
        run_id,
        run_config,
        mode,
        solids_to_execute,
        step_keys_to_execute,
        status,
        tags,
        root_run_id,
        parent_run_id,
        pipeline_snapshot,
        execution_plan_snapshot,
        parent_pipeline_snapshot,
        solid_selection=None,
        external_pipeline_origin=None,
    ):

and after adding some snapshot (@todo: dig deeper on this if needed) it’s returning a PipelineRun class

class PipelineRun(
    namedtuple(
        "_PipelineRun",
        (
            "pipeline_name run_id run_config mode solid_selection solids_to_execute "
            "step_keys_to_execute status tags root_run_id parent_run_id "
            "pipeline_snapshot_id execution_plan_snapshot_id external_pipeline_origin"
        ),
    )
):

from above code, basically holding some meta data , for later history tracking (? maybe)

3. execute_run()

def execute_run(
    pipeline: IPipeline,
    pipeline_run: PipelineRun,
    instance: DagsterInstance,
    raise_on_error: bool = False,
) -> PipelineExecutionResult:

3.1 DagsterInstance

here I want to know more about its physical plan on instance, so check class DagsterInstance, from docs, it’s reading from yaml to get metadata about storage and other resources.

constructor as below

    def __init__(
        self,
        instance_type,
        local_artifact_storage,
        run_storage,
        event_storage,
        compute_log_manager,
        schedule_storage=None,
        scheduler=None,
        run_coordinator=None,
        run_launcher=None,
        settings=None,
        ref=None,
    ):

example on compute log storage, basically stored in database cs

core classes here

  • Scheduler
    • DagsterDaemonScheduler
    • SystemCronScheduler
  • RunCoordinator
    • Queue
    • default
    • mocked
  • RunLauncher (multiple launch mode)

3.2 execution

Utility class to consolidate execution logic

    _execute_run_iterable = ExecuteRunWithPlanIterable(
        execution_plan=execution_plan,
        iterator=pipeline_execution_iterator,
        execution_context_manager=PipelineExecutionContextManager(
            execution_plan=execution_plan,
            pipeline_run=pipeline_run,
            instance=instance,
            run_config=pipeline_run.run_config,
            raise_on_error=raise_on_error,
        ),
    )

here the core class is ExecutionContextManager and EventGenerationManager

and finally feed result in PipelineExecutionResult

Reference