joshita / dev

Published

- 2 min read

Apache Flink Late Arrival Data

img of Apache Flink Late Arrival Data

What is late arrival data Late arrival data is defined by the implementor in the code, data beyound a certain event_time is considered as late arrival data. example

   streamingDF.withWaterMark("event_time", "10 minutes")
           .groupBy("$event_time", "5 minutes")
            .count()

Keeps track of event_time, data late by 10 minutes is allowed QQ. What is event_time? How its used in structured streaming Answer : Event time is when the event occurred, Spark Structured Streaming uses the maximum observed event_time event_time < (maximum_observed_time - 10minutes) Watermark = max_observed_event_time - 10 minutes

QQ. Is event_time fixed or variable ? Answer : Event_time itself is dynamic because it depends on when the event occurred.

Event ID Event Time (event_time) Processing Time (when it arrives) E1 12:00:00 12:00:01 E2 12:01:00 12:01:02 E3 12:05:00 12:06:00 E4 11:50:00 12:07:00 (late arrival) Since E4’s event_time (11:50:00) is older than 11:55:00, it is dropped.

Whats the usecase

   val query = stream
  .withWatermark("timestamp", "10 minutes") // Specify watermark delay
  .groupBy("key")
  .agg(count("value"))
  .writestream

What is window doing here You define windows based on event time. For example, you might have 10-minute windows, meaning data is grouped into 10-minute intervals according to their timestamps.

   val windowedCounts = input
     .groupBy(
       window($"eventTime", "1 minute"), // 1-minute tumbling windows
       $"value" // Grouping by the "value" column (replace with your grouping key)
     )
     .agg(count("*").alias("count"))

How to hadle late arrival data

   val withWatermark = stream.withWatermark("eventTime", "10 minutes")

//Aggregrations
val aggregatedData = withWatermark
  .groupBy(window($"eventTime", "1 minute"), $"key")
  .agg(count("*").alias("count"))
  .writeStream
  // ... (your output mode and sink) ...

//Filter late data
val lateData = withWatermark
  .filter($"eventTime" < current_watermark() - expr("10 minutes")) // 10 minutes after watermark

lateData.writeStream
  .format("parquet") // Or any other format you prefer
  .option("path", "path/to/late/data") // Path to store late data
  .start()