flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 10:29:11 GMT
I have a directory containing a list of files, each one containing a
kryo-serialized object.
With json serialized objects I don't have that problem (but there I use
 env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> I don't know your use case.
> The InputFormat interface is very flexible. Directories can be recursively
> read. A file can contain one or more objects. You can also make a smarter
> IF and put multiple (small) files into one split...
>
> It is up to your use case what you need to implement.
>
>
> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Should this be the case just reading recursively an entire directory
>> containing one object per file?
>>
>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> You could implement your own InputFormat based on FileInputFormat and
>>> overwrite the createInputSplits method to just create a single split per
>>> file.
>>>
>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> So what should I do?
>>>>
>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah, I checked the code.
>>>>>
>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>> BinaryOutputFormat.
>>>>> So you cannot use the BinaryInputFormat to read a file which does not
>>>>> provide the metadata.
>>>>>
>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> The file containing the serialized object is 7 bytes
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>> BinaryInputFormat.
>>>>>>> How large is the file with the single object?
>>>>>>>
>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> I also tried with
>>>>>>>>
>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>
>>>>>>>> but I get the same error :(
>>>>>>>>
>>>>>>>> Moreover, in this example I put exactly one object per file
so it
>>>>>>>> should be able to deserialize it, right?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If you create your file by just sequentially writing
all objects
>>>>>>>>> to the file using Kryo, you can only read it with a parallelism
of 1.
>>>>>>>>> Writing binary files in a way that they can be read in
parallel is
>>>>>>>>> a bit tricky (and not specific to Flink).
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>> I;m trying to read a file serialized with kryo but
I get this
>>>>>>>>>> exception (due to the fact that the createInputSplits
creates 8
>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>
>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>
>>>>>>>>>> -----------------------------------------------
>>>>>>>>>> My program is basically the following:
>>>>>>>>>>
>>>>>>>>>> public static void main(String[] args) throws Exception
{
>>>>>>>>>>
>>>>>>>>>> ...
>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>> //serialise object
>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> //deserialise object
>>>>>>>>>>
>>>>>>>>>> myObj=null;
>>>>>>>>>>
>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>
>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat
= new
>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>
>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>> ds.print();
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass
object) {
>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
type) {
>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message