Watermark and input data filtering in Apache Spark Structured Streaming

Versions: Apache Spark 3.5.0

I've already written about watermarks in a few places in the blog but despite that, I still find things to refresh. One of them is the watermark used to filter out the late data, which will be the topic of this blog post.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

Reminder, watermark in streaming systems has 2 purposes. The first of them is to control how late data can be processed. This property inherently impacts another one which is, when a state should be removed from the state store. The latter is also called a Garbage Collection Watermark.

Even though the definition looks simple, Apache Spark has a specificity about watermarks because they cannot be used without stateful processing. Put another way, if you write a job like that:

val clicksWithWatermark = clicksStream.toDF
  .withWatermark("clickTime", "10 minutes")

val query = clicksWithWatermark.writeStream.format("console").option("truncate", false)
  .start()

...do not expect your streaming query filtering out records older than the 10 minutes watermark because simply there is no watermark in the execution plan!

== Physical Plan ==
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: ConsoleWriter[numRows=20, truncate=false]], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2062/0x0000000800fcc040@4c443787
+- EventTimeWatermark clickTime#3: timestamp, 10 minutes
   +- *(1) Project [clickAdId#2, clickTime#3]
  	+- MicroBatchScan[clickAdId#2, clickTime#3] MemoryStreamDataSource

Why so? After all, the EventTimeWatermark node is there. Let's try to understand this in the next section.

Late data filtering

The watermark logical node does have a physical representation, the EventTimeWatermarkExec. However, it has 2 roles:

The node is not involved in the late data filtering! The filtering is a side-effect of the watermark tracking because it directly uses the watermark value injected to the stateful processing nodes:

class IncrementalExecution(...)

override val rule: PartialFunction[SparkPlan, SparkPlan] = {
  case s: StateStoreSaveExec if s.stateInfo.isDefined =>
    s.copy(
      eventTimeWatermarkForLateEvents = inputWatermarkForLateEvents(s.stateInfo.get),
      eventTimeWatermarkForEviction = inputWatermarkForEviction(s.stateInfo.get)
    )

  case s: SessionWindowStateStoreSaveExec if s.stateInfo.isDefined =>
    s.copy(
      eventTimeWatermarkForLateEvents = inputWatermarkForLateEvents(s.stateInfo.get),
      eventTimeWatermarkForEviction = inputWatermarkForEviction(s.stateInfo.get)
    )

As a result, it's the stateful physical node which is responsible for the state eviction and late data filtering. But it couldn't do this without the information provided by the watermark node.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!