kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Afshartous, Nick" <nafshart...@turbine.com>
Subject Re: Low-level Consumer Example (Scala)
Date Wed, 06 Apr 2016 18:26:02 GMT


>  I use https://github.com/akka/reactive-kafka <https://github.com/akka/reactive-kafka>.
This library has just been taken over by the Akka team and will in incorporated into future
version of Akka.

Thanks Dave for the pointer.  

I downloaded the reactive-kakka .8 branch since we're using Kafka .8.2.2.  We need at-most
semantics and control over individual offset commit so I was looking at the example below
on the branch page

   https://github.com/akka/reactive-kafka/tree/0.8

Could this example be tweaked to provide at-most once semantics ?  And there's mention in
the doc of configuring the concurrency level of internal thread pools, so I assume that would
be applicable to this example ?
--
     Nick

Manual Commit (version 0.8 and above)

In order to be able to achieve "at-least-once" delivery, you can use following API to obtain
an additional Sink, when you can stream back messages that you processed. An underlying actor
will periodically flush offsets of these messages as committed. Reactive Kafka supports manual
commit both to Zookeeper (legacy) and Kafka storage. Dual commit is not supported. In order
to commit manually to zookeeper, you have to add an optional module to your dependencies:

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  zooKeeperHost = "localhost:2181",
  topic = "lowercaseStrings",
  groupId = "groupName",
  decoder = new StringDecoder())
.commitInterval(5 seconds) // flush interval

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffsetSink.publisher)
  .map(processMessage(_)) // your message processing
  .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
  .run()


________________________________________
From: David Buschman <david.buschman@timeli.io>
Sent: Wednesday, April 6, 2016 5:01 AM
To: users@kafka.apache.org
Subject: Re: Low-level Consumer Example (Scala)

I use https://github.com/akka/reactive-kafka <https://github.com/akka/reactive-kafka>.
This library has just been taken over by the Akka team and will in incorporated into future
version of Akka.

It allows for at-least-once semantics as well as at-most-once semantics.

WARNING:  The new API ( v0.11.X) is unproven, you might start with 0.10.X first, which is
the version I am using now.

Thanks,
    DaVe.

David Buschman
dave@timeli.io



> On Apr 5, 2016, at 1:58 PM, Afshartous, Nick <nafshartous@turbine.com> wrote:
>
>
> Hi,
>
> I'm looking for a complete low-level consumer example.  Ideally one in Scala that continuously
consumes from a topic and commits offsets.
>
> Thanks for any pointers,
>
> --
>
>     Nick

Mime
View raw message