spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandan prakash <chandanbaran...@gmail.com>
Subject Re: spark streaming Directkafka with checkpointing : changed parameters not considered
Date Fri, 19 Aug 2016 17:51:27 GMT
Thanks Cody for the pointer.

I am able to do this now. Not using checkpointing. Rather storing offsets
in zookeeper for fault tolerance.
Spark Config changes now getting reflected in code deployment.
*Using this api :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
messageHandler)*
*instead of :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)*

*One Quick question :
*What is need of checkpointing if we can achieve both fault tolerance
and application code/config changes  without checkpointing? Is there
anything else which checkpointing gives? I might be missing something.


Regards,
Chandan


On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Yeah the solutions are outlined in the doc link.  Or just don't rely on
> checkpoints
> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaranwal@gmail.com>
> wrote:
>
>> Yes,
>>  i looked into the source code implementation.  sparkConf is serialized
>> and saved during checkpointing and re-created from the checkpoint directory
>> at time of restart. So any sparkConf parameter which you load from
>> application.config and set in sparkConf object in code cannot be changed
>> and reflected with checkpointing.  :(
>>
>> Is there is any work around of reading changed sparkConf parameter value
>> with using checkpoiting?
>> p.s. i am not adding new parameter, i am just changing values of some
>> existing sparkConf param.
>>
>> This is a common case and there must be some solution for this.
>>
>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> Checkpointing is not kafka-specific.  It encompasses metadata about the
>>> application.  You can't re-use a checkpoint if your application has changed.
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>> ide.html#upgrading-application-code
>>>
>>>
>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>> chandanbaranwal@gmail.com> wrote:
>>>
>>>> Is it possible that i use checkpoint directory to restart streaming but
>>>> with modified parameter value in config file (e.g.  username/password for
>>>> db connection)  ?
>>>> Thanks in advance.
>>>>
>>>> Regards,
>>>> Chandan
>>>>
>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>>>> chandanbaranwal@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am using direct kafka with checkpointing of offsets same as :
>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>>>> src/main/scala/example/IdempotentExample.scala
>>>>>
>>>>> I need to change some parameters like db connection params :
>>>>> username/password for db connection .
>>>>> I stopped streaming gracefully ,changed parameters in config file and
>>>>> restarted streaming.
>>>>> *Issue : changed parameters  username/password are not being
>>>>> considered.*
>>>>>
>>>>> *Question* :
>>>>> As per my understanding , Checkpointing should only save offsets of
>>>>> kafka partitions and not the credentials of the db connection.
>>>>> Why its picking old db connection params ?
>>>>>
>>>>> I am declaring params in main method and not in setUpSsc(0 method.
>>>>> My code is identical to that in the above program link  as below:
>>>>> val jdbcDriver = conf.getString("jdbc.driver")
>>>>> val jdbcUrl = conf.getString("jdbc.url")
>>>>> *val jdbcUser = conf.getString("jdbc.user")*
>>>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>>>> // while the job doesn't strictly need checkpointing,
>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>>>> failure
>>>>> val checkpointDir = conf.getString("checkpointDir")
>>>>> val ssc = StreamingContext.getOrCreate(
>>>>> checkpointDir,
>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>>>> *jdbcPassword*, checkpointDir) _
>>>>> )
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Chandan Prakash
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Chandan Prakash
>>>>
>>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>


-- 
Chandan Prakash

Mime
View raw message