flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Mongodb
Date Thu, 06 Nov 2014 16:10:30 GMT
Hi!
Can you:

  - either return a BSONWritable from the function
  - or type the output formats to String?

The MongoRecordWriter can work with non BSON objects as well.
https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java


Stephan


On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> I'm trying to do that but I can't find the proper typing.. For example:
>
> DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable,
> BSONWritable>, String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public String map(Tuple2<BSONWritable, BSONWritable> record) throws
> Exception {
> BSONWritable value = record.getField(1);
> BSONObject doc = value.getDoc();
> BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
> String type = jsonld.getString("@type");
> return type;
> }
> });
>
> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
> "mongodb://localhost:27017/test.test");
> fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new
> MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));
>
> Obviously this doesn't work because I'm emitting strings and trying to
> write BSONWritable ..can you show me a simple working example?
>
> Best,
> Flavio
>
> On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi Flavio!
>>
>> I think the general method is the same as with the inputs.
>>
>> You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
>> )
>>
>> You can then call
>>
>> DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;
>>
>> data.output(mongoOutput);
>>
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <pompermaier@okkam.it>
>> wrote:
>>
>>> Any help here..?
>>>
>>> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <pompermaier@okkam.it
>>> > wrote:
>>>
>>>> Just shared the example at
>>>> https://github.com/okkam-it/flink-mongodb-test and twitted :)
>>>>
>>>> The next step is to show how to write the result of a Flink process
>>>> back to Mongo.
>>>> How can I manage to do that? Can someone help me?
>>>>
>>>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org>
>>>> wrote:
>>>>
>>>>> How about going for an optional parameter for the InputFormat to
>>>>> determine into how many splits each region is split?
>>>>> That would be a lightweight option to control the number of splits
>>>>> with low effort (on our side).
>>>>>
>>>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> So how are we going to proceed here? Is someone willing to help me
in
>>>>>> improving the splitting policy or we leave it as it is now?
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I agree. Going for more splits with smaller key regions is a
>>>>>>> good idea.
>>>>>>> However, it might be a bit difficult to determine a good number
of
>>>>>>> splits as the size of a split depends on its density. Too large
splits are
>>>>>>> prone to cause data skew, too small ones will increase the overhead
of
>>>>>>> split assignment.
>>>>>>>
>>>>>>> A solution for this problem could be to add an optional parameter
to
>>>>>>> the IF to give an upper bound for the number of InputSplits.
>>>>>>>
>>>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>
>>>>>>>> Typo: it should have meant that workers that get a larger
split
>>>>>>>> will get fewer additional splits.
>>>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>>>
>>>>>>>> InputSplits are assigned lazily at runtime, which gives you
many of
>>>>>>>> the benefits of re-assigning without the nastyness.
>>>>>>>>
>>>>>>>> Can you write the logic that creates the splits such that
it
>>>>>>>> creates multiple splits per region? Then the lazy assignment
will make sure
>>>>>>>> that workers that get a larger split will get get additional
splits than
>>>>>>>> workers that get smaller splits...
>>>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>>>>
>>>>>>>> Hmm, that's good question indeed. I am not familiar with
HBase's
>>>>>>>>> mode of operation.
>>>>>>>>> I would assume, that HBase uses range partitioning to
partition a
>>>>>>>>> table into regions. That way it is rather easy to balance
the size of
>>>>>>>>> regions, as long as there is no single key that occurs
very often. I am not
>>>>>>>>> sure if it is possible to overcome data skew cause by
frequent keys.
>>>>>>>>> However as I said, these are just assumption. I will
have a look
>>>>>>>>> at HBase's internals for verification.
>>>>>>>>>
>>>>>>>>> In any case, Flink does currently not support reassigning
>>>>>>>>> or splitting of InputSplits at runtime.
>>>>>>>>> Also initially generating balanced InputSplits willl
be
>>>>>>>>> tricky. That would be possible if we can efficiently
determine the
>>>>>>>>> "density" of a key range when creating the InputSplits.
However, I'm a bit
>>>>>>>>> skeptical that this can be done...
>>>>>>>>>
>>>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> From what I know HBase manages the regions but the
fact that they
>>>>>>>>>> are evenly distributed depends on a well-designed
key..
>>>>>>>>>> if it is not the case you could encounter very unbalanced
regions
>>>>>>>>>> (i.e. hot spotting).
>>>>>>>>>>
>>>>>>>>>> Could it be a good idea to create a split policy
that compares
>>>>>>>>>> the size of all the splits and generate equally-sized
split that can be
>>>>>>>>>> reassigned to free worker if the original assigned
one is still busy?
>>>>>>>>>>
>>>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> ad 1) HBase manages the regions and should also
take care
>>>>>>>>>>> of their uniform size.
>>>>>>>>>>> as 2) Dynamically changing InputSplits is not
possible at the
>>>>>>>>>>> moment. However, the input split generation of
the IF should also be able
>>>>>>>>>>> to handle such issues upfront. In fact, the IF
could also generate multiple
>>>>>>>>>>> splits per region (this would be necessary to
make sure that the minimum
>>>>>>>>>>> number of splits is generated if there are less
regions than required
>>>>>>>>>>> splits).
>>>>>>>>>>>
>>>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier
<
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>>>> That was more or less like I thought it should
be but there are
>>>>>>>>>>>> still points I'd like to clarify:
>>>>>>>>>>>>
>>>>>>>>>>>> 1 - What if a region is very big and there
are other regions
>>>>>>>>>>>> very small..? There will be one slot that
takes a very long time while the
>>>>>>>>>>>> others will stay inactive..
>>>>>>>>>>>> 2 - Do you think it is possible to implement
this in an
>>>>>>>>>>>> adaptive way (stop processing of huge region
if it worth it and assign
>>>>>>>>>>>> remaining data to inactive task managers)?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske
<
>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Local split assignment preferably assigns
input split to
>>>>>>>>>>>>> workers that can locally read the data
of an input split.
>>>>>>>>>>>>> For example, HDFS stores file chunks
(blocks) distributed over
>>>>>>>>>>>>> the cluster and gives access to these
chunks to every worker via network
>>>>>>>>>>>>> transfer. However, if a chunk is read
from a process that runs on the same
>>>>>>>>>>>>> node as the chunk is stored, the read
operation directly accesses the local
>>>>>>>>>>>>> file system without going over the network.
Hence, it is essential to
>>>>>>>>>>>>> assign input splits based on the locality
of their data if you want to have
>>>>>>>>>>>>> reasonably performance. We call this
local split assignment. This is a
>>>>>>>>>>>>> general concept of all data parallel
systems including Hadoop, Spark, and
>>>>>>>>>>>>> Flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This issue is not related to serializability
of input formats.
>>>>>>>>>>>>> I assume that the wrapped MongoIF is
also not capable of local
>>>>>>>>>>>>> split assignment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you mean for  "might lack
support for local split
>>>>>>>>>>>>>> assignment"? You mean that InputFormat
is not serializable?
>>>>>>>>>>>>>> This instead is not true for Mongodb?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM,
Fabian Hueske <
>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There's a page about Hadoop Compatibility
that shows how to
>>>>>>>>>>>>>>> use the wrapper.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The HBase format should work
as well, but might lack support
>>>>>>>>>>>>>>> for local split assignment. In
that case performance would suffer a lot.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Am Dienstag, 4. November 2014
schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>>>> Thus, in principle, also
the TableInputFormat of HBase
>>>>>>>>>>>>>>>> could be used in a similar
way..isn't it?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42
AM, Fabian Hueske <
>>>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the blog post uses Flinks
wrapper for Hadoop InputFormats.
>>>>>>>>>>>>>>>>> This has been ported
to the new API and is described in
>>>>>>>>>>>>>>>>> the documentation.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So you just need to take
Mongos Hadoop IF and plug it into
>>>>>>>>>>>>>>>>> the new IF wrapper. :-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>>>> but it use the old
APIs (HadoopDataSource instead of
>>>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>>>> How can I use Mongodb
with the new Flink APIs?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message