Lesser Known Facts/Short cuts in Spark(PART1)

Photo by Jakub Skafiriak on Unsplash

Being a data engineer we are challenged everyday with not so usual cases to solve.

We cannot apply the same thought process to all the places. We need to think of the effort it takes and how we can reduce the time.

But at the same time, we are supposed to know the internals before we jump in and come to a conclusion.

In this Blog, I shall be talking about few tips and tricks and some lesser known facts in Spark which will come handy for most of our Data Engineer fellows.

1. count() always trigger an evaluation of each row?

So what if I run .count on the dataframe? it should cache/materialise the data right?

Not really, Count is different on DataFrames and Datasets when compared to RDDs.

In RDD, we do not have the catalyst optimiser On RDDs, so it will always evaluates everything.

But on the other hand, DataFrame/Dataset have become smarter by using the catalyst, it turns into the equivalent of “select count(*) from …” in SQL, which can be done without scanning the data for some data formats (e.g. Parquet).

But note, On the other hand though, caching a DataFrame / Dataset does require everything to be cached.

Need to know more?

https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html

2. Shortcut to find the empty columns in the dataframe?

df.select(df.columns.map(c => sum(col(c).isNull.cast(“int”)).alias(c)): _*).show()

3. What is metastore timeout? why should I even care?

Lets take one example where we use spark for ingesting the data into the ADLSGen2/S3 where we use API or HTTPS Calls for eg. SFDC using sprimgml library.

https://github.com/springml/spark-salesforce

Springml spark-salesforce library will help you to connect to SFDC and fetches the data.

It makes a HTTPS Poll unlike other JDBC calls.

The throughput varies based on the number of columns we are trying to access in every https call.

When we are trying to access a very huge object which has got millions of records, Spark will spend a lot of time in querying SFDC.

And by the time spark completes the GET request, the hive metastores times out.

Failed to acquire connection to jdbc:mysql://<>/central_metastore?useSSL=true&requireSSL=false. Sleeping for 7000 ms. Attempts left: 2

In this case we need to tune below parameters.

spark.hadoop.hive.server2.session.check.interval(default 60000ms)spark.hadoop.hive.server2.idle.operation.timeout(default 7200000ms)spark.hadoop.hive.server2.idle.session.timeout(default 900000ms)spark.hadoop.hive.server2.idle.operation.interval

The check interval for session/operation timeout, in milliseconds, which can be disabled by setting to zero or a negative value.
For example, the value of “60000” refers to 1 minute and indicates that the session will be checked every 1 minute.

b. spark.hadoop.hive.server2.idle.operation.timeout

Operation will be closed when not accessed for this duration of time, in milliseconds; disable by setting to zero.

For a positive value, checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR). For a negative value, checked for all of the operations regardless of state.
For example, the value of “7200000ms” indicate that the query/operation will be timed out after 2 hours if it is still running.

c. spark.hadoop.hive.server2.idle.session.timeout

Session will be shut down if its not accessed for the given duration of time, in milliseconds.

We can disable the setting to zero or a negative value.
Value of “900000” indicate that the session will be timed out after 15mins of of inactivity.

4. What should I use, Savemode.Overwrite or mode(“overwrite”)??

Is there a difference between them?

No, “overwrite” is a shorthand command for Savemode.overwrite

Look at the below source code.

/**
* Specifies the behavior when data or table already exists. Options include:
* — `overwrite`: overwrite the existing data.
* — `append`: append the data.
* — `ignore`: ignore the operation (i.e. no-op).
* — `error` or `errorifexists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter[T] = {
this.mode = saveMode.toLowerCase(Locale.ROOT) match {
case “overwrite” => SaveMode.Overwrite
case “append” => SaveMode.Append
case “ignore” => SaveMode.Ignore
case “error” | “errorifexists” | “default” => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s”Unknown save mode: $saveMode. ” +
“Accepted save modes are ‘overwrite’, ‘append’, ‘ignore’, ‘error’, ‘errorifexists’.”)
}
this
}

5. Spark Distinct VS DropDuplicates

In the Distinct function we need to pass the columns in select clause prior to calling the function.

Where as in dropDuplicates(colNames)will return all the columns in the dataframe after removing the duplicates for the given columns.

From javadoc, there is no difference between distinc() and dropDuplicates().

dropDuplicatespublic DataFrame dropDuplicates()Returns a new DataFrame that contains only the unique rows from this DataFrame. This is an alias for distinct.

dropDuplicates() was introduced in 1.4 as a replacement for distinct(), as you can use it’s overloaded methods to get unique rows based on subset of columns.

Will be continued…………………..

PART 2 is already published here.

https://link.medium.com/lExxlOuM9hb

Connect with me:

https://www.linkedin.com/in/ajshetty28

BigData Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/