Predicate Pushdown is for real??

Photo by Kevin Butz on Unsplash

Spark is used as an ingestion tool in more and more companies. It is a perfect replacement for any kind of commercial applications like Talend or Informatica.

Spark can connect to multiple different source systems, it could be standard databases like SQL Server, ORACLE or even NOSQL databases like Cassandra, Mongodb etc.

Each of the source systems have their own level of optimisation available when we query.

It takes user query as an input and tries to optimise it and returns only the required data.

Spark Predicate pushdown uses the source system’s built in optimisation techniques to filter out the unwanted data before it can read into the memory.

Predicate push down to database allows for better optimised Spark queries.

Basically Spark uses the where clause in the query and pushes it to the source to filter out the data. now instead of reading the whole dataset we would be asking the source to filter the data based on the where clause first and return the final dataset.

A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance.

So yes, The Predicate pushdown is for real!!!

We expect spark to be smart enough to do all our job explicitly. But being a data engineer we are supposed to be validating if this has come into effect or not.

Below is a small exercise to see how we can verify if the predicate pushdown is taken into consideration or not.

Before that I encourage you to be familiar with the plans(logical, optimised logical, physical)which spark generates based on the query.

http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

The “PushedFilters” part should exist in Dataframe’s Physical plan and the existence of right literals in “PushedFilters” is used to prove that the predicates pushing down have been effective.

For this exercise we shall be installing a sqlserver on top of Azure and connecting using JDBC.

https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15

val dbtable=”(select * from AZURE_SQL_TEMP.MY_TEST_TABLE where LAST_UPDATE_DATE >=TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’) and LAST_UPDATE_DATE <= TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’))”val myDF = spark.read.format(“driverClass”).option(“url”, s”url”).option(“driver”, “com.microsoft.sqlserver.jdbc.SQLServerDriver”).option(“query”, dbtable).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“numPartitions”,100).load().explain(true)

Having the where clause in the dbtable doesn’t use pushdown predicates.

== Physical Plan == Scan JDBCRelation(((select * from AZURE_SQL_TEMP.MY_TEST_TABLE where LAST_UPDATE_DATE >=TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’) and LAST_UPDATE_DATE <= TO_DATE(‘2021–06–05 00:00:00’,’yyyy-MM-dd HH24:mi:ss’))) SPARK_GEN_SUBQ_20) [numPartitions=1] [EMPLOYEE_TYPE#474728,COLUMN1#873701,COLUMN2#47429,… 2 more fields] PushedFilters: [], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Here we can see that the numpartitioners as 1.

So by default the numPartitions set to 1.

But the PushedFilters is EMPTY.

Lets put the where clause as an option.

val myDF = spark.read.format(“com.microsoft.sqlserver.jdbc.spark”).option(“url”, url).option(“driver”, driverClass).option(“query”, dbtable).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“numPartitions”,100).load().where(“””EMPLOYEE_TYPE like “%Standard%””””).explain(true)== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE SPARK_GEN_SUBQ_21) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(EMPLOYEE_TYPE), *StringContains(EMPLOYEE_TYPE,Standard)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

So this took where clause in predicate pushdown.

So is filter option also works like where?

val myDF = spark.read.format(“driverClass”).option(“url”, url).option(“driver”, driverClass).option(“query”, dbtable).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“numPartitions”,100).load().filter(“””EMPLOYEE_TYPE like “%Standard%””””).explain(true)== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_21) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(EMPLOYEE_TYPE), *StringContains(EMPLOYEE_TYPE, Standard)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Even filter works similar to Where clause. Why so.

Underneath the code, where clause is a alias for filter

So this proves that we need to use where or filter clause for predicate pushdown

Lets try with dates.

val myDF = spark.read.format(“driverClass”).option(“url”, url).option(“driver”, driverClass).option(“query”, dbtable).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“numPartitions”,100).load().where(“””LAST_UPDATE_DATE >= “2021–06–09” “””).where(“””LAST_UPDATE_DATE <= “2021–06–10” “””).explain(true)Filter ((cast(LAST_UPDATE_DATE#4696382 as string) >= 2021–06–09) && (cast(LAST_UPDATE_DATE#4696383 as string) <= 2021–06–10)) +- Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_26) [numPartitions=1] [COLUMN1#4696384,COLUMN2#4696385,COLUMN3#4696386,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE)], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Pushed filters are using only is not NULL. We need dates greater than and equal to

So the sqlserver by default converts the where clauses to string

val myDF = spark.read.format(driverClasss).option(“url”, url).option(“driver”, driverClass).option(“query”, dbtable).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“numPartitions”,100).load().where(“””LAST_UPDATE_DATE >= cast(“2021–06–09” as TIMESTAMP)”””).where(“””LAST_UPDATE_DATE <= cast(“2021–06–10” as TIMESTAMP)”””).explain(true)== Physical Plan == Scan JDBCRelation((AZURE_SQL_TEMP.MY_TEST_TABLE) SPARK_GEN_SUBQ_25) [numPartitions=1] [COLUMN1#4696382,COLUMN2#4696384,COLUMN3#4696384,… 2more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Great so it worked!! We have explicitly converted string to TIMESTAMP for SQLSERVER to read.

But my numPartition is still 1.

That is because, for numPartition to come in effect, we need to pass the partitionColum.

Ideally partition column will be the primary key which is increasing.

Lets try

val myDF = spark.read.format(driverClass).option(“url”, url).option(“driver”, driverClass).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“dbtable”, dbtable).option(“partitionColumn”, “LAST_UPDATE_DATE”).option(“numPartitions”, 10).load().where(“””LAST_UPDATE_DATE >= cast(“2021–06–09” as TIMESTAMP)”””).where(“””LAST_UPDATE_DATE <= cast(“2021–06–10” as TIMESTAMP)”””).explain(true)

java.lang.IllegalArgumentException: requirement failed: When reading JDBC data sources, users need to specify all or none for the following options: ‘partitionColumn’, ‘lowerBound’, ‘upperBound’, and ‘numPartitions’

Let’s try to alter a little.

val myDF = spark.read.format(“driverClass).option(“url”, sqlServerUrl).option(“driver”, driverClass).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“dbtable”, dbtable).option(“partitionColumn”, “LAST_UPDATE_DATE”).option(“lowerBound”, “2021–06–05 00:00:00.00”).option(“upperBound”, “2021–06–06 00:00:00.00”).option(“numPartitions”, 10).where(“””LAST_UPDATE_DATE >= cast(“2021–06–09” as TIMESTAMP)”””).where(“””LAST_UPDATE_DATE <= cast(“2021–06–10” as TIMESTAMP)”””).load().explain(true)== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY

So it worked. Both partition and pushed filters.

val myDF = spark.read.format(driverClass).option(“url”, url).option(“driver”, driverClass).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“dbtable”, dbtable).option(“partitionColumn”, “LAST_UPDATE_DATE”).option(“lowerBound”, “2021–06–05 00:00:00.00”).option(“upperBound”, “2021–06–06 00:00:00.00”).option(“numPartitions”, 10).load().explain(true)== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#877973,COLUMN2#4696382,COLUMN3#4696383,… 2 more fields] PushedFilters: [], ReadSchema: struct<EMPLOYEE_TYPE:string,COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

Having lowerbound and upperbound will not trigger predicate pushdown we need to pass it explicitly.

val myDF = spark.read.format(driverClass).option(“url”, sqlServerUrl).option(“driver”, driverClass).option(“user”, sqlServerUser).option(“password”, sqlServerPassword).option(“dbtable”, dbtable).option(“partitionColumn”, “LAST_UPDATE_DATE”).option(“lowerBound”, “2021–06–05 00:00:00.00”).option(“upperBound”, “2021–06–06 00:00:00.00”).option(“numPartitions”, 10).load().where(“””LAST_UPDATE_DATE >= cast(“2021–06–09” as TIMESTAMP)”””).where(“””LAST_UPDATE_DATE <= cast(“2021–06–10” as TIMESTAMP)”””).explain(true)== Physical Plan == Scan JDBCRelation(AZURE_SQL_TEMP.MY_TEST_TABLE) [numPartitions=10] [COLUMN1#4696382,COLUMN2#4696383,COLUMN3#4696384,… 2 more fields] PushedFilters: [*IsNotNull(LAST_UPDATE_DATE), *GreaterThanOrEqual(LAST_UPDATE_DATE,2021–06–09 00:00:00.0), *Less…, ReadSchema: struct<COLUMN1:string,COLUMN2:string,COLUMN3:decimal(25,11),PKEY_…

So here we can confirm that the predicate pushdown has been used in the plan and applied on the LAST_UPDATE_DATE column.

You may please post your feedback in the comment.

Ajith Shetty

BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓

Subscribe to my: Weekly Newsletter Just Enough Data

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/