Apache Spark listeners

Versions: Apache Spark 3.3.0 https://github.com/bartosz25/spark-playground/tree/master/spark-listeners

Message bus is a common architectural design in the Enterprise Design Patterns. But it's also present at a lower level to enable the event-driven behavior. Apache Spark is not an exception. It uses a publish/subscribe approach in various places.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

Agnostic design

A message bus architecture involves 3 actors:

The implementation looks the same in Apache Spark.

SparkListener

Job-related events are the first type of messages you can listen to in Apache Spark. They expose what happens for:

Apache Spark comes with a handy abstract class called SparkListener that defines no-op event handlers for all existing events. Thanks to this class, if you need to only react on one specific category, you don't have to redefine all on* callback methods.

To register a custom listener, you can call SparkContext#addSparkListener(listener: SparkListenerInterface) function that will add your class to the listeners queue.

An event publication consists of calling the LiveListenerBus#post(event: SparkListenerEvent) with the case class representing the event. Event dispatching to the listeners is a simple pattern matching on the event classes:

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
// ...
  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
// ...

QueryExecutionListener

Another category of listeners comes from Apache Spark SQL and the QueryExecutionListener class. It's a poorer sibling of the SparkListener because it only has 2 callbacks:

To register the listeners, you can specify the full class name in the spark.sql.queryExecutionListeners config parameter or directly call the ExecutionListenerManager#register(listener: QueryExecutionListener) method.

The logic responsible for dispatching the event to the handlers is defined inside the ExecutionListenerBus#doPostEvent(listener: QueryExecutionListener, event: SparkListenerSQLExecutionEnd). It uses the pattern matching on top of the SparkListenerSQLExecutionEnd to get the success or failure handler:

class ExecutionListenerManager private[sql](
    session: SparkSession,
    sqlConf: SQLConf,
    loadExtensions: Boolean)
  extends Logging {
// ...
  override protected def doPostEvent(
      listener: QueryExecutionListener,
      event: SparkListenerSQLExecutionEnd): Unit = {
    if (shouldReport(event)) {
      val funcName = event.executionName.get
      event.executionFailure match {
        case Some(ex) =>
          val exception = ex match {
            case e: Exception => e
            case other: Throwable =>
              QueryExecutionErrors.failedToExecuteQueryError(other)
          }
          listener.onFailure(funcName, event.qe, exception)
        case _ =>
          listener.onSuccess(funcName, event.qe, event.duration)
      }
    }
  }

StreamingQueryListener

The StreamingQueryListener is the last example of the listeners I want to share with you here. As its name indicates, the class is responsible for the events related to the Structured Streaming queries. It defines 3 callbacks:

There are 2 ways to attach a new listener. The first uses the spark.sql.streaming.streamingQueryListeners configuration entry where you can define a list of listeners to use in the job. The second method calls StreamingQueryManager#addListener(listener: StreamingQueryListener) directly.

When it comes to the dispatch, it's defined in the same doPostEvent method but this time, in the StreamingQueryListenerBus:

class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
  extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {

// ...
  override protected def doPostEvent(
      listener: StreamingQueryListener,
      event: StreamingQueryListener.Event): Unit = {
    def shouldReport(runId: UUID): Boolean = {
      // When loaded by Spark History Server, we should process all event coming from replay
      // listener bus.
      sparkListenerBus.isEmpty ||
        activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
    }

    event match {
      case queryStarted: QueryStartedEvent =>
        if (shouldReport(queryStarted.runId)) {
          listener.onQueryStarted(queryStarted)
        }
      case queryProgress: QueryProgressEvent =>
        if (shouldReport(queryProgress.progress.runId)) {
          listener.onQueryProgress(queryProgress)
        }
      case queryTerminated: QueryTerminatedEvent =>
        if (shouldReport(queryTerminated.runId)) {
          listener.onQueryTerminated(queryTerminated)
        }
      case _ =>
    }
  }

Example

I hope you saw from the above sections that writing and registering a listener is quite easy. In case you're still confused, below you can find 3 code snippets showing how to define and register each type of listeners:

In addition to the observable metrics Spark listeners are another way to see what happens with your job. The difference between these 2 components is the scope. The metrics operate on the data whereas the listeners apply to the query- or job-related attributes.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!