Simplifying Dagster Codebases with Dagstd

Boilerplate no more.

Photo by Diane Picchiottino on Unsplash

If you’ve worked in the data engineering space for any amount of time, you’ve probably needed to use some sort of orchestration or pipelining to create fully automatic, reliable workflows. The vast majority of companies achieve this using Apache Airflow. Airflow is incredibly powerful and well-supported, but has a steep learning curve and a fair amount of complexity.

For this reason, many have recently been turning to more modern Airflow alternatives such as Prefect and Dagster to build their data pipelines quickly and efficiently. They offer much simpler APIs and powerful web UIs to manage and monitor your automations, and a reasonable number of integrations to simplify common data tasks.

Personally, when leading the data orchestration project at my company and after evaluating a few of the different alternatives, we chose Dagster, and it now manages all of our data automation both internally and for external reporting. Dagster is a great tool, but there are occasions where you just need to pass in a simple integer or string as input to an op, but in Dagster, inputs to ops can only be outputs of other ops. This results in a lot of boilerplate functions being written that just return a formatted string or even just an integer. This is why Dagstd was created.

Dagstd is a set of common ops and functions for use with Dagster, and it also includes a Sphinx Autodoc extension to allow ops to be picked up and documented automatically, which has saved me so much time in the long run.

Dagstd Ops

At the moment (as of v0.1.1), Dagstd contains:

Dagstd is, of course, installable via pip:

pip install dagstd

It was built with dagster==0.14.17, but should work with later versions too.

Here’s an example of a pure-Dagster graph that downloads a daily ZIP file and extracts a known file name. Note: the download_large_file op has been omitted for brevity.

import zipfile

from datetime import datetime

from dagster import op, job


@op
def get_todays_date() -> str:
    return datetime.today().strftime()


@op
def five() -> int:
    return 5


@op
def get_download_file_url(date: str) -> str:
    return f'https://example.com/{date}.csv'


@op
def get_nth_file_name(n: int) -> str:
    return f'file_{n:02}.txt'


@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
    with zipfile.ZipFile(zip_path) as zip_file:
        with(f'/tmp/{file_name}', 'wb') as f:
            f.write(zip_file.read(file_name))
        context.log.info(f'Extracted {file_name} from {zip_path}')
        return f'/tmp/{file_name}'


@job
def process_data():
    date = get_todays_date()
    url = get_download_file_url(date)
    zip_path = download_large_file(url)

    file_name = get_nth_file_name(five())
    file_path = extract_file_from_zip(zip_path, file_name)

And here’s the same graph, with the same functionality, but with Dagstd ops.

import zipfile

from datetime import datetime

from dagster import op, job
from dagstd.constants import Constant, Five
from dagstd.operations import fmt


@op
def get_todays_date_string() -> str:
    return datetime.today().strftime("%Y-%m-%d")


@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
    with zipfile.ZipFile(zip_path) as zip_file:
        with(f'/tmp/{file_name}', 'wb') as f:
            f.write(zip_file.read(file_name))
        context.log.info(f'Extracted {file_name} from {zip_path}')
        return f'/tmp/{file_name}'


@job
def process_data():
    date = get_todays_date_string()
    url = fmt(Constant('https://example.com/{}.csv'), [date])
    zip_path = download_large_file(url)

    file_name = fmt(Constant('file_{}.txt'), [Five()])
    file_path = extract_file_from_zip(zip_path, file_name)

This is a small example, so the savings probably aren’t huge, but it serves how much boilerplate can be avoided when using Dagstd.

Sphinx Autodoc Plugin

Dagstd includes a Sphinx autodoc plugin that can be used to generate documentation for Dagster ops. To use the autodoc plugin, add the following to your conf.py file:

extensions = [
 ...
 'dagstd.sphinx.parser',
]

By default, this will prefix all op documentation with (op). To change this, add the following to your conf.py file:

dagstd_op_prefix = 'My Op'

You can see an example of this in action in the Dagstd documentation.

Contribute

Dagstd is brand new, and I’m not an expert in Dagster either, so there are definitely some more common ops that I could include in the library! If you’ve got any that you find yourself having to write a lot in your Dagster codebases, let me know and I’ll add them when I can. The next additions are likely to be ops for working with dates, so please tell me what you would want either by commenting here, or opening an issue or pull request on GitHub.

I hope that at least a few of you will find this package useful, and I’d love to hear your feedback! In the meantime, why not simplify your financial data workflows even further by taking a look at my other open-source Python package, Quiffen? It’s a great tool for working with Quicken Interchange Format (QIF) files. You can read more about it here.

If you want to get in touch, you can either email me at isaac@isaacharrisholt.com or via LinkedIn or Twitter. Happy engineering!