incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杨定裕 <yangdin...@gmail.com>
Subject Re: How to use S4 to implement Join operator
Date Thu, 04 Oct 2012 08:13:49 GMT
Thank you for your help.
I should run several apps and see the problem.


2012/10/4 杨定裕 <yangdingyu@gmail.com>

> Hi,
> I debug the program and find that List, map or other array class in the PE
> are shared in PE.
>  You can test it.
>
>
> 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
>
>> I get all that my point is you need to get the data from Adaptor and
>> create key'd data out of it, somewhat as follows:
>>
>> This is the partitioner App.code:
>>
>>     protected void onInit() {
>>         try {
>>                         KeyFinder<Event> quoteKeyFinder = new
>> KeyFinder<Event>() {
>>                                 @Override
>>                                 public List<String> get(final Event
>> event) {
>>                                         // TODO Auto-generated method stub
>>                                         return new ArrayList<String>() {
>>                                                 {
>>
>> add(((MktData)event).getRic());
>>                                                 }
>>                                         };
>>                                 }
>>                         };
>>
>>                         KeyFinder<Event> tradeKeyFinder = new
>> KeyFinder<Event>() {
>>                                 @Override
>>                                 public List<String> get(final Event
>> event) {
>>                                         // TODO Auto-generated method stub
>>                                         return new ArrayList<String>() {
>>                                                 {
>>
>> add(((MktData)event).getRic());
>>                                                 }
>>                                         };
>>                                 }
>>                         };
>>
>>                         RemoteStream quoteStream =
>> createOutputStream("quotedata", quoteKeyFinder);
>>                         RemoteStream tradeStream =
>> createOutputStream("tradedata", tradeKeyFinder);
>>                         PartitionPE partitionPE =
>> createPE(PartitionPE.class);
>>                         partitionPE.setDownstream(quoteStream);
>>                         partitionPE.setDownstream(tradeStream);
>>                         partitionPE.setSingleton(true);
>>                         createInputStream("mktdata", partitionPE);
>>                 } catch (Exception e) {
>>                         // TODO Auto-generated catch block
>>                         throw new RuntimeException();
>>                 }
>> }
>>
>> Now the downstream PE will basically get data keyed by the key defined
>> in the Partitioner App. Pictorially your topoogy will look as follows:
>>
>> Adaptor -> Partitioner -> JoinPE - PrintPE - Shailendra
>> On Wed, Oct 3, 2012 at 9:58 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
>> > Yes, the partition is separated  by the key value.
>> >  If I don't have partition, the count value should be added to 20.
>> > So ...
>> >
>> > This is the code of APP:
>> >          OLAJoinPE joinPE = createPE(OLAJoinPE.class);
>> >             //filterPE.setDownStream(avgSeenStream);
>> >             joinPE.setTimerInterval(10000, TimeUnit.MILLISECONDS);
>> >             Stream<Event> JoinStream = createStream("JoinStream", new
>> > KeyFinder<Event>() {
>> >
>> >                 @Override
>> >                 public List<String> get(final Event event) {
>> >                     String value = "";
>> >                     if(event.containsKey("lineitem")){
>> >                         value = event.get("lineitem");
>> >                     }
>> >                     else if(event.containsKey("order")){
>> >                         value = event.get("order");
>> >                     }
>> >                     //System.out.println("OLA APP: value:"+value);
>> >                     return ImmutableList.of(value);// partition key:
>> static
>> > is one PE
>> >                 }
>> >             }, joinPE);
>> >
>> >
>> >             OLALineitemPE lineitemPE = createPE(OLALineitemPE.class);
>> >             lineitemPE.setDownStream(JoinStream);
>> >             lineitemPE.setSingleton(true);// what is meaning?
>> >             createInputStream("lineitem",lineitemPE);
>> >
>> >             OLAOrderPE ordermPE = createPE(OLAOrderPE.class);
>> >             ordermPE.setDownStream(JoinStream);
>> >             ordermPE.setSingleton(true);// what is meaning?
>> >             ordermPE.setTimerInterval(1000, TimeUnit.MILLISECONDS);
>> >             createInputStream("order",ordermPE);
>> >
>> >
>> >
>> >
>> >
>> >
>> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
>> >>
>> >> Ok. I get it, the default behavior is that both PE's in each partition
>> >> will get all the data. The reason is as follows:
>> >> The data coming in from the adaptor doesn't have the notion of a key
>> >> therefore it sends all the data to the PE's. If you want to partition
>> >> the data then you can front this with a Partitioner PE which gets the
>> >> data from the adaptor and then sends it downstream. Since the data has
>> >> the notion of key therefore each partition won't receive all the data.
>> >> - Shailendra
>> >>
>> >> On Wed, Oct 3, 2012 at 9:42 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
>> >> > Hi, I am running at the testCluster with two adapter.
>> >> > The question is that the list stores all the input data. In fact, it
>> >> > should
>> >> > store part data in each PE.
>> >> > The list in each PE should be independent, not store all the data.
>> >> > How can i implement this?
>> >> >
>> >> > This is the reslut:
>> >> >
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 1||| PEID:1||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 2||| PEID:1||| count:2
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 3||| PEID:1||| count:3
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 4||| PEID:1||| count:4
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 5||| PEID:1||| count:5
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 6||| PEID:1||| count:6
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 7||| PEID:2||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 8||| PEID:3||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 9||| PEID:3||| count:2
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 10||| PEID:3||| count:3
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 11||| PEID:3||| count:4
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 12||| PEID:3||| count:5
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 13||| PEID:3||| count:6
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 14||| PEID:4||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 15||| PEID:5||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 16||| PEID:5||| count:2
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 17||| PEID:5||| count:3
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 18||| PEID:6||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
>> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
>> >> >>
>> >> >> Ok, this looks correct, so then what is the question. Do you have
>> this
>> >> >> app running in more than one partition ? - Shailendra
>> >> >>
>> >> >> On Wed, Oct 3, 2012 at 9:18 PM, 杨定裕 <yangdingyu@gmail.com>
wrote:
>> >> >> > Hi, Shailendra,
>> >> >> > Yes, I have a stream with two event types:lineitem and order.
The
>> >> >> > same
>> >> >> > key
>> >> >> > will be sent to same PE.
>> >> >> >
>> >> >> > this is the code of mine:
>> >> >> > ---------------------------
>> >> >> >
>> >> >> >     private List<Event> T_lineitem = new ArrayList<Event>();
>> >> >> >     private List<Event> T_order = new ArrayList<Event>();
>> >> >> >
>> >> >> >     private long count = 0;
>> >> >> >
>> >> >> >     public void onEvent(Event event) {
>> >> >> >         // in this example, we use the default generic Event
>> type, by
>> >> >> > you
>> >> >> > can also define your own type
>> >> >> >         String value = "";
>> >> >> >         count = count+1;
>> >> >> >         try{
>> >> >> >             value =
>> >> >> > event.get("lineitem").replace("|","&").split("&")[0];
>> >> >> >
>> >> >> >             T_lineitem.add(event);
>> >> >> >             System.out.println("JoinPE------:Table:!|||
>> >> >> > T_lineitem.size():
>> >> >> > "+T_lineitem.size()+"||| PEID:"+getId()+"||| count:"+count);
>> >> >> >         }catch(Exception e){
>> >> >> >
>> >> >> >         }
>> >> >> >         try{
>> >> >> >             value =
>> >> >> > event.get("order").replace("|","&").split("&")[0];
>> >> >> >             T_order.add(event);
>> >> >> >             System.out.println("oinPE------:Table:!|||
>> >> >> > T_order.size():
>> >> >> > "+T_order.size()+"||| PEID:"+getId()+"||| count:"+count);
>> >> >> >
>> >> >> >
>> >> >> >         }catch(Exception e){
>> >> >> >
>> >> >> >         }
>> >> >> > }
>> >> >> > -----------------------------
>> >> >> >
>> >> >> > The output is like this :
>> >> >> > JoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7|||
count:1
>> >> >> > JoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7|||
count:2
>> >> >> >
>> >> >> > That is the problem, count should be more than the size of
list.
>> >> >> >
>> >> >> > Dingyu
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>>
>
>

Mime
View raw message