flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Wed, 05 Nov 2014 17:39:05 GMT
Just shared the example at https://github.com/okkam-it/flink-mongodb-test
and twitted :)

The next step is to show how to write the result of a Flink process back to
How can I manage to do that? Can someone help me?

On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org> wrote:

> How about going for an optional parameter for the InputFormat to determine
> into how many splits each region is split?
> That would be a lightweight option to control the number of splits with
> low effort (on our side).
> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>> So how are we going to proceed here? Is someone willing to help me in
>> improving the splitting policy or we leave it as it is now?
>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org> wrote:
>>> I agree. Going for more splits with smaller key regions is a good idea.
>>> However, it might be a bit difficult to determine a good number of
>>> splits as the size of a split depends on its density. Too large splits are
>>> prone to cause data skew, too small ones will increase the overhead of
>>> split assignment.
>>> A solution for this problem could be to add an optional parameter to
>>> the IF to give an upper bound for the number of InputSplits.
>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>> Typo: it should have meant that workers that get a larger split will
>>>> get fewer additional splits.
>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>> InputSplits are assigned lazily at runtime, which gives you many of the
>>>> benefits of re-assigning without the nastyness.
>>>> Can you write the logic that creates the splits such that it creates
>>>> multiple splits per region? Then the lazy assignment will make sure that
>>>> workers that get a larger split will get get additional splits than workers
>>>> that get smaller splits...
>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>> Hmm, that's good question indeed. I am not familiar with HBase's mode
>>>>> of operation.
>>>>> I would assume, that HBase uses range partitioning to partition a
>>>>> table into regions. That way it is rather easy to balance the size of
>>>>> regions, as long as there is no single key that occurs very often. I
am not
>>>>> sure if it is possible to overcome data skew cause by frequent keys.
>>>>> However as I said, these are just assumption. I will have a look at
>>>>> HBase's internals for verification.
>>>>> In any case, Flink does currently not support reassigning or splitting
>>>>> of InputSplits at runtime.
>>>>> Also initially generating balanced InputSplits willl be tricky. That
>>>>> would be possible if we can efficiently determine the "density" of a
>>>>> range when creating the InputSplits. However, I'm a bit skeptical that
>>>>> this can be done...
>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>> From what I know HBase manages the regions but the fact that they
>>>>>> evenly distributed depends on a well-designed key..
>>>>>> if it is not the case you could encounter very unbalanced regions
>>>>>> (i.e. hot spotting).
>>>>>> Could it be a good idea to create a split policy that compares the
>>>>>> size of all the splits and generate equally-sized split that can
>>>>>> reassigned to free worker if the original assigned one is still busy?
>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>>> wrote:
>>>>>>> ad 1) HBase manages the regions and should also take care of
>>>>>>> uniform size.
>>>>>>> as 2) Dynamically changing InputSplits is not possible at the
>>>>>>> moment. However, the input split generation of the IF should
also be able
>>>>>>> to handle such issues upfront. In fact, the IF could also generate
>>>>>>> splits per region (this would be necessary to make sure that
the minimum
>>>>>>> number of splits is generated if there are less regions than
>>>>>>> splits).
>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>> Ok, thanks for the explanation!
>>>>>>>> That was more or less like I thought it should be but there
>>>>>>>> still points I'd like to clarify:
>>>>>>>> 1 - What if a region is very big and there are other regions
>>>>>>>> small..? There will be one slot that takes a very long time
while the
>>>>>>>> others will stay inactive..
>>>>>>>> 2 - Do you think it is possible to implement this in an adaptive
>>>>>>>> way (stop processing of huge region if it worth it and assign
>>>>>>>> data to inactive task managers)?
>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>> wrote:
>>>>>>>>> Local split assignment preferably assigns input split
to workers
>>>>>>>>> that can locally read the data of an input split.
>>>>>>>>> For example, HDFS stores file chunks (blocks) distributed
over the
>>>>>>>>> cluster and gives access to these chunks to every worker
via network
>>>>>>>>> transfer. However, if a chunk is read from a process
that runs on the same
>>>>>>>>> node as the chunk is stored, the read operation directly
accesses the local
>>>>>>>>> file system without going over the network. Hence, it
is essential to
>>>>>>>>> assign input splits based on the locality of their data
if you want to have
>>>>>>>>> reasonably performance. We call this local split assignment.
This is a
>>>>>>>>> general concept of all data parallel systems including
Hadoop, Spark, and
>>>>>>>>> Flink.
>>>>>>>>> This issue is not related to serializability of input
>>>>>>>>> I assume that the wrapped MongoIF is also not capable
of local
>>>>>>>>> split assignment.
>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>>> What do you mean for  "might lack support for local
>>>>>>>>>> assignment"? You mean that InputFormat is not serializable?
>>>>>>>>>> instead is not true for Mongodb?
>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <
>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>> There's a page about Hadoop Compatibility that
shows how to use
>>>>>>>>>>> the wrapper.
>>>>>>>>>>> The HBase format should work as well, but might
lack support for
>>>>>>>>>>> local split assignment. In that case performance
would suffer a lot.
>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>> Should I start from
>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>> Thus, in principle, also the TableInputFormat
of HBase could be
>>>>>>>>>>>> used in a similar way..isn't it?
>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske
>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> the blog post uses Flinks wrapper for
Hadoop InputFormats.
>>>>>>>>>>>>> This has been ported to the new API and
is described in the
>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>> So you just need to take Mongos Hadoop
IF and plug it into the
>>>>>>>>>>>>> new IF wrapper. :-)
>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>> How can I use Mongodb with the new
Flink APIs?
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>  .

View raw message