flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Mongodb
Date Thu, 06 Nov 2014 14:58:57 GMT
Hi Flavio!

I think the general method is the same as with the inputs.

You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (

You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;



On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <pompermaier@okkam.it>

> Any help here..?
> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>> Just shared the example at https://github.com/okkam-it/flink-mongodb-test
>> and twitted :)
>> The next step is to show how to write the result of a Flink process back
>> to Mongo.
>> How can I manage to do that? Can someone help me?
>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org> wrote:
>>> How about going for an optional parameter for the InputFormat to
>>> determine into how many splits each region is split?
>>> That would be a lightweight option to control the number of splits with
>>> low effort (on our side).
>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>> So how are we going to proceed here? Is someone willing to help me in
>>>> improving the splitting policy or we leave it as it is now?
>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>> wrote:
>>>>> I agree. Going for more splits with smaller key regions is a
>>>>> good idea.
>>>>> However, it might be a bit difficult to determine a good number of
>>>>> splits as the size of a split depends on its density. Too large splits
>>>>> prone to cause data skew, too small ones will increase the overhead of
>>>>> split assignment.
>>>>> A solution for this problem could be to add an optional parameter to
>>>>> the IF to give an upper bound for the number of InputSplits.
>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>> Typo: it should have meant that workers that get a larger split will
>>>>>> get fewer additional splits.
>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>> InputSplits are assigned lazily at runtime, which gives you many
>>>>>> the benefits of re-assigning without the nastyness.
>>>>>> Can you write the logic that creates the splits such that it creates
>>>>>> multiple splits per region? Then the lazy assignment will make sure
>>>>>> workers that get a larger split will get get additional splits than
>>>>>> that get smaller splits...
>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>> Hmm, that's good question indeed. I am not familiar with HBase's
>>>>>>> of operation.
>>>>>>> I would assume, that HBase uses range partitioning to partition
>>>>>>> table into regions. That way it is rather easy to balance the
size of
>>>>>>> regions, as long as there is no single key that occurs very often.
I am not
>>>>>>> sure if it is possible to overcome data skew cause by frequent
>>>>>>> However as I said, these are just assumption. I will have a look
>>>>>>> HBase's internals for verification.
>>>>>>> In any case, Flink does currently not support reassigning
>>>>>>> or splitting of InputSplits at runtime.
>>>>>>> Also initially generating balanced InputSplits willl be tricky.
>>>>>>> would be possible if we can efficiently determine the "density"
of a key
>>>>>>> range when creating the InputSplits. However, I'm a bit skeptical
>>>>>>> this can be done...
>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>> From what I know HBase manages the regions but the fact that
>>>>>>>> are evenly distributed depends on a well-designed key..
>>>>>>>> if it is not the case you could encounter very unbalanced
>>>>>>>> (i.e. hot spotting).
>>>>>>>> Could it be a good idea to create a split policy that compares
>>>>>>>> size of all the splits and generate equally-sized split that
can be
>>>>>>>> reassigned to free worker if the original assigned one is
still busy?
>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>> wrote:
>>>>>>>>> ad 1) HBase manages the regions and should also take
care of their
>>>>>>>>> uniform size.
>>>>>>>>> as 2) Dynamically changing InputSplits is not possible
at the
>>>>>>>>> moment. However, the input split generation of the IF
should also be able
>>>>>>>>> to handle such issues upfront. In fact, the IF could
also generate multiple
>>>>>>>>> splits per region (this would be necessary to make sure
that the minimum
>>>>>>>>> number of splits is generated if there are less regions
than required
>>>>>>>>> splits).
>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>> That was more or less like I thought it should be
but there are
>>>>>>>>>> still points I'd like to clarify:
>>>>>>>>>> 1 - What if a region is very big and there are other
regions very
>>>>>>>>>> small..? There will be one slot that takes a very
long time while the
>>>>>>>>>> others will stay inactive..
>>>>>>>>>> 2 - Do you think it is possible to implement this
in an adaptive
>>>>>>>>>> way (stop processing of huge region if it worth it
and assign remaining
>>>>>>>>>> data to inactive task managers)?
>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org
>>>>>>>>>> > wrote:
>>>>>>>>>>> Local split assignment preferably assigns input
split to workers
>>>>>>>>>>> that can locally read the data of an input split.
>>>>>>>>>>> For example, HDFS stores file chunks (blocks)
distributed over
>>>>>>>>>>> the cluster and gives access to these chunks
to every worker via network
>>>>>>>>>>> transfer. However, if a chunk is read from a
process that runs on the same
>>>>>>>>>>> node as the chunk is stored, the read operation
directly accesses the local
>>>>>>>>>>> file system without going over the network. Hence,
it is essential to
>>>>>>>>>>> assign input splits based on the locality of
their data if you want to have
>>>>>>>>>>> reasonably performance. We call this local split
assignment. This is a
>>>>>>>>>>> general concept of all data parallel systems
including Hadoop, Spark, and
>>>>>>>>>>> Flink.
>>>>>>>>>>> This issue is not related to serializability
of input formats.
>>>>>>>>>>> I assume that the wrapped MongoIF is also not
capable of local
>>>>>>>>>>> split assignment.
>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>> What do you mean for  "might lack support
for local split
>>>>>>>>>>>> assignment"? You mean that InputFormat is
not serializable?
>>>>>>>>>>>> This instead is not true for Mongodb?
>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske
>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>> There's a page about Hadoop Compatibility
that shows how to
>>>>>>>>>>>>> use the wrapper.
>>>>>>>>>>>>> The HBase format should work as well,
but might lack support
>>>>>>>>>>>>> for local split assignment. In that case
performance would suffer a lot.
>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>> Thus, in principle, also the TableInputFormat
of HBase could
>>>>>>>>>>>>>> be used in a similar way..isn't it?
>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian
Hueske <
>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> the blog post uses Flinks wrapper
for Hadoop InputFormats.
>>>>>>>>>>>>>>> This has been ported to the new
API and is described in the
>>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>>> So you just need to take Mongos
Hadoop IF and plug it into
>>>>>>>>>>>>>>> the new IF wrapper. :-)
>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>> Am Dienstag, 4. November 2014
schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>> How can I use Mongodb with
the new Flink APIs?
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>  .

View raw message