Published
- 2 min read
Apache Flink Late Arrival Data
What is late arrival data Late arrival data is defined by the implementor in the code, data beyound a certain event_time is considered as late arrival data. example
streamingDF.withWaterMark("event_time", "10 minutes")
.groupBy("$event_time", "5 minutes")
.count()
Keeps track of event_time, data late by 10 minutes is allowed QQ. What is event_time? How its used in structured streaming Answer : Event time is when the event occurred, Spark Structured Streaming uses the maximum observed event_time event_time < (maximum_observed_time - 10minutes) Watermark = max_observed_event_time - 10 minutes
QQ. Is event_time fixed or variable ? Answer : Event_time itself is dynamic because it depends on when the event occurred.
Event ID Event Time (event_time) Processing Time (when it arrives) E1 12:00:00 12:00:01 E2 12:01:00 12:01:02 E3 12:05:00 12:06:00 E4 11:50:00 12:07:00 (late arrival) Since E4’s event_time (11:50:00) is older than 11:55:00, it is dropped.
Whats the usecase
val query = stream
.withWatermark("timestamp", "10 minutes") // Specify watermark delay
.groupBy("key")
.agg(count("value"))
.writestream
What is window doing here You define windows based on event time. For example, you might have 10-minute windows, meaning data is grouped into 10-minute intervals according to their timestamps.
val windowedCounts = input
.groupBy(
window($"eventTime", "1 minute"), // 1-minute tumbling windows
$"value" // Grouping by the "value" column (replace with your grouping key)
)
.agg(count("*").alias("count"))
How to hadle late arrival data
val withWatermark = stream.withWatermark("eventTime", "10 minutes")
//Aggregrations
val aggregatedData = withWatermark
.groupBy(window($"eventTime", "1 minute"), $"key")
.agg(count("*").alias("count"))
.writeStream
// ... (your output mode and sink) ...
//Filter late data
val lateData = withWatermark
.filter($"eventTime" < current_watermark() - expr("10 minutes")) // 10 minutes after watermark
lateData.writeStream
.format("parquet") // Or any other format you prefer
.option("path", "path/to/late/data") // Path to store late data
.start()