flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Camelia-Elena Ciolac <camelia-elena.cio...@inria.fr>
Subject Re: JSON file not found - StreamExecutionEnvironment
Date Fri, 07 Nov 2014 15:57:05 GMT
Dear Gyula, 

Thank You very much for your idea that indeed makes the program surpass that error. 

Now I run into a deserialization error and I have some doubts of its cause. 

Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous
types of records (e.g. corresponding to events having different structures of fields) ? 

I copy&paste below the whole error trace as it may contain some hints that maybe can help
You suggest me a workaround, please. This is the only output that I receive after launching
the program in execution. 

------------------------------------- 

Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2
(1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE):
java.lang.RuntimeException: Cannot deserialize invokable object 
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) 
at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)

at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)

at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)

at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
at java.lang.reflect.Method.invoke(Method.java:597) 
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) 
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) 
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException:
org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) 
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) 
... 10 more 
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap

at java.net.URLClassLoader$1.run(URLClassLoader.java:202) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:190) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:306) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:247) 
at java.lang.Class.forName0(Native Method) 
at java.lang.Class.forName(Class.java:249) 
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) 
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) 
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) 
... 12 more 

at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) 
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
at java.lang.Thread.run(Thread.java:695) 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot
1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot
deserialize invokable object 
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) 
at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)

at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)

at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)

at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
at java.lang.reflect.Method.invoke(Method.java:597) 
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) 
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) 
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException:
org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) 
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) 
... 10 more 
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap

at java.net.URLClassLoader$1.run(URLClassLoader.java:202) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:190) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:306) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:247) 
at java.lang.Class.forName0(Native Method) 
at java.lang.Class.forName(Class.java:249) 
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) 
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) 
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) 
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) 
... 12 more 

at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) 
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
at java.lang.Thread.run(Thread.java:695) 

at org.apache.flink.client.program.Client.run(Client.java:325) 
at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62) 
at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80) 
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)

at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
at java.lang.reflect.Method.invoke(Method.java:597) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)

at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 

------------------------- 

Best regards, 
Camelia 

----- Mail original -----

> De: "Gyula Fóra" <gyfora@apache.org>
> À: user@flink.incubator.apache.org
> Envoyé: Vendredi 7 Novembre 2014 14:06:47
> Objet: Re: JSON file not found - StreamExecutionEnvironment

> Hello,

> Please try running the same job, but for the file path drop the file:// so
> just " /Users/X/Y/Z/theFile. txt"

> I think this will fix your problem, however we need to fix this in the api.

> Regards,
> Gyula

> On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <
> camelia-elena.ciolac@inria.fr > wrote:

> > Hello,
> 

> > I wrote a small program to test the JSON parsing capability with the new
> > streaming API of Flink 0.7.0-incubating, but I ran into a "file not found"
> > exception.
> 
> > As a context for my question:
> 

> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.createLocalEnvironment(); // neither with
> > StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
> 
> > DataStream<Tuple4<String,Integer,Integer,Long>> ds1 =
> > env.readTextFile(args[0]). flatMap (....);
> 

> > At runtime I pass the arguments as follows:
> 

> > flink run --jarfile ./quickstart/target/quickstart-0.1.jar --class
> > org.apache.flink.ReadJSONDirectly --arguments
> > file:///Users/X/Y/Z/theFile.txt file:///Users/X/Y/Z/outputFile.txt -v
> 

> > and even if the file exists in the path, I still get the error stack:
> 

> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by: java.lang.IllegalArgumentException: File not found:
> > file:///Users/X/Y/Z/theFile.txt
> 
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
> 
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
> 
> > at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 
> > at java.lang.reflect.Method.invoke(Method.java:597)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 

> > The same thing happens if I put the file in HDFS and pass as argument the
> > hdfs:///pathToFile/theFile.txt
> 

> > What could be the cause, in your opinion?
> 

> > Thank you in advance!
> 
> > Camelia
> 

Mime
View raw message