Blog

Supercharging Airflow & dbt with Astronomer Cosmos on Azure Container Instances

18 Jun, 2024
Xebia Background Header Wave

Over the past few years, dbt has become the standard for data transformations. A very typical workflow now is to have some basic data extraction and ingestion take place, after which transformations are done in one or more dbt projects (a typical ELT (Extract-Load-Transform) workflow). Essentially, dbt, as advertised, takes care of the T in ETL/ELT/EtLT. While this works well, a major downside is visibility and operability in your data orchestrator of choice. The naive approach is to "just" run dbt as a single task in your pipeline. This will work, but turns the entire T step in your ETL/ELT/EtLT process into an opaque task, making it harder to understand and debug failures in your data pipelines. Additionally, having only one task makes retrying or re-running parts of your data model impossible.

In this blogpost, we’re going to show how you can turn this opaqueness into transparency by using Astronomer Cosmos to automatically render your dbt project into an Airflow DAG while running dbt on Azure Container Instances.

Introducing Astronomer Cosmos

Astronomer Cosmos is an open-source project created and maintained by Astronomer. Its primary goal is to provide transparency within Airflow of the structure of dbt project, while avoiding any manual configuration or changes to your dbt project.

Image courtesy of Astronomer: www.astronomer.io/images/cosmos/image1.png

Image courtesy of Astronomer: www.astronomer.io/images/cosmos/image1.png

Astronomer Cosmos takes your dbt project and converts it into tasks in an Airflow DAG. Each model will be converted to its own task. Any tests associated with that model will be run in a separate task after the model task completes. It does this by parsing either the dbt project source code or the dbt project manifest.json. You could, of course, do this yourself by using dbt build --select to select specific models or groups of models and creating a task for each of these. That, however, would require some manual work and would also introduce a maintenance burden in order to ensure the Airflow DAG continues to be aligned with the dbt project.

What’s the problem?

Astronomer Cosmos needs a few things to work well out of the box:

  1. An Airflow DAG (and corresponding Airflow setup)
  2. Access to either the dbt project source code, or the dbt project’s manifest.json
  3. A runtime in which the dbt commands are executed.

This last point is the main focus for this blog. In some environments, this could be as simple as using the Kubernetes cluster that Airflow is running on. Tasks would then run in their own pods, which are removed once the each task completes. However, there are also cases where this might not be possible:

  • You might be running Airflow in a constrained Kubernetes environment, e.g. with limited resources or capabilities/permissions for spinning additional pods
  • You might be running Airflow on a Virtual Machine, where you might not want to install dependencies specific to one DAG in the environment used by all DAGs.
  • Your Airflow environment might not have direct access to the target system (e.g. Databricks, PostgreSQL, BigQuery) because of networking or security constraints enforced on the environment Airflow itself runs in.
  • For compliance reasons, you may not be allowed to run dbt itself within a managed Airflow environment outside the control of your company.
  • You might not have access to a private container registry from Airflow directly.

These are just some examples where a runtime for dbt is a not a given, there are sure to be more. In the next section we’ll explain one approach that we used successfully to handle these situations.

Solution: Azure Container Instances

While working on a client project recently, we ran into one of the cases mentioned. In this section we’ll dive into what we did to address the concerns.

Previously Astronomer Cosmos had support for 4 distinct runtimes:

  1. Docker
  2. Kubernetes
  3. Local
  4. Virtualenv

To bridge the gap to Azure we added the option to use Azure Container Instances (ACI) as the runtime for Astronomer Cosmos (docs & PR). Azure Container Instances allow you to run containers on-demand in a dedicated environment. Using them makes our setup look something like this:

In this case, the data we are transforming with dbt is stored in an Azure Database for PostgreSQL instance. We run dbt in a container in ACI, which is created and destroyed using Airflow, based on an image stored in a private Azure Container Registry.

Running in ACI gave us some nice benefits:

  1. We now are using Azure native tooling, making some integrations easier.
  2. The Airflow ACI operators automatically create and destroy the ACI instances to ensure you don’t leave any resources behind once jobs complete.
  3. We have full control over the runtime environment (and can run it locally), as it is a Docker image. We can now use that same Docker image in our CI/CD pipelines.

Putting all the pieces together, we’ll now show you the impact of shifting from a single task dbt setup using Azure Container Instances to an Astronomer Cosmos driven one, also using Azure Container Instances as the runtime.

Let’s take a look at the jaffle shop example. We’re going to assume you want to use ACI as the run-time for dbt anyway. To get this to work, you need some configuration, which both the Cosmos and non-Cosmos approach have in common:

aci_params = {
    "image": "<<< The base image you want to run in ACI >>>",
    "ci_conn_id": "<<< Airflow connection with credentials to create/delete ACI instances in the target resource group >>>",
    "registry_conn_id": "<<< Airflow connection with credentials to pull the image from a container registry >>>",
    "resource_group": "<<< Azure resource group in which the ACI instances are created >>>",
    "name": "astro-aci-{{ ti.task_id.replace('.','-').replace('_','-') }}", # we do some replacement here as <code>_ are not allowed`
    "region": "West Europe",
    "environment_variables": { # Connection configuration for your setup, PostgreSQL is our target in this case.
        "POSTGRES_DATABASE": "{{ conn.aci_db.schema }}",
        "POSTGRES_HOST": "{{ conn.aci_db.host }}",
        "POSTGRES_PASSWORD": "{{ conn.aci_db.password }}",
        "POSTGRES_PORT": "{{ conn.aci_db.port }}",
        "POSTGRES_SCHEMA": "{{ conn.aci_db.schema }}",
        "POSTGRES_USER": "{{ conn.aci_db.login }}",
    },
    "secured_variables": ["POSTGRES_PASSWORD"] # Any variables that you want to pass in but need to obfuscate as they are sensitive.
}

For the non-Cosmos approach, the source code of the DAG is something like this::

with DAG(
        dag_id="jaffle_shop_azure_container_instance",
        start_date=datetime(2022, 11, 27),
        schedule=None,
        catchup=False,
) as dag:

    pre_dbt = EmptyOperator(task_id="pre_dbt")
    dbt = AzureContainerInstancesOperator(task_id="dbt", command=["dbt", "build"], **aci_params)

    post_dbt = EmptyOperator(task_id="post_dbt")

    pre_dbt >> dbt >> post_dbt

which gives you a DAG that looks like this: Again, this works, but it gives you little to no granularity or visibility to understand what part failed, nor the ability to re-run only parts of the dbt build.

If we now switch to using Astronomer Cosmos, with Azure Container Instances as the runtime, we get something like this:

shared_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.AZURE_CONTAINER_INSTANCE,
    dbt_project_path="..."
)

shared_profile_config = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="dev",
    profiles_yml_filepath=DBT_ROOT_PATH / "jaffle_shop" / "profiles.yml"
)

with DAG(
        dag_id="jaffle_shop_azure_container_instance",
        start_date=datetime(2022, 11, 27),
        schedule=None,
        catchup=False,
) as dag:

    pre_dbt = EmptyOperator(task_id="pre_dbt")

    dbt = DbtTaskGroup(
        group_id="dbt",
        project_config=ProjectConfig(
            manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
            project_name="jaffle_shop"),
        execution_config=shared_execution_config,
        operator_args=aci_params,
        profile_config=shared_profile_config,
        default_args={"retries": 2},
    )

    post_dbt = EmptyOperator(task_id="post_dbt")

    pre_dbt >> dbt >> post_dbt

giving us this DAG:

Each task you see will result in a single dbt command being run in an Azure Container Instance. The ACI instance will be created automatically and also removed automatically once the command completes. No modifications were needed at all to the dbt project itself.

One potential down-side of this setup is the overhead incurred by using ACI as the runtime for dbt. Anecdotally, the spin-up time for ACI can be approximately 1 minute. This may vary depending on your Docker image. This overhead is incurred for every task in the dbt/Airflow DAG. This may be prohibitive, but will depend on your situation. If that is the case, you may want to look into setups with faster spin-up times such as Kubernetes (where pod spin-up time can still be a problem, but is less impactful). However, running a Kubernetes cluster requires a lot more maintenance.

Wrapping up

Astronomer Cosmos offers a nice and easy way to get better insight and control when running your dbt code in Airflow. By adding support for Azure Container Instances as a run-time, we have made it easier to integrate with Azure natively, which may be a requirement for your context.

Photo by Lars Kienle on Unsplash

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts