Predicate Pushdown is for real??

Photo by Kevin Butz on Unsplash

Spark is used as an ingestion tool in more and more companies. It is a perfect replacement for any kind of commercial applications like Talend or Informatica.

Spark can connect to multiple different source systems, it could be standard databases like SQL Server, ORACLE or even NOSQL databases like Cassandra, Mongodb etc.

Each of the source systems have their own level of optimisation available when we query.

It takes user query as an input and tries to optimise it and returns only the required data.

Spark Predicate pushdown uses the source system’s built in optimisation techniques to filter out the unwanted data before it can read into the memory.

Predicate push down to database allows for better optimised Spark queries.

Basically Spark uses the where clause in the query and pushes it to the source to filter out the data. now instead of reading the whole dataset we would be asking the source to filter the data based on the where clause first and return the final dataset.

A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance.

So yes, The Predicate pushdown is for real!!!

We expect spark to be smart enough to do all our job explicitly. But being a data engineer we are supposed to be validating if this has come into effect or not.

Below is a small exercise to see how we can verify if the predicate pushdown is taken into consideration or not.

Before that I encourage you to be familiar with the plans(logical, optimised logical, physical)which spark generates based on the query.

http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

The “PushedFilters” part should exist in Dataframe’s Physical plan and the existence of right literals in “PushedFilters” is used to prove that the predicates pushing down have been effective.

For this exercise we shall be installing a sqlserver on top of Azure and connecting using JDBC.

Having the where clause in the dbtable doesn’t use pushdown predicates.

== Physical Plan == Scan JDBCRelation(((select * from AZURE_SQL_TEMP.MY_TEST_TABLE where LAST_UPDATE_DATE >=TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’) and LAST_UPDATE_DATE <= TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’))) SPARK_GEN_SUBQ_20) [numPartitions=1] [EMPLOYEE_TYPE#474728,COLUMN1#873701,COLUMN2#47429,… 2 more fields] PushedFilters: [], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Here we can see that the numpartitioners as 1.

So by default the numPartitions set to 1.

But the PushedFilters is EMPTY.

Lets put the where clause as an option.

== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE SPARK_GEN_SUBQ_21) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(EMPLOYEE_TYPE), *StringContains(EMPLOYEE_TYPE,Standard)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

So this took where clause in predicate pushdown.

So is filter option also works like where?

== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_21) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(EMPLOYEE_TYPE), *StringContains(EMPLOYEE_TYPE, Standard)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Even filter works similar to Where clause. Why so.

Underneath the code, where clause is a alias for filter

So this proves that we need to use where or filter clause for predicate pushdown

Lets try with dates.

Filter ((cast(LAST_UPDATE_DATE#4696382 as string) >= 2021–06–09) && (cast(LAST_UPDATE_DATE#4696383 as string) <= 2021–06–10)) +- Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_26) [numPartitions=1] [COLUMN1#4696384,COLUMN2#4696385,COLUMN3#4696386,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Pushed filters are using only is not NULL. We need dates greater than and equal to

So the sqlserver by default converts the where clauses to string

== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_25) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696384,COLUMN3#4696384,… 2more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Great so it worked!! We have explicitly converted string to TIMESTAMP for SQLSERVER to read.

But my numPartition is still 1.

That is because, for numPartition to come in effect, we need to pass the partitionColum.

Ideally partition column will be the primary key which is increasing.

Lets try

java.lang.IllegalArgumentException: requirement failed: When reading JDBC data sources, users need to specify all or none for the following options: ‘partitionColumn’, ‘lowerBound’, ‘upperBound’, and ‘numPartitions’

Let’s try to alter a little.

== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY

So it worked. Both partition and pushed filters.

== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#877973,COLUMN2#4696382,COLUMN3#4696383,… 2 more fields] PushedFilters: [], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Having lowerbound and upperbound will not trigger predicate pushdown we need to pass it explicitly.

== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

So here we can confirm that the predicate pushdown has been used in the plan and applied on the LAST_UPDATE_DATE column.

You may please post your feedback in the comment.

Ajith Shetty

BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓

Subscribe to my: Weekly Newsletter Just Enough Data

--

--

--

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Boring Python Projects You Should Probably Build

Making a carpet for a virtual world

Tears, Sweat and Server Migration

The nexus puzzle in continuous integration

Commonly Used Object-Oriented Design Patterns in PHP Part 2: Observer and Singleton

How to Package Your Python Code

Efficient Resource Management at Pinterest’s Batch Processing Platform

See more description in the text below.

What I Wish I Knew Before Using Google Analytics for Firebase for Mobile Product Analytics

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ajith Shetty

Ajith Shetty

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/

More from Medium

An Optimization Story: Speeding Data Locker Up

Thanks For Your Valuable Contribution, Here is A Present For You All!

Building a customized Spark metrics collection tool for a global enterprise technology provider

Data Quality — Backfill Strategy