flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: JSON file not found - StreamExecutionEnvironment
Date Fri, 07 Nov 2014 16:06:04 GMT
Dear Camelia,

The error you got means that Flink cannot deserialize your user-function
when trying to create vertexes in the processing graph. It could be that
you are using some non-static inner class which cannot be serialized for
parsing the JSON.

So for all your user functions for example flatmapfunction, the object you
are passing must be serializable using default java serialization.

(This is not an error at runtime when parsing sending the tuples, this
happened when setting up the job)

Regards,
Gyula


On Fri, Nov 7, 2014 at 4:57 PM, Camelia-Elena Ciolac <
camelia-elena.ciolac@inria.fr> wrote:

> Dear Gyula,
>
> Thank You very much for your idea that indeed makes the program surpass
> that error.
>
> Now I run into a deserialization error and I have some doubts of its
> cause.
>
> Is it possible in Flink 0.7.0-incubating to parse an input JSON file
> containing heterogeneous types of records (e.g. corresponding to events
> having different structures of fields)  ?
>
> I copy&paste below the whole error trace as it may contain some hints that
> maybe can help You suggest me a workaround, please. This is the only output
> that I receive after launching the program in execution.
>
>
> -------------------------------------
>
>
> Error: The program execution failed: java.lang.Exception: Failed to deploy
> the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1
> (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE):
> java.lang.RuntimeException: Cannot deserialize invokable object
>     at
> org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
>     at
> org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
>     at
> org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
>     at
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
>     at
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
>     at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
> Caused by: org.apache.commons.lang3.SerializationException:
> java.lang.ClassNotFoundException:
> org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
>     at
> org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
>     ... 10 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:249)
>     at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
>     at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
>     ... 12 more
>
>     at
> org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     at java.lang.Thread.run(Thread.java:695)
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: java.lang.Exception: Failed to deploy the task flatMap-2
> (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886
> (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize
> invokable object
>     at
> org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
>     at
> org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
>     at
> org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
>     at
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
>     at
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
>     at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
> Caused by: org.apache.commons.lang3.SerializationException:
> java.lang.ClassNotFoundException:
> org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
>     at
> org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
>     ... 10 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:249)
>     at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
>     at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
>     at
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
>     ... 12 more
>
>     at
> org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     at java.lang.Thread.run(Thread.java:695)
>
>     at org.apache.flink.client.program.Client.run(Client.java:325)
>     at
> org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62)
>     at
> org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80)
>     at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)
>     at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
>
> -------------------------
>
> Best regards,
> Camelia
>
>
> ------------------------------
>
> *De: *"Gyula Fóra" <gyfora@apache.org>
> *À: *user@flink.incubator.apache.org
> *Envoyé: *Vendredi 7 Novembre 2014 14:06:47
> *Objet: *Re: JSON file not found - StreamExecutionEnvironment
>
>
> Hello,
>
> Please try running the same job, but for the file path drop the file:// so
> just "/Users/X/Y/Z/theFile.txt"
>
> I think this will fix your problem, however we need to fix this in the api.
>
> Regards,
> Gyula
>
> On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <
> camelia-elena.ciolac@inria.fr> wrote:
>
>> Hello,
>>
>> I wrote a small program to test the JSON parsing capability with the new
>> streaming API of Flink 0.7.0-incubating, but I ran into a "file not found"
>> exception.
>> As a context for my question:
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();   // neither with
>> StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
>> DataStream<Tuple4<String,Integer,Integer,Long>> ds1 =
>> env.readTextFile(args[0]). flatMap (....);
>>
>> At runtime I pass the arguments as follows:
>>
>> flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class
>> org.apache.flink.ReadJSONDirectly --arguments
>> file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v
>>
>> and even if the file exists in the path, I still get the error stack:
>>
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>     at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by: java.lang.IllegalArgumentException: File not found:
>> file:///Users/X/Y/Z/theFile.txt
>>     at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
>>     at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
>>     at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>     at java.lang.reflect.Method.invoke(Method.java:597)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>     ... 6 more
>>
>> The same thing happens if I put the file in HDFS and pass as argument
>> the   hdfs:///pathToFile/theFile.txt
>>
>> What could be the cause, in your opinion?
>>
>>
>> Thank you in advance!
>> Camelia
>>
>>
>>
>>
>
>

Mime
View raw message