00 - General

  • Databricks documentation, link
  • Data Engineering With Databricks, GitHub

01 - Databricks Workspace and Services

Databricks Architecture and Services

databricks architecture

databricks services

Databricks Control Plane

  • Web Application
    • Databricks SQL
    • Databricks Machine Learning
    • Databricks Data Science and Engineering
  • Repos / Notebooks
  • Job Scheduling
  • Cluster Management

databricks cluster architecture

databricks cluster overview

Cluster

Cluster are made up of one or more virtual machine (VMs) instances

  • Driver node. Coordinate activities of executors, aka master node in EMR.
  • Executor node. Run tasks composing a Spark job, aka run node in EMR.

Clusters Type

  • All-purpose Clusters
    • Analyse data collaboratively using interactive notebooks
    • Create clusters from the Workspace or API
    • It can be shared with multiple people to do collaborative work
    • Configuration info retains up to 70 clusters for up to 30 days
  • Job Clusters
    • Run automated jobs
    • The Databricks job scheduler creates new job clusters when running jobs, and terminate job clusters when complete
    • You cannot restart a job cluster, because job clusters are provisioned in isolated, parameterised environment
    • Configuration info retain up to 30 clusters

Notebook Basics

Magic Command

  • %python. Run python code in a cell.
  • %sql. Run sql code in a cell.
  • %md. Run Markdown syntax code in a cell.
  • %run. Run an executable or another notebook within a notebook.

Keyboard Shortcut

General:

  • h. Help window.
  • esc. Switch to Command mode.
  • ⌘Command + A. Select all cells.
Cell Operation in Edit Mode
  • ⇧Shift + ↩Enter. Run command and move to next cell.
  • ⌥Option + ↩Enter. Run command and insert new cell below
  • ⌘Command + ] / [. Indent/Unindent selection.
  • ^Control + ⌥Option + P. Insert a cell above.
  • ^Control + ⌥Option + N. Insert a cell below.
  • ^Control + ⌥Option + -. Split a cell at current cursor.
  • ^Control + ⌥Option + D. Delete current cell.
  • ^Control + ⌥Option + ↑Up. Move a cell up.
  • ^Control + ⌥Option + ↓Down. Move a cell down.
Cell Operation in Command Mode
  • ⌘Command + ⇧ Shift + F. Format a cell.
  • ⌘Command + /. Toggle line comment.
  • ⌘Command + C. Copy current cell.
  • ⌘Command + X. Cut current cell.
  • ⌘Command + V. Paste cell.
  • ⇧Shift + M. Merge with cell below.
  • A. Insert a cell above.
  • B. Insert a cell below.
  • D. Delete current cell.
  • G. Go to first cell.
  • L. Toggle line numbers.
  • O. Toggle cell output.
  • T. Toggle cell title.

Repos

Databricks Repos support all following operation

  1. Clone, push to, or pull from a remote Git repository.
  2. Create and manage branches for development work.
  3. Create notebooks, and edit notebooks and other files.
  4. Visually compare differences upon commits.

However, the following tasks are not supported by Databricks Repos, and must be performed in your Git provider:

  1. Create a pull request.
  2. Resolve merge conflicts.
  3. Merge or delete branches.
  4. Rebase a branch.

Repos vs Notebooks

One advantage of Databricks Repos over the built-in Notebooks versioning is that Databricks Repos supports creating and managing branches for development work.

SQL

Databricks SQL is a data warehouse on the Databricks Lakehouse Platform that lets you run all your SQL and BI applications at scale.

02 - Delta Lake

2.1 What is Delta Lake?

Delta Lake is an open-source project that enables building a data lakehouse on top of existing storage systems.

Delta Lake is Not…

  • Proprietary technology
  • Storage format
  • Storage medium
  • Database service or data warehouse

Delta Lake is…

  • Open Source project.
  • Builds on standard data formats
  • Optimised for cloud object storage
    • Object storage is cheap, durable, highly available, and affectively infinitely scalable.
  • Built for scalable metadata handling
    • Delta Lake is designed to resolve the problem of quickly returning point queries .

In summary, Delta Lake

  • decouple compute and storage cost
  • provide optimised performance for data
  • regardless of data scale

Because Delta Lake is open source, leveraging open format, storing data in the customised cloud object storage, and designed for infinity scalability.

Choosing to migrate your data to Delta Lake represent an investment in the long-term performance and liability.

The Data Lakehouse is built on top of this foundation, with the vision of powering applications and queries through your organisation from a single copy of the data.

2.2 ACID Guarantee

Delta Lake brings ACID to object storage

  • Atomicity means that all transactions either succeed or fail completely.
  • Consistency guarantees relate to how a given state of the data is observed by simultaneous operations.
  • Isolation refers to how simultaneous operations potentially conflict with one another.
  • Durability means committed changes are permanent.

Problems solved by ACID

  1. Hard to append data.
  2. Modification of existing data difficult.
  3. Jobs failing mid way.
  4. Real-time operations hard.
  5. Costly to keep historical data versions.

2.3 Managing Delta Lake Table

Creating Delta Lake Table

We can create a Delta Lake table students with following SQL query:

1
2
3
4
5
CREATE TABLE IF NOT EXISTS students (
  id INT,
  name STRING,
  value DOUBLE
);

Note: In Databricks Runtime 8.0 and above, Delta Lake is the default format and you don’t need USING DELTA.

Now, let’s insert some data to the table

1
2
3
4
5
6
7
8
INSERT INTO students VALUES (1, "Yve", 1.0);
INSERT INTO students VALUES (2, "Omar", 2.5);
INSERT INTO students VALUES (3, "Elia", 3.0);

INSERT INTO students VALUES
  (4, "Ted", 4.7)
  (5, "Tiffany", 5.5)
  (6, "Vini", 6.3)

Note: Databricks doesn’t have a COMMIT keyword, transaction run as soon as they’re executed, and commit as they succeed.

Query data from Delta Lake Tables

1
SELECT * FROM students

Delta Lake guarantees that any read against a table will always return the most recent version of the table, and that you’ll never encounter a state of deadlock due to ongoing operations.

Table reads can never conflict with other operations, and the newest version of your data is immediately to all clients that can query your lakehouse. Because all transaction information is stored in cloud object storage alongside your data files, concurrent reads on Delta Lake tables is only limited by the hard limits of object storage on cloud vendors.

Note: It’s not infinite, but it’s at least thousands of reads per second.

  • AWS
  • Azure
  • GCP

Note: There are other Delta Lake limitations on AWS S3 when using multi-cluster write to same Delta Lake table, see details

Insert, update, and delete records in Delta Lake Tables

We can use the update clause to update record values in Delta Lake tables

1
2
3
4
UPDATE students
SET value = value + 1
WHERE name like 'T%'
;

We can use delete clause to delete certain records in a table

1
2
3
DELETE FROM students
WHERE value > 6
;

Write upsert statements with Delta Lake

1
2
3
4
5
6
CREATE OR REPLACE TEMP VIEW updates (id, name, value, type) AS VALUES
  (2, "Omar", 15.2, "update"),
  (3, "", null, "delete"),
  (7, "Blue", 7.7, "insert"),
  (11, "Diya", 8.8, "update")
;

We can use merge clause to achieve upsert function with a Delta Lake table

1
2
3
4
5
6
7
8
9
MERGE INTO students b
USING updates u
ON b.id=u.id
WHEN MATCHED AND u.type = "update"
  THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
  THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
  THEN INSERT *

Note: Merge statement must have at least one field to match on, and each WHEN MATCHED or WHEN NOT MATCHED clause can have any number of additional conditional statements

Drop Delta Lake Tables

We can use drop clause to drop a Delta Lake table

1
2
DROP TABLE students
;

There are two types of tables in Delta Lake

  • Managed Table.
    • Managed Table are tables whose metadata and the data are managed by Databricks.
    • When you run the DROP TABLE on a managed table, both the metadata and the underlying data files are deleted.
  • External Table.
    • External Tables are tables whose metadata are managed by Databricks.
    • When you run the DROP TABLE command, only the metadata will be deleted.

2.4 Advanced Delta Lake Features

Examine Table Details

By default, Databricks uses a Hive metastore to register databases, tables, and views.

We can use DESCRIBE EXTENDED to see important metadata about our table.

1
2
DESCRIBE EXTENDED students
;

We can also use DESCRIBE DETAIL to display more table metadata.

1
2
DESCRIBE DETAIL students
;

The default location of Hive metastore is

1
dbfs:/user/hive/warehouse

Describe the directory structure of Delta Lake files

A Delta Lake table is actually backed by a collection of files stored in cloud objects. We could use the following command to have a close look at these files.

1
2
%python
display(dbutils.fs.ls(f"{DA.paths.user_db}/students"))

It will yield something like

1
2
3
4
path,name,size,modificationTime
dbfs:/mnt/dbacademy-users/[email protected]/data-engineering-with-databricks/database.db/students/_delta_log/,_delta_log/,0,1678591408696
dbfs:/mnt/dbacademy-users/[email protected]/data-engineering-with-databricks/database.db/students/part-00000-32df286e-2449-4ca4-81f8-599eea9292ea.c000.snappy.parquet,part-00000-32df286e-2449-4ca4-81f8-599eea9292ea.c000.snappy.parquet,868,1678591174000
...

Note that you can see a folder _delta_log and a bunch of Apache Parquet formatted files. The Parquet files are the actual data files. And the _delta_log folder is the primary directory to store all Delta Lake table transaction records.

We can peek inside the _delta_log to see more

1
2
%python
display(dbutils.fs.ls(f"{DA.paths.user_db}/students/_delta_log"))

The results will be something like

1
2
3
4
path,name,size,modificationTime
dbfs:/mnt/dbacademy-users/[email protected]/data-engineering-with-databricks/database.db/students/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1678591170000
dbfs:/mnt/dbacademy-users/[email protected]/data-engineering-with-databricks/database.db/students/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1678591170000
...

Use OPTIMIZE to compact small files

For Delta Lake tables, we can use the command OPTIMIZE to combine files toward an optimal size (scaled based on the size of the table).

When executing OPTIMIZE command, suers can optionally specify one or several fields for ZORDER indexing. While the specific match for Z-order is unimportant, it speeds up data retrieval when filtering on provided fields by colocating data with similar values within data files.

1
2
3
OPTIMIZE students
ZORDER BY ID
;

Result

1
2
path,metrics
dbfs:/mnt/dbacademy-users/[email protected]/data-engineering-with-databricks/database.db/students,"{""numFilesAdded"":1,""numFilesRemoved"":4,""filesAdded"":{""min"":944,""max"":944,""avg"":944,""totalFiles"":1,""totalSize"":944},""filesRemoved"":{""min"":868,""max"":891,""avg"":883.75,""totalFiles"":4,""totalSize"":3535},""partitionsOptimized"":0,""zOrderStats"":{""strategyName"":""minCubeSize(107374182400)"",""inputCubeFiles"":{""num"":0,""size"":0},""inputOtherFiles"":{""num"":4,""size"":3535},""inputNumCubes"":0,""mergedFiles"":{""num"":4,""size"":3535},""numOutputCubes"":1,""mergedNumCubes"":null},""numBatches"":1,""totalConsideredFiles"":4,""totalFilesSkipped"":0,""preserveInsertionOrder"":false,""numFilesSkippedToReduceWriteAmplification"":0,""numBytesSkippedToReduceWriteAmplification"":0,""startTimeMs"":1678592204750,""endTimeMs"":1678592209671,""totalClusterParallelism"":2,""totalScheduledTasks"":1,""autoCompactParallelismStats"":null}"

Reference: Delta Optimise

Use ZORDER to index tables

Review a history of table transactions

We can use DESCRIBE HISTORY command to return provenance information, including operation, user and so on for each write to a table.

Table history is retained for 30 days.

1
2
DESCRIBE HISTORY students
;

Results

1
2
3
4
version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
8,2023-03-12T03:36:48.000+0000,2843732932364956,abc@d.com,OPTIMIZE,"{""predicate"":""[]"",""zOrderBy"":""[\""id\""]"",""batchId"":""0"",""auto"":""false""}",null,"{""notebookId"":""433315738133426""}",0307-222029-nk3zyxzd,7,SnapshotIsolation,false,"{""numRemovedFiles"":""4"",""numRemovedBytes"":""3535"",""p25FileSize"":""944"",""minFileSize"":""944"",""numAddedFiles"":""1"",""maxFileSize"":""944"",""p75FileSize"":""944"",""p50FileSize"":""944"",""numAddedBytes"":""944""}",null,Databricks-Runtime/11.3.x-photon-scala2.12
7,2023-03-12T03:20:11.000+0000,2843732932364956,abc@d.com,MERGE,"{""predicate"":""(b.id = u.id)"",""matchedPredicates"":""[{\""predicate\"":\""(u.type = 'update')\"",\""actionType\"":\""update\""},{\""predicate\"":\""(u.type = 'delete')\"",\""actionType\"":\""delete\""}]"",""notMatchedPredicates"":""[{\""predicate\"":\""(u.type = 'insert')\"",\""actionType\"":\""insert\""}]""}",null,"{""notebookId"":""433315738133426""}",0307-222029-nk3zyxzd,6,WriteSerializable,false,"{""numTargetRowsCopied"":""0"",""numTargetRowsDeleted"":""1"",""numTargetFilesAdded"":""2"",""executionTimeMs"":""8668"",""numTargetRowsInserted"":""1"",""scanTimeMs"":""6140"",""numTargetRowsUpdated"":""1"",""numOutputRows"":""2"",""numTargetChangeFilesAdded"":""0"",""numSourceRows"":""4"",""numTargetFilesRemoved"":""2"",""rewriteTimeMs"":""2090""}",null,Databricks-Runtime/11.3.x-photon-scala2.12
...

Reference: Delta history

Query previous table version

With all the table history information, we can easily query previous table version

1
2
3
SELECT *
FROM students VERSION AS OF 3
;

Time travel in Delta Lake tables are not recreating a previous state of the table by undoing transactions against our current version, rather, we’re just querying all these data files that were indicated as valid as of the specified version.

Roll back to previous table version

To roll back to previous table version, we just need to use the RESTORE command

1
2
RESTORE TABLE students TO VERSION AS OF 8
;

Note that a RESTORE command is recorded as a transaction as well, you won’t be able to completely hide the fact you accidentally deleted all the records in the table (xp), but you will be able to undo the operation and bring your table back to a desired state.

1
2
3
4
version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
10,2023-03-12T03:50:21.000+0000,2843732932364956,[email protected],RESTORE,"{""version"":""8"",""timestamp"":null}",null,"{""notebookId"":""433315738133426""}",0307-222029-nk3zyxzd,9,Serializable,false,"{""numRestoredFiles"":""1"",""removedFilesSize"":""0"",""numRemovedFiles"":""0"",""restoredFilesSize"":""944"",""numOfFilesAfterRestore"":""1"",""tableSizeAfterRestore"":""944""}",null,Databricks-Runtime/11.3.x-photon-scala2.12
9,2023-03-12T03:49:44.000+0000,2843732932364956,[email protected],DELETE,"{""predicate"":""[\""true\""]""}",null,"{""notebookId"":""433315738133426""}",0307-222029-nk3zyxzd,8,WriteSerializable,false,"{""numRemovedFiles"":""1"",""numAddedChangeFiles"":""0"",""executionTimeMs"":""58"",""scanTimeMs"":""58"",""rewriteTimeMs"":""0""}",null,Databricks-Runtime/11.3.x-photon-scala2.12
...

Reference: Delta RESTORE

Clean up stale data files with VACUUM

Databricks will automatically clean up stale files. If you wish to manually purge old data files, this can be performed with the VACUUM operation.

By default, VACUUM will prevent you from deleting files less than 7 days old, just to ensure that no long-running operations are still referencing any of the files to be deleted. But you can configure Delta Lake with following knobs to enable deleting files less than 7 days.

1
2
3
4
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
SET spark.databricks.delta.vacuum.logging.enabled = true;

VACUUM students RETAIN 0 HOURS DRY RUN

If you run VACUUM on a Delta table, you lose the ability time travel back to a version older than the specified dat retention period.

Reference: Delta Vacuum

03 - Relational Entities On Databricks

3.1 Databases and Tables on Databricks

Check table details

We can use the DESCRIBE DETAIL command to get detailed table information.

1
DESCRIBE DETAIL managed_table_in_db_with_default_location;

Load a CSV file into external table:

  1. create a temporary view to load csv file
  2. create an external table with specified external location, and then insert data from temporary view into this external table.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
USE ${da.schema_name}_default_location;

CREATE OR REPLACE TEMPORARY VIEW temp_delays USING CSV OPTIONS (
  path = '${DA.paths.datasets}/flights/departuredelays.csv',
  header = "true",
  mode = "FAILFAST" -- abort file parsing with a RuntimeException if any malformed lines are encountered
);
CREATE OR REPLACE TABLE external_table LOCATION '${da.paths.working_dir}/external_table' AS
  SELECT * FROM temp_delays;

SELECT * FROM external_table;

Views, Temporary View & Global Temporary View

Normal View

We can use the CREATE VIEW statement to create normal view as we know.

1
2
3
4
5
CREATE VIEW view_delays_abq_lax AS
  SELECT *
  FROM external_table
  WHERE origin = 'ABQ' AND destination = 'LAX'
;

Temporary View

We can add the TEMPORARY keyword to create a temporary view

1
2
3
CREATE TEMPORARY VIEW temp_view_delays_gt_120
AS SELECT * FROM external_table WHERE delay > 120 ORDER BY delay ASC
;

Global Temporary View

There is also another concept, GLOBAL TEMPORARY VIEW in Databricks

1
2
3
CREATE GLOBAL TEMPORARY VIEW global_temp_view_dist_gt_1000
AS SELECT * FROM external_table WHERE distance > 1000
;

Temporary view vs Global Temporary view

TEMPORARY VIEW are session-scoped and will be dropped when session ends because it skips persisting the definition in underlying metastore. GLOBAL TEMPORARY views are tied to a system preserved temporary database global_temp.

04 - ETL with Spark SQL

Query Single File

In Databricks, we can use SQL to directly query a single file

1
SELECT * FROM json.`${DA.paths.kafka_event}/001.json`

Results

1
2
3
4
key,offset,partition,timestamp,topic,value
VUEwMDAwMDAxMDczODAyOTY=,219246233,0,1593880175268,clickstream,eyJkZXZpY2UiOiJBbmRyb2lkIiwiZWNvbW1lcmNlIjp7fSwiZXZlbnRfbmFtZSI6ImNhcmVlcnMiLCJldmVudF9wcmV2aW91c190aW1lc3RhbXAiOjE1OTM4ODAwMTM1MDY5NzksImV2ZW50X3RpbWVzdGFtcCI6MTU5Mzg4MDE3NTI1Mjk0OSwiZ2VvIjp7ImNpdHkiOiJZdW1hIiwic3RhdGUiOiJDTyJ9LCJpdGVtcyI6W10sInRyYWZmaWNfc291cmNlIjoiZ29vZ2xlIiwidXNlcl9maXJzdF90b3VjaF90aW1lc3RhbXAiOjE1OTM4Nzg5ODg2MjU5NTEsInVzZXJfaWQiOiJVQTAwMDAwMDEwNzM4MDI5NiJ9
VUEwMDAwMDAxMDczOTEyODU=,219428744,1,1593880175652,clickstream,eyJkZXZpY2UiOiJtYWNPUyIsImVjb21tZXJjZSI6e30sImV2ZW50X25hbWUiOiJtYWluIiwiZXZlbnRfdGltZXN0YW1wIjoxNTkzODgwMTc1NjQ3NDE4LCJnZW8iOnsiY2l0eSI6IlNwcmluZ2ZpZWxkIiwic3RhdGUiOiJNQSJ9LCJpdGVtcyI6W10sInRyYWZmaWNfc291cmNlIjoiZ29vZ2xlIiwidXNlcl9maXJzdF90b3VjaF90aW1lc3RhbXAiOjE1OTM4ODAxNzU2NDc0MTgsInVzZXJfaWQiOiJVQTAwMDAwMDEwNzM5MTI4NSJ9
...

Query a Directory of Files

Apart from querying a file, we can also use Databricks to query a directory like

1
SELECT * FROM `${DA.paths.kafka_events}`

Results

1
2
3
4
key,offset,partition,timestamp,topic,value
VUEwMDAwMDAxMDczOTgwNTQ=,219255030,0,1593880885085,clickstream,eyJkZXZpY2UiOiJBbmRyb2lkIiwiZWNvbW1lcmNlIjp7fSwiZXZlbnRfbmFtZSI6Im1haW4iLCJldmVudF90aW1lc3RhbXAiOjE1OTM4ODA4ODUwMzYxMjksImdlbyI6eyJjaXR5IjoiTmV3IFlvcmsiLCJzdGF0ZSI6Ik5ZIn0sIml0ZW1zIjpbXSwidHJhZmZpY19zb3VyY2UiOiJnb29nbGUiLCJ1c2VyX2ZpcnN0X3RvdWNoX3RpbWVzdGFtcCI6MTU5Mzg4MDg4NTAzNjEyOSwidXNlcl9pZCI6IlVBMDAwMDAwMTA3Mzk4MDU0In0=
VUEwMDAwMDAxMDczOTI0NTg=,219255043,0,1593880892303,clickstream,eyJkZXZpY2UiOiJpT1MiLCJlY29tbWVyY2UiOnt9LCJldmVudF9uYW1lIjoiYWRkX2l0ZW0iLCJldmVudF9wcmV2aW91c190aW1lc3RhbXAiOjE1OTM4ODAzMDA2OTY3NTEsImV2ZW50X3RpbWVzdGFtcCI6MTU5Mzg4MDg5MjI1MTMxMCwiZ2VvIjp7ImNpdHkiOiJXZXN0YnJvb2siLCJzdGF0ZSI6Ik1FIn0sIml0ZW1zIjpbeyJpdGVtX2lkIjoiTV9TVEFOX1QiLCJpdGVtX25hbWUiOiJTdGFuZGFyZCBUd2luIE1hdHRyZXNzIiwiaXRlbV9yZXZlbnVlX2luX3VzZCI6NTk1LjAsInByaWNlX2luX3VzZCI6NTk1LjAsInF1YW50aXR5IjoxfV0sInRyYWZmaWNfc291cmNlIjoiZ29vZ2xlIiwidXNlcl9maXJzdF90b3VjaF90aW1lc3RhbXAiOjE1OTM4ODAzMDA2OTY3NTEsInVzZXJfaWQiOiJVQTAwMDAwMDEwNzM5MjQ1OCJ9
...

Extract Text Files as Raw Strings

When working with text-based files (e.g. JSON, CSV, TSV, TXT formats), you can use the text format to load each line of the file as row with one string column named value.

1
SELECT * FROM text.`${DA.paths.kafka_events}`

Results

1
2
3
4
value
"{""key"":""VUEwMDAwMDAxMDczOTgwNTQ="",""offset"":219255030,""partition"":0,""timestamp"":1593880885085,""topic"":""clickstream"",""value"":""eyJkZXZpY2UiOiJBbmRyb2lkIiwiZWNvbW1lcmNlIjp7fSwiZXZlbnRfbmFtZSI6Im1haW4iLCJldmVudF90aW1lc3RhbXAiOjE1OTM4ODA4ODUwMzYxMjksImdlbyI6eyJjaXR5IjoiTmV3IFlvcmsiLCJzdGF0ZSI6Ik5ZIn0sIml0ZW1zIjpbXSwidHJhZmZpY19zb3VyY2UiOiJnb29nbGUiLCJ1c2VyX2ZpcnN0X3RvdWNoX3RpbWVzdGFtcCI6MTU5Mzg4MDg4NTAzNjEyOSwidXNlcl9pZCI6IlVBMDAwMDAwMTA3Mzk4MDU0In0=""}"
"{""key"":""VUEwMDAwMDAxMDczOTI0NTg="",""offset"":219255043,""partition"":0,""timestamp"":1593880892303,""topic"":""clickstream"",""value"":""eyJkZXZpY2UiOiJpT1MiLCJlY29tbWVyY2UiOnt9LCJldmVudF9uYW1lIjoiYWRkX2l0ZW0iLCJldmVudF9wcmV2aW91c190aW1lc3RhbXAiOjE1OTM4ODAzMDA2OTY3NTEsImV2ZW50X3RpbWVzdGFtcCI6MTU5Mzg4MDg5MjI1MTMxMCwiZ2VvIjp7ImNpdHkiOiJXZXN0YnJvb2siLCJzdGF0ZSI6Ik1FIn0sIml0ZW1zIjpbeyJpdGVtX2lkIjoiTV9TVEFOX1QiLCJpdGVtX25hbWUiOiJTdGFuZGFyZCBUd2luIE1hdHRyZXNzIiwiaXRlbV9yZXZlbnVlX2luX3VzZCI6NTk1LjAsInByaWNlX2luX3VzZCI6NTk1LjAsInF1YW50aXR5IjoxfV0sInRyYWZmaWNfc291cmNlIjoiZ29vZ2xlIiwidXNlcl9maXJzdF90b3VjaF90aW1lc3RhbXAiOjE1OTM4ODAzMDA2OTY3NTEsInVzZXJfaWQiOiJVQTAwMDAwMDEwNzM5MjQ1OCJ9""}"
...

This can be useful when data sources are prone to corruption and customer text parsing functions will be used to extract values from text fields.

Create a table based on External CSV files

In Databricks, we can create a table based on a csv file or a directory of csv files. What we need to do is to specify

  1. column names and types
  2. file format
  3. delimiter used to separate fields
  4. presence of a header
  5. path to where this data is located

Example Spark DDL query:

1
2
3
4
5
6
7
8
CREATE TABLE sales_csv
  (order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
  header = "true",
  delimiter = "|"
)
LOCATION "${DA.paths.sales_csv}"

Note

  • For the sake of performance, Spark will automatically cached previously queried data in local storage.
  • External data source is not configured to tell Spark that the underlying data files need to refresh.
  • But we can manually refresh the cache of our data by running the REFRESH TABLE command.
1
2
REFRESH TABLE sales_csv
;

Extracting Data from SQL Databases via JDBC Connection

Databricks does provide a standard JDBC driver so that we can use Spark SQL to connect with an external database, like following:

1
2
3
4
5
6
7
8
9
CREATE TABLE
USING JDBC
OPTIONS (
    url = "jdbc:{databaseServerType}://{jdbcHostname}:{jdbcPort}",
    dbtable = "{jdbcDatabase}.table",
    user = "{jdbcUsername}",
    password = "{jdbcPassword}"
)
;

Note

  1. Even through Databricks can talk to various database & data warehouses, it’s recommended to either
  • Moving the entire source table(s) to Databricks and only transferring the results back to Databricks
  • Pushing down the query to the external SQL databases and only transferring the results back to Databricks
  1. In either case, working with very large datasets in external SQL databases can incur significant overhead because of
  • Network transfer legacy associated with moving all data over the public internet
  • Execution of query logic in source systems not optimised for big data queries

Create Table as Select (CTAS)

CREATE TABLE AS SELECT statements create and populate Delta tables using data retrieved from an input query.

1
2
3
4
CREATE OR REPLACE TABLE sales AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-historical`;

DESCRIBE EXTENDED sales;

CTAS statements will automatically infer schema information from query results, and do not support manual declaration.

  • useful for external data ingestion from sources with well-defined schema, like Apache Parquet and tables.
  • don’t support specifying additional file options.

To create a table with comments, you can do something like

1
2
3
CREATE TABLE payments
COMMENT "This table contains sensitive information"
AS SELECT * FROM bank_transactions

Note: Remember the comments will straight follow the create statement

CTAS for External Data Source

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE OR REPLACE TEMP VIEW sales_tmp_vw
  (order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
  path = "${da.paths.datasets}/ecommerce/raw/sales-csv",
  header = "true",
  delimiter = "|"
);

CREATE TABLE sales_delta AS SELECT * FROM sales_tmp_vw;

SELECT * FROM sales_delta

CTAS for Renaming Columns from Existing Tables

We could use CTAS to rename a column from an existing tables.

1
2
3
4
5
CREATE OR REPLACE TABLE purchases AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;

SELECT * FROM purchases

Table Constraint

Because Delta Lake enforces schema on write, Databricks can support standard SQL constraint management clauses to ensure the quality and integrity of data added to a table.

Currently Databricks support two types of constraints :

NOT NULL Constraint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE TABLE people10m (
  id INT NOT NULL,
  firstName STRING,
  middleName STRING NOT NULL,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA;

ALTER TABLE people10m ALTER COLUMN middleName DROP NOT NULL;
ALTER TABLE people10m ALTER COLUMN ssn SET NOT NULL;

CHECK Constraint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA;

ALTER TABLE people10m ADD CONSTRAINT dateWithinRange CHECK (birthDate > '1900-01-01');
ALTER TABLE people10m DROP CONSTRAINT dateWithinRange;

Primary Key & Foreign Key Constraints

Databricks is also going to release the Primary Key and Foreign Key relationships on fields in Unity Catalog tables in the short future.

You can declare primary keys and foreign keys as part of the table specification clause during table creation. This clause is not allowed during CTAS statements. You can also add constraints to existing tables.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE TABLE T(
  pk1 INTEGER NOT NULL,
  pk2 INTEGER NOT NULL,
  CONSTRAINT t_pk PRIMARY KEY(pk1, pk2)
);

CREATE TABLE S(
  pk INTEGER NOT NULL PRIMARY KEY,
  fk1 INTEGER,
  fk2 INTEGER,
  CONSTRAINT s_t_fk FOREIGN KEY(fk1, fk2) REFERENCES T
);

Cloning Delta Lake Tables

Delta Lake has two options for efficiently coping Delta Lake tables.

  • DEEP CLONE.
    • fully copies data and metadata from a source table to a target.
    • This copy occurs incrementally, so executing this command again can sync changes from the source to the target location.
  • SHALLOW CLONE.
    • create a copy of a table quickly to test out applying changes without the risk of modifying the current table.
    • Just copy the Delta transaction logs, aka table metadata, meaning that data doesn’t move.
    • This is similar to Snowflake’s Zero Copying Cloning
  • In either case, data modifications applied to the cloned version of the table will be tracked and stored separately from the source. Cloning is a great way to setup tables for testing SQL code while still in development

snowflake zero copying cloning

DEEP CLONE

1
2
CREATE OR REPLACE TABLE purchases_clone
DEEP CLONE purchases

SHALLOW CLONE

1
2
CREATE OR REPLACE TABLE purchases_shallow_clone
SHALLOW CLONE purchases

Writing to Delta Tables

Complete Overwrite

We can use overwrites to atomically replace all of the data in a table. There are multiple benefits to overwriting tables instead of deleting and recreating tables:

  1. overwriting a table is much faster because it doesn’t need to list the directory recursively or delete any files.
  2. the old version of the table still exists; can easily retrieve the old data using Time Travel.
  3. it’s an atomic operation. Concurrent queries can still read the table while you are deleting the table.
  4. Due to ACID transaction guarantees, if overwriting the table fails, the table will be in its previous state.

In Databricks, we could use CREATE OR REPLACE TABLE (CRAS) statements fully replace the contents of a table each time they execute.

1
2
CREATE OR REPLACE TABLE events AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`

And we can use DESCRIBE HISTORY command to retrieve the table history.

1
DESCRIBE HISTORY events

Result

1
2
3
version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2023-03-16T02:06:18.000+0000,2843732932364956,[email protected],CREATE OR REPLACE TABLE AS SELECT,"{""isManaged"":""true"",""description"":null,""partitionBy"":""[]"",""properties"":""{}""}",null,"{""notebookId"":""433315738132925""}",0307-222029-nk3zyxzd,0,WriteSerializable,false,"{""numFiles"":""2"",""numOutputRows"":""485696"",""numOutputBytes"":""15098269""}",null,Databricks-Runtime/11.3.x-scala2.12
0,2023-03-16T01:44:11.000+0000,2843732932364956,[email protected],CLONE,"{""source"":""delta.`dbfs:/mnt/dbacademy-datasets/data-engineering-with-databricks/v02/ecommerce/delta/events_hist`"",""sourceVersion"":""1"",""isShallow"":""true""}",null,"{""notebookId"":""433315738132925""}",0307-222029-nk3zyxzd,-1,Serializable,false,"{""removedFilesSize"":""0"",""numRemovedFiles"":""0"",""sourceTableSize"":""14998176"",""numCopiedFiles"":""0"",""copiedFilesSize"":""0"",""sourceNumOfFiles"":""1""}",null,Databricks-Runtime/11.3.x-scala2.12

INSERT OVERWRITE

INSERT OVERWRITE provides a nearly identical outcome like CRAS, data in the target table will be replaced by data from the query.

  • Can only overwrite an existing table, not create a new one like our CRAS statement.
  • Can overwrite only with new records that match the current table schema, and thus can be a safer technique for overwriting an existing table without disrupting downstream consumers.
  • Can overwrite individual partitions.
1
2
INSERT OVERWRITE sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-historical/`

INSERT INTO

We can use INSERT INTO to atomically append new rows to an existing Delta table. This allows for incremental updates to existing table, which is much more efficient than overwriting each time.

1
2
INSERT INTO sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-30m`

Note INSERT INTO does not hae any built-in guarantees to prevent inserting the same records multiple times. Re-executing the above cell would write the same records to the target table, resulting in duplicate records.

MERGE INTO

You can upsert data from a source table, view or DataFrame into a target Delta table using the MERGE INTO command. Delta Lake supports inserts, updates and deletes in MERGE INTO, and supports extended syntax beyond SQL standards for advanced use cases.

1
2
3
4
5
MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}

The main benefits with MERGE INTO are

  1. updates, inserts and deletes are completed as a single transaction.
  2. multiple conditionals can be added in addition to matching fields.
  3. provides extensive options for implementing custom logic.

Insert-Only Merge for Deduplication

A common ETL use case is to collect logs or other every-appending datasets into a Delta table through a series of append operations.

Many source systems can generate duplicate records. With Merge, you can avoid inserting the duplicate records by performing an insert-only merge.

This optimised command uses the same MERGE INTO syntax but only provide a WHEN NOT MATCHED clause.

Below we use this to confirm that records with the same user_id and event_timestamp aren’t already in the events table.

1
2
3
4
5
MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN
  INSERT *

COPY INTO

COPY INTO provides SQL engineers an idempotent option to incrementally ingest data from external systems.

Note that this operation does have some expectations:

  1. Data schema should be consistent.
  2. Duplicate records should try to be excluded or handled downstream.

This operation is potentially much cheaper than full table scans for data that grows predictably.

1
2
3
COPY INTO sales
FROM "${da.paths.datasets}/ecommerce/raw/sales-30m"
FILEFORMAT = PARQUET

Advanced SQL Transformation

Interacting with JSON Data

We could use : to query nested JSON data, parse and extract the information that we need

1
2
3
SELECT value:device, value:geo:city
FROM events_strings
;

Spark SQL provides following functions

  • from_json. The from_json function requires a schema.
  • schema_of_json.
1
2
3
4
5
CREATE OR REPLACE TEMP VIEW parsed_events AS
  SELECT from_json(value, schema_of_json('{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1075.5,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593879231210816,"event_timestamp":1593879335779563,"geo":{"city":"Houston","state":"TX"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_K","item_name":"Standard King Mattress","item_revenue_in_usd":1075.5,"price_in_usd":1195.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593454417513109,"user_id":"UA000000106116176"}')) AS json
  FROM events_strings;

SELECT * FROM parsed_events

Results

1
2
3
4
json
"{""device"":""macOS"",""ecommerce"":{""purchase_revenue_in_usd"":null,""total_item_quantity"":null,""unique_items"":null},""event_name"":""checkout"",""event_previous_timestamp"":1593880801027797,""event_timestamp"":1593880822506642,""geo"":{""city"":""Traverse City"",""state"":""MI""},""items"":[{""coupon"":null,""item_id"":""M_STAN_T"",""item_name"":""Standard Twin Mattress"",""item_revenue_in_usd"":595,""price_in_usd"":595,""quantity"":1}],""traffic_source"":""google"",""user_first_touch_timestamp"":1593879413256859,""user_id"":""UA000000107384208""}"
"{""device"":""Windows"",""ecommerce"":{""purchase_revenue_in_usd"":null,""total_item_quantity"":null,""unique_items"":null},""event_name"":""email_coupon"",""event_previous_timestamp"":1593880770092554,""event_timestamp"":1593880829320848,""geo"":{""city"":""Hickory"",""state"":""NC""},""items"":[{""coupon"":""NEWBED10"",""item_id"":""M_STAN_F"",""item_name"":""Standard Full Mattress"",""item_revenue_in_usd"":850.5,""price_in_usd"":945,""quantity"":1}],""traffic_source"":""direct"",""user_first_touch_timestamp"":1593879889503719,""user_id"":""UA000000107388621""}"
...

Once a JSON string is unpacked to a struct type, Spark supports *(star) unpacking to flatten fields into columns.

1
2
3
4
5
CREATE OR REPLACE TEMP VIEW new_events_final AS
  SELECT json.*
  FROM parsed_events;

SELECT * FROM new_events_final;

Results

1
2
3
4
device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
macOS,"{""purchase_revenue_in_usd"":null,""total_item_quantity"":null,""unique_items"":null}",checkout,1593880801027797,1593880822506642,"{""city"":""Traverse City"",""state"":""MI""}","[{""coupon"":null,""item_id"":""M_STAN_T"",""item_name"":""Standard Twin Mattress"",""item_revenue_in_usd"":595,""price_in_usd"":595,""quantity"":1}]",google,1593879413256859,UA000000107384208
Windows,"{""purchase_revenue_in_usd"":null,""total_item_quantity"":null,""unique_items"":null}",email_coupon,1593880770092554,1593880829320848,"{""city"":""Hickory"",""state"":""NC""}","[{""coupon"":""NEWBED10"",""item_id"":""M_STAN_F"",""item_name"":""Standard Full Mattress"",""item_revenue_in_usd"":850.5,""price_in_usd"":945,""quantity"":1}]",direct,1593879889503719,UA000000107388621
...

Interaction with Arrays

  • size() function provides a count of the number of elements in an array for each row.
  • explode() function lets us put each element in an array on its own row, i.e. flatten the array.
  • collect_set() function can collect unique values for a field, including fields within arrays.
  • flatten() function allows multiple arrays to be combined into a single array.
  • array_distinct() function removes duplicate elements from an array.
1
2
3
4
5
6
SELECT
  user_id,
  collect_set(event_name) AS event_history,
  array_distinct(flatten(collect_set(items.item_id))) AS cart_history
FROM events
GROUP BY user_id

Set Operators

  • UNION returns the collection of two queries.
  • MINUS return all rows found in one dataset but not the other.
  • INTERSECT return all rows found in both relations.

Pivot Tables

PIVOT clause is used for data perspective. We can get the aggregated values based on specific column values, which will be returned to multiple columns used in the SELECT clause. The PIVOT clause can be specified after the table name or subquery.

Pivot transforms the rows of a table by rotating unique values of a specific column list into separated columns. In other word, it converts a table from a long format to a wide format.

Here we use PIVOT to create a new transactions table that flattens out the information contained in the sales table. This flattened data format can be useful for dashboarding, but also useful for applying machine learning algorithms for inference or prediction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE OR REPLACE TABLE transactions AS

SELECT * FROM (
  SELECT
    email,
    order_id,
    transaction_timestamp,
    total_item_quantity,
    purchase_revenue_in_usd,
    unique_items,
    item.item_id AS item_id,
    item.quantity AS quantity
  FROM sales_enriched
) PIVOT (
  sum(quantity) FOR item_id in (
    'P_FOAM_K',
    'M_STAN_Q',
    'P_FOAM_S',
    'M_PREM_Q',
    'M_STAN_F',
    'M_STAN_T',
    'M_PREM_K',
    'M_PREM_F',
    'M_STAN_K',
    'M_PREM_T',
    'P_DOWN_S',
    'P_DOWN_K'
  )
);

SELECT * FROM transactions

High-order Functions

High-order Functions

  • FILTER() filters an array using the given lambda function.
  • EXISTS() tests whether a statement is true for one or more elements in an array.
  • TRANSFORM() uses the given lambda function to transform all elements in an array.
  • REDUCE() takes two lambda functions to reduce the elements of an array to a single array value by merging the elements into a buffer, and then apply a finishing function on the final buffer.

filter()

1
FILTER(items, i -> i.item_id LIKE '%K') AS king_items

transform()

1
TRANSFORM(king_items, k -> CAST(k.item_revenue_in_usd * 100 AS INT)) AS item_revenues

SQL UDFs

At minimum, a SQL UDF requires a function name, optional parameters, the type to be returned, and some custom logic.

1
2
3
4
CREATE OR REPLACE FUNCTION yelling(text STRING)
RETURN STRING
RETURN concat(upper(text), "!!!")
;

The above code defines a function named yelling() that takes one parameter text, and returns a string that will be uppercase letters with three exclamation points to the end.

We can directly use the yelling() in our select statement.

1
2
SELECT yelling(food) FROM foods
;

We could use DESCRIBE FUNCTION or DESCRIBE FUNCTION EXTENDED command to get all function details

1
2
DESCRIBE FUNCTION EXTENDED yelling
;

Results

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
function_desc
Function:       spark_catalog.helloworldyoyoyo_se1w_da_dewd.yelling
Type:           SCALAR
Input:          text STRING
Returns:        STRING
Deterministic:  true
Data Access:    CONTAINS SQL
Configs:        spark.sql.hive.convertCTAS=true
                spark.sql.legacy.createHiveTableByDefault=false
                spark.sql.parquet.compression.codec=snappy
                spark.sql.sources.commitProtocolClass=com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol
                spark.sql.sources.default=delta
                spark.sql.streaming.stopTimeout=15s
Owner:          root
Create Time:    Sat Mar 18 04:15:38 UTC 2023
"Body:           concat(upper(text), ""!!!"")"

exists()

aggregate()

05 - Python for Spark SQL

06 - Incremental Data Processing

Auto Loader

Databricks Auto Loader provides an easy-to-use mechanism for incrementally and efficiently processing new data files as they arrive in cloud file storage.

Auto Loader keeps track of discovered files using Checkpointing in the checkpoint location. Checkpoints allows Auto Loader to provide exactly-once ingestion guarantees.

The following is an example function that to demonstrate using Databricks Auto Loader with PySpark API, includes both a Structured Streaming read and write.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def autoload_to_table(data_source, source_format, table_name, checkpoint_directory):
    query = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format)
                  .option("cloudFiles.schemaLocation", checkpoint_directory)
                  .load(data_source)                                              # Auto Loader API
                  .writeStream
                  .option("checkpointLocation", checkpoint_directory)
                  .option("mergeSchema", "true")
                  .table(table_name))
    return query

query = autoload_to_table(data_source = f"{DA.paths.working_dir}/tracker",
                          source_format = "json",
                          table_name = "target_table",
                          checkpoint_directory = f"{DA.paths.checkpoints}/target_table")

The Auto Loader query we configured will automatically detects and processes records from the source directory into the target table. There is a slight delay as records are ingested, but an Auto Loader query executing with default streaming configuration should update results in near real time.

Auto Loader vs Delta Live Table

Auto Loader allows incrementally data ingestion into Delta Lake from a variety of data sources.

While Delta Live Table is used for defining end-to-end data pipelines by specifying the data source, the transformation logic, and destination state of the data, instead of manually stitching together siloed data processing jobs.

In effect Auto Loader would be a piece of the DLT pipeline specifically on the data source side as raw data is incrementally brought into Delta Lake. DLT would then pick up from there for the downstream transformation and processing of the data as illustrated.

Spark Structured Streaming

The magic behind Spark Structured Streaming is that it allows users to interact with ever-growing data sources as if they were just a static table of records.

structured streaming

A data stream describes any data source that grows over time. Structured Streaming lets us define a query against the data source and automatically detect new records, and propagate them through previously defined logic.

Read a Stream

The spark.readStream() method returns a DataStreamReader used to configure and query the stream.

1
2
3
(spark.readStream
  .table("bronze")
  .createOrReplaceTempView("streaming_tmp_vw"))

When we execute a query on a streaming temporary view, we’ll continue to update the results of the query as new data arrives in the source. Think of a query executed against a streaming temp view as an always-on incremental query.

Write a Stream

To persist the results of a streaming query, we must write them out to durable storage. The DataFrame.writeStream method return a DataStreamWriter used to configure the output.

When writing to Delta Lake tables, we typically will only need to worry about 3 settings:

  1. Checkpoint.
  • Databricks creates checkpoints by storing the current state of your streaming job to cloud storage.
  • Checkpointing combines with write ahead logs to allow a terminated stream to be restarted and continue from where it left off.
  • Checkpoints cannot be shared between separate streams. A checkpoint is required for every streaming write to ensure processing guarantees.
  1. Output Modes. There are two outputs modes.
  • Append. .outputMode("append"). This is the default mode, and only newly appended rows are incrementally appended to the target table with each batch.
  • Complete. .outputMode("complete"). The results table is recalculated each time a write is triggered, the target table is overwritten with each batch.
  1. Trigger Intervals. There are four trigger intervals.
  • Unspecified. This is the default, and equivalent to processingTime=500ms.
  • Fixed Interval micro-batches. .trigger(processingTime="2 minutes"). The query will be executed in micro-batches and kicked off at the user-specified intervals.
  • Triggered micro-batch. .trigger(once=True). The query will execute a single micro-batch to process all the available data and then stop on its own.
  • Triggered micro-batches. .trigger(availableNow=True). The query will execute multiple micro-batches to process all the available data and then stop on its own.

07 - Multi-hop Architecture

Delta Lake allows users to easily combine streaming and batch workloads in a unified multi-hop pipeline. Each stage of the pipeline represents a state of our data valuable to driving core use cases within the business.

multi-hop architecture

  • Bronze tables contains raw data ingested from various sources (JSON files, RDBMS data, IoT data, to name a few examples).
  • Silver tables provide a more refined view of our data. We can join fields from various bronze tables to enrich streaming records, or update account statuses based on recent activity.
  • Gold tables provide business level aggregations often used for reporting and dashboarding. This would include aggregation such as daily active site users, weekly sales per store, or gross revenue per quarter by department.

08 - Delta Live Table

Delta Live Table Pipeline

There are two types DLT Pipelines:

  1. Continuous Pipeline
  2. Triggered Pipeline

Continuous Pipeline

Continuous Pipelines update tables continuously as input data changes. Once an update is started, it continues to run until the pipeline is shutdown.

In Development mode, the Delta Live Table system ease the development process by

  1. Reusing a cluster to avoid the overhead of restarts. The cluster runs for two hours when development mode is enabled.
  2. Disabling pipeline retries so that you can immediately detect and fix errors.

Triggered Pipeline

Triggered Pipelines update each table with whatever data is currently available and then they shutdown.

In Production Mode, the Delta Live Table system:

  1. Terminates the cluster immediately when the pipeline is stopped.
  2. Restarts the cluster for recoverable errors, e.g. memory leak or stale credentials.
  3. Retries execution in case of specific errors, e.g. a failure to start a cluster.

Managing Data Quality with Delta Live Tables

You use expectations to define data quality constraints on the contents of a dataset. Expectations allow you to guarantee data arriving in tables meets data quality requirements and provide insights into data quality for each pipeline update. You apply expectations to queries using Python decorators or SQL constraint clauses.

Retain invalid records

1
CONSTRAINT valid_timestamp EXCEPT (timestamp > '2012-01-01')

Drop invalid records

1
CONSTRAINT valid_current_page EXCEPT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Fail on invalid records

1
CONSTRAINT valid_count EXCEPT (count > 0) ON VIOLATION FAIL UPDATE

With ON VIOLATION FAIL UPDATE, records that violate the expectation will cause the pipeline to fail. Whe a pipeline fails because of an expectation violation, you must fix the pipeline code to handle the invalid data correctly before re-running the pipeline.

SQL For Delta Live Table

At its simplest, you can think of DLT SQL as a slight modification to traditional CTAS statements. DLT tables and views will always be preceded by the LIVE keywords.

Auto Loader syntax for DLT

Delta Live Tables provides slightly modified Python syntax for Auto Loader, and adds SQL support for Auto Loader.

1
2
3
4
5
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Note

  • Delta Live Tables will automatically configures and manages the schema and checkpoint directories when using Auto Loader to read files.
  • If you manually configure either of these directories, performing a full refresh does not affect the contents of the configured directories.
  • Databricks recommends using the automatically configured directories to avid unexpected side efforts during processing.

Delta Live Table Demo

09 - Task Orchestration with Jobs

Notification

Job Alerts

To notify when runs of this job begin, complete, or fail, you can add one or more email addresses, or system destinations (e.g. webhook destination or Slack).

Notifications you set at the job level are not sent when failed tasks are retried. To receive a failure notification after every failed tasks, including every failed retry, we need to use Task Notification instead.

Destination Type

  1. Email
  2. Slack
  3. Webhook
  4. Microsoft Teams
  5. PagerDuty
  6. Amazon Simple Email Service (SES) and Simple Notification Service (SNS)

10 - Running a DBSQL Query

11 - Managing Permissions

Data Explorer

The Data Explorer allows users and admin to

  1. Navigate databases, tables and views.
  2. Explorer data schema, metadata, and history.
  3. Set and modify permissions of relational entities.

Configuring Permissions

By default, admins will have the ability to view all objects registered to the metastore and will be able to control permissions for other users in the workspace. Users will default to having no permissions on anything registered to the metastore, other than objects that they can create in DBSQL.

Generally, permissions will be set using Groups that have been configured by administrator, often by importing organisational structures from SCIM integration with a different identity provider.

Table Access Control Lists (ACLs)

Databricks allows you to configure permissions for the following objects:

  1. CATALOG. controls access to the entire data catalog.
  2. DATABASE. controls access to a database.
  3. TABLE. controls access to managed or external tables.
  4. VIEW. controls access to SQL views.
  5. FUNCTION. controls access to a named function.
  6. ANY FILE. controls access to the underlying filesystem. Users granted access to ANY FILE can bypass the restrictions put on the catalog, databases, tables, and views by reading from the filesystem directly.

Privileges

  1. ALL PRIVILEGES. gives all privileges (is translated into all the below privileges).
  2. SELECT. gives read access to an object.
  3. MODIFY. gives ability to add, delete, and modify data to or from an object.
  4. READ_METADATA. gives ability to view an object and its metadata.
  5. USAGE. does not give any abilities, but is an additional requirement to perform any action on a database object.
  6. CREATE. gives ability to create an object, e.g. a table in a database.
  7. CREATE_NAMED_FUNCTION. gives ability to create a named UDF in an existing catalog or schema.

Admin Configuration

To grant permission to certain team, we could do

1
2
GRANT ALL PRIVILEGES ON TABLE employees TO hr_team
;

To confirm results, we can run

1
2
SHOW GRANT ON TABLE employees
;

12 - Productionalizing Dashboards and Queries in DBSQL