spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Liu <>
Subject re: streaming, batch / spark 2.2.1
Date Thu, 02 Aug 2018 18:42:22 GMT
 Hello there,

I'm new to spark streaming and have trouble to understand spark batch
"composition" (google search keeps give me an older spark streaming
concept). Would appreciate any help and clarifications.
I'm using spark 2.2.1 for a streaming workload (see quoted code in (a)
below). The general question I have is:

How is the number of records for a spark batch (as seen on Spark Job UI)
determined? (the default batch interval time is supposedly zero in Spark
2.2.1  by default settings)

The Issue I'm facing is that for the same incoming streaming source (300K
msg/sec to a kafka broker, 220bytes per message), I got different numbers
(2x) of processed batches on two different systems for the same amount of
application/consumer running time (30min). -- At batch level, the input
data size per batch are the same (49.9KB), where the total input data size
(under spark executor tab) is different , i.e. ~2x as the system also
processed 2x of batches as expected. --- Note: on both systems, the spark
consumer seems to hold well (no increased batch processing time lagging
over the 30 min). see (c) for the real functional concern.

(b) and (c) below have a bit more context info and the real concern in case

Have been struggling with this. Any comments and help would be very much




(a) code in use:
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
      .groupBy(millisTime(window($"event_time", "10
seconds").getField("start")) as 'time_window, $"campaign_id")  //original
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
...      .

the number of records in one batch does not seem to be determined by the
batch interval (since it is zero by default in Spark2.2), but likely (at
least influenced) by the time it needs to process the previous batch. It is
noted that the input data amount per batch seems to be quite consistent and
kept the same on both systems from Spark UI (49.9 kb)- indicating there is
a strict logic to prepare/cap the data per batch despite the fluctuation in
the batch processing time - what is this logic?

the major question is a functional one: if one system processes the double
amount of the data than the other, should it be an indication that either
the system processed duplicated data or the other system processes half of
the needed data? Or it is more a reporting issue?

View raw message