The first state in Apache Spark Structured Streaming arbitrary stateful processing

Versions: Apache Spark 3.4.0

When you wrote your first arbitrary stateful processing pipelines, the state expiration is maybe the first tricky point you had to deal with. Why is that? After all, it's just about setting the timeout, doesn't it? Most of the time, yes, but there is an exception.

When you use the watermark-based state expiration, the watermark is present only in the 2nd micro-batch. So when you set the state TTL to something like the following snippet, you can end up with the state expired just afterwards!

val tenMinutesAsMilliseconds = 600000
currentState.setTimeoutTimestamp(currentState.getCurrentWatermarkMs() + tenMinutesAsMilliseconds)

When you check the getCurrentWatermarkMs implementation you'll see that it returns a long:

  /**
   * Get the current event time watermark as milliseconds in epoch time.
   *
   * @note In a streaming query, this can be called only when watermark is set before calling
   *   	`[map/flatMap]GroupsWithState`. In a batch query, this method always returns -1.
   */
  @throws[UnsupportedOperationException](
	"if watermark has not been set before in [map|flatMap]GroupsWithState")
  def getCurrentWatermarkMs(): Long

All groups created in the first micro-batch will expire in the second micro-batch as it was set to 1970-01-01 and at 99% you're processing data of the current year. Look at this code's output:

State will expire at 1970-01-01 02:00:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

State will expire at 2023-05-25 13:44:00.0
State timed out!
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|5	|
+-----+

Even though the watermark is quite high (2 hours), the data for the first state of 10 o'clock expires soon after in the next micro-batch. The problem is real. Imagine an entity-based stateful processing where, for example, you apply some custom session logic for users visiting your website. If there is a user who visits the page once and goes away for a while, when he/she comes back, it'll be a new session.

How to deal with that issue? Below you will find some of the possible solutions.

Temporary state

The first pattern is Temporary state and it relies on the watermark-aware state. Let's take a look at the state representing our simple count example:

case class CounterWithFlag(count: Int, needsUpdateWithNewWatermark: Boolean = false,
                       	firstGroupMaxEventTime: Option[Timestamp] = None)

The idea is to:

Below is the stateful mapper's snippet responsible for setting these attributes:

  val (countToAdd, maxEventTime) = getStatsForState(rows)
  val (needsUpdateWithNewWatermark, eventTimeForState) =
    if (currentStateGroup.getCurrentWatermarkMs() > 0) {
      (false, None)
    } else {
      (true, Some(maxEventTime))
    }
  val currentState = currentStateGroup.getOption.getOrElse(CounterWithFlag(count = 0))
  currentStateGroup.update(currentState.copy(
    count = currentState.count + countToAdd,
    needsUpdateWithNewWatermark = needsUpdateWithNewWatermark,
    firstGroupMaxEventTime = eventTimeForState
  ))

Now, when the state expires, the mapper uses this flag value to apply correct state expiration strategy:

  private def didStateReallyExpire(currentState: CounterWithFlag,
                               	currentWatermarkMs: Long): Boolean = {
	(currentState.needsUpdateWithNewWatermark &&
  	currentState.firstGroupMaxEventTime.get.getTime < currentWatermarkMs) ||
	!currentState.needsUpdateWithNewWatermark
  }

The first part of the condition prevents the state from expiring in the first micro-batch. Unless the watermark has advanced fast between 2 micro-batch executions, the max event time from the first micro-batch should still be greater than the watermark. Otherwise, we consider the state as expired; after all, it falls behind the watermark.

Depending on the outcome, the mapper either extends the state, this time with the correct watermark-based strategy, or expires it as expected:

val expirationTime = currentStateGroup.getCurrentWatermarkMs() + timeoutDurationMs
if (currentStateGroup.hasTimedOut) {
  println(s"State timed out!")
  val currentState = currentStateGroup.get
  if (didStateReallyExpire(currentState, currentStateGroup.getCurrentWatermarkMs())) {
    currentStateGroup.remove()
    Some(currentState.count)
  } else {
    currentStateGroup.update(currentState)
    println(s"Setting new expiration time as at ${new Timestamp(expirationTime)}")
    currentStateGroup.setTimeoutTimestamp(expirationTime)
    None
  }

It works and the logic is pretty close to the real watermark-based expiration. Of course, it has an extra readability cost since the stateful function not only manages the state result but also it somehow needs to complete the Structured Streaming broken logic for the first micro-batch.

Watermark-like expiration

A simpler alternative from the code perspective would be the Watermark-like expiration. The single difference with the classical watermark strategy is the usage of event time for the state expiration. We should start by transforming our previous state to something:

case class CounterWithMaxEventTime(count: Int, maxEventTimeSoFar: Timestamp)

As you can see, we don't need the boolean flag but this time we store every time the max event time seen so far. We're doing that to avoid updating the state in case of receiving late data. Later, in the mapping function, the expiration setting simply becomes:

val expirationTime = newState.maxEventTimeSoFar.getTime + timeoutDurationMs
println(s"State will expire at ${new Timestamp(expirationTime)}")
currentStateGroup.setTimeoutTimestamp(expirationTime)

It's easier than the previous approach but also has a drawback. The state expiration is still based on the watermark that is generated as a subtraction of the allowed delay from the event time. As a result, the state will expire later than expected. Let's take an example to understand this better:

As you can see, the simplicity has a serious cost here.But it seems, there is no single solution that will keep the mapper logic simple and solve the first micro-batch issue.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!