kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: SAM Scala aggregate
Date Sun, 09 Sep 2018 19:11:43 GMT
Why can't you use Kafka Streams 2.0?

Note: Kafka Streams is backward compatible and it can connect to older
brokers -- it's not required to upgrade your cluster to use Kafka
Streams 2.0 -- updating you maven/gradle dependency is sufficient.

Also, AFAIK SAM conversions are only available in Scala 2.12.


-Matthias

On 9/9/18 11:37 AM, Michael Eugene wrote:
> I can’t do Kafka 2.0. I am limited to this version right now.  If I continue to struggle
with it this much, I can eventually do that.  However, I know other people in the organization
have things kafka working with Scala. Probably not a good idea to say it’s a necessity when
it’s not completely necessary. Your point is well taken though, I am considering it. 
> 
> Sent from my iPhone
> 
>> On Sep 9, 2018, at 1:10 PM, Debasish Ghosh <ghosh.debasish@gmail.com> wrote:
>>
>> I don't have the environment to run the Scala code right now. Will be
>> tomorrow until I have one ..
>>
>> Now that Scala API is part of the official Kafka distribution. Can u please
>> try that out instead of kafka-streams-scala ? The library is now deprecated
>> and I remember we ran into some SAM related issues with Scala 2.11 (which
>> worked fine with 2.12). They were finally fixed in the Kafka distribution -
>> there are some differences in the APIs as well ..
>>
>> regards.
>>
>>> On Sun, Sep 9, 2018 at 11:32 PM Michael Eugene <fartzy@hotmail.com> wrote:
>>>
>>> I’m using 2.11.11
>>>
>>> Sent from my iPhone
>>>
>>>> On Sep 9, 2018, at 12:13 PM, Debasish Ghosh <ghosh.debasish@gmail.com>
>>> wrote:
>>>>
>>>> Which version of Scala are u using ?
>>>>
>>>>> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene <fartzy@hotmail.com>
>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am using kafak-sreams-scala
>>>>> https://github.com/lightbend/kafka-streams-scala, and I am trying to
>>>>> implement something very simple and I am getting a compilation error
by
>>> the
>>>>> "aggregate" method. The error is "Cannot resolve overload method
>>>>> 'aggregate'" and "Unspecified value parameters: materialized:
>>>>> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
>>>>> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
>>>>> https://github.com/lightbend/kafka-streams-scala>
>>>>>
>>>>> GitHub
>>>>> - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
>>>>> https://github.com/lightbend/kafka-streams-scala>
>>>>> github.com
>>>>> Note:
>>>>> Scala API for Kafka Streams have been accepted for inclusion in Apache
>>>>> Kafka. We have been working with the Kafka team since the last couple
of
>>>>> months working towards meeting the standards and guidelines for this
>>>>> activity. Lightbend and Alexis Seigneurin have
>>>>> contributed this library (with ...
>>>>>
>>>>>
>>>>>
>>>>> However when I add a third argument for a Materialized, I get the
>>>>> compilation error "Too may arguments for method aggregate(() =>VR,
>>> (K,V,VR)
>>>>> => VR)"
>>>>>
>>>>> It doesn't make sense anymore what could be breaking this.
>>>>>
>>>>>
>>>>>
>>>>> val myStream = builder
>>>>> .stream(inputTopic)
>>>>> .map{ (key: String, value: Array[Byte]) =>
>>>>>   println(s"key = ${key}")
>>>>>   val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
>>>>>
>>>>>   val newValue = GroupByAction.getGroupByValueFromByteAray(value)
>>>>>
>>>>>   println(s"newKey = ${newKey}")
>>>>>   (newKey, serialise(newValue))}
>>>>>
>>>>> .groupByKey
>>>>> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
>>>>>
>>>>> --
>>>> Sent from my iPhone
>>>
>>
>>
>> -- 
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg


Mime
View raw message