flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julien <jmassio...@gmail.com>
Subject Re: A "per operator instance" window all ?
Date Mon, 19 Feb 2018 08:34:04 GMT
Hello,

I've already tried to key my stream with 
"resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
So all my keys will be either 0,1, 2 or 3. I can then benefit from a 
time window on this keyed stream and do only 4 queries to my external 
system.
But it is not well distributed with the default partitioner on keyed 
stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).

I think I should explore the customer partitioner, as you suggested Xingcan.
Maybe my last question on this will be: "can you give me more details on 
this point "and simulate a window operation by yourself in a 
ProcessFunction" ?

When I look at the documentation about the custom partitioner, I can see 
that the result of partitionCustom is a DataStream.
It is not a KeyedStream.
So the only window I have will be windowAll (which will bring me back to 
a parallelism of 1, no ?).

And if I do something like "myStream.partitionCustom(<my new 
partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my 
custom partitioner ?
When looking at the "KeyedStream" class, it seems that it will go back 
to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?

Thanks again for your feedback,

Julien.


On 19/02/2018 03:45, 周思华 wrote:
> Hi Julien,
>     If I am not misunderstand, I think you can key your stream on a 
> `Random.nextInt() % parallesm`, this way  you can "group" together 
> alerts from different and benefit from multi parallems.
>
>
> 发自网易邮箱大师
>
> On 02/19/2018 09:08,Xingcan Cui<xingcanc@gmail.com> 
> <mailto:xingcanc@gmail.com> wrote:
>
>     Hi Julien,
>
>     sorry for my misunderstanding before. For now, the window can only
>     be defined on a KeyedStream or an ordinary DataStream but with
>     parallelism = 1. I’d like to provide three options for your scenario.
>
>     1. If your external data is static and can be fit into the memory,
>     you can use ManagedStates
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to
>     cache them without considering the querying problem.
>     2. Or you can use a CustomPartitioner
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to
>     manually distribute your alert data and simulate an window
>     operation by yourself in a ProcessFuncton.
>     3. You may also choose to use some external systems such as
>     in-memory store, which can work as a cache for your queries.
>
>     Best,
>     Xingcan
>
>>     On 19 Feb 2018, at 5:55 AM, Julien <jmassiot77@gmail.com
>>     <mailto:jmassiot77@gmail.com>> wrote:
>>
>>     Hi Xingcan,
>>
>>     Thanks for your answer.
>>     Yes, I understand that point:
>>
>>       * if I have 100 resource IDs with parallelism of 4, then each
>>         operator instance will handle about 25 keys
>>
>>
>>     The issue I have is that I want, on a given operator instance, to
>>     group those 25 keys together in order to do only 1 query to an
>>     external system per operator instance:
>>
>>       * on a given operator instance, I will do 1 query for my 25 keys
>>       * so with the 4 operator instances, I will do 4 query in
>>         parallel (with about 25 keys per query)
>>
>>
>>     I do not know how I can do that.
>>
>>     If I define a window on my keyed stream (with for example
>>     /stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>>     /then my understanding is that the window is "associated" to the
>>     key. So in this case, on a given operator instance, I will have
>>     25 of those windows (one per key), and I will do 25 queries
>>     (instead of 1).
>>
>>     Do you understand my point ?
>>     Or maybe am I missing something ?
>>
>>     I'd like to find a way on operator instance 1 to group all the
>>     alerts received on those 25 resource ids and do 1 query for those
>>     25 resource ids.
>>     Same thing for operator instance 2, 3 and 4.
>>
>>
>>     Thank you,
>>     Regards.
>>
>>
>>     On 18/02/2018 14:43, Xingcan Cui wrote:
>>>     Hi Julien,
>>>
>>>     the cardinality of your keys (e.g., resource ID) will not be
>>>     restricted to the parallelism. For instance, if you have 100
>>>     resource IDs processed by KeyedStream with parallelism 4, each
>>>     operator instance will handle about 25 keys.
>>>
>>>     Hope that helps.
>>>
>>>     Best,
>>>     Xingcan
>>>
>>>>     On 18 Feb 2018, at 8:49 PM, Julien <jmassiot77@gmail.com
>>>>     <mailto:jmassiot77@gmail.com>> wrote:
>>>>
>>>>     Hi,
>>>>
>>>>     I am pretty new to flink and I don't know what will be the best
>>>>     way to deal with the following use case:
>>>>
>>>>       * as an input, I recieve some alerts from a kafka topic
>>>>           o an alert is linked to a network resource (like
>>>>             router-1, router-2, switch-1, switch-2, ...)
>>>>           o so an alert has two main information (the alert id and
>>>>             the resource id of the resource on which this alert has
>>>>             been raised)
>>>>       * then I need to do a query to an external system in order to
>>>>         enrich the alert with additional information on the resource
>>>>
>>>>
>>>>     (A "natural" candidate for the key on this stream will be the
>>>>     resource id)
>>>>
>>>>     The issue I have is that regarding the query to the external
>>>>     system:
>>>>
>>>>       * I do not want to do 1 query per resource id
>>>>       * I want to do a small number of queries in parallel (for
>>>>         example 4 queries in parallel every 500ms), each query
>>>>         requesting the external system for several alerts linked to
>>>>         several resource id
>>>>
>>>>     Currently, I don't know what will be the best way to deal with
>>>>     that:
>>>>
>>>>       * I can key my stream on the resource id and then define a
>>>>         processing time window of 500ms and when the trigger is ok,
>>>>         then I do my query
>>>>           o by doing so, I will "group" several alerts in a single
>>>>             query, but they will all be linked to the same resource.
>>>>           o so I will do 1 query per resource id (which will be too
>>>>             much in my use case)
>>>>       * I can also do a windowAll on a non keyed stream
>>>>           o by doing so, I will "group" together alerts from
>>>>             different resource ids, but from what I've read in such
>>>>             a case the parallelism will always be one.
>>>>           o so in this case, I will only do 1 query whereas I'd
>>>>             like to have some parallelism
>>>>
>>>>     I am thinking that a way to deal with that will be:
>>>>
>>>>       * define the resource id as the key of stream and put a
>>>>         parallelism of 4
>>>>       * and then having a way to do a windowAll on this keyed stream
>>>>           o which is that, on a given operator instance, I will
>>>>             "group" on the same window all the keys (ie all the
>>>>             resource ids) managed by this operator instance
>>>>           o with a parallelism of 4, I will do 4 queries in
>>>>             parallel (1 per operator instance, and each query will
>>>>             be for several alerts linked to several resource ids)
>>>>
>>>>     But after looking at the documentation, I cannot see this
>>>>     ability (having a windowAll on a keyed stream).
>>>>
>>>>     Am I missing something?
>>>>
>>>>     What will be the best way to deal with such a use case?
>>>>
>>>>
>>>>     I've tried for example to review my key and to do something
>>>>     like "resourceId.hahsCode%<max nb of queries in parallel>" and
>>>>     then to use a time window.
>>>>
>>>>     In my example above, the <max nb of queries in parallel> will
>>>>     be 4. And all my keys will be 0, 1, 2 or 3.
>>>>
>>>>     The issue with this approach is that due to the way the
>>>>     operatorIdx is computed based on the key, it does not
>>>>     distribute well my processing:
>>>>
>>>>       * when this partitioning logic from the
>>>>         "KeyGroupRangeAssignment" class is applied
>>>>           o //**
>>>>                  * Assigns the given key to a parallel operator index.
>>>>                  *
>>>>                  * @param key the key to assign
>>>>                  * @param maxParallelism the maximum supported
>>>>             parallelism, aka the number of key-groups.
>>>>                  * @param parallelism the current parallelism of
>>>>             the operator
>>>>                  * @return the index of the parallel operator to
>>>>             which the given key should be routed.
>>>>                  */
>>>>                 public static int
>>>>             assignKeyToParallelOperator(Object key, int
>>>>             maxParallelism, int parallelism) {
>>>>                     return
>>>>             computeOperatorIndexForKeyGroup(maxParallelism,
>>>>             parallelism, assignToKeyGroup(key, maxParallelism));
>>>>                 }
>>>>
>>>>                 /**
>>>>                  * Assigns the given key to a key-group index.
>>>>                  *
>>>>                  * @param key the key to assign
>>>>                  * @param maxParallelism the maximum supported
>>>>             parallelism, aka the number of key-groups.
>>>>                  * @return the key-group to which the given key is
>>>>             assigned
>>>>                  */
>>>>                 public static int assignToKeyGroup(Object key, int
>>>>             maxParallelism) {
>>>>                     return
>>>>             computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>>                 }/
>>>>           o key 0, 1, 2 and 3 are only assigned to operator 2 and 3
>>>>             (so 2 over my 4 operators will not have anything to do)
>>>>
>>>>
>>>>     So, what will be the best way to deal with that?
>>>>
>>>>
>>>>     Thank you in advance for your support.
>>>>
>>>>     Regards.
>>>>
>>>>
>>>>     Julien.
>>>>
>>>>
>>>
>>
>


Mime
View raw message