beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guenther Grill (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-2943) Beam Flink execution not working
Date Tue, 12 Sep 2017 06:57:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Guenther Grill updated BEAM-2943:
---------------------------------
    Description: 
Hi,

I followed the guide https://beam.apache.org/documentation/runners/flink/ to run beam program
within a flink cluster. 

The output of the dependency-command is:

{code}
{{mvn dependency:tree -Pflink-runner |grep flink                                         
                                    
[INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
[INFO]    +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
[INFO]    |  \- org.apache.flink:force-shading:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-core:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-java:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
}}
{code}

Then I started the flink cluster with the correct version with docker-compose

{{
export JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10

docker-compose up -d
}}

The compose file looks like this:

{{
version: '3.3'
services:
  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "6123:6123"
      - "8081:8081"
    volumes:
      - /tmp:/tmp
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]

  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
}}

The flink cluster works, but when I execute 

{{
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=pom.xml \
      --output=/path/to/counts \
      --flinkMaster=[CORRECT_IP_OF_CONTAINER]:6123 \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"
}}

I get:

{{
2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199).
2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd.
2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Job recovers via failover strategy: full graph restart
2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager             
  - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199)
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource
(at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))':
Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150)
	... 24 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not
read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63)
	... 25 more
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
	... 26 more

}}

  was:
Hi,

I followed the guide https://beam.apache.org/documentation/runners/flink/ to run beam program
within a flink cluster. 

The output of the dependency-command is:

{{mvn dependency:tree -Pflink-runner |grep flink                                         
                                    
[INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
[INFO]    +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
[INFO]    |  \- org.apache.flink:force-shading:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-core:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-java:jar:1.3.0:runtime
[INFO]    |  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
[INFO]    +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
}}

Then I started the flink cluster with the correct version with docker-compose

{{
export JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10

docker-compose up -d
}}

The compose file looks like this:

{{
version: '3.3'
services:
  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "6123:6123"
      - "8081:8081"
    volumes:
      - /tmp:/tmp
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]

  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
}}

The flink cluster works, but when I execute 

{{
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=pom.xml \
      --output=/path/to/counts \
      --flinkMaster=[CORRECT_IP_OF_CONTAINER]:6123 \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"
}}

I get:

{{
2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199).
2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd.
2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Job recovers via failover strategy: full graph restart
2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager             
  - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager             
  - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199)
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource
(at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))':
Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150)
	... 24 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not
read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63)
	... 25 more
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
	... 26 more

}}


> Beam Flink execution not working
> --------------------------------
>
>                 Key: BEAM-2943
>                 URL: https://issues.apache.org/jira/browse/BEAM-2943
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.1.0
>         Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 (2017-08-06)
x86_64 GNU/Linux
>            Reporter: Guenther Grill
>            Assignee: Aljoscha Krettek
>              Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/documentation/runners/flink/ to run beam
program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> {{mvn dependency:tree -Pflink-runner |grep flink                                    
                                         
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]    +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]    |  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]    |  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]    +- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]    |  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]    +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]    +- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]    |  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]    +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]    +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> }}
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {{
> export JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> }}
> The compose file looks like this:
> {{
> version: '3.3'
> services:
>   jobmanager:
>     image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
>     expose:
>       - "6123"
>     ports:
>       - "6123:6123"
>       - "8081:8081"
>     volumes:
>       - /tmp:/tmp
>     command: jobmanager
>     environment:
>       - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
>   taskmanager:
>     image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
>     expose:
>       - "6121"
>       - "6122"
>     depends_on:
>       - jobmanager
>     command: taskmanager
>     environment:
>       - JOB_MANAGER_RPC_ADDRESS=[CORRECT_IP_OF_CONTAINER]
> }}
> The flink cluster works, but when I execute 
> {{
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>     -Pflink-runner \
>     -Dexec.args="--runner=FlinkRunner \
>       --inputFile=pom.xml \
>       --output=/path/to/counts \
>       --flinkMaster=[CORRECT_IP_OF_CONTAINER]:6123 \
>       --filesToStage=target/word-count-beam-bundled-0.1.jar"
> }}
> I get:
> {{
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager        
       - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager        
       - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Job recovers via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager        
       - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager        
       - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource
(at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))':
Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> 	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150)
> 	... 24 more
> Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> 	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> 	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63)
> 	... 25 more
> Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> 	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> 	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> 	... 26 more
> }}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message