kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Eugene <far...@hotmail.com>
Subject Re: SAM Scala aggregate
Date Tue, 11 Sep 2018 01:43:55 GMT
Well to bring up that kafka to 2.0, do I just need for sbt kafka clients and kafka streams
2.0 for sbt?  And it doesn't matter if the system is not Kafka 2.0? Upgrading Kafka itself
is probably not an option or me right now.





________________________________
From: John Roesler <john@confluent.io>
Sent: Monday, September 10, 2018 4:18:24 PM
To: users@kafka.apache.org
Subject: Re: SAM Scala aggregate

In addition to the other suggestions, if you're having too much trouble
with the interface, you can always fall back to creating anonymous
Initializer/Aggregator instances the way you would if programming in Java.
This way, you wouldn't need SAM conversion at all (all it's doing is
turning your functions into an Initializer and Aggregator).

On Mon, Sep 10, 2018 at 8:32 AM Sean Glover <sean.glover@lightbend.com>
wrote:

> Hi,
>
> SAM conversions can be enabled in 2.11 with the -Xexperimental to scalac.
> However, this version of SAM conversions isn't recommended for production
> and was significantly refactored for 2.12.  When we contributed this API to
> Kafka we explicitly removed the SAM conversions because we didn't want to
> require the Kafka build (or end user builds) to enable this experimental
> feature.
>
> Echoing what Debasish has already said, the lightbend/kafka-streams-scala
> project is deprecated.  If the app your building is destined for production
> you should use the Kafka 2.0+ client libraries which are backwards
> compatible with older versions of the brokers; you don't need to get
> everyone else to upgrade their clients or your cluster.
>
> Regards
> Sean
>
> On Sun, Sep 9, 2018 at 3:59 PM Michael Eugene <fartzy@hotmail.com> wrote:
>
> > I’m integrating with lot of other applications and it would be a little
> > cowboy to chose my own version.  I can commend that we all went to 2.0
> but
> > there are already working application me so it won’t be possible to
> upgrade
> > everything that everyone else is doing.  That will have to happen
> sometime
> > though. It’s just not gonna happen this week and my goal is to get
> > something working by tomorrow and I’ll stay up all night if I have to.
> >
> > Sent from my iPhone
> >
> > > On Sep 9, 2018, at 2:11 PM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> > >
> > > Why can't you use Kafka Streams 2.0?
> > >
> > > Note: Kafka Streams is backward compatible and it can connect to older
> > > brokers -- it's not required to upgrade your cluster to use Kafka
> > > Streams 2.0 -- updating you maven/gradle dependency is sufficient.
> > >
> > > Also, AFAIK SAM conversions are only available in Scala 2.12.
> > >
> > >
> > > -Matthias
> > >
> > >> On 9/9/18 11:37 AM, Michael Eugene wrote:
> > >> I can’t do Kafka 2.0. I am limited to this version right now.  If I
> > continue to struggle with it this much, I can eventually do that.
> However,
> > I know other people in the organization have things kafka working with
> > Scala. Probably not a good idea to say it’s a necessity when it’s not
> > completely necessary. Your point is well taken though, I am considering
> it.
> > >>
> > >> Sent from my iPhone
> > >>
> > >>> On Sep 9, 2018, at 1:10 PM, Debasish Ghosh <ghosh.debasish@gmail.com
> >
> > wrote:
> > >>>
> > >>> I don't have the environment to run the Scala code right now. Will
be
> > >>> tomorrow until I have one ..
> > >>>
> > >>> Now that Scala API is part of the official Kafka distribution. Can
u
> > please
> > >>> try that out instead of kafka-streams-scala ? The library is now
> > deprecated
> > >>> and I remember we ran into some SAM related issues with Scala 2.11
> > (which
> > >>> worked fine with 2.12). They were finally fixed in the Kafka
> > distribution -
> > >>> there are some differences in the APIs as well ..
> > >>>
> > >>> regards.
> > >>>
> > >>>> On Sun, Sep 9, 2018 at 11:32 PM Michael Eugene <fartzy@hotmail.com>
> > wrote:
> > >>>>
> > >>>> I’m using 2.11.11
> > >>>>
> > >>>> Sent from my iPhone
> > >>>>
> > >>>>> On Sep 9, 2018, at 12:13 PM, Debasish Ghosh <
> > ghosh.debasish@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>> Which version of Scala are u using ?
> > >>>>>
> > >>>>>> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene <
> fartzy@hotmail.com>
> > >>>> wrote:
> > >>>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> I am using kafak-sreams-scala
> > >>>>>> https://github.com/lightbend/kafka-streams-scala, and I
am trying
> > to
> > >>>>>> implement something very simple and I am getting a compilation
> > error by
> > >>>> the
> > >>>>>> "aggregate" method. The error is "Cannot resolve overload
method
> > >>>>>> 'aggregate'" and "Unspecified value parameters: materialized:
> > >>>>>> Materialized[String, NotInferedVR, KeyValueStore[Bytes,
> > Array[Byte]]]"
> > >>>>>> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
> > >>>>>> https://github.com/lightbend/kafka-streams-scala>
> > >>>>>>
> > >>>>>> GitHub
> > >>>>>> - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
> > >>>>>> https://github.com/lightbend/kafka-streams-scala>
> > >>>>>> github.com
> > >>>>>> Note:
> > >>>>>> Scala API for Kafka Streams have been accepted for inclusion
in
> > Apache
> > >>>>>> Kafka. We have been working with the Kafka team since the
last
> > couple of
> > >>>>>> months working towards meeting the standards and guidelines
for
> this
> > >>>>>> activity. Lightbend and Alexis Seigneurin have
> > >>>>>> contributed this library (with ...
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> However when I add a third argument for a Materialized,
I get the
> > >>>>>> compilation error "Too may arguments for method aggregate(()
=>VR,
> > >>>> (K,V,VR)
> > >>>>>> => VR)"
> > >>>>>>
> > >>>>>> It doesn't make sense anymore what could be breaking this.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> val myStream = builder
> > >>>>>> .stream(inputTopic)
> > >>>>>> .map{ (key: String, value: Array[Byte]) =>
> > >>>>>>  println(s"key = ${key}")
> > >>>>>>  val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
> > >>>>>>
> > >>>>>>  val newValue = GroupByAction.getGroupByValueFromByteAray(value)
> > >>>>>>
> > >>>>>>  println(s"newKey = ${newKey}")
> > >>>>>>  (newKey, serialise(newValue))}
> > >>>>>>
> > >>>>>> .groupByKey
> > >>>>>> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
> > >>>>>>
> > >>>>>> --
> > >>>>> Sent from my iPhone
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> Debasish Ghosh
> > >>> http://manning.com/ghosh2
> > >>> http://manning.com/ghosh
> > >>>
> > >>> Twttr: @debasishg
> > >>> Blog: http://debasishg.blogspot.com
> > >>> Code: http://github.com/debasishg
> > >
> >
>
>
> --
> Senior Software Engineer, Lightbend, Inc.
>
> <http://lightbend.com>
>
> @seg1o <https://twitter.com/seg1o>, in/seanaglover
> <https://www.linkedin.com/in/seanaglover/>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message