flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-12101) Race condition when concurrently running uploaded jars via REST
Date Mon, 08 Apr 2019 12:48:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-12101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812397#comment-16812397
] 

Till Rohrmann commented on FLINK-12101:
---------------------------------------

Ok, I think I understand the problem now. The following sequence produces the problem:

1. Job1 enter getOptimizedPlan[thread-1]: setAsContext(OptimizerPlanEnvironment) --> ctxEnvFactory
= Optimizer
2. Job2 enter getOptimizedPlan[thread-2]: setAsContext(OptimizerPlanEnvironment) --> ctxEnvFactory
= Optimizer
3. Job1 finish getOptimizedPlan[thread-1]: unsetContext --> ctxEnvFactory = null
4. Job2 enter invokeInteractiveModeForExecution [thread-2]: start execution with ctxEnvFactory
= null --> Instantiate a {{LocalEnvironment}}

The problem seems to be the static field {{ExecutionEnvironment#contextEnvironmentFactory}}
which is shared by all jobs. A quick fix could be to use thread local variables to set the
{{ContextEnvironmentFactory}}. In order to not break existing setups we could only respect
the thread local variable if {{ExecutionEnvironment#contextEnvironmentFactory}} is {{null}}.


> Race condition when concurrently running uploaded jars via REST
> ---------------------------------------------------------------
>
>                 Key: FLINK-12101
>                 URL: https://issues.apache.org/jira/browse/FLINK-12101
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.6.4, 1.7.2
>            Reporter: Maximilian Michels
>            Assignee: leesf
>            Priority: Major
>
> Flink enables to upload and run Jars via REST. When multiple uploaded jars are invoked
interactively to generate the JobGraph, the static initialization of the {{ContextEnvironment}},
when calls are interleaved, will override each other and produce a local execution of the
jar. The local execution uses an incorrect class loader and throws an exception like this:
> {noformat}
> 2019-04-02 14:25:05,549 ERROR <pipeline class>   - Failed to create job graph
> java.lang.RuntimeException: Pipeline execution failed
>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:117)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>     at <pipeline class run>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>     at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>     at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>     at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>     at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117)
>     at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151)
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>     at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>     at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>     at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:125)
>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:114)
>     ... 18 more
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate
outputs in order.
>     at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1164)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
>     at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.(StoppableSourceStreamTask.java:39)
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1398)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:682)
>     ... 1 more
> Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>     at java.util.ArrayList.readObject(ArrayList.java:797)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
>     at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
>     ... 12 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message