What’s inside DELTA LAKE

Photo by Franki Chamaki on Unsplash

Delta Lake is the term you would have heard about or read about in 100s of blogs or you may have even used it in your project.

The intentions of this blog is not only talk about Delta Lake and its concepts, but to familiarise you on how does it work under the hood.

Before we get any deeper let’s set the base.

What is Delta Lake?

Delta Lake is an open format storage layer that delivers reliability, security and performance on your data lake — for both streaming and batch operations. By replacing data silos with a single home for structured, semi-structured and unstructured data, Delta Lake is the foundation of a cost-effective, highly scalable lakehouse.

Source: https://databricks.com/product/delta-lake-on-databricks

Features of Delta Lake

· ACID Compliant

· Schema Enforcement

· Support for streaming data

· Time Travel

· Upsert and Delete compatible

https://delta.io/

Components of Delta Lake

  1. Delta lake Storage Layer

We store the data using delta lake and access the data using Spark. This Approach provides the performance boost and as well as the consistency of the data.

2. Delta Tables

Parquet files: Delta table stores the data in the Parquet file format.

Transaction logs: Its an ordered entry of every transactions has even been performed in this table

Upon firing any query on top of the delta table, spark checks the transaction logs to see what’s the new transactions which has been posted to the table. And updates the table with the new changes.

This ensures that the user table is always synchronised with the latest changes

metastore: Stores the schema and metadata.

3. Delta Engine

Delta Engine is a high performance, Apache Spark compatible query engine that provides an efficient way to process data in data lakes including data stored in open source Delta Lake. Delta Engine accelerate data lake operations, supporting a variety of workloads ranging from large-scale ETL processing to ad-hoc, interactive queries. Many of these optimisations take place automatically; you get the benefits of these Delta Engine capabilities just by using Databricks for your data lakes.

Source: https://docs.databricks.com/delta/optimizations/index.html

Lets get inside the Delta Tables

When we talk about Delta table, how is it different from the normal Table?

Delta Table mainly consists of below 2 components.

Parquet files: The file format by which the files/data are stored.

Transaction logs: It’s an ordered record of the transactions which occurred on the delta table. Consider this as a Source Of Truth which will be used by the delta engine to guarantee the atomicity.

Visualising the Delta Table path:

Let’s dig deeper inside the _delta_log folder.

delta_log folder contains JSON files which keeps track of all the transactions occurred for the given table.

Soon will have demo and shall talk about it more.

Delta Lake time travel

Using Delta lake time travel we can refer to the previous versions of the data.

Delta table, tracks every transaction and at a given point in time it gives the flexibility to refer to any previous snapshots.

Time travel would help in

Comparing the previous version of the data with current

In case of data corruption, we can go back to a previous snapshots.

DEMO

Let’s create a simple dataframe by using a CSV file.

Lets convert the data to Delta by specifying the format as “delta”

We can also create the table in the metastore.

Delta supports the partitioning of the data. So lets use Year as our partitioning column.

Lets see what do we have under the delta path:

ROW 6,7,8,9: are the parquet files which got created when we inserted the data into the table for the first time.

ROW 1,2,3,4: are the partition folders which got created when we inserted the data into the same delta path but as partitioned.

ROW 5: _delta_log: which contains the transaction logs

Lets dig the the delta_log:

ROW 5,6: are the transaction logs which holds the transactions which occurred against the table

The four columns each represent a different part of the very first commit to the Delta Table, creating the table.

  • add: contains the information about the individual columns.
  • commitInfo: Contains the details about the type of the operation READ or Write. plus the timestamp and the user.
  • metaData: Contains the schema information.
  • protocol: Contains the delta version.

Since we have applied 2 write functions.

  1. Initial write by reading from CSV > version 0
  2. Write on the partitioned column > version 1

We can look at the version 1 file and confirm that it captured the change.

PartitionValue contains the column by which we have partitioned.

You can confirm the same in the below screenshot.

Let’s write our version 3, Put a filter on the same data and write it back to the same delta path.

So even after we did the overwrite, we can still see that the data pertaining to previous commits still exists.

Access previous versions of table using Time Travel

Until now we have performed 3 writes:

  1. Read the initial CSV file and written to delta path
  2. Written the csv file partitioned on YEAR
  3. Applied the filter on the BasePay column and written to the delta path

So as per our 3 writes we have 3 different versions available in the table.

We can travel back to the specified time or the version.

To access the specific version we can pass “versionAsOf” with the version number.

To travel back to the specific timestamp time we can pass “timestampAsOf” with the timestamp value.

Vaccum

Vaccum is the command which can be used to clean up our directory and delete any previous version of the data.

We get the error while we try to delete the previous version. This is by design to have a fail safe against the accidental deletes.

We can bypass the default retention period check by setting the “spark.databricks.delta.retentionDurationCheck.enabled” to false.

And to confirm that that the version 0 is deleted, let’s try running the read command for the version 0.

Now since we deleted the version 0, we should still see version 1 which is a partitioned write.

Recursively vacuums directories associated with the Spark table and remove uncommitted files older than a retention threshold. The default threshold is 7 days. Databricks automatically triggers VACUUM operations as data is written.

VACUUM [ table_identifier | path] [RETAIN num HOURS]

To clean up uncommitted files left over from Spark jobs, use the VACUUM command to remove them. Normally VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.

For example, VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.

// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")

// vacuum all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")

You may find the Above codes in my github repo:

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/