flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 10:01:25 GMT
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the
BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not
provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> The file containing the serialized object is 7 bytes
>
> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> This might be an issue with the blockSize parameter of the
>> BinaryInputFormat.
>> How large is the file with the single object?
>>
>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> I also tried with
>>>
>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>
>>> but I get the same error :(
>>>
>>> Moreover, in this example I put exactly one object per file so it should
>>> be able to deserialize it, right?
>>>
>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> If you create your file by just sequentially writing all objects to the
>>>> file using Kryo, you can only read it with a parallelism of 1.
>>>> Writing binary files in a way that they can be read in parallel is a
>>>> bit tricky (and not specific to Flink).
>>>>
>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>> inputsplits, where just one is not empty..).
>>>>>
>>>>> Caused by: java.io.IOException: Invalid argument
>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>> at
>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>> at
>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>> at
>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>> at
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> -----------------------------------------------
>>>>> My program is basically the following:
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>> ...
>>>>> //try-with-resources used to autoclose resources
>>>>> try (Output output = new Output(new
>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>> //serialise object
>>>>> Kryo kryo=new Kryo();
>>>>> kryo.writeClassAndObject(output, myObj);
>>>>> } catch (FileNotFoundException ex) {
>>>>> LOG.error(ex.getMessage(), ex);
>>>>> }
>>>>>
>>>>> //deserialise object
>>>>>
>>>>> myObj=null;
>>>>>
>>>>> try (Input input = new Input( new
>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>     Kryo kryo=new Kryo();
>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>> } catch (FileNotFoundException ex) {
>>>>> LOG.error(ex.getMessage(), ex);
>>>>> }
>>>>>
>>>>>
>>>>> final ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>> MyClassSerializer.class);
>>>>> Configuration configuration = new Configuration();
>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>> 64*1024*1024);
>>>>>
>>>>> TypeInformation<MyClass> typeInfo = new
>>>>> GenericTypeInfo<>(MyClass.class);
>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>> inputFormat.configure(configuration);
>>>>>
>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>> ds.print();
>>>>>
>>>>> }
>>>>>
>>>>> private static final class MyClassSerializer extends
>>>>> Serializer<MyClass> {
>>>>>
>>>>> @Override
>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>> kryo.writeClassAndObject(output, object);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type)
{
>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>> }
>>>>> }
>>>>>
>>>>> Am I doing something wrong?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message