Table Partitioning

You should default to non-partitioned tables for most use cases when working with Delta Lake.

  • Most Delta Lake tables, especially small-to-medium sized data, will not benefit from partitioning.
  • Because partitioning physically separates data files, this approach can result in a small files problem and prevent file compaction, and efficient data skipping.
  • The benefits observed in Hive or HDFS do not translate to Delta Lake, and you should consult with an experienced Delta Lake architect before partitioning tables.

Table Cloning

Table Cloning is a great way to setup tables for testing SQL code while still in development.

  • DEEP CLONE. Copy table metadata + data files.
    • Rerun the command will sync changes from the source to target location.
  • SHALLOW CLONE. Just copy table metadata.

Table Overwriting

We could use CREATE OR REPLACE AS SELECT (CRAS) statements to Complete Overwrites Delta Lake Tables have multiple benefits instead of deleting and recreating tables:

  1. Overwriting a table is much faster because it doesn’t need to list the directory recursively or delete all files.
  2. The old version of the table still exists, and it can easily retrieve the old data using Time Travel.
  3. It’s an atomic operation, meaning concurrent queries can still read the table while you are deleting the table.
  4. Due to ACID transaction guarantees, if overwriting the table fails the table will automatically roll back to its previous state.

INSERT OVERWRITE provides a nearly identical outcome as Complete Overwrite , but is a more “safer” technique for overwriting an existing table, because

  1. It can only overwrite an existing table, not create a new one like CRAS statement
  2. It can only overwrite with new records that match the current table schema, this can help us prevent disrupting downstream consumers
  3. It can overwrite individual partitions.

Performance Optimisation

OPTIMIZE

Delta Lake can improve the speed of read queries from a table. One way to improve this speed is by compacting small files into large ones. You trigger compaction by running the OPTIMIZE command.

VACUUM

Another way is to use VACUUM command to automatically clean up stale files. By default, VACUUM will prevent you from deleting files less than 7 days old, just to ensure that no long-running operations are still referencing any of the files to be deleted. But you can configure Delta Lake with following knobs to enable deleting files less than 7 days.

Delta Live Table

Create a Silver Table from a Bronze Table

In Delta Live Table, you can stream data from other tables in the same pipeline by using the STREAM() function. In this case, we must define a streaming live table using CREATE STREAMING LIVE TABLE statement. Remember to query another LIVE table, we ust always use the LIVE keyword to the table name.

1
2
3
4
5
6
CREATE STREAMING LIVE TABLE sales_silver
AS
SELECT
  store_id, total + tax AS total_after_tax
FROM STREAM(LIVE.sales_bronze)
;

Data Ingestion

Multiple Data Source

If we have scenarios where we need to ingest data from multiple tables, we could continue to use the MERGE INTO operation, but tweak it for Insert-Only Merge for Deduplication.

This optimised command uses the same MERGE INTO syntax but only provide a WHEN NOT MATCHED clause.

Below we use this to confirm that records with the same user_id and event_timestamp aren’t already in the events table.

1
2
3
4
5
MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN
  INSERT *

Single Data Source

COPY INTO command is a good friend for incrementally ingesting data from external system. It’s an idempotent option, and potentially

Data Quality

Quality Enforcement with CONSTRAINT

We can specify certain column-level constraints in Databricks,

For example,

1
ALTER TABLE orders_silver ADD CONSTRAINT timestamp_within_range CHECK (order_timestamp >= '1970-01-01');

ADD CONSTRAINT command will verifies that all existing rows satisfy the constraint before adding it to the table.

General

GLOBAL TEMP VIEW

If there are some relational object that must be used and shared with other data engineers in other sessions on the same cluster. In order to save on storage cost, avoiding copying and storing physical data, the best option to resolve this problem is to use GLOBAL TEMP VIEW in Databricks.

CREATE TABLE USING

When we use CREATE TABLE statement, we can also specify particular data source options. The following are all the data sources that supported by Databricks

  1. TEXT
  2. AVRO
  3. BINARYFILE
  4. CSV
  5. JSON
  6. PARQUET
  7. ORC
  8. DELTA

The following additional file formats to use for the table are supported in the Databricks Runtime

  • JDBC
  • LIBSVM
  • a fully-qualified class name of a custom implementation of org.apache.spark.sql.sources.DataSourceRegister.

The default option is DELTA. For any data_source other than DELTA you must also specify a LOCATION unless the table catalog is hive_metastore.

Auto Loader VS COPY INTO

There are several things to consider when choosing between Auto Loader and COPY INTO command:

  1. Volumes of Data. If you are going to ingest files in the order of thousands, you can use COPY INTO command. If you are expecting files in the order of millions or more over time, the use Auto Loader.
  2. If your data schema is going to evolve frequently, Auto Loader provides better primitives around schema inference and evolution.
  3. Loading a subset of re-uploaded files can be a bit easier to manage with COPY INTO. With Auto Loader, it’s harder to reprocess a select subsets of files. However, you can use COPY INTO to reload the subset of files while an Auto Loader stream is running simultaneously.

SCD Type 2 Table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
%sql

Merge INTO books_silver
USING (
  SELECT updates.book_id AS merge_key, updates.*
  FROM updates

  UNION ALL

  SELECT NULL AS merge_key, updates.*
  FROM updates
  JOIN books_silver ON updates.book_id = books_silver.book_id
  WHERE books_silver.current = true and updates.price <> books_silver.price
) staged_updates
ON books_silver.book_id = merge_key
WHEN MATCHED AND books_silver.current = true and books_silver.price <> staged_updates.price THEN
  UPDATE set current = false, end_date = staged_updates.updated
WHEN NOT MATCHED THEN
  INSERT (book_id, title, author, price, current, effective_date, end_date)
  VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def type2_upsert(microBatchDF, batch):
    microBatchDF.createOrReplaceTempView("updates")

    query = """
Merge INTO books_silver
USING (
  SELECT updates.book_id AS merge_key, updates.*
  FROM updates

  UNION ALL

  SELECT NULL AS merge_key, updates.*
  FROM updates
  JOIN books_silver ON updates.book_id = books_silver.book_id
  WHERE books_silver.current = true and updates.price <> books_silver.price
) staged_updates
ON books_silver.book_id = merge_key
WHEN MATCHED AND books_silver.current = true and books_silver.price <> staged_updates.price THEN
  UPDATE set current = false, end_date = staged_updates.updated
WHEN NOT MATCHED THEN
  INSERT (book_id, title, author, price, current, effective_date, end_date)
  VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
    """

    microBatchDF.sparkSession.sql(query)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from pyspark.sql import functions as F

def process_books():
    schema = "book_id string, title string, author string, price double, updated timestamp"

    query = (spark.readStream
                    .table("bronze")
                    .filter("topic='books'")
                    .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                    .select("v.*")
                  .writeStream
                    .foreachBatch(type2_upsert)
                    .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_silver")
                    .trigger(availableNow=True)
                    .start())
    query.awaitTermination()

process_books()

Change Data Capture (CDC) feed

We can use MERGE INTO command to process a CDC feed. MERGE INTO command allows you to merge a set of updates, insertions and deletions based on a source table into a target delta table.

1
2
3
4
5
6
7
8
9
MERGE INTO target_table t
USING source_updates s
ON t.key_field = s.key_field
WHEN MATCHED AND t.sequence_field < s.sequence_field
    THEN UPDATE SET *
WHEN MATCHED AND s.operation_field = "DELETE"
    THEN DELETE
WHEN NOT MATCHED
    THEN INSERT *

However, Merge operations cannot be performed if multiple source rows matched and attempted to modify the same target row in the delta table.

CDC feed with multiple updates for the same key will generate an exception.

To avoid this error, you need to ensure that you are merging only the most recent changes. We can use the rank() over (window) function to achieve that. What we need to do is to just filter out the records that rank=1, and merge them into our target table using MERGE INTO COMMAND.

Change Data Feed (CDF)

Generally speaking, we use CDF for sending incremental data changes to downstream tables in a multi-hop architecture. So use CDF when only small fraction of records updated in each batch. Such updates are usually received from external sources in CDC format.

If most of the records in the table are updated, or if the table is overwritten in each batch, then don’t use CDF.

When to use Change Data Feed change data feed usage

Dynamic View

Dynamics views allow identity ACL access control list to be applied to data in a table at the column or row level. So user with sufficient privileges will be able to see all the fields, while restricted users will be shown arbitrary results as defined at view creation.

Column level control

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE OR REPLACE VIEW customers_vw AS
  SELECT
    customer_id,
    CASE
      WHEN is_member('admin_demo') THEN EMAIL
      ELSE 'REDACTED'
    END AS email,
    gender,
    CASE
      WHEN is_member('admin_demo') THEN first_name
      ELSE 'REDACTED'
    END AS first_name,
    CASE
      WHEN is_member('admin_demo') THEN last_name
      ELSE 'REDACTED'
    END AS last_name,
    CASE
      WHEN is_member('admin_demo') THEN street
      ELSE 'REDACTED'
    END AS street,
    city,
    country,
    row_time
  FROM customers_silver

Row level control

1
2
3
4
5
6
7
CREATE OR REPLACE VIEW customers_fr_vw AS
SELECT * FROM customers_vw
WHERE
  CASE
    WHEN is_member("admin_demo") THEN TRUE
    ELSE country = "France" AND row_time > "2022-01-01"
  END

See here for more details.

Structured Streaming

In order to restart streaming queries on failure, it’s recommended to configure Structured Streaming jobs with the following job configuration

  1. Retries: Set to Unlimited
  2. Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.
  3. Clusters: Set this always to use a new job cluster and use the latest Spark version (or at least version 2.1)
  4. Notifications: Set this if you want email notification on failures
  5. Schedule: Do not set a schedule
  6. Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.

Streaming query to handle late-arriving data

pyspark.sql.DataFrame.withWaterMark

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spark.readStream
      .table("orders_cleaned")
      .withWaterMark("order_timestamp", "30 minutes")
      .groupBy(
        "order_timestamp",
        "author")
      .agg(
        count("order_id").alias("order_cnt"),
        avg("quantity").alias("avg_qty"))
    .writeStream
      .opinion("checkpointLocation", "dbfs:/path/checkpoint")
      .table("order_stats")

Streaming query to calculate business-level aggregation for each non-overlapping five-minute interval.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spark.readStream
      .table("orders_cleaned")
      .withWaterMark("order_timestamp", "10 minutes")
      .groupBy(
        window("order_timestamp", "5 minutes").alias("time"),
        "author")
      .agg(
        count("order_id").alias("order_cnt"),
        avg("quantity").alias("order_qty"))
    .writeStream
      .opinion("checkpointLocation", "dbfs:/path/checkpoint")
      .table("order_stats")

pyspark.sql.functions.window

Global Spark Configuration

1
2
3
4
5
6
7

## enable Optimised writes and Auto Compaction
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)

## enable Change Data Feed (CDF)
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True)