flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 12:40:13 GMT
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Enumeration of nested files is a feature of the FileInputFormat.
> If you implement your own IF based on FileInputFormat as I suggested
> before, you can use that feature.
>
> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> I have a directory containing a list of files, each one containing a
>> kryo-serialized object.
>> With json serialized objects I don't have that problem (but there I use
>>  env.readTextFile(path.withParameters(parameters)
>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>
>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> I don't know your use case.
>>> The InputFormat interface is very flexible. Directories can be
>>> recursively read. A file can contain one or more objects. You can also make
>>> a smarter IF and put multiple (small) files into one split...
>>>
>>> It is up to your use case what you need to implement.
>>>
>>>
>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> Should this be the case just reading recursively an entire directory
>>>> containing one object per file?
>>>>
>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> You could implement your own InputFormat based on FileInputFormat and
>>>>> overwrite the createInputSplits method to just create a single split
per
>>>>> file.
>>>>>
>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> So what should I do?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, I checked the code.
>>>>>>>
>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>> BinaryOutputFormat.
>>>>>>> So you cannot use the BinaryInputFormat to read a file which
does
>>>>>>> not provide the metadata.
>>>>>>>
>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This might be an issue with the blockSize parameter of
the
>>>>>>>>> BinaryInputFormat.
>>>>>>>>> How large is the file with the single object?
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> I also tried with
>>>>>>>>>>
>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>
>>>>>>>>>> but I get the same error :(
>>>>>>>>>>
>>>>>>>>>> Moreover, in this example I put exactly one object
per file so it
>>>>>>>>>> should be able to deserialize it, right?
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> If you create your file by just sequentially
writing all objects
>>>>>>>>>>> to the file using Kryo, you can only read it
with a parallelism of 1.
>>>>>>>>>>> Writing binary files in a way that they can be
read in parallel
>>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier
<
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>> I;m trying to read a file serialized with
kryo but I get this
>>>>>>>>>>>> exception (due to the fact that the createInputSplits
creates 8
>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native
Method)
>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>
>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>
>>>>>>>>>>>> public static void main(String[] args) throws
Exception {
>>>>>>>>>>>>
>>>>>>>>>>>> ...
>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>> //serialise object
>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>
>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>
>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>
>>>>>>>>>>>> TypeInformation<MyClass> typeInfo =
new
>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat
= new
>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>
>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> private static final class MyClassSerializer
extends
>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public void write(Kryo kryo, Output output,
MyClass object) {
>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input,
Class<MyClass>
>>>>>>>>>>>> type) {
>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>> }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Flavio
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message