flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 09:49:16 GMT
This might be an issue with the blockSize parameter of the
BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> I also tried with
>
> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>
> but I get the same error :(
>
> Moreover, in this example I put exactly one object per file so it should
> be able to deserialize it, right?
>
> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> If you create your file by just sequentially writing all objects to the
>> file using Kryo, you can only read it with a parallelism of 1.
>> Writing binary files in a way that they can be read in parallel is a bit
>> tricky (and not specific to Flink).
>>
>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> Hi to all,
>>> I;m trying to read a file serialized with kryo but I get this exception
>>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>>> just one is not empty..).
>>>
>>> Caused by: java.io.IOException: Invalid argument
>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>> at
>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>> at
>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>> at
>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> -----------------------------------------------
>>> My program is basically the following:
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> ...
>>> //try-with-resources used to autoclose resources
>>> try (Output output = new Output(new
>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>> //serialise object
>>> Kryo kryo=new Kryo();
>>> kryo.writeClassAndObject(output, myObj);
>>> } catch (FileNotFoundException ex) {
>>> LOG.error(ex.getMessage(), ex);
>>> }
>>>
>>> //deserialise object
>>>
>>> myObj=null;
>>>
>>> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>>>     Kryo kryo=new Kryo();
>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>> } catch (FileNotFoundException ex) {
>>> LOG.error(ex.getMessage(), ex);
>>> }
>>>
>>>
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>> MyClassSerializer.class);
>>> Configuration configuration = new Configuration();
>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>> 64*1024*1024);
>>>
>>> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
>>> final BinaryInputFormat<MyClass> inputFormat = new
>>> TypeSerializerInputFormat<>(typeInfo);
>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>> inputFormat.configure(configuration);
>>>
>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>> ds.print();
>>>
>>> }
>>>
>>> private static final class MyClassSerializer extends Serializer<MyClass>
>>> {
>>>
>>> @Override
>>> public void write(Kryo kryo, Output output, MyClass object) {
>>> kryo.writeClassAndObject(output, object);
>>> }
>>>
>>> @Override
>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>> return (MyClass) kryo.readClassAndObject(input);
>>> }
>>> }
>>>
>>> Am I doing something wrong?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>
>

Mime
View raw message