kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Eugene <far...@hotmail.com>
Subject Re: SAM Scala aggregate
Date Sun, 09 Sep 2018 19:59:19 GMT
I’m integrating with lot of other applications and it would be a little cowboy to chose my
own version.  I can commend that we all went to 2.0 but there are already working application
me so it won’t be possible to upgrade everything that everyone else is doing.  That will
have to happen sometime though. It’s just not gonna happen this week and my goal is to get
something working by tomorrow and I’ll stay up all night if I have to.  

Sent from my iPhone

> On Sep 9, 2018, at 2:11 PM, Matthias J. Sax <matthias@confluent.io> wrote:
> 
> Why can't you use Kafka Streams 2.0?
> 
> Note: Kafka Streams is backward compatible and it can connect to older
> brokers -- it's not required to upgrade your cluster to use Kafka
> Streams 2.0 -- updating you maven/gradle dependency is sufficient.
> 
> Also, AFAIK SAM conversions are only available in Scala 2.12.
> 
> 
> -Matthias
> 
>> On 9/9/18 11:37 AM, Michael Eugene wrote:
>> I can’t do Kafka 2.0. I am limited to this version right now.  If I continue to
struggle with it this much, I can eventually do that.  However, I know other people in the
organization have things kafka working with Scala. Probably not a good idea to say it’s
a necessity when it’s not completely necessary. Your point is well taken though, I am considering
it. 
>> 
>> Sent from my iPhone
>> 
>>> On Sep 9, 2018, at 1:10 PM, Debasish Ghosh <ghosh.debasish@gmail.com> wrote:
>>> 
>>> I don't have the environment to run the Scala code right now. Will be
>>> tomorrow until I have one ..
>>> 
>>> Now that Scala API is part of the official Kafka distribution. Can u please
>>> try that out instead of kafka-streams-scala ? The library is now deprecated
>>> and I remember we ran into some SAM related issues with Scala 2.11 (which
>>> worked fine with 2.12). They were finally fixed in the Kafka distribution -
>>> there are some differences in the APIs as well ..
>>> 
>>> regards.
>>> 
>>>> On Sun, Sep 9, 2018 at 11:32 PM Michael Eugene <fartzy@hotmail.com>
wrote:
>>>> 
>>>> I’m using 2.11.11
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>>> On Sep 9, 2018, at 12:13 PM, Debasish Ghosh <ghosh.debasish@gmail.com>
>>>> wrote:
>>>>> 
>>>>> Which version of Scala are u using ?
>>>>> 
>>>>>> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene <fartzy@hotmail.com>
>>>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I am using kafak-sreams-scala
>>>>>> https://github.com/lightbend/kafka-streams-scala, and I am trying
to
>>>>>> implement something very simple and I am getting a compilation error
by
>>>> the
>>>>>> "aggregate" method. The error is "Cannot resolve overload method
>>>>>> 'aggregate'" and "Unspecified value parameters: materialized:
>>>>>> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
>>>>>> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
>>>>>> https://github.com/lightbend/kafka-streams-scala>
>>>>>> 
>>>>>> GitHub
>>>>>> - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
>>>>>> https://github.com/lightbend/kafka-streams-scala>
>>>>>> github.com
>>>>>> Note:
>>>>>> Scala API for Kafka Streams have been accepted for inclusion in Apache
>>>>>> Kafka. We have been working with the Kafka team since the last couple
of
>>>>>> months working towards meeting the standards and guidelines for this
>>>>>> activity. Lightbend and Alexis Seigneurin have
>>>>>> contributed this library (with ...
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> However when I add a third argument for a Materialized, I get the
>>>>>> compilation error "Too may arguments for method aggregate(() =>VR,
>>>> (K,V,VR)
>>>>>> => VR)"
>>>>>> 
>>>>>> It doesn't make sense anymore what could be breaking this.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> val myStream = builder
>>>>>> .stream(inputTopic)
>>>>>> .map{ (key: String, value: Array[Byte]) =>
>>>>>>  println(s"key = ${key}")
>>>>>>  val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
>>>>>> 
>>>>>>  val newValue = GroupByAction.getGroupByValueFromByteAray(value)
>>>>>> 
>>>>>>  println(s"newKey = ${newKey}")
>>>>>>  (newKey, serialise(newValue))}
>>>>>> 
>>>>>> .groupByKey
>>>>>> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
>>>>>> 
>>>>>> --
>>>>> Sent from my iPhone
>>>> 
>>> 
>>> 
>>> -- 
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>> 
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
> 
Mime
View raw message