Project Github
https://github.com/dagster-io/dagster
Dagster origin
I like this post so much https://medium.com/dagster-io/introducing-dagster-dbd28442b2b7
This leaves data applications in a state where computations are written in a wide variety of tools and languages, but without an integration layer to describe the meaning of those computations or the relationships between them. As a result, massive amounts of metadata and context is often lost as data is flowed from tool to tool, and there is no standard for interacting with the computations crafted within those tools. Up until now, the focus of higher level tools has been the mechanics of the physical orchestration of those computations — ensuring correct ordering, retries, and so forth — and not the semantic meaning of those computations. This is the gap that Dagster is designed to fill.
Description
From my reading, keywords are
- schedule / orchestration
- integration with major cloud providers
- integration with major ETL / ML frameworks
Dagster is a data orchestrator for machine learning, analytics, and ETL
Dagster lets you define pipelines in terms of the data flow between reusable, logical components, then test locally and run anywhere. With a unified view of pipelines and the assets they produce, Dagster can schedule and orchestrate Pandas, Spark, SQL, or anything else that Python can invoke.
Dagster is designed for data platform engineers, data engineers, and full-stack data scientists. Building a data platform with Dagster makes your stakeholders more independent and your systems more robust. Developing data pipelines with Dagster makes testing easier and deploying faster.
Integration | Dagster Library | |
Apache Airflow | dagster-airflow</a> Allows Dagster pipelines to be scheduled and executed, either containerized or uncontainerized, as Apache Airflow DAGs. |
|
Apache Spark | dagster-spark</a> · dagster-pyspark</a>
Libraries for interacting with Apache Spark and PySpark. |
|
Dask | dagster-dask</a>
Provides a Dagster integration with Dask / Dask.Distributed. |
|
Datadog | dagster-datadog</a>
Provides a Dagster resource for publishing metrics to Datadog. |
|
/ | Jupyter / Papermill | dagstermill</a> Built on the papermill library, dagstermill is meant for integrating productionized Jupyter notebooks into dagster pipelines. |
PagerDuty | dagster-pagerduty</a>
A library for creating PagerDuty alerts from Dagster workflows. |
|
Snowflake | dagster-snowflake</a>
A library for interacting with the Snowflake Data Warehouse. |
|
Cloud Providers | ||
AWS | dagster-aws</a>
A library for interacting with Amazon Web Services. Provides integrations with Cloudwatch, S3, EMR, and Redshift. |
|
Azure | dagster-azure</a>
A library for interacting with Microsoft Azure. |
|
GCP | dagster-gcp</a>
A library for interacting with Google Cloud Platform. Provides integrations with GCS, BigQuery, and Cloud Dataproc. |
Show case - cli as example
$ dagster pipeline execute -f hello_dagster.py
preset.py
Args:
name (str): The name of this preset. Must be unique in the presets defined on a given
pipeline.
run_config (Optional[dict]): A dict representing the config to set with the preset.
This is equivalent to the ``run_config`` argument to :py:func:`execute_pipeline`.
solid_selection (Optional[List[str]]): A list of solid subselection (including single
solid names) to execute with the preset. e.g. ``['*some_solid+', 'other_solid']``
mode (Optional[str]): The mode to apply when executing this preset. (default: 'default')
tags (Optional[Dict[str, Any]]): The tags to apply when executing this preset.
0. Setup IDE
If you are using IntelliJ, mark dagster
as root source
1. API execute_pipeline()
Core module, api.py (dagster.core.definitions.execution)
def execute_pipeline(
pipeline: Union[PipelineDefinition, IPipeline],
run_config: Optional[dict] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
solid_selection: Optional[List[str]] = None,
instance: Optional[DagsterInstance] = None,
raise_on_error: bool = True,
) -> PipelineExecutionResult:
hidden method
def _logged_execute_pipeline(
pipeline: Union[IPipeline, PipelineDefinition],
instance: DagsterInstance,
run_config: Optional[dict] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
solid_selection: Optional[List[str]] = None,
raise_on_error: bool = True,
) -> PipelineExecutionResult:
here it’s calling
- create_run_for_pipeline()
- execute_run()
2. create_run_for_pipeline()
2.1 If no plan, then create one, so here calling another API method create_execution_plan
create_execution_plan( pipeline_def, run_config=run_config, mode=mode, )
and will return a plan
return ExecutionPlan.build(
pipeline, environment_config, mode=mode, step_keys_to_execute=step_keys_to_execute
)
the build() is instantiate a ExecutionPlan
class
full_plan = ExecutionPlan(
self.pipeline,
{step.handle: step for step in self._steps.values()},
[step.handle for step in self._steps.values()],
self.environment_config,
)
some key parameters of a execution plan, includng DAG of steps
class ExecutionPlan(
NamedTuple(
"_ExecutionPlan",
[
("pipeline", IPipeline),
("step_dict", Dict[StepHandleUnion, ExecutionStepUnion]),
("executable_map", Dict[str, Union[StepHandle, ResolvedFromDynamicStepHandle]]),
("resolvable_map", Dict[str, List[UnresolvedStepHandle]]),
("step_handles_to_execute", List[StepHandleUnion]),
("environment_config", EnvironmentConfig),
],
)
2.2 create_run() after a plan is ready
def create_run(
self,
pipeline_name,
run_id,
run_config,
mode,
solids_to_execute,
step_keys_to_execute,
status,
tags,
root_run_id,
parent_run_id,
pipeline_snapshot,
execution_plan_snapshot,
parent_pipeline_snapshot,
solid_selection=None,
external_pipeline_origin=None,
):
and after adding some snapshot (@todo: dig deeper on this if needed) it’s returning a PipelineRun
class
class PipelineRun(
namedtuple(
"_PipelineRun",
(
"pipeline_name run_id run_config mode solid_selection solids_to_execute "
"step_keys_to_execute status tags root_run_id parent_run_id "
"pipeline_snapshot_id execution_plan_snapshot_id external_pipeline_origin"
),
)
):
from above code, basically holding some meta data , for later history tracking (? maybe)
3. execute_run()
def execute_run(
pipeline: IPipeline,
pipeline_run: PipelineRun,
instance: DagsterInstance,
raise_on_error: bool = False,
) -> PipelineExecutionResult:
3.1 DagsterInstance
here I want to know more about its physical plan on instance, so check class DagsterInstance
, from docs, it’s reading from yaml to get metadata about storage and other resources.
constructor as below
def __init__(
self,
instance_type,
local_artifact_storage,
run_storage,
event_storage,
compute_log_manager,
schedule_storage=None,
scheduler=None,
run_coordinator=None,
run_launcher=None,
settings=None,
ref=None,
):
example on compute log storage, basically stored in database
core classes here
- Scheduler
- DagsterDaemonScheduler
- SystemCronScheduler
- RunCoordinator
- Queue
- default
- mocked
- RunLauncher (multiple launch mode)
3.2 execution
Utility class to consolidate execution logic
_execute_run_iterable = ExecuteRunWithPlanIterable(
execution_plan=execution_plan,
iterator=pipeline_execution_iterator,
execution_context_manager=PipelineExecutionContextManager(
execution_plan=execution_plan,
pipeline_run=pipeline_run,
instance=instance,
run_config=pipeline_run.run_config,
raise_on_error=raise_on_error,
),
)
here the core class is ExecutionContextManager
and EventGenerationManager
and finally feed result in PipelineExecutionResult