spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dan Dutrow <dan.dut...@gmail.com>
Subject Re: Different Kafka createDirectStream implementations
Date Tue, 08 Sep 2015 20:00:00 GMT
I see... the first method takes the offsets as it's third parameter while
the second method just takes topic names and that's the primary reason why
the implementations are different.

In that case, what I am noticing is that setting the messageHandler is
unavailable in the second method. This isn't a killer for me, but maybe
someone else would want to set that.

On Tue, Sep 8, 2015 at 2:32 PM Cody Koeninger <cody@koeninger.org> wrote:

> What exactly do you think should be done with auto offset reset if someone
> has explicitly provided offsets?
>
> auto offset reset is only useful for determining whether to start the
> stream at the beginning or the end of the log... if someone's provided
> explicit offsets, it's pretty clear where to start the stream.
>
> On Tue, Sep 8, 2015 at 1:19 PM, Dan Dutrow <dan.dutrow@gmail.com> wrote:
>
>> Yes, but one implementation checks those flags and the other one doesn't.
>> I would think they should be consistent.
>>
>> On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger <cody@koeninger.org> wrote:
>>
>>> If you're providing starting offsets explicitly, then auto offset reset
>>> isn't relevant.
>>>
>>> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow <dan.dutrow@gmail.com>
>>> wrote:
>>>
>>>> The two methods of createDirectStream appear to have different
>>>> implementations, the second checks the offset.reset flags and does some
>>>> error handling while the first does not. Besides the use of a
>>>> messageHandler, are they intended to be used in different situations?
>>>>
>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>> ClassTag,
>>>> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
>>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
>>>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K,
>>>> V] => R *
>>>> ):
>>>>
>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>> ClassTag, VD <: Decoder[V]: ClassTag]
>>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
>>>> Set[String] )
>>>> --
>>>> Dan📱
>>>
>>>
>>> --
>> Dan📱
>
>
> --
Dan📱

Mime
View raw message