kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: Stream Applications are initialized
Date Wed, 04 Mar 2020 04:21:32 GMT
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

It's hard to say. However, calling `KafkaStreams#close()` in the
uncaught exception handler is considered bad practice (using a timeout
avoids the deadlock, but it would be better to just set a boolean flag
and call `close()` from outside the handler.

I assume your input topic has enough partitions and there are enough
tasks to utilize all 10 instances?

You also don't need to set the `group.id` -- this config will be
ignored anyway and the `application.id` is used as consumer `group.id`.

I would try to inspect the log (if required at DEBUG level) to get
more intel what the application is doing.


- -Matthias


On 2/23/20 1:09 PM, Michelle Francois wrote:
> I tried to create a kafka streams application, but there was a bug
> in my code. I believe there was a deadlock for my application and
> whenever I tried to run an application instance with the same
> StreamsConfig.APPLICATION_ID_CONFIG id it could not start. I had to
> create another instance with different
> StreamsConfig.APPLICATION_ID_CONFIG id in order for it to start. I
> fixed the bug by adding:
>
>> kafkaStreams.setUncaughtExceptionHandler((Thread thread,
>> Throwable throwable) -> { kafkaStreams.close(1000L,
>> TimeUnit.MILLISECONDS); });
>>
> and I had never the same problem of deadlock.
>
> *Nevertheless, these days my  code had another bug, and the
> execution got stuck in a while loop. * I used several times:
>
>> pkill -u myuser
>>
>
> with no problems. However, now when I try to run 10 instances of my
> streams application only a portion of them are initialized. I mean
> that the method
>
>> public void init(ProcessorContext processorContext)
>>
> in my 10 processors is executed only to a portion of my 10 kafka
> streams application instances. To be more specific, I run a
> scriptfile
>
>> mvn exec:java -Dexec.mainClass="SiteApplication"
>> -Dexec.args="Vectors1 VectorSink1 VectorSink2
>> VectorSourceNodeName1 VectorProcessorName1 StateStoreName1
>> si1-client si1-group si1-appid"  & mvn exec:java
>> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2
>> VectorSink3 VectorSink4 VectorSourceNodeName2
>> VectorProcessorName2 StateStoreName2 si2-client si2-group
>> si2-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
>> -Dexec.args="Vectors3 VectorSink5 VectorSink6
>> VectorSourceNodeName3 VectorProcessorName3 StateStoreName3
>> si3-client si3-group si3-appid"  & mvn exec:java
>> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4
>> VectorSink7 VectorSink8 VectorSourceNodeName4
>> VectorProcessorName4 StateStoreName4 si4-client si4-group
>> si4-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
>> -Dexec.args="Vectors5 VectorSink9 VectorSink10
>> VectorSourceNodeName5 VectorProcessorName5 StateStoreName5
>> si5-client si5-group si5-appid"  & mvn exec:java
>> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6
>> VectorSink11 VectorSink12 VectorSourceNodeName6
>> VectorProcessorName6 StateStoreName6 si6-client si6-group
>> si6-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
>> -Dexec.args="Vectors7 VectorSink13 VectorSink14
>> VectorSourceNodeName7 VectorProcessorName7 StateStoreName7
>> si7-client si7-group si7-appid"  & mvn exec:java
>> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8
>> VectorSink15 VectorSink16 VectorSourceNodeName8
>> VectorProcessorName8 StateStoreName8 si8-client si8-group
>> si8-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
>> -Dexec.args="Vectors9 VectorSink17 VectorSink18
>> VectorSourceNodeName9 VectorProcessorName9 StateStoreName9
>> si9-client si9-group si9-appid"  & mvn exec:java
>> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10
>> VectorSink19 VectorSink20 VectorSourceNodeName10
>> VectorProcessorName10 StateStoreName10 si10-client si10-group
>> si10-appid"  &
>>
>
> and usually only 7 processors are initialized out of ten (Each
> application has the same topology consisting of one Processor).
>
> *Whenever I don't change the StreamsConfig.APPLICATION_ID_CONFIG id
> the same instances are not initialized.* *Whenever I change the
> StreamsConfig.APPLICATION_ID_CONFIG id of them all, the number of
> instances that get initialized change (they are now 8 or 9 now
> initalized according to the change of the
> StreamsConfig.APPLICATION_ID_CONFIG and they stick to that until I
> change the StreamsConfig.APPLICATION_ID_CONFIG again to all of
> them).*
>
>
> My streams topology is*:*
>
>
> public class SiteApplication {
>> static Logger logger =
>> Logger.getLogger(SiteApplication.class.getName());
>>
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>>
>> PropertyConfigurator.configure(SiteApplication.class.getClassLoader()
.getResourceAsStream("log4j.properties"));
>>
>>
>>
Serde<String> stringSerde = Serdes.String();
>> Serde<Integer> integerSerde = Serdes.Integer(); Serde<byte[]>
>> byteArraySerde = Serdes.ByteArray();
>>
>> Serializer<String> stringSerializer = stringSerde.serializer();
>> Deserializer<String> stringDeserializer =
>> stringSerde.deserializer();
>>
>> Serializer<Integer> integerSerializer =
>> integerSerde.serializer(); Deserializer<Integer>
>> integerDeserializer = integerSerde.deserializer();
>>
>> Serializer<byte[]> byteArraySerializer =
>> byteArraySerde.serializer(); Deserializer<byte[]>
>> byteArrayDeserializer = byteArraySerde.deserializer();
>>
>> Serializer<byte[]> byteArraySerializer1 =
>> byteArraySerde.serializer(); Deserializer<byte[]>
>> byteArrayDeserializer1 = byteArraySerde.deserializer();
>>
>>
>> Topology toplogy = new Topology();
>>
>> String vectorSink1 = args[1]; String vectorSink2 = args[2];
>> String vectorSourceNodeName = args[3]; String vectorProcessorName
>> = args[4]; String firstStateStoreName = args[5];
>>
>>
>> MyCustomStoreBuilder vectorStoreBuilder = new
>> MyCustomStoreBuilder(firstStateStoreName);
>>
>>
>> toplogy.addSource(vectorSourceNodeName, stringDeserializer,
>> integerDeserializer, args[0]) .addProcessor(vectorProcessorName,
>> ()->new SiteProcessor(firstStateStoreName, vectorSink1,
>> vectorSink2), vectorSourceNodeName)
>> .addSink(vectorSink1,"IncreaseOfC", stringSerializer,
>> byteArraySerializer, vectorProcessorName) .addSink(vectorSink2,
>> "Messages", stringSerializer, byteArraySerializer1,
>> vectorProcessorName) .addStateStore(vectorStoreBuilder,
>> vectorProcessorName);
>>
>>
>> Properties props = new Properties();
>> props.put(StreamsConfig.CLIENT_ID_CONFIG, args[6]);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, args[7]);
>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, args[8]);
>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> clu1.myuniversity.com:6667,clu2.myuniversity:6667,
>> clu3.myuniversity.com:6667,clu4.myuniversity.com:6667");
>> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
>> props.put(StreamsConfig.STATE_DIR_CONFIG, "/home/myuser/state");
>>
>> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RES
ET_CONFIG),
>>
>>
"earliest");
>> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass() );
>> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass() );
>>
>>
>>
>> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AG
E_CONFIG),
>>
>>
60000);
>> props.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AG
E_CONFIG),
>>
>>
60000);
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(toplogy, props);
>> kafkaStreams.setUncaughtExceptionHandler((Thread thread,
>> Throwable throwable) -> { // here you should examine the
>> throwable/exception and perform an appropriate action!
>> logger.info("There is an error in site " + args[6]);
>> kafkaStreams.close(1000L, TimeUnit.MILLISECONDS); });
>>
>>
>> logger.info("Starting FGM Application now for " + args[6]);
>> kafkaStreams.cleanUp(); kafkaStreams.start();
>>
>> // Add shutdown hook to respond to SIGTERM and gracefully close
>> Kafka Streams Runtime.getRuntime().addShutdownHook(new
>> Thread(()->{ try { System.out.println("Shutting down FGM
>> Application  now"); kafkaStreams.close(1000L,
>> TimeUnit.MILLISECONDS); } catch (final Exception e) {
>> e.printStackTrace(); } })); }
>>
>>
>>
>> }
>
>
>
>
>
> I have to say something to the administrator of the cluster of my
> university running the kafka Streams  but I do not know what
> exactly is happening.
>
> *I do not know if It is a bug in my code because it is not stuck in
> a while loop any more, because I run a steady version of my code
> which ran in the past without problems.*
>
> *The problems began after being stuck in a while loop in each
> application instance but I fixed this.*
>
> *Does the installation of the Apache Kafka need a restart by the
> administrator or another change to configuration?*
>
> Is it a deadlock that I am missing?
>
> If it were a deadlock why it would not happen to all the
> application instances but instead only to a portion of them?
>
> Thanks in advance.
>
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5fLMQACgkQO4miYXKq
/Og8KhAArJyvbd8TcCTY1lBbgd+NigqJSOA153fXaP0UDDiAFl8bDBQQ3MziL6uz
etpFABw5C/B5izX4OTn233M6hZXZaT2+xNd7i3z3aLkXUvmQNZJvsZxWz+feZAuT
KQDQoSKzgdwuwyjKF2BHfrJCYd+fLjgAFTGTXBjIiJHhOUnM2rkSVuzX0PkZLRR0
6f0Otz5s8+BdyLyTfWdXcsF0y5rUl9a4dlOPHqt+LMCtDfJNQkAM204TzDBlisFq
orJhcTLWPNC/UUKhe5RHGh8BY2XQBThwEIGw3T6VNlERQifZZgHOUJFfJUgN0Jps
1ejaTWWA/UETy89Jp8d93hJT3mSh46JIq/ylykiYVtfga2F80WfKBpwVEE/oYdmQ
CbPWz2ZpMrJAHwYmo/dAayiMIv2Ux36A7NtmVr87r1oeb+euRzC/58p1yGBrNoGv
xyU/bYbUbvKwoMxwE1yox9fK/bKqTfvJ4x7Ov3um49oG53fkUGcjzEiIfAEPiP5m
+autPY6Q7oGErPQYPfraVVqfh14N3dx/KWP2gAsFcs1kvv8xlsNREJWcu/rHyLP8
K8oP+Jwe50qZesyvOco+GL/okkVGkp0DjJnQgKhKrTDuS14A6ZtY9t6/iSyKkgge
RW332D6Vj+V6B/yzELACRAjJvt6dctQSirvJ+1KsRkdsiVx9VjQ=
=ZzZl
-----END PGP SIGNATURE-----

Mime
View raw message