Day 05 - Demystify Spark UI

Manager: What happened today? Why is it taking so long for the job to complete?

Data Engineer: Give me a sec. I'll check the Spark UI to find out.

...

...

Three hours later

Manager: Did you find the root cause?

Data Engineer: Not yet! I've just resubmitted the job. 😛

Manager: 🤔

If you want to have a better explaination of the underlying problem, then learn how to demystify the Spark UI to gain actionable knowledge.

Application Summary and Jobs View#

When you land on the Spark UI for a specific application say app-20220203175006-0012, you can view the Event Timeline and the Completed/Active Jobs, as shown below.

Spark Jobs View!

  • User This is the username who has submitted the application

  • Total Uptime : Total runtime of the application

  • Scheduling Mode : From the offical docs - By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

    Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings.

    To enable the fair scheduler, simply set the spark.scheduler.mode property to FAIR when configuring a SparkContext

package com.gigahex.samples
import org.apache.spark.sql.SparkSession
object HelloWord {
def main(args: Array[String]): Unit = {
/**
* Setup the spark session
*/
val spark = SparkSession.builder()
.master("local")
.appName("count-words-scala")
.config("spark.scheduler.mode", "FAIR")
.getOrCreate()
spark.stop()
}
}
  • Event Timeline : Every application requires executors for running the jobs and driver is the first executor created to act as an interface between the client and the other executors that would be doing the actual processing. The event timeline clearly shows that the first executor is the driver and the other executors have a numeric ID.

    Below the executors timeline, you can see the jobs that have been scheduled. As mentioned previously, every job has many stages and every stage has many tasks. For the program that we've executed, only one task was enough to complete the job.

Job Details View#

Clicking on the individual Job ID will display the same event timeline, with DAG Visualization and list of Stages.

Spark Stage List!

  • Every program in Spark, is converted into a Direct Acyclic Graph(DAG), which you can think of a blueprint of a job that is optimized to reduce the data read and shuffled across the network. For scenarios, where data needs to be shuffled across the network, there would be multiple stages for a given job. This is happens when you're joining two different dataset using some common column. We will be seeing such examples in the future.

    Whenever there is a shuffle, you can see the metrics regarding Shuffle Read and Shuffle Write.

  • Stages list, shows all the stages that were part of the job. Every stage will consist of tasks, and each task by default is allocated 1 core, and processes 1 partition.

    A partition is a sub-set of the entire dataset that is processed by a single task. The larger is the size of the partition, the slower would be the processing and if the size of the partition is too small, then more time would be spent on task serialization and de-serialization.

    tip

    If your job is performing slow, then check the shuffle read and write size. In order to minimize the shuffle read and write size, try to reduce the size of each partition.

Stages Detailed View#

Every stage will expose additional metrics as displayed below.

Spark Stage detail!

  • Total Time Across All Tasks: Each task will take for task serialization, computation, deserialization, shuffle read and write. This is the total time taken by all the tasks to complete all the above sub-processes.

  • Locality Level Summary: According to the offical docs, Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:

    • PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible -NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes

    • NO_PREF data is accessed equally quickly from anywhere and has no locality preference

    • RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch

    • ANY data is elsewhere on the network and not in the same rack

      note

      Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.

Task Metrics Detail#

Each task in the Spark, spends time in different steps. Let's look at the main steps.

  • Scheduler Delay : This is time interval between task submission for running and the actual start time.
  • Executor Computing : Total time taken by the executor to run the task.
  • Task Deserialization : Time taken to deserialize the JVM objects transferred over network before executing it.
  • Shuffle Read Time : Time taken for fetching the shuffle data over the network.
  • Shuffle Write Time : Time taken for writing the shuffle data. If the data is written to disk, then this will be slower.

Summary#

Today we've covered the most common metrics exposed by the Spark Web UI, i.e. Jobs, DAGs and Stage metrics. There are other details provided for executors and the SQL plan, that we'll get into when we go into Dataframes and optimization section, which we'll covered during the last week of this guide.

For any queries or issues that you face, feel free to discuss in the Slack workspace.

What's next?#

Tomorrow, we'll start building analytics app like Google Analytics using Spark.

Plan post day5!