spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject Re: Batchdurationmillis seems "sticky" with direct Spark streaming
Date Wed, 09 Sep 2015 04:02:57 GMT
What's wrong with creating a checkpointed context??  We WANT checkpointing,
first of all.  We therefore WANT the checkpointed context.

Second of all, it's not true that we're loading the checkpointed context
independent of whether params.isCheckpointed() is true.  I'm quoting the
code again:

// This is NOT loading a checkpointed context if isCheckpointed() is false.
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
    JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
      @Override
      public JavaStreamingContext create() {
        return createContext(sparkConf, params);
      }
    };
    return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
    // Create context with the specified batch interval, in milliseconds.
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
    // Set the checkpoint directory, if we're checkpointing
    if (params.isCheckpointed()) {
      jssc.checkpoint(params.getCheckpointDir());

    }
...............
Again, this is *only* calling context.checkpoint() if isCheckpointed() is
true.  And we WANT it to be true.

What am I missing here?



On Tue, Sep 8, 2015 at 11:42 PM, Tathagata Das <tdas@databricks.com> wrote:

> Well, you are returning JavaStreamingContext.getOrCreate(params.
> getCheckpointDir(), factory);
> That is loading the checkpointed context, independent of whether params
> .isCheckpointed() is true.
>
>
>
>
> On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg <
> dgoldenberg123@gmail.com> wrote:
>
>> That is good to know. However, that doesn't change the problem I'm
>> seeing. Which is that, even with that piece of code commented out
>> (stream.checkpoint()), the batch duration millis aren't getting changed
>> unless I take checkpointing completely out.
>>
>> In other words, this commented out:
>>
>> //    if (params.isCheckpointed() && params.getCheckpointMillis() > 0L)
{
>> //
>> messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
>> //    }
>>
>> doesn't change the "stickiness" of the batch duration millis value.
>> However, if the context is not checkpointed, the problem goes away and the
>> batch duration millis setting is properly updated.
>>
>> I'll take the commented out piece out however I still need to figure out
>> how to fix the batch duration millis so I can also keep the checkpointed
>> context.
>>
>> The checkpointed context is basically created as I stated before, note
>> the invokation of the checkpoint() method:
>>
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>>     JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>       @Override
>>       public JavaStreamingContext create() {
>>         return createContext(sparkConf, params);
>>       }
>>     };
>>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>>     // Create context with the specified batch interval, in milliseconds.
>>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>>     // Set the checkpoint directory, if we're checkpointing
>>     if (params.isCheckpointed()) {
>>       jssc.checkpoint(params.getCheckpointDir());
>>     }
>>
>> .......
>>
>> On Tue, Sep 8, 2015 at 11:14 PM, Tathagata Das <tdas@databricks.com>
>> wrote:
>>
>>> Calling directKafkaStream.checkpoint() will make the system write the
>>> raw kafka data into HDFS files (that is, RDD checkpointing). This is
>>> completely unnecessary with Direct Kafka because it already tracks the
>>> offset of data in each batch
>>>
>>
>>
>>> (which checkpoint is enabled using
>>> streamingContext.checkpoint(checkpointDir)) and can recover from failure by
>>> reading the exact same data back from Kafka.
>>>
>>>
>>> TD
>>>
>>> On Tue, Sep 8, 2015 at 4:38 PM, Dmitry Goldenberg <
>>> dgoldenberg123@gmail.com> wrote:
>>>
>>>> >> Why are you checkpointing the direct kafka stream? It serves not
>>>> purpose.
>>>>
>>>> Could you elaborate on what you mean?
>>>>
>>>> Our goal is fault tolerance.  If a consumer is killed or stopped
>>>> midstream, we want to resume where we left off next time the consumer is
>>>> restarted.
>>>>
>>>> How would that be "not surving a purpose"?  This is already working for
>>>> us.  From our testing, we indeed resume where we left off, which is not
>>>> possible without checkpointing.  If checkpointing is turned off, we'll
>>>> resume with the later Kafka topic entries, which would lead to skipping
>>>> over some entries.
>>>>
>>>> Please elaborate.
>>>>
>>>> We need both checkpointing and the ability to set batch duration
>>>> millis.  The Spark API provides both capabilities but somehow if
>>>> checkpointing is turned on, our batch duration millis are always set to 10
>>>> seconds internally by Spark.  What is the resolution?
>>>>
>>>>
>>>> On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das <tdas@databricks.com>
>>>> wrote:
>>>>
>>>>> Why are you checkpointing the direct kafka stream? It serves not
>>>>> purpose.
>>>>>
>>>>> TD
>>>>>
>>>>> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <
>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>
>>>>>> I just disabled checkpointing in our consumers and I can see that
the
>>>>>> batch duration millis set to 20 seconds is now being honored.
>>>>>>
>>>>>> Why would that be the case?
>>>>>>
>>>>>> And how can we "untie" batch duration millis from checkpointing?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Well, I'm not sure why you're checkpointing messages.
>>>>>>>
>>>>>>> I'd also put in some logging to see what values are actually
being
>>>>>>> read out of your params object for the various settings.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>
>>>>>>>> I've stopped the jobs, the workers, and the master. Deleted
the
>>>>>>>> contents of the checkpointing dir. Then restarted master,
workers, and
>>>>>>>> consumers.
>>>>>>>>
>>>>>>>> I'm seeing the job in question still firing every 10 seconds.
 I'm
>>>>>>>> seeing the 10 seconds in the Spark Jobs GUI page as well
as our logs.
>>>>>>>> Seems quite strange given that the jobs used to fire every
1 second, we've
>>>>>>>> switched to 10, now trying to switch to 20 and batch duration
millis is not
>>>>>>>> changing.
>>>>>>>>
>>>>>>>> Does anything stand out in the code perhaps?
>>>>>>>>
>>>>>>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <cody@koeninger.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Have you tried deleting or moving the contents of the
checkpoint
>>>>>>>>> directory and restarting the job?
>>>>>>>>>
>>>>>>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry, more relevant code below:
>>>>>>>>>>
>>>>>>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>>>>>>>> JavaStreamingContext jssc = params.isCheckpointed()
?
>>>>>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>>>>>> sparkConf, params);
>>>>>>>>>> jssc.start();
>>>>>>>>>> jssc.awaitTermination();
>>>>>>>>>> jssc.close();
>>>>>>>>>> ……………………………..
>>>>>>>>>>   private JavaStreamingContext
>>>>>>>>>> createCheckpointedContext(SparkConf sparkConf, Parameters
params)
>>>>>>>>>> {
>>>>>>>>>>     JavaStreamingContextFactory factory = new
>>>>>>>>>> JavaStreamingContextFactory() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public JavaStreamingContext create() {
>>>>>>>>>>         return createContext(sparkConf, params);
>>>>>>>>>>       }
>>>>>>>>>>     };
>>>>>>>>>>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>>>>>>> factory);
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   private JavaStreamingContext createContext(SparkConf
sparkConf,
>>>>>>>>>> Parameters params) {
>>>>>>>>>>     // Create context with the specified batch interval,
in
>>>>>>>>>> milliseconds.
>>>>>>>>>>     JavaStreamingContext jssc = new JavaStreamingContext(
>>>>>>>>>> sparkConf, Durations.milliseconds(params
>>>>>>>>>> .getBatchDurationMillis()));
>>>>>>>>>>     // Set the checkpoint directory, if we're checkpointing
>>>>>>>>>>     if (params.isCheckpointed()) {
>>>>>>>>>>       jssc.checkpoint(params.getCheckpointDir());
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     Set<String> topicsSet = new HashSet<String>(Arrays.asList(
>>>>>>>>>> params.getTopic()));
>>>>>>>>>>
>>>>>>>>>>     // Set the Kafka parameters.
>>>>>>>>>>     Map<String, String> kafkaParams = new HashMap<String,
>>>>>>>>>> String>();
>>>>>>>>>>     kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>>>>>>>> params.getBrokerList());
>>>>>>>>>>     if (StringUtils.isNotBlank(params.getAutoOffsetReset()))
{
>>>>>>>>>>       kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET,
>>>>>>>>>> params.getAutoOffsetReset());
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     // Create direct Kafka stream with the brokers
and the topic.
>>>>>>>>>>     JavaPairInputDStream<String, String> messages
=
>>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>>       jssc,
>>>>>>>>>>       String.class,
>>>>>>>>>>       String.class,
>>>>>>>>>>       StringDecoder.class,
>>>>>>>>>>       StringDecoder.class,
>>>>>>>>>>       kafkaParams,
>>>>>>>>>>       topicsSet);
>>>>>>>>>>
>>>>>>>>>>     // See if there's an override of the default
checkpoint
>>>>>>>>>> duration.
>>>>>>>>>>     if (params.isCheckpointed() && params.getCheckpointMillis()
>>>>>>>>>> > 0L) {
>>>>>>>>>>       messages.checkpoint(Durations.milliseconds(params
>>>>>>>>>> .getCheckpointMillis()));
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     JavaDStream<String> messageBodies = messages.map(new
>>>>>>>>>> Function<Tuple2<String, String>, String>()
{
>>>>>>>>>>       @Override
>>>>>>>>>>       public String call(Tuple2<String, String>
tuple2) {
>>>>>>>>>>         return tuple2._2();
>>>>>>>>>>       }
>>>>>>>>>>     });
>>>>>>>>>>
>>>>>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
>>>>>>>>>> Void>() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public Void call(JavaRDD<String> rdd)
throws Exception {
>>>>>>>>>>         ProcessPartitionFunction func = new
>>>>>>>>>> ProcessPartitionFunction(params);
>>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>>         return null;
>>>>>>>>>>       }
>>>>>>>>>>     });
>>>>>>>>>>
>>>>>>>>>>     return jssc;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg
<
>>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'd think that we wouldn't be "accidentally recovering
from
>>>>>>>>>>> checkpoint" hours or even days after consumers
have been restarted, plus
>>>>>>>>>>> the content is the fresh content that I'm feeding,
not some content that
>>>>>>>>>>> had been fed before the last restart.
>>>>>>>>>>>
>>>>>>>>>>> The code is basically as follows:
>>>>>>>>>>>
>>>>>>>>>>>     SparkConf sparkConf = createSparkConf(...);
>>>>>>>>>>>     // We'd be 'checkpointed' because we specify
a checkpoint
>>>>>>>>>>> directory which makes isCheckpointed true
>>>>>>>>>>>     JavaStreamingContext jssc = params.isCheckpointed()
?
>>>>>>>>>>> createCheckpointedContext(sparkConf, params)
: createContext(
>>>>>>>>>>> sparkConf, params);    jssc.start();
>>>>>>>>>>>
>>>>>>>>>>>     jssc.awaitTermination();
>>>>>>>>>>>
>>>>>>>>>>>     jssc.close();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das
<
>>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you sure you are not accidentally recovering
from
>>>>>>>>>>>> checkpoint? How are you using StreamingContext.getOrCreate()
in your code?
>>>>>>>>>>>>
>>>>>>>>>>>> TD
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg
<
>>>>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Tathagata,
>>>>>>>>>>>>>
>>>>>>>>>>>>> In our logs I see the batch duration
millis being set first to
>>>>>>>>>>>>> 10 then to 20 seconds. I don't see the
20 being reflected later during
>>>>>>>>>>>>> ingestion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the Spark UI under Streaming I see
the below output, notice
>>>>>>>>>>>>> the *10 second* Batch interval.  Can
you think of a reason
>>>>>>>>>>>>> why it's stuck at 10?  It used to be
1 second by the way, then somehow over
>>>>>>>>>>>>> the course of a few restarts we managed
to get it to be 10 seconds.  Now it
>>>>>>>>>>>>> won't get reset to 20 seconds.  Any ideas?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Streaming
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - *Started at: *Thu Sep 03 10:59:03
EDT 2015
>>>>>>>>>>>>>    - *Time since start: *1 day 8 hours
44 minutes
>>>>>>>>>>>>>    - *Network receivers: *0
>>>>>>>>>>>>>    - *Batch interval: *10 seconds
>>>>>>>>>>>>>    - *Processed batches: *11790
>>>>>>>>>>>>>    - *Waiting batches: *0
>>>>>>>>>>>>>    - *Received records: *0
>>>>>>>>>>>>>    - *Processed records: *0
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Statistics over last 100 processed batchesReceiver
Statistics
>>>>>>>>>>>>> No receivers
>>>>>>>>>>>>> Batch Processing Statistics
>>>>>>>>>>>>>
>>>>>>>>>>>>>    MetricLast batchMinimum25th percentileMedian75th
percentile
>>>>>>>>>>>>>    MaximumProcessing Time23 ms7 ms10
ms11 ms14 ms172 msScheduling
>>>>>>>>>>>>>    Delay1 ms0 ms0 ms0 ms1 ms2 msTotal
Delay24 ms8 ms10 ms12 ms14
>>>>>>>>>>>>>    ms173 ms
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata
Das <
>>>>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could you see what the streaming
tab in the Spark UI says? It
>>>>>>>>>>>>>> should show the underlying batch
duration of the StreamingContext, the
>>>>>>>>>>>>>> details of when the batch starts,
etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> BTW, it seems that the 5.6 or 6.8
seconds delay is present
>>>>>>>>>>>>>> only when data is present (that is,
* Documents processed: >
>>>>>>>>>>>>>> 0)*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry
Goldenberg <
>>>>>>>>>>>>>> dgoldenberg123@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Tathagata,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Checkpointing is turned on but
we were not recovering. I'm
>>>>>>>>>>>>>>> looking at the logs now, feeding
fresh content hours after the restart.
>>>>>>>>>>>>>>> Here's a snippet:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents
processed: 0.
>>>>>>>>>>>>>>> *2015-09-04 06:12:55,629 ...
Documents processed: 4.*
>>>>>>>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents
processed: 0.
>>>>>>>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents
processed: 0.
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents
processed: 0.
>>>>>>>>>>>>>>> *2015-09-04 06:17:46,832 ...
Documents processed: 40.*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Interestingly, the fresh content
(4 documents) is fed about
>>>>>>>>>>>>>>> 5.6 seconds after the previous
batch, not 10 seconds. The second fresh
>>>>>>>>>>>>>>> batch comes in 6.8 seconds after
the previous empty one.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Granted, the log message is printed
after iterating over the
>>>>>>>>>>>>>>> messages which may account for
some time differences. But generally,
>>>>>>>>>>>>>>> looking at the log messages being
printed before we iterate, it's still 10
>>>>>>>>>>>>>>> seconds each time, not 20 which
is what batchdurationmillis is currently
>>>>>>>>>>>>>>> set to.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> JavaPairInputDStream<String,
String> messages =
>>>>>>>>>>>>>>> KafkaUtils.createDirectStream(....);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   JavaDStream<String> messageBodies
= messages.map(new Function<Tuple2<String,
>>>>>>>>>>>>>>> String>, String>() {
>>>>>>>>>>>>>>>       @Override
>>>>>>>>>>>>>>>       public String call(Tuple2<String,
String> tuple2) {
>>>>>>>>>>>>>>>         return tuple2._2();
>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>     });
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     messageBodies.foreachRDD(new
Function<JavaRDD<String>,
>>>>>>>>>>>>>>> Void>() {
>>>>>>>>>>>>>>>       @Override
>>>>>>>>>>>>>>>       public Void call(JavaRDD<String>
rdd) throws Exception
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   ProcessPartitionFunction func
= new ProcessPartitionFunction(...);
>>>>>>>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>>>>>>>         return null;
>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>     });
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The log message comes from ProcessPartitionFunction:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public void call(Iterator<String>
messageIterator) throws Exception
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>     log.info("Starting data partition
processing.
>>>>>>>>>>>>>>> AppName={}, topic={}.)...", appName,
topic);
>>>>>>>>>>>>>>>     // ... iterate ...
>>>>>>>>>>>>>>>     log.info("Finished data partition
processing
>>>>>>>>>>>>>>> (appName={}, topic={}). Documents
processed: {}.", appName,
>>>>>>>>>>>>>>> topic, docCount);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Any ideas? Thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 10:45
PM, Tathagata Das <
>>>>>>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Are you accidentally recovering
from checkpoint files which
>>>>>>>>>>>>>>>> has 10 second as the batch
interval?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34
AM, Dmitry Goldenberg <
>>>>>>>>>>>>>>>> dgoldenberg123@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm seeing an oddity
where I initially set the
>>>>>>>>>>>>>>>>> batchdurationmillis to
1 second and it works fine:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> JavaStreamingContext
jssc = new JavaStreamingContext(
>>>>>>>>>>>>>>>>> sparkConf, Durations.milliseconds(batchDurationMillis));
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Then I tried changing
the value to 10 seconds. The change
>>>>>>>>>>>>>>>>> didn't seem to take.
I've bounced the Spark workers and the consumers and
>>>>>>>>>>>>>>>>> now I'm seeing RDD's
coming in once around 10 seconds (not always 10
>>>>>>>>>>>>>>>>> seconds according to
the logs).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, now I'm trying
to change the value to 20 seconds
>>>>>>>>>>>>>>>>> and it's just not taking.
I've bounced Spark master, workers, and consumers
>>>>>>>>>>>>>>>>> and the value seems "stuck"
at 10 seconds.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Any ideas? We're running
Spark 1.3.0 built for Hadoop 2.4.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message