kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saiprasad mishra <saiprasadmis...@gmail.com>
Subject Re: Reopen KAFKA-4344 ?
Date Mon, 07 Nov 2016 20:55:30 GMT
Hi Srinivas

I raised the issue and the way I got around this was to let kafka streams
run on POJO way rather than some of the dependent instances being spring
managed bean instances.
If you create the instance of riakService and counterService in processor
class instead of passing the spring managed instances to the processor
class constructor your kafka streams initilization should be fine and it
should create the right number of tasks with right number of processors for
all the partitions.

I was fine with POJO based approach as kafka streams has quite a bit of
apis to query the state(of course once it is started correctly) as i am
running stateful processors and i wanted to query the state data all the
time. I was just using spring boot controller for the web container to
proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis.

Alternatively you can try having prototype components for these two
services (if your usecase is fine with this).

Hope this helps.

Regards
Sai

On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> KAFKA-4344 was not a bug. The issues was as wrong initialization order
> of Kafka Streams by the user.
>
> Please double check your initialization order (and maybe read the old
> email thread and JIRA comments -- it might have some relevant
> information for you to fix the issue for you).
>
> If the problem is still there, can you please reduce your code to a
> minimum example that reproduces the problem?
>
> Thanks!
>
> - -Matthias
>
> On 11/5/16 3:28 PM, srinivas koniki wrote:
> >
> > Hi, I'm still seeing the same issue with spring boot. Code is
> > below, sorry code is in groovy and not fully baked. Just have
> > single processor. It worked well with single partition. But when i
> > increased the partitions, started seeing the error as in this
> > kafka-4344.
> >
> >
> > import com.codahale.metrics.MetricRegistry import
> > org.apache.kafka.clients.consumer.ConsumerConfig import
> > org.apache.kafka.clients.producer.KafkaProducer import
> > org.apache.kafka.clients.producer.ProducerRecord import
> > org.apache.kafka.common.serialization.Serdes import
> > org.apache.kafka.streams.KafkaStreams import
> > org.apache.kafka.streams.StreamsConfig import
> > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
> > import org.apache.kafka.streams.processor.AbstractProcessor import
> > org.apache.kafka.streams.processor.ProcessorSupplier import
> > org.apache.kafka.streams.processor.TopologyBuilder import
> > org.aspectj.lang.ProceedingJoinPoint import
> > org.aspectj.lang.annotation.AfterReturning import
> > org.aspectj.lang.annotation.Around import
> > org.aspectj.lang.annotation.Aspect import
> > org.aspectj.lang.annotation.Pointcut import
> > org.springframework.beans.factory.annotation.Autowired import
> > org.springframework.beans.factory.annotation.Value import
> > org.springframework.boot.actuate.metrics.CounterService import
> > org.springframework.boot.actuate.metrics.GaugeService import
> > org.springframework.boot.autoconfigure.SpringBootApplication import
> > org.springframework.boot.test.context.SpringBootTest import
> > org.springframework.context.Lifecycle import
> > org.springframework.context.annotation.Bean import
> > org.springframework.context.annotation.Configuration import
> > org.springframework.context.annotation.Import import
> > org.springframework.context.support.PropertySourcesPlaceholderConfigur
> er
> >
> >
> import org.springframework.stereotype.Component
> > import org.springframework.test.context.ContextConfiguration import
> > org.springframework.util.StopWatch import spock.lang.Shared import
> > spock.lang.Specification
> >
> > import java.util.concurrent.Future import
> > java.util.stream.IntStream
> >
> > /** * Created by srinivas.koniki on 11/5/16. */
> > @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> > RiakService]) @SpringBootTest(webEnvironment =
> > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> > extends Specification{
> >
> > static String kafkaTopic = 'testTopic'
> >
> > @Shared TestConfig testConfigRef
> >
> > @Autowired TestConfig testConfig
> >
> > @Autowired MetricRegistry metricRegistry
> >
> > @Autowired KafkaProducer kafkaProducer
> >
> > @Shared static final EmbeddedKafkaCluster CLUSTER = new
> > EmbeddedKafkaCluster(1)
> >
> > def setupSpec() { println("Heavy init for all the tests...")
> > CLUSTER.start()
> > System.setProperty('broker.url',CLUSTER.bootstrapServers())
> > System.setProperty('zk.url',CLUSTER.zKConnectString())
> > System.setProperty('kafka.topic',kafkaTopic)
> > CLUSTER.createTopic(kafkaTopic, 3, 1) }
> >
> > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> >
> > def "Test send and receive" (){ expect: testConfig != null
> > metricRegistry != null println ''+metricRegistry.getGauges()
> >
> > when: testConfigRef = testConfig testConfig.start() List<Future>
> > futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i ->
> > Future future = kafkaProducer.send(new ProducerRecord<String,
> > String>(kafkaTopic, 'test'+i, 'testMesg'+i)) })
> >
> > futureList.forEach({ future -> println future.get() }) then:
> > Thread.sleep(20000)
> >
> > println ''+metricRegistry.getGauges() println
> > ''+metricRegistry.counters
> > metricRegistry.counters.keySet().forEach({key -> println
> > key+':'+metricRegistry.counters.get(key).count })
> > Thread.sleep(2000) }
> >
> > @Configuration @SpringBootApplication static class TestConfig
> > implements Lifecycle {
> >
> > @Value('${broker.url}') String brokerUrl
> >
> > Map<String, String> producerConfig(){ def props =
> > ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0,
> > "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432,
> > "key.serializer":
> > "org.apache.kafka.common.serialization.StringSerializer",
> > "value.serializer" :
> > "org.apache.kafka.common.serialization.StringSerializer" ] }
> >
> > @Bean KafkaProducer<String, String> kafkaProducer() { new
> > KafkaProducer<String, String>(producerConfig()) }
> >
> > @Bean public static PropertySourcesPlaceholderConfigurer
> > properties() { return new PropertySourcesPlaceholderConfigurer() }
> >
> > @Value('${zk.url}') String zkUrl
> >
> > @Value('${kafka.topic}') String kafkaTopic
> >
> > @Autowired RiakService riakService
> >
> > @Autowired CounterService counterService
> >
> > KafkaStreams streams
> >
> > boolean state
> >
> > @Override void start() { println 'starting streams' Properties
> > props = new Properties(); props.put('group.id',
> > "streams-test-processor-group");
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "streams-test-processor");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
> > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl);
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> > TopologyBuilder builder = new TopologyBuilder();
> > builder.addSource("Source", kafkaTopic); def processor = new
> > PriceProcessor(riakService, counterService) def procSupplier =
> > {processor} as ProcessorSupplier builder.addProcessor("Process",
> > procSupplier, "Source");
> >
> > streams = new KafkaStreams(builder, props); streams.start() println
> > ' Streams started' state = true }
> >
> > @Override void stop() { streams.close() state = false }
> >
> > @Override boolean isRunning() { return state }
> >
> > }
> >
> > static class PriceProcessor extends AbstractProcessor<String,
> > String> { RiakService riakClient1 CounterService counterService
> > PriceProcessor(RiakService riakClient2, CounterService
> > counterService1){ this.riakClient1 = riakClient2
> > this.counterService = counterService1 }
> >
> > @Override void process(String key, String value) {
> > riakClient1.save(key, value)
> >
> > context().commit() println 'offset-'+context().offset()+',
> > partition -'+context().partition()
> > counterService.increment('kafka.partition.'+context().partition()+'.of
> fset')
> >
> >
> }
> >
> >
> > }
> >
> > @Aspect @Component static class MetricsAspect { final
> > CounterService counterService final GaugeService gaugeService
> >
> > @Autowired MetricsAspect(GaugeService gaugeService1, CounterService
> > counterService1){ println 'aspect init' this.gaugeService =
> > gaugeService1 this.counterService = counterService1 }
> >
> > @Pointcut("execution(*
> > com.bsb.retail.price.kafka.RiakService.save(..))") public void
> > methodsToBeProfiled(){}
> >
> > @Around("methodsToBeProfiled()") public Object
> > profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw =
> > new StopWatch(getClass().getSimpleName()); try {
> > sw.start(pjp.getSignature().getName()); return pjp.proceed(); }
> > finally { sw.stop(); System.out.println(sw.prettyPrint());
> > gaugeService.submit('riakSave', sw.lastTaskTimeMillis) }
> >
> > }
> >
> > @AfterReturning(pointcut = "execution(*
> > com.bsb.retail.price.RiakService.save(..)) ") void metric(){
> > println 'increment metrics counter'
> > counterService.increment('message.count') }
> >
> >
> >
> > }
> >
> >
> > }
> >
> > @Component class RiakService { void save(String key, String value)
> > { println 'Sending to riak '+Thread.currentThread().name } }
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYILUqAAoJECnhiMLycopPMDQP/RZZTwXm0YOVgnAqvvObGwzq
> MCrigCz0+RmFMbGStOVQcRvzEMu1ZAXi6EIq32GUGJtC1L6xwaXamH4IZ9+u/hbi
> w8mp9YiQX9RUJEqDYZp0L7P2PfWamMVTz6ALh5xRlBnPIQrsvTaVZmFsn1/B6peM
> 50/XldUaRNb1RKzpDwjP+K1Y2pfsMxcvvG2VXQUNF6pnjpwETGyOyGzFcl1cX4pc
> vg+pkLb7E5WktDw2c18/bImZqji+P/ofuduBLqoAv19/p7gBFRO3UHyjnb3sl/Yp
> sHv06kXy13jPJP+6O7jIJo7+0IKOVReoOsJnIYsITi/odQXFA0b7wT42v0Xx2d1+
> 9YquS5ue9wvN0epngNtlpr+ADzhn0cTa9bDnLUi8RONzgmoZOn39QfeTsEvGbF5l
> kR1/1a9BPgK/O11b8rI13obBZxT/XdtPtDmZCBCBfXnSEc7/88Ag8eLKPd6fFpeU
> 81FXPfPsQklk0UXQck5zH/sm+AZMpAYJIPphRLIP4NNpcBG1XrP+tCWjZuE+lqCZ
> DPJ3f41ahP6cj1i0LFleIzIi77k0QHCk6tJISsxo1g5XvVARNvC0EcygJp0utWet
> pILJ+o9+l/d5mF19gyEsSpzzoLowvTe4h/fnDkHsSjaJVFrK3XZ/WSI/Drmo5Tjo
> +4ur3dG49ntS/uuTwche
> =1JgC
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message