incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杨定裕 <yangdin...@gmail.com>
Subject Re: How to use S4 to implement Join operator
Date Thu, 04 Oct 2012 04:58:24 GMT
Yes, the partition is separated  by the key value.
 If I don't have partition, the count value should be added to 20.
So ...

This is the code of APP:
         OLAJoinPE joinPE = createPE(OLAJoinPE.class);
            //filterPE.setDownStream(avgSeenStream);
            joinPE.setTimerInterval(10000, TimeUnit.MILLISECONDS);
            Stream<Event> JoinStream = createStream("JoinStream", new
KeyFinder<Event>() {

                @Override
                public List<String> get(final Event event) {
                    String value = "";
                    if(event.containsKey("lineitem")){
                        value = event.get("lineitem");
                    }
                    else if(event.containsKey("order")){
                        value = event.get("order");
                    }
                    //System.out.println("OLA APP: value:"+value);
                    return ImmutableList.of(value);// partition key: static
is one PE
                }
            }, joinPE);


            OLALineitemPE lineitemPE = createPE(OLALineitemPE.class);
            lineitemPE.setDownStream(JoinStream);
            lineitemPE.setSingleton(true);// what is meaning?
            createInputStream("lineitem",lineitemPE);

            OLAOrderPE ordermPE = createPE(OLAOrderPE.class);
            ordermPE.setDownStream(JoinStream);
            ordermPE.setSingleton(true);// what is meaning?
            ordermPE.setTimerInterval(1000, TimeUnit.MILLISECONDS);
            createInputStream("order",ordermPE);





2012/10/4 Shailendra Mishra <shailendrah@gmail.com>

> Ok. I get it, the default behavior is that both PE's in each partition
> will get all the data. The reason is as follows:
> The data coming in from the adaptor doesn't have the notion of a key
> therefore it sends all the data to the PE's. If you want to partition
> the data then you can front this with a Partitioner PE which gets the
> data from the adaptor and then sends it downstream. Since the data has
> the notion of key therefore each partition won't receive all the data.
> - Shailendra
>
> On Wed, Oct 3, 2012 at 9:42 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
> > Hi, I am running at the testCluster with two adapter.
> > The question is that the list stores all the input data. In fact, it
> should
> > store part data in each PE.
> > The list in each PE should be independent, not store all the data.
> > How can i implement this?
> >
> > This is the reslut:
> >
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 1||| PEID:1||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 2||| PEID:1||| count:2
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 3||| PEID:1||| count:3
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 4||| PEID:1||| count:4
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 5||| PEID:1||| count:5
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 6||| PEID:1||| count:6
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 7||| PEID:2||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 8||| PEID:3||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 9||| PEID:3||| count:2
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 10||| PEID:3||| count:3
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 11||| PEID:3||| count:4
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 12||| PEID:3||| count:5
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 13||| PEID:3||| count:6
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 14||| PEID:4||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 15||| PEID:5||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 16||| PEID:5||| count:2
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 17||| PEID:5||| count:3
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 18||| PEID:6||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
> > OLAJoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
> >
> >
> >
> >
> > 2012/10/4 Shailendra Mishra <shailendrah@gmail.com>
> >>
> >> Ok, this looks correct, so then what is the question. Do you have this
> >> app running in more than one partition ? - Shailendra
> >>
> >> On Wed, Oct 3, 2012 at 9:18 PM, 杨定裕 <yangdingyu@gmail.com> wrote:
> >> > Hi, Shailendra,
> >> > Yes, I have a stream with two event types:lineitem and order. The same
> >> > key
> >> > will be sent to same PE.
> >> >
> >> > this is the code of mine:
> >> > ---------------------------
> >> >
> >> >     private List<Event> T_lineitem = new ArrayList<Event>();
> >> >     private List<Event> T_order = new ArrayList<Event>();
> >> >
> >> >     private long count = 0;
> >> >
> >> >     public void onEvent(Event event) {
> >> >         // in this example, we use the default generic Event type, by
> >> > you
> >> > can also define your own type
> >> >         String value = "";
> >> >         count = count+1;
> >> >         try{
> >> >             value =
> >> > event.get("lineitem").replace("|","&").split("&")[0];
> >> >
> >> >             T_lineitem.add(event);
> >> >             System.out.println("JoinPE------:Table:!|||
> >> > T_lineitem.size():
> >> > "+T_lineitem.size()+"||| PEID:"+getId()+"||| count:"+count);
> >> >         }catch(Exception e){
> >> >
> >> >         }
> >> >         try{
> >> >             value = event.get("order").replace("|","&").split("&")[0];
> >> >             T_order.add(event);
> >> >             System.out.println("oinPE------:Table:!||| T_order.size():
> >> > "+T_order.size()+"||| PEID:"+getId()+"||| count:"+count);
> >> >
> >> >
> >> >         }catch(Exception e){
> >> >
> >> >         }
> >> > }
> >> > -----------------------------
> >> >
> >> > The output is like this :
> >> > JoinPE------:Table:!||| T_lineitem.size(): 19||| PEID:7||| count:1
> >> > JoinPE------:Table:!||| T_lineitem.size(): 20||| PEID:7||| count:2
> >> >
> >> > That is the problem, count should be more than the size of list.
> >> >
> >> > Dingyu
> >> >
> >
> >
>

Mime
View raw message