kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Issue with GlobalKTable on sink topic
Date Wed, 16 May 2018 15:47:55 GMT
Should be ok to read the topic. I cannot spot any error in your
configs/program either.

However, I am not entirely sure, if I understand the problem correctly.

>> The problem is in the first run,  where GlobalKTable reads null records (I
>> have a json serializer and it reads a record with null value and, of
>> course, the records written formerly are not null). 

Can you elaborate? What is the behavior you expect?

On startup, GlobalKTable consumes the specified topic to the current
end-offsets to pre-populate itself before any other processing starts.


-Matthias

On 5/16/18 6:30 AM, Daniele Tria wrote:
> Dear community, I'm struggling with an issue regarding `GlobalKTable`: that
> is demanded to read from a topic A, where I add key-mapped records from
> another stream. This topic A is populated within the same application by `
> stream.to` costruct with some events that I need to manipulate before they
> can be sinked. So, The `GlobalKTable` has to materialize the records read
> in its store.
> The problem is in the first run,  where GlobalKTable reads null records (I
> have a json serializer and it reads a record with null value and, of
> course, the records written formerly are not null). In the next runs,
> records are correctly read from topic and then saved in the store.
> Here is the snippet code:
> 
> val streamProperties = {
>   val settings = new Properties
>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
>   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
>   settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.Bytes.getClass.getName)
>   settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.Bytes.getClass.getName)
>   settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
>   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>   settings.put(StreamsConfig.STATE_DIR_CONFIG,
> "in-memory-avg-store/myApp/global")
>   settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.AT_LEAST_ONCE)
>   settings
> }
> 
> var globalKTable: Option[GlobalKTable[String, Event]] = None
> 
> val serdeEvent: Serde[Event] = JsonSerde[Event]
> val eventTopic = "eventTopicGlobalTable"
> 
> val builder: StreamsBuilder = new StreamsBuilder()
> 
> val stream: KStream[String, Event] = eventStream.selectKey(new
> KeyValueMapper[Array[Byte], Event, String] {
>   override def apply(key: Array[Byte], value: Event): String = {
>     value.id
>   }
> })
> 
> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
> serdeEvent))
> 
> globalKTable match {
>   case None => {
>     globalKTable = Some(builder.globalTable[String, Event](eventTopic,
>       Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
>       Materialized.as[String, Event, KeyValueStore[Bytes,
> Array[Byte]]]("eventStore")))
>   }
>   case _ => ()
> }
> 
> val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)
> 
> streams.start()
> 
> 
> So my question is why is this happening? Am I not allowed to read from a
> sinked topic in the same run? Either, Is it something related to my
> configuration?
> 
> Thank you so much for your work.
> 


Mime
View raw message