incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杨定裕 <yangdin...@gmail.com>
Subject Re: How to use S4 to implement Join operator
Date Thu, 04 Oct 2012 06:46:23 GMT
Hi,
I debug the program and find that List, map or other array class in the PE
are shared in PE.
 You can test it.

2012/10/4 Shailendra Mishra <shailendrah@gmail.com>

> I get all that my point is you need to get the data from Adaptor and
> create key'd data out of it, somewhat as follows:
>
> This is the partitioner App.code:
>
>     protected void onInit() {
>         try {
>                         KeyFinder<Event> quoteKeyFinder = new
> KeyFinder<Event>() {
>                                 @Override
>                                 public List<String> get(final Event event)
> {
>                                         // TODO Auto-generated method stub
>                                         return new ArrayList<String>() {
>                                                 {
>
> add(((MktData)event).getRic());
>                                                 }
>                                         };
>                                 }
>                         };
>
>                         KeyFinder<Event> tradeKeyFinder = new
> KeyFinder<Event>() {
>                                 @Override
>                                 public List<String> get(final Event event)
> {
>                                         // TODO Auto-generated method stub
>                                         return new ArrayList<String>() {
>                                                 {
>
> add(((MktData)event).getRic());
>                                                 }
>                                         };
>                                 }
>                         };
>
>                         RemoteStream quoteStream =
> createOutputStream("quotedata", quoteKeyFinder);
>                         RemoteStream tradeStream =
> createOutputStream("tradedata", tradeKeyFinder);
>                         PartitionPE partitionPE =
> createPE(PartitionPE.class);
>                         partitionPE.setDownstream(quoteStream);
>                         partitionPE.setDownstream(tradeStream);
>                         partitionPE.setSingleton(true);
>                         createInputStream("mktdata", partitionPE);
>                 } catch (Exception e) {
>                         // TODO Auto-generated catch block
>                         throw new RuntimeException();
>                 }
> }
>
> Now the downstream PE will basically get data keyed by the key defined
> in the Partitioner App. Pictorially your topoogy will look as follows:
>
> Adaptor -> Partitioner -> JoinPE - PrintPE - Shailendra
> On Wed, Oct 3, 2012 at 9:58 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
> > Yes, the partition is separated  by the key value.
> >  If I don't have partition, the count value should be added to 20.
> > So ...
> >
> > This is the code of APP:
> >          OLAJoinPE joinPE = createPE(OLAJoinPE.class);
> >             //filterPE.setDownStream(avgSeenStream);
> >             joinPE.setTimerInterval(10000, TimeUnit.MILLISECONDS);
> >             Stream<Event> JoinStream = createStream("JoinStream", new
> > KeyFinder<Event>() {
> >
> >                 @Override
> >                 public List<String> get(final Event event) {
> >                     String value = "";
> >                     if(event.containsKey("lineitem")){
> >                         value = event.get("lineitem");
> >                     }
> >                     else if(event.containsKey("order")){
> >                         value = event.get("order");
> >                     }
> >                     //System.out.println("OLA APP: value:"+value);
> >                     return ImmutableList.of(value);// partition key:
> static
> > is one PE
> >                 }
> >             }, joinPE);
> >
> >
> >             OLALineitemPE lineitemPE = createPE(OLALineitemPE.class);
> >             lineitemPE.setDownStream(JoinStream);
> >             lineitemPE.setSingleton(true);// what is meaning?
> >             createInputStream("lineitem",lineitemPE);
> >
> >             OLAOrderPE ordermPE = createPE(OLAOrderPE.class);
> >             ordermPE.setDownStream(JoinStream);
> >             ordermPE.setSingleton(true);// what is meaning?
> >             ordermPE.setTimerInterval(1000, TimeUnit.MILLISECONDS);
> >             createInputStream("order",ordermPE);
> >
> >
> >
> >
> >
> >
> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
> >>
> >> Ok. I get it, the default behavior is that both PE's in each partition
> >> will get all the data. The reason is as follows:
> >> The data coming in from the adaptor doesn't have the notion of a key
> >> therefore it sends all the data to the PE's. If you want to partition
> >> the data then you can front this with a Partitioner PE which gets the
> >> data from the adaptor and then sends it downstream. Since the data has
> >> the notion of key therefore each partition won't receive all the data.
> >> - Shailendra
> >>
> >> On Wed, Oct 3, 2012 at 9:42 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
> >> > Hi, I am running at the testCluster with two adapter.
> >> > The question is that the list stores all the input data. In fact, it
> >> > should
> >> > store part data in each PE.
> >> > The list in each PE should be independent, not store all the data.
> >> > How can i implement this?
> >> >
> >> > This is the reslut:
> >> >
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 1||| PEID:1||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 2||| PEID:1||| count:2
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 3||| PEID:1||| count:3
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 4||| PEID:1||| count:4
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 5||| PEID:1||| count:5
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 6||| PEID:1||| count:6
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 7||| PEID:2||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 8||| PEID:3||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 9||| PEID:3||| count:2
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 10||| PEID:3||| count:3
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 11||| PEID:3||| count:4
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 12||| PEID:3||| count:5
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 13||| PEID:3||| count:6
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 14||| PEID:4||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 15||| PEID:5||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 16||| PEID:5||| count:2
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 17||| PEID:5||| count:3
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 18||| PEID:6||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
> >> > OLAJoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
> >> >
> >> >
> >> >
> >> >
> >> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
> >> >>
> >> >> Ok, this looks correct, so then what is the question. Do you have
> this
> >> >> app running in more than one partition ? - Shailendra
> >> >>
> >> >> On Wed, Oct 3, 2012 at 9:18 PM, 杨定裕 <yangdingyu@gmail.com>
wrote:
> >> >> > Hi, Shailendra,
> >> >> > Yes, I have a stream with two event types:lineitem and order.
The
> >> >> > same
> >> >> > key
> >> >> > will be sent to same PE.
> >> >> >
> >> >> > this is the code of mine:
> >> >> > ---------------------------
> >> >> >
> >> >> >     private List<Event> T_lineitem = new ArrayList<Event>();
> >> >> >     private List<Event> T_order = new ArrayList<Event>();
> >> >> >
> >> >> >     private long count = 0;
> >> >> >
> >> >> >     public void onEvent(Event event) {
> >> >> >         // in this example, we use the default generic Event type,
> by
> >> >> > you
> >> >> > can also define your own type
> >> >> >         String value = "";
> >> >> >         count = count+1;
> >> >> >         try{
> >> >> >             value =
> >> >> > event.get("lineitem").replace("|","&").split("&")[0];
> >> >> >
> >> >> >             T_lineitem.add(event);
> >> >> >             System.out.println("JoinPE------:Table:!|||
> >> >> > T_lineitem.size():
> >> >> > "+T_lineitem.size()+"||| PEID:"+getId()+"||| count:"+count);
> >> >> >         }catch(Exception e){
> >> >> >
> >> >> >         }
> >> >> >         try{
> >> >> >             value =
> >> >> > event.get("order").replace("|","&").split("&")[0];
> >> >> >             T_order.add(event);
> >> >> >             System.out.println("oinPE------:Table:!|||
> >> >> > T_order.size():
> >> >> > "+T_order.size()+"||| PEID:"+getId()+"||| count:"+count);
> >> >> >
> >> >> >
> >> >> >         }catch(Exception e){
> >> >> >
> >> >> >         }
> >> >> > }
> >> >> > -----------------------------
> >> >> >
> >> >> > The output is like this :
> >> >> > JoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
> >> >> > JoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
> >> >> >
> >> >> > That is the problem, count should be more than the size of list.
> >> >> >
> >> >> > Dingyu
> >> >> >
> >> >
> >> >
> >
> >
>

Mime
View raw message