kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry Thacker <he...@henrythacker.com>
Subject Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages
Date Tue, 02 May 2017 16:33:42 GMT
Hi Eno,

Think I've cracked it finally - was hit by two problems, firstly my listen
IP was 0.0.0.0 and clients were trying to connect to this, which obviously
wasn't going to work.

Part two was I had stupidly left some code in when I was working with the
KStreamBuilder and hadn't removed it when moving to the TopologyBuilder, so
I was trying to instantiate a KafkaStreams with no input source.

Looks like everything works now - thank you very much for your help.

Now I'm creating multiple application.ids that are essentially throwaway
and all state can be removed after 5 days, just need to work out how to
tidy this all up in a semi-automated fashion.

Thanks,
Henry

-- 
Henry Thacker

On 2 May 2017 at 16:21:33, Eno Thereska (eno.thereska@gmail.com) wrote:

> Could you make sure you don’t have a firewall or that the Kafka brokers
> are set up correctly and can be accessed? Is the SSL port the same as the
> PLAINTEXT port in your server.config file? E.g., see this:
> https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521
> <
> https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521>
>
>
> Eno
>
> On May 2, 2017, at 10:59 AM, Henry Thacker <henry@henrythacker.com>
> wrote:
>
> Hi Eno,
>
> At the moment this is hard coded, but overridable with command line
> parameters:
>
> config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> maxMessageBytes);
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes);
> config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getName());
> config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
> config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
> config.put(ProducerConfig.RETRIES_CONFIG, 2);
>
> if (ssl)
> config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
>
> Variables:
> appId - "my-streamer-app"
> topic - "20170502_instancea_1234"
> brokers - "localhost:9092,localhost:9093,localhost:9094"
> zookeepers - "localhost:2181,localhost:2182,localhost:2183"
> maxMessageBytes - 30000000
> ssl - true
>
> Thanks,
> Henry
> --
> Henry Thacker
>
> On 2 May 2017 at 10:16:25, Eno Thereska (eno.thereska@gmail.com) wrote:
>
> Hi Henry,
>
> Could you share the streams configuration for your apps? I.e., the part
> where you assign application id and all the rest of the configs (just
> configs, not code).
>
> Thanks
> Eno
>
> On May 2, 2017, at 8:53 AM, Henry Thacker <henry@henrythacker.com> wrote:
>
> Thanks all for your replies - I have checked out the docs which were very
> helpful.
>
> I have now moved the separate topic streams to different processes each
> with their own app.id and I'm getting the following pattern, with no data
> consumed:
>
> "Starting stream thread [StreamThread-1]
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group ..
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group .."
>
> The discover and dead states repeat every few minutes.
>
> During this time, the broker logs look happy.
>
> One other, hopefully unrelated point, is this cluster is all SSL
> encrypted.
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
> On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io)
> wrote:
>
> Henry,
>
> you might want to check out the docs, that give an overview of the
> architecture:
> http://docs.confluent.io/current/streams/architecture.html#example
>
> Also, I am wondering why your application did not crash: I would expect
> an exception like
>
> java.lang.IllegalArgumentException: Assigned partition foo-2 for
> non-subscribed topic regex pattern; subscription pattern is bar
>
> Maybe you just don't hit it, because both topics have a single partition
> and not multiple.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved
>
>
> Yes. That should happen.
>
> why when
>
> running this in two separate processes do I not observe the same?
>
>
> Not sure what you mean by this?
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
>
> Yes.
>
>
> If both your topics are single partitioned, and you want to share state,
> you will not be able to run with more then one thread in your Streams app.
>
> The only way to work around this, would be to copy the data into another
> topic with more partitions before you process them -- of course, this
> would mean data duplication.
>
>
> -Matthias
>
>
> On 4/28/17 12:45 PM, Henry Thacker wrote:
>
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
>
> Thanks,
> Henry
>
>
> ------------------------------
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message