flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Fri, 07 Nov 2014 16:01:53 GMT
I managed to write back to mongo using this:

MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
"mongodb://localhost:27017/test.testData");
// emit result (this works only locally)
fin.output(new HadoopOutputFormat<Text,BSONWritable>(new
MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));

So I updated also the example at
https://github.com/okkam-it/flink-mongodb-test :)

On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
> Can you:
>
>   - either return a BSONWritable from the function
>   - or type the output formats to String?
>
> The MongoRecordWriter can work with non BSON objects as well.
> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java
>
>
> Stephan
>
>
> On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> I'm trying to do that but I can't find the proper typing.. For example:
>>
>> DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable,
>> BSONWritable>, String>() {
>>
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public String map(Tuple2<BSONWritable, BSONWritable> record) throws
>> Exception {
>> BSONWritable value = record.getField(1);
>> BSONObject doc = value.getDoc();
>> BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
>> String type = jsonld.getString("@type");
>> return type;
>> }
>> });
>>
>> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
>> "mongodb://localhost:27017/test.test");
>> fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new
>> MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));
>>
>> Obviously this doesn't work because I'm emitting strings and trying to
>> write BSONWritable ..can you show me a simple working example?
>>
>> Best,
>> Flavio
>>
>> On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi Flavio!
>>>
>>> I think the general method is the same as with the inputs.
>>>
>>> You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
>>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
>>> )
>>>
>>> You can then call
>>>
>>> DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;
>>>
>>> data.output(mongoOutput);
>>>
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <pompermaier@okkam.it
>>> > wrote:
>>>
>>>> Any help here..?
>>>>
>>>> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Just shared the example at
>>>>> https://github.com/okkam-it/flink-mongodb-test and twitted :)
>>>>>
>>>>> The next step is to show how to write the result of a Flink process
>>>>> back to Mongo.
>>>>> How can I manage to do that? Can someone help me?
>>>>>
>>>>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org>
>>>>> wrote:
>>>>>
>>>>>> How about going for an optional parameter for the InputFormat to
>>>>>> determine into how many splits each region is split?
>>>>>> That would be a lightweight option to control the number of splits
>>>>>> with low effort (on our side).
>>>>>>
>>>>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> So how are we going to proceed here? Is someone willing to help
me
>>>>>>> in improving the splitting policy or we leave it as it is now?
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I agree. Going for more splits with smaller key regions is
a
>>>>>>>> good idea.
>>>>>>>> However, it might be a bit difficult to determine a good
number of
>>>>>>>> splits as the size of a split depends on its density. Too
large splits are
>>>>>>>> prone to cause data skew, too small ones will increase the
overhead of
>>>>>>>> split assignment.
>>>>>>>>
>>>>>>>> A solution for this problem could be to add an optional
>>>>>>>> parameter to the IF to give an upper bound for the number
of InputSplits.
>>>>>>>>
>>>>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>>
>>>>>>>>> Typo: it should have meant that workers that get a larger
split
>>>>>>>>> will get fewer additional splits.
>>>>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>>>>
>>>>>>>>> InputSplits are assigned lazily at runtime, which gives
you many
>>>>>>>>> of the benefits of re-assigning without the nastyness.
>>>>>>>>>
>>>>>>>>> Can you write the logic that creates the splits such
that it
>>>>>>>>> creates multiple splits per region? Then the lazy assignment
will make sure
>>>>>>>>> that workers that get a larger split will get get additional
splits than
>>>>>>>>> workers that get smaller splits...
>>>>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>>>>>
>>>>>>>>> Hmm, that's good question indeed. I am not familiar with
HBase's
>>>>>>>>>> mode of operation.
>>>>>>>>>> I would assume, that HBase uses range partitioning
to partition a
>>>>>>>>>> table into regions. That way it is rather easy to
balance the size of
>>>>>>>>>> regions, as long as there is no single key that occurs
very often. I am not
>>>>>>>>>> sure if it is possible to overcome data skew cause
by frequent keys.
>>>>>>>>>> However as I said, these are just assumption. I will
have a look
>>>>>>>>>> at HBase's internals for verification.
>>>>>>>>>>
>>>>>>>>>> In any case, Flink does currently not support reassigning
>>>>>>>>>> or splitting of InputSplits at runtime.
>>>>>>>>>> Also initially generating balanced InputSplits willl
be
>>>>>>>>>> tricky. That would be possible if we can efficiently
determine the
>>>>>>>>>> "density" of a key range when creating the InputSplits.
However, I'm a bit
>>>>>>>>>> skeptical that this can be done...
>>>>>>>>>>
>>>>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> From what I know HBase manages the regions but
the fact that
>>>>>>>>>>> they are evenly distributed depends on a well-designed
key..
>>>>>>>>>>> if it is not the case you could encounter very
unbalanced
>>>>>>>>>>> regions (i.e. hot spotting).
>>>>>>>>>>>
>>>>>>>>>>> Could it be a good idea to create a split policy
that compares
>>>>>>>>>>> the size of all the splits and generate equally-sized
split that can be
>>>>>>>>>>> reassigned to free worker if the original assigned
one is still busy?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske
<
>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> ad 1) HBase manages the regions and should
also take care
>>>>>>>>>>>> of their uniform size.
>>>>>>>>>>>> as 2) Dynamically changing InputSplits is
not possible at the
>>>>>>>>>>>> moment. However, the input split generation
of the IF should also be able
>>>>>>>>>>>> to handle such issues upfront. In fact, the
IF could also generate multiple
>>>>>>>>>>>> splits per region (this would be necessary
to make sure that the minimum
>>>>>>>>>>>> number of splits is generated if there are
less regions than required
>>>>>>>>>>>> splits).
>>>>>>>>>>>>
>>>>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier
<
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>>>>> That was more or less like I thought
it should be but there
>>>>>>>>>>>>> are still points I'd like to clarify:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1 - What if a region is very big and
there are other regions
>>>>>>>>>>>>> very small..? There will be one slot
that takes a very long time while the
>>>>>>>>>>>>> others will stay inactive..
>>>>>>>>>>>>> 2 - Do you think it is possible to implement
this in an
>>>>>>>>>>>>> adaptive way (stop processing of huge
region if it worth it and assign
>>>>>>>>>>>>> remaining data to inactive task managers)?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Local split assignment preferably
assigns input split to
>>>>>>>>>>>>>> workers that can locally read the
data of an input split.
>>>>>>>>>>>>>> For example, HDFS stores file chunks
(blocks) distributed
>>>>>>>>>>>>>> over the cluster and gives access
to these chunks to every worker via
>>>>>>>>>>>>>> network transfer. However, if a chunk
is read from a process that runs on
>>>>>>>>>>>>>> the same node as the chunk is stored,
the read operation directly accesses
>>>>>>>>>>>>>> the local file system without going
over the network. Hence, it is
>>>>>>>>>>>>>> essential to assign input splits
based on the locality of their data if you
>>>>>>>>>>>>>> want to have reasonably performance.
We call this local split
>>>>>>>>>>>>>> assignment. This is a general concept
of all data parallel systems
>>>>>>>>>>>>>> including Hadoop, Spark, and Flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This issue is not related to serializability
of input formats.
>>>>>>>>>>>>>> I assume that the wrapped MongoIF
is also not capable of
>>>>>>>>>>>>>> local split assignment.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you mean for  "might
lack support for local split
>>>>>>>>>>>>>>> assignment"? You mean that InputFormat
is not serializable?
>>>>>>>>>>>>>>> This instead is not true for
Mongodb?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00
AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> There's a page about Hadoop
Compatibility that shows how to
>>>>>>>>>>>>>>>> use the wrapper.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The HBase format should work
as well, but might lack
>>>>>>>>>>>>>>>> support for local split assignment.
In that case performance would suffer a
>>>>>>>>>>>>>>>> lot.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>>>>> Thus, in principle, also
the TableInputFormat of HBase
>>>>>>>>>>>>>>>>> could be used in a similar
way..isn't it?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at
9:42 AM, Fabian Hueske <
>>>>>>>>>>>>>>>>> fhueske@apache.org>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the blog post uses
Flinks wrapper for Hadoop InputFormats.
>>>>>>>>>>>>>>>>>> This has been ported
to the new API and is described in
>>>>>>>>>>>>>>>>>> the documentation.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So you just need
to take Mongos Hadoop IF and plug it
>>>>>>>>>>>>>>>>>> into the new IF wrapper.
:-)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>>>>> but it use the
old APIs (HadoopDataSource instead of
>>>>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>>>>> How can I use
Mongodb with the new Flink APIs?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Mime
View raw message