This one simple trick lets you stream with dbt and Databricks

Are you using Databricks and want to use dbt, but you're not sure how to do it with all of your big data? Like, dbt can't seem to stream, and you need to stream because anything else is too expensive or too much work or too flaky?

I've used this pattern and seen it used across several teams of data people reading through petabytes-ish of data.

dbt doesn't natively support streaming, unless you use Databricks streaming tables or materialized views, which I don't really like to do because they have always failed me eventually. They start out great, but as soon as something happens that you want to debug, or you need to rename your table or something, then you're SOL. So I don't use them - I use plain old streaming. (No offense to any Databricks folks)

Some people might not realize that when your dbt python notebook code gets run, it's really just uploaded as a notebook. The default dbt function (model()), takes the result and feeds it into a table.

SO since it's just a notebook, you can stream...it's just a matter of getting dbt to not overwrite your existing table's data, and checkpointing, making sure you get alerts if things fail, setting timeouts, etc. And that turns out to be pretty simple.

So the one simple trick is....hey, have you considered supplements?

Create an empty dataframe with the schema you want, and have model() return that empty dataframe so that NOTHING is appended to your table.

Use a workflow_job because it breaks you out of a dbt sandbox and lets you use everything from Databricks jobs. Otherwise they jam you into these one-time runs. And because, shameless plug, I wrote most of it, though as an act of frustration, and with a bunch of help from Databricks (shoutout Ben).

Is that just a wall of text? I hope not. I'll add fun memes one day. Until then, check out this wall of code.

This is a simple example. It streams a log file table and appends new entries to a new table. Substitute log file for telemetry or other things you might be interested in. I'm going to be honest - I haven't run this exact code, so let me know if anything is wrong.

# COMMAND ----------

# MAGIC %pip install Pillow

# Just an example for pip installing...

dbutils.library.restartPython()

# COMMAND ----------

import pyspark.sql.types as T  
import pyspark.sql.functions as F


def get_or_create_checkpoint_location(dbt):  
    """Create checkpoint location for streaming query."""
    create_volume_query = f"CREATE VOLUME IF NOT EXISTS {dbt.this.database}.{dbt.this.schema}.checkpoints"
    print("Create volume query", create_volume_query)
    spark.sql(create_volume_query)
    return f"/Volumes/{dbt.this.database}/{dbt.this.schema}/checkpoints/{dbt.this.identifier}"

def run_stream(dbt):  
    """Run streaming query to process new log files."""
    checkpoint_location = get_or_create_checkpoint_location(dbt)
    output_location = str(dbt.this)

    log_files_df = (spark.readStream
      .format('delta')
      .option('ignoreChanges', 'true')
      .option('ignoreDeletes', 'true')
      .table('my_catalog.schema.log_files')
    )

    write_stream = (
      log_files_df.writeStream.format("delta")
      .outputMode('append')
      .option("checkpointLocation", checkpoint_location)
      .option("mergeSchema", "true")
      .trigger(availableNow = True)
      .start(output_location)
    )

def model(dbt, session):  
    """Main entry point for dbt model."""
    dbt.config(
        materialized='incremental',
        submission_method='workflow_job'
    )

    # Define output schema
    output_schema = T.StructType([
        T.StructField('log_entry_id', T.StringType(), False),
        T.StructField('log_file_id', T.IntegerType(), False),
    ])

    df = spark.createDataFrame(data=spark.sparkContext.emptyRDD(), schema=output_schema)

    if not dbt.is_incremental:
        # Create table if it doesn't exist
        df.write.saveAsTable(str(dbt.this), mode='append')

    # Run streaming query
    run_stream(dbt)

    return df

The yaml is going to be something like:

version: 2

models:  
  - name: int_device_logs

    config:
#      cluster_id: afsfs-1232819-dsfbkjbs1  # Use this when developing
      python_job_config:
        timeout_seconds: 3600
        email_notifications: { on_failure: ["me@example.com"] }
        max_retries: 2

        name: my_workflow_name

        # Override settings for your model's dbt task. For instance, you can
        # change the task key
        additional_task_settings: { "task_key": "my_dbt_task" }

        # Define tasks to run before/after the model
        # This example assumes you have already uploaded a notebook to /my_notebook_path to perform optimize and vacuum
        post_hook_tasks:
          [
            {
              "depends_on": [{ "task_key": "my_dbt_task" }],
              "task_key": "OPTIMIZE_AND_VACUUM",
              "notebook_task":
                { "notebook_path": "/my_notebook_path", "source": "WORKSPACE" },
            },
          ]

        # Simplified structure, rather than having to specify permission separately for each user
        grants:
          view: [{ "group_name": "marketing-team" }]
          run: [{ "user_name": "other_user@example.com" }]
          manage: []
      job_cluster_config:
        spark_version: "16.4.x-scala2.12"
        node_type_id: "c2-standard-4"
        runtime_engine: "STANDARD"
        data_security_mode: "SINGLE_USER"
        single_user_name: "aghodsi@databricks.com"
        autoscale: { "min_workers": 1, "max_workers": 1 }

You can override pretty much anything in a job here.

If you want to pip install a package in your notebook, use # MAGIC %pip install Pillow or whatever. If you go with the regular %pip install my-package, the arbitrarily vindictive dbt python parser is going to get you.

There are some fine people at dbt, but my feelings on the company are for another post - especially now that they are safely with fivetran instead of (insolvently?) independent.