kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Eugene <far...@hotmail.com>
Subject SAM Scala aggregate
Date Sun, 09 Sep 2018 05:13:50 GMT
Hi,

  I am using kafak-sreams-scala https://github.com/lightbend/kafka-streams-scala, and I am
trying to implement something very simple and I am getting a compilation error by the "aggregate"
method. The error is "Cannot resolve overload method 'aggregate'" and "Unspecified value parameters:
materialized: Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
[https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<https://github.com/lightbend/kafka-streams-scala>

GitHub
 - lightbend/kafka-streams-scala: Thin Scala wrapper ...<https://github.com/lightbend/kafka-streams-scala>
github.com
Note:
 Scala API for Kafka Streams have been accepted for inclusion in Apache Kafka. We have been
working with the Kafka team since the last couple of months working towards meeting the standards
and guidelines for this activity. Lightbend and Alexis Seigneurin have
 contributed this library (with ...



  However when I add a third argument for a Materialized, I get the compilation error "Too
may arguments for method aggregate(() =>VR, (K,V,VR) => VR)"

  It doesn't make sense anymore what could be breaking this.



val myStream = builder
  .stream(inputTopic)
  .map{ (key: String, value: Array[Byte]) =>
    println(s"key = ${key}")
    val newKey = GroupByAction.getGroupByKeyFromByteAray(value)

    val newValue = GroupByAction.getGroupByValueFromByteAray(value)

    println(s"newKey = ${newKey}")
    (newKey, serialise(newValue))}

.groupByKey
.aggregate(()=> 0L, (k,v,vr) => vr + 1)


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message