flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 10:08:24 GMT
Should this be the case just reading recursively an entire directory
containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> You could implement your own InputFormat based on FileInputFormat and
> overwrite the createInputSplits method to just create a single split per
> file.
>
> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> So what should I do?
>>
>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Ah, I checked the code.
>>>
>>> The BinaryInputFormat expects metadata which is written be the
>>> BinaryOutputFormat.
>>> So you cannot use the BinaryInputFormat to read a file which does not
>>> provide the metadata.
>>>
>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> The file containing the serialized object is 7 bytes
>>>>
>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> This might be an issue with the blockSize parameter of the
>>>>> BinaryInputFormat.
>>>>> How large is the file with the single object?
>>>>>
>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> I also tried with
>>>>>>
>>>>>> DataSet<RowBundle> ds =
>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>
>>>>>> but I get the same error :(
>>>>>>
>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>> should be able to deserialize it, right?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If you create your file by just sequentially writing all objects
to
>>>>>>> the file using Kryo, you can only read it with a parallelism
of 1.
>>>>>>> Writing binary files in a way that they can be read in parallel
is a
>>>>>>> bit tricky (and not specific to Flink).
>>>>>>>
>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>> I;m trying to read a file serialized with kryo but I get
this
>>>>>>>> exception (due to the fact that the createInputSplits creates
8
>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>
>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>> at
>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> -----------------------------------------------
>>>>>>>> My program is basically the following:
>>>>>>>>
>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>
>>>>>>>> ...
>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>> try (Output output = new Output(new
>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>> //serialise object
>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>> }
>>>>>>>>
>>>>>>>> //deserialise object
>>>>>>>>
>>>>>>>> myObj=null;
>>>>>>>>
>>>>>>>> try (Input input = new Input( new
>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> final ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>> MyClassSerializer.class);
>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>> 64*1024*1024);
>>>>>>>>
>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>
>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>> ds.print();
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>> Serializer<MyClass> {
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void write(Kryo kryo, Output output, MyClass object)
{
>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
type) {
>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message