kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hamidreza Afzali <hamidreza.afz...@hivestreaming.com>
Subject Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue
Date Mon, 03 Oct 2016 14:18:36 GMT
Thanks Guozhang. We use ProcessorTopologyTestDriver for unit tests.

Hamid


> On 28 Sep 2016, at 11:48 AM, Hamidreza Afzali <hamidreza.afzali@hivestreaming.com>
wrote:
> 
> Hi,
> 
> We are using the latest Kafka 0.10.1 branch. The combination of ProcessorTopologyTestDriver
and WindowedStreamPartitioner is resulting in a division by 0 exception because of the empty
list of partitions:
> 
> https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47
> 
> Our topology looks similar to this:
> 
>  builder.stream("events")
>    .groupByKey(...)
>    .aggregate(...,
>      TimeWindows.of(1 * 60 * 1000L)
>    )
>    .mapValues(_.size: Integer)
>    .to(windowedSerde, Serdes.Integer(), "events-over-time")
> 
> If we use our own partitioner in .to() it works.
> 
>  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
>    override def partition(k: K, v: V, numPartitions: Int): Integer = {
>      // return an integer between 0 and numPartitions-1, or null if the default partitioning
logic should be used
>      null
>    }
>  }
> 
> Is this a bug?
> 
> Thank you in advance,
> Hamid
> 


Mime
View raw message