spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Batchdurationmillis seems "sticky" with direct Spark streaming
Date Tue, 08 Sep 2015 13:53:39 GMT
Have you tried deleting or moving the contents of the checkpoint directory
and restarting the job?

On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <dgoldenberg123@gmail.com>
wrote:

> Sorry, more relevant code below:
>
> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);
> jssc.start();
> jssc.awaitTermination();
> jssc.close();
> ……………………………..
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
>     JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>       @Override
>       public JavaStreamingContext create() {
>         return createContext(sparkConf, params);
>       }
>     };
>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);
>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
>     // Create context with the specified batch interval, in milliseconds.
>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
>     // Set the checkpoint directory, if we're checkpointing
>     if (params.isCheckpointed()) {
>       jssc.checkpoint(params.getCheckpointDir());
>     }
>
>     Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
> .getTopic()));
>
>     // Set the Kafka parameters.
>     Map<String, String> kafkaParams = new HashMap<String, String>();
>     kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
> .getBrokerList());
>     if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>       kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
> .getAutoOffsetReset());
>     }
>
>     // Create direct Kafka stream with the brokers and the topic.
>     JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(
>       jssc,
>       String.class,
>       String.class,
>       StringDecoder.class,
>       StringDecoder.class,
>       kafkaParams,
>       topicsSet);
>
>     // See if there's an override of the default checkpoint duration.
>     if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>       messages.checkpoint(Durations.milliseconds(params
> .getCheckpointMillis()));
>     }
>
>     JavaDStream<String> messageBodies = messages.map(new
> Function<Tuple2<String, String>, String>() {
>       @Override
>       public String call(Tuple2<String, String> tuple2) {
>         return tuple2._2();
>       }
>     });
>
>     messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
>       @Override
>       public Void call(JavaRDD<String> rdd) throws Exception {
>         ProcessPartitionFunction func = new
> ProcessPartitionFunction(params);
>         rdd.foreachPartition(func);
>         return null;
>       }
>     });
>
>     return jssc;
> }
>
> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
> dgoldenberg123@gmail.com> wrote:
>
>> I'd think that we wouldn't be "accidentally recovering from checkpoint"
>> hours or even days after consumers have been restarted, plus the content is
>> the fresh content that I'm feeding, not some content that had been fed
>> before the last restart.
>>
>> The code is basically as follows:
>>
>>     SparkConf sparkConf = createSparkConf(...);
>>     // We'd be 'checkpointed' because we specify a checkpoint directory
>> which makes isCheckpointed true
>>     JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);    jssc.start();
>>
>>     jssc.awaitTermination();
>>
>>     jssc.close();
>>
>>
>>
>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <tdas@databricks.com>
>> wrote:
>>
>>> Are you sure you are not accidentally recovering from checkpoint? How
>>> are you using StreamingContext.getOrCreate() in your code?
>>>
>>> TD
>>>
>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>>> dgoldenberg123@gmail.com> wrote:
>>>
>>>> Tathagata,
>>>>
>>>> In our logs I see the batch duration millis being set first to 10 then
>>>> to 20 seconds. I don't see the 20 being reflected later during ingestion.
>>>>
>>>> In the Spark UI under Streaming I see the below output, notice the *10
>>>> second* Batch interval.  Can you think of a reason why it's stuck at
>>>> 10?  It used to be 1 second by the way, then somehow over the course of a
>>>> few restarts we managed to get it to be 10 seconds.  Now it won't get reset
>>>> to 20 seconds.  Any ideas?
>>>>
>>>> Streaming
>>>>
>>>>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>>    - *Time since start: *1 day 8 hours 44 minutes
>>>>    - *Network receivers: *0
>>>>    - *Batch interval: *10 seconds
>>>>    - *Processed batches: *11790
>>>>    - *Waiting batches: *0
>>>>    - *Received records: *0
>>>>    - *Processed records: *0
>>>>
>>>>
>>>>
>>>> Statistics over last 100 processed batchesReceiver Statistics
>>>> No receivers
>>>> Batch Processing Statistics
>>>>
>>>>    MetricLast batchMinimum25th percentileMedian75th percentileMaximumProcessing
>>>>    Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1
>>>>    ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <tdas@databricks.com>
>>>> wrote:
>>>>
>>>>> Could you see what the streaming tab in the Spark UI says? It should
>>>>> show the underlying batch duration of the StreamingContext, the details
of
>>>>> when the batch starts, etc.
>>>>>
>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when
>>>>> data is present (that is, * Documents processed: > 0)*
>>>>>
>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>
>>>>>> Tathagata,
>>>>>>
>>>>>> Checkpointing is turned on but we were not recovering. I'm looking
at
>>>>>> the logs now, feeding fresh content hours after the restart. Here's
a
>>>>>> snippet:
>>>>>>
>>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>>>>> ...
>>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>>>>
>>>>>> Interestingly, the fresh content (4 documents) is fed about 5.6
>>>>>> seconds after the previous batch, not 10 seconds. The second fresh
batch
>>>>>> comes in 6.8 seconds after the previous empty one.
>>>>>>
>>>>>> Granted, the log message is printed after iterating over the messages
>>>>>> which may account for some time differences. But generally, looking
at the
>>>>>> log messages being printed before we iterate, it's still 10 seconds
each
>>>>>> time, not 20 which is what batchdurationmillis is currently set to.
>>>>>>
>>>>>> Code:
>>>>>>
>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>> KafkaUtils.createDirectStream(....);
>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>>
>>>>>>
>>>>>>   JavaDStream<String> messageBodies = messages.map(new Function<Tuple2<String,
>>>>>> String>, String>() {
>>>>>>       @Override
>>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>>         return tuple2._2();
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
Void>() {
>>>>>>       @Override
>>>>>>       public Void call(JavaRDD<String> rdd) throws Exception
{
>>>>>>
>>>>>>   ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>>>>>>         rdd.foreachPartition(func);
>>>>>>         return null;
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>> The log message comes from ProcessPartitionFunction:
>>>>>>
>>>>>> public void call(Iterator<String> messageIterator) throws Exception
{
>>>>>>     log.info("Starting data partition processing. AppName={},
>>>>>> topic={}.)...", appName, topic);
>>>>>>     // ... iterate ...
>>>>>>     log.info("Finished data partition processing (appName={},
>>>>>> topic={}). Documents processed: {}.", appName, topic, docCount);
>>>>>> }
>>>>>>
>>>>>> Any ideas? Thanks.
>>>>>>
>>>>>> - Dmitry
>>>>>>
>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <tdas@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Are you accidentally recovering from checkpoint files which has
10
>>>>>>> second as the batch interval?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm seeing an oddity where I initially set the batchdurationmillis
>>>>>>>> to 1 second and it works fine:
>>>>>>>>
>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>>>> Durations.milliseconds(batchDurationMillis));
>>>>>>>>
>>>>>>>> Then I tried changing the value to 10 seconds. The change
didn't
>>>>>>>> seem to take. I've bounced the Spark workers and the consumers
and now I'm
>>>>>>>> seeing RDD's coming in once around 10 seconds (not always
10 seconds
>>>>>>>> according to the logs).
>>>>>>>>
>>>>>>>> However, now I'm trying to change the value to 20 seconds
and it's
>>>>>>>> just not taking. I've bounced Spark master, workers, and
consumers and the
>>>>>>>> value seems "stuck" at 10 seconds.
>>>>>>>>
>>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> - Dmitry
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message