incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthieu Morel <mmo...@apache.org>
Subject Re: Forwarding events to multipe PEs
Date Tue, 18 Mar 2014 10:00:10 GMT
Hi,

apparently you are creating 2 subscriptions for the same external data stream. This is currently
not supported out of the box. 
I think the second subscription is overriding the first one. You should be able to verify
that using the CLI tools and displaying the state of the cluster and streams.

Ways around that could be to:
- use different streams for different kinds of events
- use 1 PE prototype to subscribe to the "LinearRoad" stream and handle processing and possible
re-dispatch there
- modify the S4 platform code to handle this case

Hope this helps.

By the way are you implementing the linear road benchmark http://www.cs.brandeis.edu/~linearroad/
?


Regards,

Matthieu


On Mar 17, 2014, at 20:53 , Martin Schneider <another.martin.schneider@gmail.com> wrote:

> Hi,
> 
> I would like to forward an event to multiple PEs. Therefore, I tried to use the following
code in the InputAdapter:
> 
>                         	PositionReportEvent positionReport = new PositionReportEvent(line);
> 
> 
> 
>                         String vehicleId = Integer.valueOf(positionReport.getVehicleId()).toString();
> 
>                         positionReport.put("vehicleId", String.class, vehicleId);
> 
>                         positionReport.put("uniqueSegment", String.class, positionReport.getUniqueSegment());
> 
>                         getRemoteStream().put(positionReport);
> 
>                         	
>                         AddCarToSegmentEvent addCar = new AddCarToSegmentEvent(positionReport);
> 
>                         addCar.put("uniqueSegment", String.class, addCar.getUniqueSegment());
> 
>                         addCar.put("vehicleId", String.class, vehicleId);
> 
>                         getRemoteStream().put(addCar);
> 
>                         	
>                         RemoveCarFromSegmentEvent removeCar = new RemoveCarFromSegmentEvent(positionReport);
> 
>                         removeCar.put("uniqueSegment", String.class, removeCar.getPreviousUniqueSegment());
> 
>                         removeCar.put("vehicleId", String.class, vehicleId);
> 
>                         getRemoteStream().put(removeCar);
> 
>                         	
>                         }
> 
> 
> 
> and the following code in the App
> 
>     @Override
> 
>     protected void onInit() {
> 
>     CarsInSegmentCounterPE carSegmentCounterPE = createPE(CarsInSegmentCounterPE.class);
> 
>     VehicleSpeedPE vehSpeedPE = createPE(VehicleSpeedPE.class);
> 
>         // Create a stream that listens to the "names" stream and passes events to the
helloPE instance.
> 
>         
>         
>         createInputStream("LinearRoad", new KeyFinder<PositionReportEvent>() {
> 
> 
> 
>             @Override
> 
>             public List<String> get(PositionReportEvent event) {
> 
>                 return Arrays.asList(new String[] { event.get("vehicleId") });
> 
>             }
> 
>         }, vehSpeedPE);
> 
>         
>         createInputStream("LinearRoad", new KeyFinder<PositionReportEvent>() {
> 
>         	
>         @Override
> 
>         public List<String> get(PositionReportEvent event) {
> 
>         return Arrays.asList(new String[] { event.get("uniqueSegment") });
> 
>         }
> 
>         }, carSegmentCounterPE);
> 
> 
>     }
> 
> 
> 
> However, just the second PE (here: carSegmentCounterPE) receives and processes event.
What I am doing wrong?
> 
> Thanks in advance.
> 
> 
> 
> Best, Martin
> 


Mime
View raw message