kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Eugene <far...@hotmail.com>
Subject Re: SAM Scala aggregate
Date Sun, 09 Sep 2018 18:02:01 GMT
I already thought I tried that.  I tried explicitly creating the KeyValueMapper and supplying
the apply override. I think it looks like SAM syntax but it might be something else.  

Sent from my iPhone

> On Sep 9, 2018, at 11:08 AM, Ryanne Dolan <ryannedolan@gmail.com> wrote:
> 
> This means the types of k, v, vr don't match those expected by the
> aggregate() function. Add explicit types to your code and you'll find the
> problem. You'll probably find that Scala is inferring an Any somewhere.
> 
> Ryanne
> 
>> On Sun, Sep 9, 2018, 12:14 AM Michael Eugene <fartzy@hotmail.com> wrote:
>> 
>> Hi,
>> 
>>  I am using kafak-sreams-scala
>> https://github.com/lightbend/kafka-streams-scala, and I am trying to
>> implement something very simple and I am getting a compilation error by the
>> "aggregate" method. The error is "Cannot resolve overload method
>> 'aggregate'" and "Unspecified value parameters: materialized:
>> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
>> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
>> https://github.com/lightbend/kafka-streams-scala>
>> 
>> GitHub
>> - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
>> https://github.com/lightbend/kafka-streams-scala>
>> github.com
>> Note:
>> Scala API for Kafka Streams have been accepted for inclusion in Apache
>> Kafka. We have been working with the Kafka team since the last couple of
>> months working towards meeting the standards and guidelines for this
>> activity. Lightbend and Alexis Seigneurin have
>> contributed this library (with ...
>> 
>> 
>> 
>>  However when I add a third argument for a Materialized, I get the
>> compilation error "Too may arguments for method aggregate(() =>VR, (K,V,VR)
>> => VR)"
>> 
>>  It doesn't make sense anymore what could be breaking this.
>> 
>> 
>> 
>> val myStream = builder
>>  .stream(inputTopic)
>>  .map{ (key: String, value: Array[Byte]) =>
>>    println(s"key = ${key}")
>>    val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
>> 
>>    val newValue = GroupByAction.getGroupByValueFromByteAray(value)
>> 
>>    println(s"newKey = ${newKey}")
>>    (newKey, serialise(newValue))}
>> 
>> .groupByKey
>> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
>> 
>> 

Mime
View raw message