flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Flink Mongodb
Date Wed, 05 Nov 2014 12:17:06 GMT
How about going for an optional parameter for the InputFormat to determine
into how many splits each region is split?
That would be a lightweight option to control the number of splits with low
effort (on our side).

2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:

> So how are we going to proceed here? Is someone willing to help me in
> improving the splitting policy or we leave it as it is now?
> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org> wrote:
>> I agree. Going for more splits with smaller key regions is a good idea.
>> However, it might be a bit difficult to determine a good number of splits
>> as the size of a split depends on its density. Too large splits are prone
>> to cause data skew, too small ones will increase the overhead of split
>> assignment.
>> A solution for this problem could be to add an optional parameter to
>> the IF to give an upper bound for the number of InputSplits.
>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>> Typo: it should have meant that workers that get a larger split will get
>>> fewer additional splits.
>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>> InputSplits are assigned lazily at runtime, which gives you many of the
>>> benefits of re-assigning without the nastyness.
>>> Can you write the logic that creates the splits such that it creates
>>> multiple splits per region? Then the lazy assignment will make sure that
>>> workers that get a larger split will get get additional splits than workers
>>> that get smaller splits...
>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>> Hmm, that's good question indeed. I am not familiar with HBase's mode of
>>>> operation.
>>>> I would assume, that HBase uses range partitioning to partition a table
>>>> into regions. That way it is rather easy to balance the size of regions,
>>>> long as there is no single key that occurs very often. I am not sure if it
>>>> is possible to overcome data skew cause by frequent keys.
>>>> However as I said, these are just assumption. I will have a look at
>>>> HBase's internals for verification.
>>>> In any case, Flink does currently not support reassigning or splitting
>>>> of InputSplits at runtime.
>>>> Also initially generating balanced InputSplits willl be tricky. That
>>>> would be possible if we can efficiently determine the "density" of a key
>>>> range when creating the InputSplits. However, I'm a bit skeptical that
>>>> this can be done...
>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>> From what I know HBase manages the regions but the fact that they are
>>>>> evenly distributed depends on a well-designed key..
>>>>> if it is not the case you could encounter very unbalanced regions
>>>>> (i.e. hot spotting).
>>>>> Could it be a good idea to create a split policy that compares the
>>>>> size of all the splits and generate equally-sized split that can be
>>>>> reassigned to free worker if the original assigned one is still busy?
>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>> wrote:
>>>>>> ad 1) HBase manages the regions and should also take care of their
>>>>>> uniform size.
>>>>>> as 2) Dynamically changing InputSplits is not possible at the moment.
>>>>>> However, the input split generation of the IF should also be able
to handle
>>>>>> such issues upfront. In fact, the IF could also generate multiple
>>>>>> per region (this would be necessary to make sure that the minimum
number of
>>>>>> splits is generated if there are less regions than required splits).
>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>> Ok, thanks for the explanation!
>>>>>>> That was more or less like I thought it should be but there are
>>>>>>> still points I'd like to clarify:
>>>>>>> 1 - What if a region is very big and there are other regions
>>>>>>> small..? There will be one slot that takes a very long time while
>>>>>>> others will stay inactive..
>>>>>>> 2 - Do you think it is possible to implement this in an adaptive
>>>>>>> (stop processing of huge region if it worth it and assign remaining
data to
>>>>>>> inactive task managers)?
>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>> wrote:
>>>>>>>> Local split assignment preferably assigns input split to
>>>>>>>> that can locally read the data of an input split.
>>>>>>>> For example, HDFS stores file chunks (blocks) distributed
over the
>>>>>>>> cluster and gives access to these chunks to every worker
via network
>>>>>>>> transfer. However, if a chunk is read from a process that
runs on the same
>>>>>>>> node as the chunk is stored, the read operation directly
accesses the local
>>>>>>>> file system without going over the network. Hence, it is
essential to
>>>>>>>> assign input splits based on the locality of their data if
you want to have
>>>>>>>> reasonably performance. We call this local split assignment.
This is a
>>>>>>>> general concept of all data parallel systems including Hadoop,
Spark, and
>>>>>>>> Flink.
>>>>>>>> This issue is not related to serializability of input formats.
>>>>>>>> I assume that the wrapped MongoIF is also not capable of
>>>>>>>> split assignment.
>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>> What do you mean for  "might lack support for local split
>>>>>>>>> assignment"? You mean that InputFormat is not serializable?
>>>>>>>>> instead is not true for Mongodb?
>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fhueske@apache.org
>>>>>>>>> > wrote:
>>>>>>>>>> There's a page about Hadoop Compatibility that shows
how to use
>>>>>>>>>> the wrapper.
>>>>>>>>>> The HBase format should work as well, but might lack
support for
>>>>>>>>>> local split assignment. In that case performance
would suffer a lot.
>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>>>> Should I start from
>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>> Thus, in principle, also the TableInputFormat
of HBase could be
>>>>>>>>>>> used in a similar way..isn't it?
>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske
>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> the blog post uses Flinks wrapper for Hadoop
>>>>>>>>>>>> This has been ported to the new API and is
described in the
>>>>>>>>>>>> documentation.
>>>>>>>>>>>> So you just need to take Mongos Hadoop IF
and plug it into the
>>>>>>>>>>>> new IF wrapper. :-)
>>>>>>>>>>>> Fabian
>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>> How can I use Mongodb with the new Flink
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Flavio
>>>>>>>>>>>  .

View raw message