kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manikumar Reddy <ku...@nmsworks.co.in>
Subject Re: Producer code to a partition
Date Thu, 04 Feb 2016 06:42:26 GMT
In kafka, each record can have a key.  This key is used to distribute
records to partitions.
All non-keyed records will be distributed in round-robin fashion.
All keyed records will be distributed based on the hash of the key / or can
write a custom partitioner.
or we can specify partition number for each message using "ProducerRecord"
constructor.

https://kafka.apache.org/documentation.html#theproducer

On Thu, Feb 4, 2016 at 11:53 AM, Joe San <codeintheopen@gmail.com> wrote:

> What is the partition key? Why do I need to specify the partition key and a
> partition number?
>
> On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <manikumar.reddy@gmail.com
> >
> wrote:
>
> > Hi,
> >
> >  You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> > partition, K key, V value) constructor
> >   to pass partition number.
> >
> >
> >
> >
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > Kumar
> >
> > On Thu, Feb 4, 2016 at 11:41 AM, Joe San <codeintheopen@gmail.com>
> wrote:
> >
> > > Kafka users,
> > >
> > > The code below is something that I have to write to a Topic!
> > >
> > > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> > >   Future {
> > >     logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > > Kafka topic ${producerConfig.topic}")
> > >     val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> > >     val jsonMessage = Json.toJson(dataPoints).toString()
> > >     val recordMetaDataF = producer.send(
> > >       new ProducerRecord[String, String](producerConfig.topic,
> > jsonMessage)
> > >     )
> > >     // if we don't make it to Kafka within 3 seconds, we timeout
> > >     val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> > >     logger.info(
> > >       s"persisted ${tsDataPoints.length} data-points to kafka topic:
> " +
> > >         s"${recordMetaData.topic()} partition:
> > > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> > >     )
> > >     ()
> > >   }
> > > }
> > >
> > >
> > > How could I make this code write to a specific partition? currently my
> > > topic does not have partitions, so by default this code write to
> > partition
> > > 0 of the topic!
> > >
> > > I'm using Kafka 0.9.0.0! Any suggestions?
> > >
> > > Regards,
> > > Joe
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message