spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: spark streaming with kafka reset offset
Date Wed, 15 Jul 2015 03:25:48 GMT
Of course, exactly once receiving is not same as exactly once. In case of
direct kafka stream, the data may actually be pulled multiple time. But
even if the data of a batch is pulled twice because of some failure, the
final result (that is, transformed data accessed through foreachRDD) will
always be the same even if recomputed. In other words, the data in
partition x of the RDD of time t, will always be the same even if that
partition gets recomputed. Now, to get end-to-end exactly once, you will
have also push data out to external data stores in the exactly-once manner
- either the updates are idempotent, or you can use the unique id [(batch
time, partition ID)] to update the store transactionally (such that each
partition is inserted into the data store only once.

This is also explained in my talk. -
https://www.youtube.com/watch?v=d5UJonrruHk

On Tue, Jul 14, 2015 at 8:18 PM, Chen Song <chen.song.82@gmail.com> wrote:

> Thanks TD.
>
> As for 1), if timing is not guaranteed, how does exactly once semantics
> supported? It feels like exactly once receiving is not necessarily exactly
> once processing.
>
> Chen
>
> On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das <tdas@databricks.com>
> wrote:
>
>>
>>
>> On Tue, Jul 14, 2015 at 6:42 PM, Chen Song <chen.song.82@gmail.com>
>> wrote:
>>
>>> Thanks TD and Cody. I saw that.
>>>
>>> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
>>> on HDFS at the end of each batch interval?
>>>
>>
>> The timing is not guaranteed.
>>
>>
>>> 2. In the code, if I first apply transformations and actions on the
>>> directKafkaStream and then use foreachRDD on the original KafkaDStream to
>>> commit offsets myself, will offsets commits always happen after
>>> transformation and action?
>>>
>>> What do you mean by "original KafkaDStream"? if you meant the
>> directKafkaStream? If yes, then yes, output operations like foreachRDD is
>> executed in each batch in the same order as they are defined.
>>
>> dstream1.foreachRDD { rdd => func1(rdd) }
>> dstream2.foreachRDD { rdd => func2(rdd) }
>>
>> In every batch interval, func1 will be executed before func2.
>>
>>
>>
>>
>>> Chen
>>>
>>> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> Relevant documentation -
>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
>>>> towards the end.
>>>>
>>>> directKafkaStream.foreachRDD { rdd =>
>>>>      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>>>>      // offsetRanges.length = # of Kafka partitions being consumed
>>>>      ...
>>>>  }
>>>>
>>>>
>>>> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>> You have access to the offset ranges for a given rdd in the stream by
>>>>> typecasting to HasOffsetRanges.  You can then store the offsets wherever
>>>>> you need to.
>>>>>
>>>>> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song <chen.song.82@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> A follow up question.
>>>>>>
>>>>>> When using createDirectStream approach, the offsets are checkpointed
>>>>>> to HDFS and it is understandable by Spark Streaming job. Is there
a way to
>>>>>> expose the offsets via a REST api to end users. Or alternatively,
is there
>>>>>> a way to have offsets committed to Kafka Offset Manager so users
can query
>>>>>> from a consumer programmatically?
>>>>>>
>>>>>> Essentially, all I need to do is monitor the progress of data
>>>>>> consumption of the Kafka topic.
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> You can't use different versions of spark in your application
vs
>>>>>>> your cluster.
>>>>>>>
>>>>>>> For the direct stream, it's not 60 partitions per executor, it's
300
>>>>>>> partitions, and executors work on them as they are scheduled.
 Yes, if you
>>>>>>> have no messages you will get an empty partition.  It's up to
you whether
>>>>>>> it's worthwhile to call coalesce or not.
>>>>>>>
>>>>>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Is this 3 is no of parallel consumer threads per receiver
, means
>>>>>>>> in total we have 2*3=6 consumer in same consumer group consuming
from all
>>>>>>>> 300 partitions.
>>>>>>>> 3 is just parallelism on same receiver and recommendation
is to use
>>>>>>>> 1 per receiver since consuming from kafka is not cpu bound
rather
>>>>>>>> NIC(network bound)  increasing consumer thread on one receiver
won't make
>>>>>>>> it parallel in ideal sense ?
>>>>>>>>
>>>>>>>> In non receiver based consumer spark 1.3 If I use 5 execuots
and
>>>>>>>> kafka topic has 300 partions , does kafkaRDD created on 5
executors will
>>>>>>>> have 60 partitions per executor (total 300 one to one mapping)
and if some
>>>>>>>> of kafka partitions are empty say offset of last checkpoint
to current is
>>>>>>>> same for partitons P123, still it will create empty partition
in kafkaRDD ?
>>>>>>>> So we should call coalesce on kafkaRDD ?
>>>>>>>>
>>>>>>>>
>>>>>>>> And is there any incompatibity issue when I include
>>>>>>>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version
1.3) in my
>>>>>>>> application but my cluster has spark version 1.2 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> 1. Here you are basically creating 2 receivers and asking
each of
>>>>>>>>> them to consume 3 kafka partitions each.
>>>>>>>>>
>>>>>>>>> - In 1.2 we have high level consumers so how can we restrict
no of
>>>>>>>>> kafka partitions to consume from? Say I have 300 kafka
partitions in kafka
>>>>>>>>> topic and as in above I gave 2 receivers and 3 kafka
partitions . Then is
>>>>>>>>> it mean I will read from 6 out of 300 partitions only
and for rest 294
>>>>>>>>> partitions data is lost?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2.One more doubt in spark streaming how is it decided
which part
>>>>>>>>> of main function of driver will run at each batch interval
? Since whole
>>>>>>>>> code is written in one function(main function in driver)
so how it
>>>>>>>>> determined kafka streams receivers  not to be registered
in each batch only
>>>>>>>>> processing to be done .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.ayan@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> Let me take ashot at your questions. (I am sure people
like Cody
>>>>>>>>>> and TD will correct if I am wrong)
>>>>>>>>>>
>>>>>>>>>> 0. This is exact copy from the similar question in
mail thread
>>>>>>>>>> from Akhil D:
>>>>>>>>>> Since you set local[4] you will have 4 threads for
your
>>>>>>>>>> computation, and since you are having 2 receivers,
you are left
>>>>>>>>>> with 2 threads to process ((0 + 2) <-- This 2
is your 2 threads.)
>>>>>>>>>> And the other /2 means you are having 2 tasks in
that stage
>>>>>>>>>> (with id 0).
>>>>>>>>>>
>>>>>>>>>> 1. Here you are basically creating 2 receivers and
asking each of
>>>>>>>>>> them to consume 3 kafka partitions each.
>>>>>>>>>> 2. How does that matter? It depends on how many receivers
you
>>>>>>>>>> have created to consume that data and if you have
repartitioned it.
>>>>>>>>>> Remember, spark is lazy and executors are relted
to the context
>>>>>>>>>> 3. I think in java, factory method is fixed. You
just pass around
>>>>>>>>>> the contextFactory object. (I love python :) see
the signature isso much
>>>>>>>>>> cleaner :) )
>>>>>>>>>> 4. Yes, if you use spark checkpointing. You can use
yourcustom
>>>>>>>>>> check pointing too.
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Ayan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Few doubts :
>>>>>>>>>>>
>>>>>>>>>>> In 1.2 streaming when I use union of streams
, my streaming
>>>>>>>>>>> application getting hanged sometimes and nothing
gets printed on driver.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [Stage 2:>
>>>>>>>>>>>
>>>>>>>>>>>                                             (0
+ 2) / 2]
>>>>>>>>>>>  Whats is 0+2/2 here signifies.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned",
3);
>>>>>>>>>>> be same as numstreams=2 ? in unioned stream ?
>>>>>>>>>>>
>>>>>>>>>>> 2. I launched app on yarnRM with num-executors
as 5 . It created
>>>>>>>>>>> 2 receivers and 5 execuots . As in stream receivers
nodes get fixed at
>>>>>>>>>>> start of app throughout its lifetime . Does executors
gets allicated at
>>>>>>>>>>> start of each job on 1s batch interval? If yes,
how does its fast to
>>>>>>>>>>> allocate resources. I mean if i increase num-executors
to 50 , it will
>>>>>>>>>>> negotiate 50 executors from yarnRM at start of
each job so does it takes
>>>>>>>>>>> more time in allocating executors than batch
interval(here 1s , say if
>>>>>>>>>>> 500ms).? Can i fixed processing executors also
throughout the app?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> SparkConf conf = new
>>>>>>>>>>> SparkConf().setAppName("SampleSparkStreamingApp");
>>>>>>>>>>> JavaStreamingContext jssc = new
>>>>>>>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>>>>>>>>>
>>>>>>>>>>> Map<String,String> kafkaParams = new HashMap<String,
String>();
>>>>>>>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>>>>>>>>>> kafkaParams.put("group.id", "testgroup");
>>>>>>>>>>> kafkaParams.put("zookeeper.session.timeout.ms",
"10000");
>>>>>>>>>>>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
>>>>>>>>>>> topicsMap.put("testSparkPartitioned", 3);
>>>>>>>>>>> int numStreams = 2;
>>>>>>>>>>> List<JavaPairDStream<byte[],byte[]>>
kafkaStreams = new
>>>>>>>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>();
>>>>>>>>>>>   for(int i=0;i<numStreams;i++){
>>>>>>>>>>>  kafkaStreams.add(KafkaUtils.createStream(jssc,
byte[].class,
>>>>>>>>>>> byte[].class,kafka.serializer.DefaultDecoder.class
,
>>>>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>>>>>>>>>> }
>>>>>>>>>>>  JavaPairDStream<byte[], byte[]> directKafkaStream
=
>>>>>>>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>>>>>>>>>> kafkaStreams.size()));
>>>>>>>>>>>  JavaDStream<String> lines = directKafkaStream.map(new
>>>>>>>>>>> Function<Tuple2<byte[],byte[]>, String>()
{
>>>>>>>>>>>
>>>>>>>>>>> public String call(Tuple2<byte[], byte[]>
arg0) throws Exception
>>>>>>>>>>> {
>>>>>>>>>>> ...processing
>>>>>>>>>>> ..return msg;
>>>>>>>>>>> }
>>>>>>>>>>> });
>>>>>>>>>>> lines.print();
>>>>>>>>>>> jssc.start();
>>>>>>>>>>> jssc.awaitTermination();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>> 3.For avoiding dataloss when we use checkpointing,
and factory
>>>>>>>>>>> method to create sparkConytext, is method name
fixed
>>>>>>>>>>> or we can use any name and how to set in app
the method name to
>>>>>>>>>>> be used ?
>>>>>>>>>>>
>>>>>>>>>>> 4.In 1.3 non receiver based streaming, kafka
offset is not
>>>>>>>>>>> stored in zookeeper, is it because of zookeeper
is not efficient for high
>>>>>>>>>>> writes and read is not strictly consistent? So
>>>>>>>>>>>
>>>>>>>>>>>  we use simple Kafka API that does not use Zookeeper
and
>>>>>>>>>>> offsets tracked only by Spark Streaming within
its checkpoints.
>>>>>>>>>>> This eliminates inconsistencies between Spark
Streaming and
>>>>>>>>>>> Zookeeper/Kafka, and so each record is received
by Spark Streaming
>>>>>>>>>>> effectively exactly once despite failures.
>>>>>>>>>>>
>>>>>>>>>>> So we have to call context.checkpoint(hdfsdir)?
Or is it
>>>>>>>>>>> implicit checkoint location ? Means does hdfs
be used for small data(just
>>>>>>>>>>> offset?)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya
<
>>>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> There is another option to try for Receiver
Based Low Level
>>>>>>>>>>>> Kafka Consumer which is part of Spark-Packages
(
>>>>>>>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)
>>>>>>>>>>>> . This can be used with WAL as well for end
to end zero data loss.
>>>>>>>>>>>>
>>>>>>>>>>>> This is also Reliable Receiver and Commit
offset to ZK.  Given
>>>>>>>>>>>> the number of Kafka Partitions you have (
> 100) , using High Level Kafka
>>>>>>>>>>>> API for Receiver based approach may leads
to issues related Consumer
>>>>>>>>>>>> Re-balancing  which is a major issue of Kafka
High Level API.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Dibyendu
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata
Das <
>>>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In the receiver based approach, If the
receiver crashes for
>>>>>>>>>>>>> any reason (receiver crashed or executor
crashed) the receiver should get
>>>>>>>>>>>>> restarted on another executor and should
start reading data from the offset
>>>>>>>>>>>>> present in the zookeeper. There is some
chance of data loss which can
>>>>>>>>>>>>> alleviated using Write Ahead Logs (see
streaming programming guide for more
>>>>>>>>>>>>> details, or see my talk [Slides PDF
>>>>>>>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>>>>>>>>>>>> , Video
>>>>>>>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>>>>>>>>>>>> ] from last Spark Summit 2015). But that
approach can give
>>>>>>>>>>>>> duplicate records. The direct approach
gives exactly-once guarantees, so
>>>>>>>>>>>>> you should try it out.
>>>>>>>>>>>>>
>>>>>>>>>>>>> TD
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody
Koeninger <
>>>>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Read the spark streaming guide ad
the kafka integration guide
>>>>>>>>>>>>>> for a better understanding of how
the receiver based stream works.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Capacity planning is specific to
your environment and what
>>>>>>>>>>>>>> the job is actually doing, youll
need to determine it empirically.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Friday, June 26, 2015, Shushant
Arora <
>>>>>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In 1.2 how to handle offset management
after stream
>>>>>>>>>>>>>>> application starts in each job
. I should commit offset after job
>>>>>>>>>>>>>>> completion manually?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> And what is recommended no of
consumer threads. Say I have
>>>>>>>>>>>>>>> 300 partitions in kafka cluster
. Load is ~ 1 million events per
>>>>>>>>>>>>>>> second.Each event is of ~500bytes.
Having 5 receivers with 60 partitions
>>>>>>>>>>>>>>> each receiver is sufficient for
spark streaming to consume ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Jun 26, 2015 at 8:40
PM, Cody Koeninger <
>>>>>>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The receiver-based kafka
createStream in spark 1.2 uses
>>>>>>>>>>>>>>>> zookeeper to store offsets.
 If you want finer-grained control over
>>>>>>>>>>>>>>>> offsets, you can update the
values in zookeeper yourself before starting
>>>>>>>>>>>>>>>> the job.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> createDirectStream in spark
1.3 is still marked as
>>>>>>>>>>>>>>>> experimental, and subject
to change.  That being said, it works better for
>>>>>>>>>>>>>>>> me in production than the
receiver based api.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43
AM, Shushant Arora <
>>>>>>>>>>>>>>>> shushantarora09@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am using spark streaming
1.2.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If processing executors
get crashed will receiver rest the
>>>>>>>>>>>>>>>>> offset back to last processed
offset?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If receiver itself got
crashed is there a way to reset the
>>>>>>>>>>>>>>>>> offset without restarting
streaming application other than smallest or
>>>>>>>>>>>>>>>>> largest.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is spark streaming 1.3
 which uses low level consumer api,
>>>>>>>>>>>>>>>>> stabe? And which is recommended
for handling data  loss 1.2 or 1.3 .
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards,
>>>>>>>>>> Ayan Guha
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Chen Song
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Chen Song
>>>
>>>
>>
>
>
> --
> Chen Song
>
>

Mime
View raw message