flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kedar mhaswade <kedar.mhasw...@gmail.com>
Subject Re: Class loading issues when using Remote Execution Environment
Date Mon, 30 Apr 2018 12:49:15 GMT
Chesnay,

I have filed https://issues.apache.org/jira/browse/FLINK-9267 to keep track
of this issue.

Regards,
Kedar

On Fri, Apr 27, 2018 at 11:50 AM, kedar mhaswade <kedar.mhaswade@gmail.com>
wrote:

> Thanks again!
>
> This is strange. With both Flink 1.3.3 and Flink 1.6.0-SNAPSHOT and
> 1) copying gradoop-demo-shaded.jar to <Flink>/lib, and
> 2) using RemoteEnvironment with just jmHost and jmPort (no Jarfiles)
>
> I get the same exception [1], caused by:
> *Caused by: com.typesafe.config.ConfigException$Missing: No configuration
> setting found for key 'akka.remote.log-received-messages'.*
>
> This key is not documented anywhere, so I am confused. Also, copying with
> above, also JM and TM are running, the Flink dashboard on
> http://localhost:8081 is *unavailable*!
>
> With Flink 1.3.3 and Flink 1.6.0-SNAPSHOT
> 1) NOT copying gradoop-shaded.jar in <Flink>/lib, and
> 2) using RemoteEnvironment with jmHost, jmPort and jarFiles =
> {<absolute-path-to-gradoop-shaded.jar>}
>
> I get the same exception, however the Flink dashboard on
> http://localhost:8081 is *available*! This makes me believe that this is
> somehow an insidious classloading issue :(.
> I am really perplexed with this behavior. Let me stick to Flink 1.3.3
> installation as you suggested for now.
>
> If you have any other debugging tips, please let me know. But I am running
> out of ideas to make it run with non-Local Environment.
>
> Regards,
> Kedar
>
>
>
>
> [1] Gradoop shaded jar in <Flink>/lib -- exception on the web-app:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> start the ActorSystem needed to talk to the JobManager.
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:461)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:429)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:404)
> at org.apache.flink.client.RemoteExecutor.executePlanWithJars(
> RemoteExecutor.java:211)
> at org.apache.flink.client.RemoteExecutor.executePlan(
> RemoteExecutor.java:188)
> at org.apache.flink.api.java.RemoteEnvironment.execute(
> RemoteEnvironment.java:172)
> at org.apache.flink.api.java.ExecutionEnvironment.execute(
> ExecutionEnvironment.java:926)
> at org.gradoop.demo.server.RequestHandler.getResponse(
> RequestHandler.java:447)
> at org.gradoop.demo.server.RequestHandler.createResponse(
> RequestHandler.java:430)
> at org.gradoop.demo.server.RequestHandler.executeCypher(
> RequestHandler.java:121)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(
> JavaMethodInvokerFactory.java:60)
> at com.sun.jersey.server.impl.model.method.dispatch.
> AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(
> AbstractResourceMethodDispatchProvider.java:205)
> at com.sun.jersey.server.impl.model.method.dispatch.
> ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.
> java:75)
> at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.
> accept(HttpMethodRule.java:302)
> at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.
> accept(RightHandPathRule.java:147)
> at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.
> accept(ResourceClassRule.java:108)
> at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.
> accept(RightHandPathRule.java:147)
> at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(
> RootResourceClassesRule.java:84)
> at com.sun.jersey.server.impl.application.WebApplicationImpl._
> handleRequest(WebApplicationImpl.java:1542)
> at com.sun.jersey.server.impl.application.WebApplicationImpl._
> handleRequest(WebApplicationImpl.java:1473)
> at com.sun.jersey.server.impl.application.WebApplicationImpl.
> handleRequest(WebApplicationImpl.java:1419)
> at com.sun.jersey.server.impl.application.WebApplicationImpl.
> handleRequest(WebApplicationImpl.java:1409)
> at com.sun.jersey.server.impl.container.grizzly2.
> GrizzlyContainer._service(GrizzlyContainer.java:222)
> at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(
> GrizzlyContainer.java:192)
> at org.glassfish.grizzly.http.server.HttpHandler.doHandle(
> HttpHandler.java:164)
> at org.glassfish.grizzly.http.server.HttpHandlerChain.
> service(HttpHandlerChain.java:196)
> at org.glassfish.grizzly.http.server.HttpHandler.doHandle(
> HttpHandler.java:164)
> at org.glassfish.grizzly.http.server.HttpServerFilter.
> handleRead(HttpServerFilter.java:175)
> at org.glassfish.grizzly.filterchain.ExecutorResolver$
> 9.execute(ExecutorResolver.java:119)
> at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(
> DefaultFilterChain.java:265)
> at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(
> DefaultFilterChain.java:200)
> at org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(
> DefaultFilterChain.java:134)
> at org.glassfish.grizzly.filterchain.DefaultFilterChain.process(
> DefaultFilterChain.java:112)
> at org.glassfish.grizzly.ProcessorExecutor.execute(
> ProcessorExecutor.java:78)
> at org.glassfish.grizzly.nio.transport.TCPNIOTransport.
> fireIOEvent(TCPNIOTransport.java:815)
> at org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent(
> AbstractIOStrategy.java:112)
> at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.run0(
> WorkerThreadIOStrategy.java:115)
> at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.access$
> 100(WorkerThreadIOStrategy.java:55)
> at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy$
> WorkerThreadRunnable.run(WorkerThreadIOStrategy.java:135)
> at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(
> AbstractThreadPool.java:567)
> at org.glassfish.grizzly.threadpool.AbstractThreadPool$
> Worker.run(AbstractThreadPool.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not start the
> ActorSystem lazily.
> at org.apache.flink.client.program.ClusterClient$
> LazyActorSystemLoader.get(ClusterClient.java:230)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:459)
> ... 47 more
> *Caused by: com.typesafe.config.ConfigException$Missing: No configuration
> setting found for key 'akka.remote.log-received-messages'*
> at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
> at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174)
> at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24)
> at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:
> 114)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.
> apply(DynamicAccess.scala:78)
> at scala.util.Try$.apply(Try.scala:192)
> at akka.actor.ReflectiveDynamicAccess.createInstanceFor(
> DynamicAccess.scala:73)
> at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.
> apply(DynamicAccess.scala:84)
> at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.
> apply(DynamicAccess.scala:84)
> at scala.util.Success.flatMap(Try.scala:231)
> at akka.actor.ReflectiveDynamicAccess.createInstanceFor(
> DynamicAccess.scala:84)
> at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
> at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
> at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
> at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(
> AkkaUtils.scala:104)
> at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(
> AkkaUtils.scala:92)
> at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(
> AkkaUtils.scala)
> at org.apache.flink.client.program.ClusterClient$
> LazyActorSystemLoader.get(ClusterClient.java:226)
>
>
> On Thu, Apr 26, 2018 at 11:52 PM, Chesnay Schepler <chesnay@apache.org>
> wrote:
>
>> First, a small correction for my previous mail:
>>
>> I could reproduce your problems locally when submitting the fat-jar.
>> Turns out i never submitted the far-jar, as i didn't pass the jar file
>> argument to RemoteEnvironment.
>>
>> Now on to your questions:
>>
>> *What version of Flink are you trying with?*
>> I got it working *once *with 1.6-SNAPSHOT, but i would recommend
>> sticking with 1.3.1 since that is the version gradoop depends on. (i
>> haven't tried it with this version yet, but that's the next thing on my
>> list)
>>
>>
>> *Are there other config changes (flink-conf.yaml) that you made in your
>> cluster? *It was the standard config.
>>
>>
>> *Is org.apache.flink.api.common.io
>> <http://org.apache.flink.api.common.io>.FileOutputFormat a good alternative
>> to LocalCollectionOutputFormat? *It can be used, but if the result is
>> small you could also use accumulators.
>>
>>
>> *Do you think it is better to use jarFiles argument on
>> createRemoteEnvironment? *Yes, once we get it working this is the way to
>> go.
>>
>>
>> On 26.04.2018 18:42, kedar mhaswade wrote:
>>
>> Thanks Chesnay for your incredible help!
>>
>> I will try out the suggestions again. A few questions:
>> - What version of Flink are you trying with? I have had issues when I
>> placed the gradoop-demo-shaded.jar in the lib folder on Flink
>> installation (1.4 even refused to start!).
>> - Are there other config changes (flink-conf.yaml) that you made in your
>> cluster?
>> - Is org.apache.flink.api.common.io.FileOutputFormat a good alternative
>> to LocalCollectionOutputFormat, or should I use
>> HadoopOutputFormatCommonBase (I do want to run the cluster on YARN
>> later; at the moment I am trying on a standalone cluster).
>> - Do you think it is better to use jarFiles argument on
>> createRemoteEnvironment (which deploys the JAR only for this job and not
>> mess with the entire Flink cluster) a better option than placing the JAR(s)
>> in the lib folder?
>>
>> Thanks again,
>> Regards,
>> Kedar
>>
>>
>> On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <chesnay@apache.org>
>> wrote:
>>
>>> Small update:
>>>
>>> I could reproduce your problems locally when submitting the fat-jar.
>>> I could get the job to run after placing the gradoop-demo-shaded.jar
>>> into the lib folder.
>>> I have not tried yet placing only the gradoop jars into lib (but my
>>> guess is you missed a gradoop jar)
>>>
>>> Note that the job fails to run since you use
>>> "LocalCollectionOutputFormat" which can only be used for local execution,
>>> i.e. when the job submission and execution happen in the same JVM.
>>>
>>>
>>> On 25.04.2018 14:23, kedar mhaswade wrote:
>>>
>>> Thank you for your response!
>>>
>>> I have not tried the flink run app.jar route because the way the app is
>>> set up does not allow me to do it. Basically, the app is a web application
>>> which serves the UI and also submits a Flink job for running Cypher
>>> queries. It is a proof-of-concept app, but IMO, a very useful one.
>>>
>>> Here's how you can reproduce:
>>> 1) git clone git@github.com:kedarmhaswade/gradoop_demo.git (this is my
>>> fork of gradoop_demo)
>>> 2) cd gradoop_demo
>>> 3) git checkout dev => dev is the branch where my changes to make
>>> gradoop work with remote environment go.
>>> 4) mvn clean package => should bring the gradoop JARs that this app
>>> needs; these JARs should then be placed in <flink-install>/lib.
>>> 5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar
>>> <flink-install>/lib, cp ~/.m2/repository/org/gradoop/g
>>> radoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib,
>>> cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
>>> 6) start the local flink cluster (I have tried with latest
>>> (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh
>>> -- note the JM host and port
>>> 7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust
host
>>> and port per your cluster) => this is now configured to talk to the
>>> RemoteEnvironment at given JM host and port.
>>> 8) open a browser at: http://localhost:2342/gradoop/html/cypher.html
>>> 9) hit the query button => this would throw the exception
>>> 10) Ctrl C the process in 7 and just restart it as java -cp target/
>>> classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server
>>> => starts LocalEnvironment
>>> 11) do 9 again and see the results shown nicely in the browser.
>>>
>>> Here is the relevant code:
>>> 1) Choosing between
>>> <https://github.com/kedarmhaswade/gradoop_demo/blob/dev/src/main/java/org/gradoop/demo/server/Server.java#L107>
>>> a Remote or a Local Environment.
>>>
>>> The instructions are correct to my knowledge. Thanks for your
>>> willingness to try. I have tried everything I can. With different Flink
>>> versions, I get different results (I have also tried on 1.6-SNAPSHOT with
>>> class loading config being parent-first, or child-first).
>>>
>>> Regards,
>>> Kedar
>>>
>>>
>>> On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <chesnay@apache.org>
>>> wrote:
>>>
>>>> I couldn't spot any error in what you tried to do. Does the
>>>> job-submission succeed if you submit the jar through the command-line
>>>> client?
>>>>
>>>> Can you share the project, or a minimal reproducing version?
>>>>
>>>>
>>>> On 25.04.2018 00:41, kedar mhaswade wrote:
>>>>
>>>> I am trying to get gradoop_demo
>>>> <https://github.com/dbs-leipzig/gradoop_demo> (a gradoop based graph
>>>> visualization app) working on Flink with *Remote* Execution
>>>> Environment.
>>>>
>>>> This app, which is based on Gradoop, submits a job to the
>>>> *preconfigured* execution environment, collects the results and sends
>>>> it to the UI for rendering.
>>>>
>>>> When the execution environment is configured to be a LocalEnvironment
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/LocalEnvironment.html>,
>>>> everything works fine. But when I start a cluster (using <
>>>> flink-install-path>/bin/start-cluster.sh), get the Job Manager
>>>> endpoint (e.g. localhost:6123) and configure a RemoteEnvironment
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/ExecutionEnvironment.html#createRemoteEnvironment-java.lang.String-int-org.apache.flink.configuration.Configuration-java.lang.String...->
and
>>>> use that environment to run the job, I get exceptions [1].
>>>>
>>>> Based on the class loading doc
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_classloading.html>,
>>>> I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.
>>>> jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib
>>>> folder (hoping that that way those classes will be available to all the
>>>> executors in the cluster). I have ensured that the class that Flink fails
>>>> to load is in fact available in the Gradoop jars that I copied to the
>>>> /lib folder.
>>>>
>>>> I have tried using the RemoteEnvironment method with jarFiles argument
>>>> where the passed JAR file is a fat jar containing everything (in which case
>>>> there is no Gradoop JAR file in /lib folder).
>>>>
>>>> So, my questions are:
>>>> 1) How can I use RemoteEnvironment?
>>>> 2) Is there any other way of doing this *programmatically? *(That
>>>> means I can't do flink run since I am interested in the job execution
>>>> result as a blocking call -- which means ideally I don't want to use the
>>>> submit RESTful API as well). I just want RemoteEnvironment to work as well
>>>> as LocalEnvironment.
>>>>
>>>> Regards,
>>>> Kedar
>>>>
>>>>
>>>> [1]
>>>> 2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager
>>>>               - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d
>>>> (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
>>>> java.lang.NoClassDefFoundError: Could not initialize class
>>>> *org.gradoop.common.model.impl.id.GradoopId*
>>>>   at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
>>>>   at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamCla
>>>> ss.java:1887)
>>>>   at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
>>>>   at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
>>>>   at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
>>>>   at java.security.AccessController.doPrivileged(Native Method)
>>>>   at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamCl
>>>> ass.java:260)
>>>>   at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
>>>>   at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream
>>>> .java:1876)
>>>>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.ja
>>>> va:1745)
>>>>   at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
>>>>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
>>>>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>>>>   at java.util.HashSet.readObject(HashSet.java:341)
>>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>>   at java.lang.reflect.Method.invoke(Method.java:498)
>>>>   at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>>> .java:1158)
>>>>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j
>>>> ava:2169)
>>>>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2060)
>>>>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>>>>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>> m.java:2278)
>>>>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j
>>>> ava:2202)
>>>>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2060)
>>>>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>>>>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>>>>   at org.apache.flink.util.InstantiationUtil.deserializeObject(In
>>>> stantiationUtil.java:290)....
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message