spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject Re: Batchdurationmillis seems "sticky" with direct Spark streaming
Date Tue, 08 Sep 2015 23:38:05 GMT
>> Why are you checkpointing the direct kafka stream? It serves not purpose.

Could you elaborate on what you mean?

Our goal is fault tolerance.  If a consumer is killed or stopped midstream,
we want to resume where we left off next time the consumer is restarted.

How would that be "not surving a purpose"?  This is already working for
us.  From our testing, we indeed resume where we left off, which is not
possible without checkpointing.  If checkpointing is turned off, we'll
resume with the later Kafka topic entries, which would lead to skipping
over some entries.

Please elaborate.

We need both checkpointing and the ability to set batch duration millis.
The Spark API provides both capabilities but somehow if checkpointing is
turned on, our batch duration millis are always set to 10 seconds
internally by Spark.  What is the resolution?


On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das <tdas@databricks.com> wrote:

> Why are you checkpointing the direct kafka stream? It serves not purpose.
>
> TD
>
> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <
> dgoldenberg123@gmail.com> wrote:
>
>> I just disabled checkpointing in our consumers and I can see that the
>> batch duration millis set to 20 seconds is now being honored.
>>
>> Why would that be the case?
>>
>> And how can we "untie" batch duration millis from checkpointing?
>>
>> Thanks.
>>
>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> Well, I'm not sure why you're checkpointing messages.
>>>
>>> I'd also put in some logging to see what values are actually being read
>>> out of your params object for the various settings.
>>>
>>>
>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>>> dgoldenberg123@gmail.com> wrote:
>>>
>>>> I've stopped the jobs, the workers, and the master. Deleted the
>>>> contents of the checkpointing dir. Then restarted master, workers, and
>>>> consumers.
>>>>
>>>> I'm seeing the job in question still firing every 10 seconds.  I'm
>>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs.
>>>> Seems quite strange given that the jobs used to fire every 1 second, we've
>>>> switched to 10, now trying to switch to 20 and batch duration millis is not
>>>> changing.
>>>>
>>>> Does anything stand out in the code perhaps?
>>>>
>>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>> Have you tried deleting or moving the contents of the checkpoint
>>>>> directory and restarting the job?
>>>>>
>>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>
>>>>>> Sorry, more relevant code below:
>>>>>>
>>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>> sparkConf, params);
>>>>>> jssc.start();
>>>>>> jssc.awaitTermination();
>>>>>> jssc.close();
>>>>>> ……………………………..
>>>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>>>> sparkConf, Parameters params) {
>>>>>>     JavaStreamingContextFactory factory = new
>>>>>> JavaStreamingContextFactory() {
>>>>>>       @Override
>>>>>>       public JavaStreamingContext create() {
>>>>>>         return createContext(sparkConf, params);
>>>>>>       }
>>>>>>     };
>>>>>>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>>> factory);
>>>>>>   }
>>>>>>
>>>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>>>> Parameters params) {
>>>>>>     // Create context with the specified batch interval, in
>>>>>> milliseconds.
>>>>>>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>>>>     // Set the checkpoint directory, if we're checkpointing
>>>>>>     if (params.isCheckpointed()) {
>>>>>>       jssc.checkpoint(params.getCheckpointDir());
>>>>>>     }
>>>>>>
>>>>>>     Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
>>>>>> .getTopic()));
>>>>>>
>>>>>>     // Set the Kafka parameters.
>>>>>>     Map<String, String> kafkaParams = new HashMap<String,
String>();
>>>>>>     kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>>>> params.getBrokerList());
>>>>>>     if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>>>>       kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET,
>>>>>> params.getAutoOffsetReset());
>>>>>>     }
>>>>>>
>>>>>>     // Create direct Kafka stream with the brokers and the topic.
>>>>>>     JavaPairInputDStream<String, String> messages =
>>>>>> KafkaUtils.createDirectStream(
>>>>>>       jssc,
>>>>>>       String.class,
>>>>>>       String.class,
>>>>>>       StringDecoder.class,
>>>>>>       StringDecoder.class,
>>>>>>       kafkaParams,
>>>>>>       topicsSet);
>>>>>>
>>>>>>     // See if there's an override of the default checkpoint duration.
>>>>>>     if (params.isCheckpointed() && params.getCheckpointMillis()
>
>>>>>> 0L) {
>>>>>>       messages.checkpoint(Durations.milliseconds(params
>>>>>> .getCheckpointMillis()));
>>>>>>     }
>>>>>>
>>>>>>     JavaDStream<String> messageBodies = messages.map(new
>>>>>> Function<Tuple2<String, String>, String>() {
>>>>>>       @Override
>>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>>         return tuple2._2();
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
Void>() {
>>>>>>       @Override
>>>>>>       public Void call(JavaRDD<String> rdd) throws Exception
{
>>>>>>         ProcessPartitionFunction func = new
>>>>>> ProcessPartitionFunction(params);
>>>>>>         rdd.foreachPartition(func);
>>>>>>         return null;
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     return jssc;
>>>>>> }
>>>>>>
>>>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>
>>>>>>> I'd think that we wouldn't be "accidentally recovering from
>>>>>>> checkpoint" hours or even days after consumers have been restarted,
plus
>>>>>>> the content is the fresh content that I'm feeding, not some content
that
>>>>>>> had been fed before the last restart.
>>>>>>>
>>>>>>> The code is basically as follows:
>>>>>>>
>>>>>>>     SparkConf sparkConf = createSparkConf(...);
>>>>>>>     // We'd be 'checkpointed' because we specify a checkpoint
>>>>>>> directory which makes isCheckpointed true
>>>>>>>     JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>>> sparkConf, params);    jssc.start();
>>>>>>>
>>>>>>>     jssc.awaitTermination();
>>>>>>>
>>>>>>>     jssc.close();
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <tdas@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Are you sure you are not accidentally recovering from checkpoint?
>>>>>>>> How are you using StreamingContext.getOrCreate() in your
code?
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Tathagata,
>>>>>>>>>
>>>>>>>>> In our logs I see the batch duration millis being set
first to 10
>>>>>>>>> then to 20 seconds. I don't see the 20 being reflected
later during
>>>>>>>>> ingestion.
>>>>>>>>>
>>>>>>>>> In the Spark UI under Streaming I see the below output,
notice the *10
>>>>>>>>> second* Batch interval.  Can you think of a reason why
it's stuck
>>>>>>>>> at 10?  It used to be 1 second by the way, then somehow
over the course of
>>>>>>>>> a few restarts we managed to get it to be 10 seconds.
 Now it won't get
>>>>>>>>> reset to 20 seconds.  Any ideas?
>>>>>>>>>
>>>>>>>>> Streaming
>>>>>>>>>
>>>>>>>>>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>>>>>>>    - *Time since start: *1 day 8 hours 44 minutes
>>>>>>>>>    - *Network receivers: *0
>>>>>>>>>    - *Batch interval: *10 seconds
>>>>>>>>>    - *Processed batches: *11790
>>>>>>>>>    - *Waiting batches: *0
>>>>>>>>>    - *Received records: *0
>>>>>>>>>    - *Processed records: *0
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Statistics over last 100 processed batchesReceiver Statistics
>>>>>>>>> No receivers
>>>>>>>>> Batch Processing Statistics
>>>>>>>>>
>>>>>>>>>    MetricLast batchMinimum25th percentileMedian75th percentile
>>>>>>>>>    MaximumProcessing Time23 ms7 ms10 ms11 ms14 ms172
msScheduling
>>>>>>>>>    Delay1 ms0 ms0 ms0 ms1 ms2 msTotal Delay24 ms8 ms10
ms12 ms14
>>>>>>>>>    ms173 ms
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <tdas@databricks.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Could you see what the streaming tab in the Spark
UI says? It
>>>>>>>>>> should show the underlying batch duration of the
StreamingContext, the
>>>>>>>>>> details of when the batch starts, etc.
>>>>>>>>>>
>>>>>>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is
present only
>>>>>>>>>> when data is present (that is, * Documents processed:
> 0)*
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg
<
>>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Tathagata,
>>>>>>>>>>>
>>>>>>>>>>> Checkpointing is turned on but we were not recovering.
I'm
>>>>>>>>>>> looking at the logs now, feeding fresh content
hours after the restart.
>>>>>>>>>>> Here's a snippet:
>>>>>>>>>>>
>>>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents processed:
0.
>>>>>>>>>>> *2015-09-04 06:12:55,629 ... Documents processed:
4.*
>>>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents processed:
0.
>>>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents processed:
0.
>>>>>>>>>>> ...
>>>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents processed:
0.
>>>>>>>>>>> *2015-09-04 06:17:46,832 ... Documents processed:
40.*
>>>>>>>>>>>
>>>>>>>>>>> Interestingly, the fresh content (4 documents)
is fed about 5.6
>>>>>>>>>>> seconds after the previous batch, not 10 seconds.
The second fresh batch
>>>>>>>>>>> comes in 6.8 seconds after the previous empty
one.
>>>>>>>>>>>
>>>>>>>>>>> Granted, the log message is printed after iterating
over the
>>>>>>>>>>> messages which may account for some time differences.
But generally,
>>>>>>>>>>> looking at the log messages being printed before
we iterate, it's still 10
>>>>>>>>>>> seconds each time, not 20 which is what batchdurationmillis
is currently
>>>>>>>>>>> set to.
>>>>>>>>>>>
>>>>>>>>>>> Code:
>>>>>>>>>>>
>>>>>>>>>>> JavaPairInputDStream<String, String> messages
=
>>>>>>>>>>> KafkaUtils.createDirectStream(....);
>>>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>   JavaDStream<String> messageBodies = messages.map(new
Function<Tuple2<String,
>>>>>>>>>>> String>, String>() {
>>>>>>>>>>>       @Override
>>>>>>>>>>>       public String call(Tuple2<String, String>
tuple2) {
>>>>>>>>>>>         return tuple2._2();
>>>>>>>>>>>       }
>>>>>>>>>>>     });
>>>>>>>>>>>
>>>>>>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
>>>>>>>>>>> Void>() {
>>>>>>>>>>>       @Override
>>>>>>>>>>>       public Void call(JavaRDD<String>
rdd) throws Exception {
>>>>>>>>>>>
>>>>>>>>>>>   ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>>>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>>>         return null;
>>>>>>>>>>>       }
>>>>>>>>>>>     });
>>>>>>>>>>>
>>>>>>>>>>> The log message comes from ProcessPartitionFunction:
>>>>>>>>>>>
>>>>>>>>>>> public void call(Iterator<String> messageIterator)
throws Exception
>>>>>>>>>>> {
>>>>>>>>>>>     log.info("Starting data partition processing.
AppName={},
>>>>>>>>>>> topic={}.)...", appName, topic);
>>>>>>>>>>>     // ... iterate ...
>>>>>>>>>>>     log.info("Finished data partition processing
(appName={},
>>>>>>>>>>> topic={}). Documents processed: {}.", appName,
topic, docCount);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Any ideas? Thanks.
>>>>>>>>>>>
>>>>>>>>>>> - Dmitry
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das
<
>>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you accidentally recovering from checkpoint
files which has
>>>>>>>>>>>> 10 second as the batch interval?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg
<
>>>>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm seeing an oddity where I initially
set the
>>>>>>>>>>>>> batchdurationmillis to 1 second and it
works fine:
>>>>>>>>>>>>>
>>>>>>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>>>>>>>>> Durations.milliseconds(batchDurationMillis));
>>>>>>>>>>>>>
>>>>>>>>>>>>> Then I tried changing the value to 10
seconds. The change
>>>>>>>>>>>>> didn't seem to take. I've bounced the
Spark workers and the consumers and
>>>>>>>>>>>>> now I'm seeing RDD's coming in once around
10 seconds (not always 10
>>>>>>>>>>>>> seconds according to the logs).
>>>>>>>>>>>>>
>>>>>>>>>>>>> However, now I'm trying to change the
value to 20 seconds and
>>>>>>>>>>>>> it's just not taking. I've bounced Spark
master, workers, and consumers and
>>>>>>>>>>>>> the value seems "stuck" at 10 seconds.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any ideas? We're running Spark 1.3.0
built for Hadoop 2.4.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message