flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: ClassNotFoundException with Flink 1.4.1 and Kafka connector
Date Wed, 28 Feb 2018 13:44:55 GMT
Well we just ended up in ClassLoader hell...

There are 2 config options that could help:

  * add "org.apache.kafka." to "classloader.parent-first-patterns"
      o make sure you include the default as well
        (|"java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback")|
  * set "classloader.resolve-order" to "parent-first"


On 28.02.2018 14:28, Debasish Ghosh wrote:
> Thanks for the suggestion. I copied the application jar to lib. The 
> error doesn't come but I get another error related to Kafka ..
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.serialization.ByteArraySerializer is not an 
> instance of org.apache.kafka.common.serialization.Serializer
> at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342)
> ... 17 more
>
> regards.
>
> On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <chesnay@apache.org 
> <mailto:chesnay@apache.org>> wrote:
>
>     Hello,
>
>     this is probably caused by a known issue in 1.4.1:
>     https://issues.apache.org/jira/browse/FLINK-8741
>     <https://issues.apache.org/jira/browse/FLINK-8741>
>
>     This bug is not present in 1.4.0, and it will fixed in 1.4.2 which
>     should be released within the next days.
>
>     As a temporary workaround you can copy your app-assembly-1.0.jar
>     into the /lib directory.
>
>
>     On 28.02.2018 08:45, Debasish Ghosh wrote:
>>     Hi -
>>
>>     Facing a ClassNotFoundException while running Flink application
>>     that reads from Kafka. This is a modified version of the NYC Taxi
>>     App that reads from Kafka.
>>
>>     I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..
>>
>>     Here's the exception ..
>>
>>         java.lang.ClassNotFoundException:
>>         com.lightbend.fdp.sample.flink.app.TaxiRideTSAssigner
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>         at java.lang.Class.forName0(Native Method)
>>         at java.lang.Class.forName(Class.java:348)
>>         at
>>         org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>         at
>>         java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
>>         at
>>         java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>>         at
>>         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>>         at
>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>         at
>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>>         at
>>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
>>         at
>>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
>>         at
>>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
>>         at
>>         org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
>>         at
>>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
>>         at
>>         org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>>         at
>>         org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>         at
>>         org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>>         at
>>         org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>     I run the application as follows ..
>>
>>         $ ./bin/flink run
>>         /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar
>>         --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout
>>
>>
>>     I verified that the jar contains the class ..
>>
>>         $ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
>>           2090 Wed Feb 28 12:59:52 IST 2018
>>         com/lightbend/fdp/sample/flink/app/TaxiRideTSAssigner.class
>>
>>
>>
>>     Here are the relevant dependencies in build ..
>>
>>     val flinkScala            = "org.apache.flink"             %%
>>      "flink-scala"                    % "1.4.1" % "provided"
>>     val flinkStreamingScala   = "org.apache.flink"             %%
>>      "flink-streaming-scala"          % "1.4.1" % "provided"
>>     val flinkKafka            = "org.apache.flink"             %%
>>      "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j",
>>     "slf4j-log4j12")
>>
>>
>>     any help ?
>>
>>     regards.
>>
>>
>>     -- 
>>     Debasish Ghosh
>>     http://manning.com/ghosh2
>>     http://manning.com/ghosh
>>
>>     Twttr: @debasishg
>>     Blog: http://debasishg.blogspot.com
>>     Code: http://github.com/debasishg
>
>
>
>
>
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg



Mime
View raw message