kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryanne Dolan <ryannedo...@gmail.com>
Subject Re: SAM Scala aggregate
Date Sun, 09 Sep 2018 16:08:07 GMT
This means the types of k, v, vr don't match those expected by the
aggregate() function. Add explicit types to your code and you'll find the
problem. You'll probably find that Scala is inferring an Any somewhere.

Ryanne

On Sun, Sep 9, 2018, 12:14 AM Michael Eugene <fartzy@hotmail.com> wrote:

> Hi,
>
>   I am using kafak-sreams-scala
> https://github.com/lightbend/kafka-streams-scala, and I am trying to
> implement something very simple and I am getting a compilation error by the
> "aggregate" method. The error is "Cannot resolve overload method
> 'aggregate'" and "Unspecified value parameters: materialized:
> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
> https://github.com/lightbend/kafka-streams-scala>
>
> GitHub
>  - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
> https://github.com/lightbend/kafka-streams-scala>
> github.com
> Note:
>  Scala API for Kafka Streams have been accepted for inclusion in Apache
> Kafka. We have been working with the Kafka team since the last couple of
> months working towards meeting the standards and guidelines for this
> activity. Lightbend and Alexis Seigneurin have
>  contributed this library (with ...
>
>
>
>   However when I add a third argument for a Materialized, I get the
> compilation error "Too may arguments for method aggregate(() =>VR, (K,V,VR)
> => VR)"
>
>   It doesn't make sense anymore what could be breaking this.
>
>
>
> val myStream = builder
>   .stream(inputTopic)
>   .map{ (key: String, value: Array[Byte]) =>
>     println(s"key = ${key}")
>     val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
>
>     val newValue = GroupByAction.getGroupByValueFromByteAray(value)
>
>     println(s"newKey = ${newKey}")
>     (newKey, serialise(newValue))}
>
> .groupByKey
> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message