flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 10:02:20 GMT
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Ah, I checked the code.
>
> The BinaryInputFormat expects metadata which is written be the
> BinaryOutputFormat.
> So you cannot use the BinaryInputFormat to read a file which does not
> provide the metadata.
>
> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> The file containing the serialized object is 7 bytes
>>
>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> This might be an issue with the blockSize parameter of the
>>> BinaryInputFormat.
>>> How large is the file with the single object?
>>>
>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> I also tried with
>>>>
>>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>>
>>>> but I get the same error :(
>>>>
>>>> Moreover, in this example I put exactly one object per file so it
>>>> should be able to deserialize it, right?
>>>>
>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> If you create your file by just sequentially writing all objects to
>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>> Writing binary files in a way that they can be read in parallel is a
>>>>> bit tricky (and not specific to Flink).
>>>>>
>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> Hi to all,
>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>> inputsplits, where just one is not empty..).
>>>>>>
>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>> at
>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>> at
>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>> at
>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> -----------------------------------------------
>>>>>> My program is basically the following:
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>>
>>>>>> ...
>>>>>> //try-with-resources used to autoclose resources
>>>>>> try (Output output = new Output(new
>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>> //serialise object
>>>>>> Kryo kryo=new Kryo();
>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>> } catch (FileNotFoundException ex) {
>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>> }
>>>>>>
>>>>>> //deserialise object
>>>>>>
>>>>>> myObj=null;
>>>>>>
>>>>>> try (Input input = new Input( new
>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>     Kryo kryo=new Kryo();
>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>> } catch (FileNotFoundException ex) {
>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> final ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>> MyClassSerializer.class);
>>>>>> Configuration configuration = new Configuration();
>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>> 64*1024*1024);
>>>>>>
>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>> inputFormat.configure(configuration);
>>>>>>
>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>> ds.print();
>>>>>>
>>>>>> }
>>>>>>
>>>>>> private static final class MyClassSerializer extends
>>>>>> Serializer<MyClass> {
>>>>>>
>>>>>> @Override
>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>> kryo.writeClassAndObject(output, object);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
type) {
>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Am I doing something wrong?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message