flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 11:30:19 GMT
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested
before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> I have a directory containing a list of files, each one containing a
> kryo-serialized object.
> With json serialized objects I don't have that problem (but there I use
>  env.readTextFile(path.withParameters(parameters)
> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>
> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> I don't know your use case.
>> The InputFormat interface is very flexible. Directories can be
>> recursively read. A file can contain one or more objects. You can also make
>> a smarter IF and put multiple (small) files into one split...
>>
>> It is up to your use case what you need to implement.
>>
>>
>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> Should this be the case just reading recursively an entire directory
>>> containing one object per file?
>>>
>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> You could implement your own InputFormat based on FileInputFormat and
>>>> overwrite the createInputSplits method to just create a single split per
>>>> file.
>>>>
>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> So what should I do?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ah, I checked the code.
>>>>>>
>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>> BinaryOutputFormat.
>>>>>> So you cannot use the BinaryInputFormat to read a file which does
not
>>>>>> provide the metadata.
>>>>>>
>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>> BinaryInputFormat.
>>>>>>>> How large is the file with the single object?
>>>>>>>>
>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> I also tried with
>>>>>>>>>
>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>
>>>>>>>>> but I get the same error :(
>>>>>>>>>
>>>>>>>>> Moreover, in this example I put exactly one object per
file so it
>>>>>>>>> should be able to deserialize it, right?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> If you create your file by just sequentially writing
all objects
>>>>>>>>>> to the file using Kryo, you can only read it with
a parallelism of 1.
>>>>>>>>>> Writing binary files in a way that they can be read
in parallel
>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>> I;m trying to read a file serialized with kryo
but I get this
>>>>>>>>>>> exception (due to the fact that the createInputSplits
creates 8
>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native
Method)
>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>
>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>
>>>>>>>>>>> public static void main(String[] args) throws
Exception {
>>>>>>>>>>>
>>>>>>>>>>> ...
>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>> //serialise object
>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> //deserialise object
>>>>>>>>>>>
>>>>>>>>>>> myObj=null;
>>>>>>>>>>>
>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>
>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat
= new
>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>
>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>> ds.print();
>>>>>>>>>>>
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> private static final class MyClassSerializer
extends
>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass
object) {
>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
type)
>>>>>>>>>>> {
>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message