flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Mongodb
Date Thu, 06 Nov 2014 14:58:57 GMT
Hi Flavio!

I think the general method is the same as with the inputs.

You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
)

You can then call

DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;

data.output(mongoOutput);


Greetings,
Stephan


On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Any help here..?
>
> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Just shared the example at https://github.com/okkam-it/flink-mongodb-test
>> and twitted :)
>>
>> The next step is to show how to write the result of a Flink process back
>> to Mongo.
>> How can I manage to do that? Can someone help me?
>>
>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org> wrote:
>>
>>> How about going for an optional parameter for the InputFormat to
>>> determine into how many splits each region is split?
>>> That would be a lightweight option to control the number of splits with
>>> low effort (on our side).
>>>
>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> So how are we going to proceed here? Is someone willing to help me in
>>>> improving the splitting policy or we leave it as it is now?
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>> wrote:
>>>>
>>>>> I agree. Going for more splits with smaller key regions is a
>>>>> good idea.
>>>>> However, it might be a bit difficult to determine a good number of
>>>>> splits as the size of a split depends on its density. Too large splits
are
>>>>> prone to cause data skew, too small ones will increase the overhead of
>>>>> split assignment.
>>>>>
>>>>> A solution for this problem could be to add an optional parameter to
>>>>> the IF to give an upper bound for the number of InputSplits.
>>>>>
>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>
>>>>>> Typo: it should have meant that workers that get a larger split will
>>>>>> get fewer additional splits.
>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>
>>>>>> InputSplits are assigned lazily at runtime, which gives you many
of
>>>>>> the benefits of re-assigning without the nastyness.
>>>>>>
>>>>>> Can you write the logic that creates the splits such that it creates
>>>>>> multiple splits per region? Then the lazy assignment will make sure
that
>>>>>> workers that get a larger split will get get additional splits than
workers
>>>>>> that get smaller splits...
>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>>
>>>>>> Hmm, that's good question indeed. I am not familiar with HBase's
mode
>>>>>>> of operation.
>>>>>>> I would assume, that HBase uses range partitioning to partition
a
>>>>>>> table into regions. That way it is rather easy to balance the
size of
>>>>>>> regions, as long as there is no single key that occurs very often.
I am not
>>>>>>> sure if it is possible to overcome data skew cause by frequent
keys.
>>>>>>> However as I said, these are just assumption. I will have a look
at
>>>>>>> HBase's internals for verification.
>>>>>>>
>>>>>>> In any case, Flink does currently not support reassigning
>>>>>>> or splitting of InputSplits at runtime.
>>>>>>> Also initially generating balanced InputSplits willl be tricky.
That
>>>>>>> would be possible if we can efficiently determine the "density"
of a key
>>>>>>> range when creating the InputSplits. However, I'm a bit skeptical
that
>>>>>>> this can be done...
>>>>>>>
>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> From what I know HBase manages the regions but the fact that
they
>>>>>>>> are evenly distributed depends on a well-designed key..
>>>>>>>> if it is not the case you could encounter very unbalanced
regions
>>>>>>>> (i.e. hot spotting).
>>>>>>>>
>>>>>>>> Could it be a good idea to create a split policy that compares
the
>>>>>>>> size of all the splits and generate equally-sized split that
can be
>>>>>>>> reassigned to free worker if the original assigned one is
still busy?
>>>>>>>>
>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> ad 1) HBase manages the regions and should also take
care of their
>>>>>>>>> uniform size.
>>>>>>>>> as 2) Dynamically changing InputSplits is not possible
at the
>>>>>>>>> moment. However, the input split generation of the IF
should also be able
>>>>>>>>> to handle such issues upfront. In fact, the IF could
also generate multiple
>>>>>>>>> splits per region (this would be necessary to make sure
that the minimum
>>>>>>>>> number of splits is generated if there are less regions
than required
>>>>>>>>> splits).
>>>>>>>>>
>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>> That was more or less like I thought it should be
but there are
>>>>>>>>>> still points I'd like to clarify:
>>>>>>>>>>
>>>>>>>>>> 1 - What if a region is very big and there are other
regions very
>>>>>>>>>> small..? There will be one slot that takes a very
long time while the
>>>>>>>>>> others will stay inactive..
>>>>>>>>>> 2 - Do you think it is possible to implement this
in an adaptive
>>>>>>>>>> way (stop processing of huge region if it worth it
and assign remaining
>>>>>>>>>> data to inactive task managers)?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Local split assignment preferably assigns input
split to workers
>>>>>>>>>>> that can locally read the data of an input split.
>>>>>>>>>>> For example, HDFS stores file chunks (blocks)
distributed over
>>>>>>>>>>> the cluster and gives access to these chunks
to every worker via network
>>>>>>>>>>> transfer. However, if a chunk is read from a
process that runs on the same
>>>>>>>>>>> node as the chunk is stored, the read operation
directly accesses the local
>>>>>>>>>>> file system without going over the network. Hence,
it is essential to
>>>>>>>>>>> assign input splits based on the locality of
their data if you want to have
>>>>>>>>>>> reasonably performance. We call this local split
assignment. This is a
>>>>>>>>>>> general concept of all data parallel systems
including Hadoop, Spark, and
>>>>>>>>>>> Flink.
>>>>>>>>>>>
>>>>>>>>>>> This issue is not related to serializability
of input formats.
>>>>>>>>>>> I assume that the wrapped MongoIF is also not
capable of local
>>>>>>>>>>> split assignment.
>>>>>>>>>>>
>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>
>>>>>>>>>>>> What do you mean for  "might lack support
for local split
>>>>>>>>>>>> assignment"? You mean that InputFormat is
not serializable?
>>>>>>>>>>>> This instead is not true for Mongodb?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske
<
>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> There's a page about Hadoop Compatibility
that shows how to
>>>>>>>>>>>>> use the wrapper.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The HBase format should work as well,
but might lack support
>>>>>>>>>>>>> for local split assignment. In that case
performance would suffer a lot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>> Thus, in principle, also the TableInputFormat
of HBase could
>>>>>>>>>>>>>> be used in a similar way..isn't it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian
Hueske <
>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the blog post uses Flinks wrapper
for Hadoop InputFormats.
>>>>>>>>>>>>>>> This has been ported to the new
API and is described in the
>>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So you just need to take Mongos
Hadoop IF and plug it into
>>>>>>>>>>>>>>> the new IF wrapper. :-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Am Dienstag, 4. November 2014
schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>> How can I use Mongodb with
the new Flink APIs?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message