flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Mon, 10 Aug 2015 10:11:26 GMT
Congrats that you got your InputFormat working!
It is true, there can be a few inconsistencies in the Formats derived from
FileInputFormat.

It would be great if you could open JIRAs for these issues. Otherwise, the
might get lost on the mailing list.

Thanks, Fabian

2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Hi Fabian,
> thanks to your help I finally managed to successfully generate a DataSet
> from my folder but I think that there are some inconsistencies in the
> hierarchy of InputFormats.
> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
> and *enumerateNestedFiles*) while they doesn't take into account those
> flags.
> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix this
> shit"* that maybe should be removed or fixed :)
>
> Also maintaing aligned testForUnsplittable and decorateInputStream is
> somehow dangerous..
> And maybe visibility for getBlockIndexForPosition should be changed to
> protected?
>
> So basically, my needs was to implement
> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
> lot of overrides..am I doing something wrong or are those inputFormat
> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>
> public class RowBundleInputFormat extends
> TypeSerializerInputFormat<RowBundle> {
>
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(RowBundleInputFormat.class);
>
> /** The fraction that the last split may be larger than the others. */
> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
> private boolean objectRead;
>
> public RowBundleInputFormat() {
> super(new GenericTypeInfo<>(RowBundle.class));
> unsplittable = true;
> }
>
> @Override
> protected FSDataInputStream decorateInputStream(FSDataInputStream
> inputStream, FileInputSplit fileSplit) throws Throwable {
> return inputStream;
> }
>
> @Override
> protected boolean testForUnsplittable(FileStatus pathFile) {
> return true;
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> objectRead = false;
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return this.objectRead;
> }
>
> @Override
> public RowBundle nextRecord(RowBundle reuse) throws IOException {
> RowBundle yourObject = super.nextRecord(reuse);
> this.objectRead = true; // read only one object
> return yourObject;
> }
>
> // -------------------------------------------------------------------
> // Copied from FileInputFormat (override TypeSerializerInputFormat)
> // -------------------------------------------------------------------
> @Override
> public FileInputSplit[] createInputSplits(int minNumSplits)
> throws IOException {
> if (minNumSplits < 1) {
> throw new IllegalArgumentException(
> "Number of input splits has to be at least 1.");
> }
>
> // take the desired number of splits into account
> minNumSplits = Math.max(minNumSplits, this.numSplits);
>
> final Path path = this.filePath;
> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
> minNumSplits);
>
> // get all the files that are involved in the splits
> List<FileStatus> files = new ArrayList<FileStatus>();
> long totalLength = 0;
>
> final FileSystem fs = path.getFileSystem();
> final FileStatus pathFile = fs.getFileStatus(path);
>
> if (pathFile.isDir()) {
> // input is directory. list all contained files
> final FileStatus[] dir = fs.listStatus(path);
> for (int i = 0; i < dir.length; i++) {
> if (dir[i].isDir()) {
> if (enumerateNestedFiles) {
> if (acceptFile(dir[i])) {
> totalLength += addNestedFiles(dir[i].getPath(),
> files, 0, true);
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir[i].getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> } else {
> if (acceptFile(dir[i])) {
> files.add(dir[i]);
> totalLength += dir[i].getLen();
> // as soon as there is one deflate file in a directory,
> // we can not split it
> testForUnsplittable(dir[i]);
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.debug("File "
> + dir[i].getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> }
> } else {
> testForUnsplittable(pathFile);
>
> files.add(pathFile);
> totalLength += pathFile.getLen();
> }
> // returns if unsplittable
> if (unsplittable) {
> int splitNum = 0;
> for (final FileStatus file : files) {
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, file.getLen());
> Set<String> hosts = new HashSet<String>();
> for (BlockLocation block : blocks) {
> hosts.addAll(Arrays.asList(block.getHosts()));
> }
> long len = file.getLen();
> if (testForUnsplittable(file)) {
> len = READ_WHOLE_SPLIT_FLAG;
> }
> FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), 0, len, hosts.toArray(new String[hosts
> .size()]));
> inputSplits.add(fis);
> }
> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
> }
>
> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
> : 1));
>
> // now that we have the files, generate the splits
> int splitNum = 0;
> for (final FileStatus file : files) {
>
> final long len = file.getLen();
> final long blockSize = file.getBlockSize();
>
> final long minSplitSize;
> if (this.minSplitSize <= blockSize) {
> minSplitSize = this.minSplitSize;
> } else {
> if (LOG.isWarnEnabled()) {
> LOG.warn("Minimal split size of " + this.minSplitSize
> + " is larger than the block size of " + blockSize
> + ". Decreasing minimal split size to block size.");
> }
> minSplitSize = blockSize;
> }
>
> final long splitSize = Math.max(minSplitSize,
> Math.min(maxSplitSize, blockSize));
> final long halfSplit = splitSize >>> 1;
>
> final long maxBytesForLastSplit = (long) (splitSize *
> MAX_SPLIT_SIZE_DISCREPANCY);
>
> if (len > 0) {
>
> // get the block locations and make sure they are in order with
> // respect to their offset
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, len);
> Arrays.sort(blocks);
>
> long bytesUnassigned = len;
> long position = 0;
>
> int blockIndex = 0;
>
> while (bytesUnassigned > maxBytesForLastSplit) {
> // get the block containing the majority of the data
> blockIndex = getBlockIndexForPosition(blocks, position,
> halfSplit, blockIndex);
> // create a new split
> FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), position, splitSize,
> blocks[blockIndex].getHosts());
> inputSplits.add(fis);
>
> // adjust the positions
> position += splitSize;
> bytesUnassigned -= splitSize;
> }
>
> // assign the last split
> if (bytesUnassigned > 0) {
> blockIndex = getBlockIndexForPosition(blocks, position,
> halfSplit, blockIndex);
> final FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), position, bytesUnassigned,
> blocks[blockIndex].getHosts());
> inputSplits.add(fis);
> }
> } else {
> // special case with a file of zero bytes size
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, 0);
> String[] hosts;
> if (blocks.length > 0) {
> hosts = blocks[0].getHosts();
> } else {
> hosts = new String[0];
> }
> final FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), 0, 0, hosts);
> inputSplits.add(fis);
> }
> }
>
> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
> }
>
> /**
> * Recursively traverse the input directory structure and enumerate all
> * accepted nested files.
> *
> * @return the total length of accepted files.
> */
> private long addNestedFiles(Path path, List<FileStatus> files, long length,
> boolean logExcludedFiles) throws IOException {
> final FileSystem fs = path.getFileSystem();
>
> for (FileStatus dir : fs.listStatus(path)) {
> if (dir.isDir()) {
> if (acceptFile(dir)) {
> addNestedFiles(dir.getPath(), files, length,
> logExcludedFiles);
> } else {
> if (logExcludedFiles && LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir.getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> } else {
> if (acceptFile(dir)) {
> files.add(dir);
> length += dir.getLen();
> testForUnsplittable(dir);
> } else {
> if (logExcludedFiles && LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir.getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> }
> return length;
> }
>
> /**
> * Retrieves the index of the <tt>BlockLocation</tt> that contains the part
> * of the file described by the given offset.
> *
> * @param blocks
> *            The different blocks of the file. Must be ordered by their
> *            offset.
> * @param offset
> *            The offset of the position in the file.
> * @param startIndex
> *            The earliest index to look at.
> * @return The index of the block containing the given position.
> */
> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
> long halfSplitSize, int startIndex) {
> // go over all indexes after the startIndex
> for (int i = startIndex; i < blocks.length; i++) {
> long blockStart = blocks[i].getOffset();
> long blockEnd = blockStart + blocks[i].getLength();
>
> if (offset >= blockStart && offset < blockEnd) {
> // got the block where the split starts
> // check if the next block contains more than this one does
> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
> return i + 1;
> } else {
> return i;
> }
> }
> }
> throw new IllegalArgumentException("The given offset is not contained in
> the any block.");
> }
>
> }
>
>
>
>
> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> You need to do something like this:
>>
>> public class YourInputFormat extends FileInputFormat<Object> {
>>
>>    private boolean objectRead;
>>
>>    @Override
>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>       // Create one FileInputSplit for each file you want to read.
>>       // Check FileInputFormat for how to recursively enumerate files.
>>       // Input splits must start at 0 and have a length equal to length
>> of the file to read.
>>       return null;
>>    }
>>
>>    @Override
>>    public void open(FileInputSplit split) throws IOException {
>>       super.open(split);
>>       objectRead = false;
>>    }
>>
>>    @Override
>>    public boolean reachedEnd() throws IOException {
>>       return this.objectRead;
>>    }
>>
>>    @Override
>>    public Object nextRecord(Object reuse) throws IOException {
>>       Object yourObject = this.stream.read(); // use Kryo here to read
>> from this.stream()
>>       this.objectRead = true; // read only one object
>>       return yourObject;
>>    }
>> }
>>
>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> Sorry Fabian but I don't understand what I should do :(
>>> Could you provide me a simple snippet of code to achieve this?
>>>
>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>>
>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>> before, you can use that feature.
>>>>
>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> I have a directory containing a list of files, each one containing a
>>>>> kryo-serialized object.
>>>>> With json serialized objects I don't have that problem (but there I
>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I don't know your use case.
>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>> recursively read. A file can contain one or more objects. You can
also make
>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>
>>>>>> It is up to your use case what you need to implement.
>>>>>>
>>>>>>
>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> Should this be the case just reading recursively an entire directory
>>>>>>> containing one object per file?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>> and overwrite the createInputSplits method to just create
a single split
>>>>>>>> per file.
>>>>>>>>
>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> So what should I do?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>
>>>>>>>>>> The BinaryInputFormat expects metadata which is written
be the
>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>> So you cannot use the BinaryInputFormat to read a
file which does
>>>>>>>>>> not provide the metadata.
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> The file containing the serialized object is
7 bytes
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske
<
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This might be an issue with the blockSize
parameter of the
>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier
<
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>
>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>
>>>>>>>>>>>>> Moreover, in this example I put exactly
one object per file so
>>>>>>>>>>>>> it should be able to deserialize it,
right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you create your file by just sequentially
writing all
>>>>>>>>>>>>>> objects to the file using Kryo, you
can only read it with a parallelism of
>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>> Writing binary files in a way that
they can be read in
>>>>>>>>>>>>>> parallel is a bit tricky (and not
specific to Flink).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio
Pompermaier <
>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>> I;m trying to read a file serialized
with kryo but I get
>>>>>>>>>>>>>>> this exception (due to the fact
that the createInputSplits creates 8
>>>>>>>>>>>>>>> inputsplits, where just one is
not empty..).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Caused by: java.io.IOException:
Invalid argument
>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native
Method)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public static void main(String[]
args) throws Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>> //try-with-resources used to
autoclose resources
>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser")))
{
>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>> kryo.writeClassAndObject(output,
myObj);
>>>>>>>>>>>>>>> } catch (FileNotFoundException
ex) {
>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> try (Input input = new Input(
new
>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>> } catch (FileNotFoundException
ex) {
>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> final ExecutionEnvironment env
=
>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>> Configuration configuration =
new Configuration();
>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> TypeInformation<MyClass>
typeInfo = new
>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass>
inputFormat = new
>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> private static final class MyClassSerializer
extends
>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>> public void write(Kryo kryo,
Output output, MyClass object) {
>>>>>>>>>>>>>>> kryo.writeClassAndObject(output,
object);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>> public MyClass read(Kryo kryo,
Input input, Class<MyClass>
>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
>
> Flavio Pompermaier
>
> *Development Department*_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Phone:* +(39) 0461 283 702
> *Fax:* + (39) 0461 186 6433
> *Email:* pompermaier@okkam.it
> *Headquarters:* Trento (Italy), via G.B. Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
>

Mime
View raw message