flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
Date Wed, 08 Nov 2017 15:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244148#comment-16244148

ASF GitHub Bot commented on FLINK-8005:

GitHub user GJL reopened a pull request:


    [FLINK-8005] [runtime] Set user code class loader before snapshot

    ## What is the purpose of the change
    *During checkpointing, user code may dynamically load classes from the user code
    jar. This is a problem if the thread invoking the snapshot callbacks does not
    have the user code class loader set as its context class loader. This commit
    makes sure that the correct class loader is set.*
    ## Brief change log
      - *Set user code class loader in ThreadFactory of `Task#asyncCallDispatcher`*
      - *Clean up TaskAsyncCallTest*
    ## Verifying this change
    This change added tests and can be verified as follows:
      - *Added unit tests to verify that context class loader is set*
      - *Started job with FlinkKafkaProducer011 and verified that snapshotting works*
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes
/ **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    ## Documentation
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-8005-2

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4980
commit 07e9e4206842319884e424b3636493e4d7f8c7a4
Author: gyao <gary@data-artisans.com>
Date:   2017-11-08T10:46:45Z

    [FLINK-8005] [runtime] Set user code class loader before snapshot
    During checkpointing, user code may dynamically load classes from the user code
    jar. This is a problem if the thread invoking the snapshot callbacks does not
    have the user code class loader set as its context class loader. This commit
    makes sure that the correct class loader is set.

commit e5c5a42deb27949b26698cf07d9ae88459805b0d
Author: gyao <gary@data-artisans.com>
Date:   2017-11-08T15:27:54Z

    [FLINK-8005] [runtime] Move tests in TaskStopTest to TaskAsyncCallTest


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> ------------------------------------------------------------------
>                 Key: FLINK-8005
>                 URL: https://issues.apache.org/jira/browse/FLINK-8005
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, Kafka Connector, State Backends, Checkpointing
>    Affects Versions: 1.4.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Blocker
>             Fix For: 1.4.0
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s context class
loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are resolved to class
objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
>     if (value instanceof Class)
>         return value;
>     else if (value instanceof String)
>         return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader());
>     else
>         throw new ConfigException(name, value, "Expected a Class instance or class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Collection
Source -> Sink: kafka-sink-1510048188383 (1/1).
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
> 	... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer
for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer
could not be found.
> 	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> 	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
> 	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
> 	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
> 	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
> 	at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:360)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
> 	at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> 	... 12 more
> {noformat}
> *How to reproduce*
> Note that the problem only appears when a job is deployed on a cluster. 
> # Build Flink 1.4
> # Build test job https://github.com/GJL/flink-kafka011-producer-test with {{mvn -o clean
install -Pbuild-jar}}
> # Start job:
> {noformat}
> bin/flink run -c com.garyyao.StreamingJob /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar
> {noformat}

This message was sent by Atlassian JIRA

View raw message