flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 周思华 <summerle...@163.com>
Subject Re: keyBy and parallelism
Date Thu, 12 Apr 2018 08:04:29 GMT
Hi Christophe,
I think what you want to do is "stream join", and I'm a bit confuse that if you have know
there are only 8 keys   then why would you still like to use 16 parallelisms? 8 of them will
be idle(a waste of CPU). In the KeyedStream, the tuples with the same key will be sent to
the same parrallelism. 


And I'm also a bit confuse about the pseudo code, it looks like you regard that the tuple
with the same key in stream A will always arrive before the tuple in stream B? I think that
can't be promised... you may need to store the tuple in stream B in case that tuple in stream
B arrive before A, and do the "analysis logic" in both flatMap1() and flatMap2().


Regards,
Sihua Zhou


On 04/12/2018 15:44,Christophe Jolif<cjolif@gmail.com> wrote:
Thanks Chesnay (and others).


That's what I was figuring out. Now let's go onto the follow up with my exact use-case.


I have two streams A and B. A basically receives "rules" that the processing of B should observe
to process.


There is a "key" that allows me to know that a rule x coming in A is for events with the same
key coming in B.


I was planning to do (pseudo code):


A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
      -> store in a ValueState the rule 
   flatMap2()
      -> use the state to get the rule, transform the element according to the rule, collect
it
)




I think it should work, right, because the ValueState will be "per key" and contain the rule
for this key and so on?


Now, what I really care is not having all the elements of key1 in the same parallelism, I
just want to make sure key1 and key2 are isolated so I can use the key state to store the
corresponding rule and key2 rules are not used for key1 and conversely.


So ideally instead of using 8 parallelisms, in order to use the full power of my system, even
with 8 keys I would like to use 16 parallelisms as I don't care about all elements of key1
being in the same parallelism. All I care is that the state contain the rule corresponding
to this key.


What would be the recommended approach here?


Thanks again for your help,
--
Christophe




On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <chesnay@apache.org> wrote:

You will get 16 parallel executions since you specify a parallellism of 16, however 8 of these
will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:

From what I learnt, you have to control parallelism your self. You can set parallelism on
operator or set default one through flink-config.yaml.
I might be wrong.


On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjolif@gmail.com> wrote:

Hi all,


Imagine I have a default parallelism of 16 and I do something like


stream.keyBy("something").flatMap()


Now let's imagine I have less than 16 keys, maybe 8.


How many parallel executions of the flatMap function will I get? 8 because I have 8 keys,
or 16 because I have default parallelism at 16?


(and I will have follow up questions depending on the answer I suspect ;))


Thanks,
--

Christophe


Mime
View raw message