kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniele Tria <daniele.t...@radicalbit.io>
Subject Re: Issue with GlobalKTable on sink topic
Date Thu, 17 May 2018 09:04:57 GMT
Hi Matthias, thanks for the reply.

>> Can you elaborate? What is the behavior you expect?

I want my GlobalKTable reading records from a topic A, that is populated
within the same job
(so I want my table changing at runitme). The producer of A, takes a
key-mapped stream.
Next I perform a left join between a KStream and the GlobalKTable on
multiple partitions.

>> On startup, GlobalKTable consumes the specified topic to the current
>> end-offsets to pre-populate itself before any other processing starts.

So, are you implying that `GlobalKTable` can not consume records from a
topic populated at runtime by the same job,
but it needs instead a pre-populated topic either some other application
populating it?
If so, I can not execute the next left join because globalKTable records
must be consumed at runtime.

Is there any possibility of using another method to obtain the expected
result?

Thank you,

Daniele

2018-05-16 17:47 GMT+02:00 Matthias J. Sax <matthias@confluent.io>:

> Should be ok to read the topic. I cannot spot any error in your
> configs/program either.
>
> However, I am not entirely sure, if I understand the problem correctly.
>
> >> The problem is in the first run,  where GlobalKTable reads null records
> (I
> >> have a json serializer and it reads a record with null value and, of
> >> course, the records written formerly are not null).
>
> Can you elaborate? What is the behavior you expect?
>
> On startup, GlobalKTable consumes the specified topic to the current
> end-offsets to pre-populate itself before any other processing starts.
>
>
> -Matthias
>
> On 5/16/18 6:30 AM, Daniele Tria wrote:
> > Dear community, I'm struggling with an issue regarding `GlobalKTable`:
> that
> > is demanded to read from a topic A, where I add key-mapped records from
> > another stream. This topic A is populated within the same application by
> `
> > stream.to` costruct with some events that I need to manipulate before
> they
> > can be sinked. So, The `GlobalKTable` has to materialize the records read
> > in its store.
> > The problem is in the first run,  where GlobalKTable reads null records
> (I
> > have a json serializer and it reads a record with null value and, of
> > course, the records written formerly are not null). In the next runs,
> > records are correctly read from topic and then saved in the store.
> > Here is the snippet code:
> >
> > val streamProperties = {
> >   val settings = new Properties
> >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
> >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> >   settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.Bytes.getClass.getName)
> >   settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > Serdes.Bytes.getClass.getName)
> >   settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
> >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> >   settings.put(StreamsConfig.STATE_DIR_CONFIG,
> > "in-memory-avg-store/myApp/global")
> >   settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> > StreamsConfig.AT_LEAST_ONCE)
> >   settings
> > }
> >
> > var globalKTable: Option[GlobalKTable[String, Event]] = None
> >
> > val serdeEvent: Serde[Event] = JsonSerde[Event]
> > val eventTopic = "eventTopicGlobalTable"
> >
> > val builder: StreamsBuilder = new StreamsBuilder()
> >
> > val stream: KStream[String, Event] = eventStream.selectKey(new
> > KeyValueMapper[Array[Byte], Event, String] {
> >   override def apply(key: Array[Byte], value: Event): String = {
> >     value.id
> >   }
> > })
> >
> > stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
> > serdeEvent))
> >
> > globalKTable match {
> >   case None => {
> >     globalKTable = Some(builder.globalTable[String, Event](eventTopic,
> >       Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
> >       Materialized.as[String, Event, KeyValueStore[Bytes,
> > Array[Byte]]]("eventStore")))
> >   }
> >   case _ => ()
> > }
> >
> > val streams: KafkaStreams = new KafkaStreams(builder.build(),
> streamProperties)
> >
> > streams.start()
> >
> >
> > So my question is why is this happening? Am I not allowed to read from a
> > sinked topic in the same run? Either, Is it something related to my
> > configuration?
> >
> > Thank you so much for your work.
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message