incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Schneider <another.martin.schnei...@gmail.com>
Subject Re: Forwarding events to multipe PEs
Date Tue, 18 Mar 2014 10:05:51 GMT
Hi,

many thanks for your help!

I expected what you suggested. Good to know that this should work!

I am actually implementing the linear road benchmark? Why are you asking?
Do you have questions or advice?

Best, Martin




2014-03-18 11:00 GMT+01:00 Matthieu Morel <mmorel@apache.org>:

> Hi,
>
> apparently you are creating 2 subscriptions for the same external data
> stream. This is currently not supported out of the box.
> I think the second subscription is overriding the first one. You should be
> able to verify that using the CLI tools and displaying the state of the
> cluster and streams.
>
> Ways around that could be to:
> - use different streams for different kinds of events
> - use 1 PE prototype to subscribe to the "LinearRoad" stream and handle
> processing and possible re-dispatch there
> - modify the S4 platform code to handle this case
>
> Hope this helps.
>
> By the way are you implementing the linear road benchmark
> http://www.cs.brandeis.edu/~linearroad/ ?
>
>
> Regards,
>
> Matthieu
>
>
> On Mar 17, 2014, at 20:53 , Martin Schneider <
> another.martin.schneider@gmail.com> wrote:
>
> Hi,
>
> I would like to forward an event to multiple PEs. Therefore, I tried to
> use the following code in the InputAdapter:
>
>                          PositionReportEvent positionReport = newPositionReportEvent(line);
>
>
>                         String vehicleId =
> Integer.valueOf(positionReport.getVehicleId()).toString();
>
>                         positionReport.put("vehicleId", String.class,
> vehicleId);
>
>                         positionReport.put("uniqueSegment", String.class,
> positionReport.getUniqueSegment());
>
>                         getRemoteStream().put(positionReport);
>
>
>                         AddCarToSegmentEvent addCar = newAddCarToSegmentEvent(positionReport);
>
>                         addCar.put("uniqueSegment", String.class,
> addCar.getUniqueSegment());
>
>                         addCar.put("vehicleId", String.class, vehicleId);
>
>                         getRemoteStream().put(addCar);
>
>
>                         RemoveCarFromSegmentEvent removeCar = newRemoveCarFromSegmentEvent(positionReport);
>
>                         removeCar.put("uniqueSegment", String.class,
> removeCar.getPreviousUniqueSegment());
>
>                         removeCar.put("vehicleId", String.class,
> vehicleId);
>
>                         getRemoteStream().put(removeCar);
>
>
>                         }
>
>
> and the following code in the App
>
>     @Override
>
>     protected void onInit() {
>
>     CarsInSegmentCounterPE carSegmentCounterPE =
> createPE(CarsInSegmentCounterPE.class);
>
>     VehicleSpeedPE vehSpeedPE = createPE(VehicleSpeedPE.class);
>
>         // Create a stream that listens to the "names" stream and passes
> events to the helloPE instance.
>
>
>
>         createInputStream("LinearRoad", newKeyFinder<PositionReportEvent>() {
>
>
>             @Override
>
>             public List<String> get(PositionReportEvent event) {
>
>                 return Arrays.asList(new String[] { event.get("vehicleId")
> });
>
>             }
>
>         }, vehSpeedPE);
>
>
>         createInputStream("LinearRoad", newKeyFinder<PositionReportEvent>() {
>
>
>         @Override
>
>         public List<String> get(PositionReportEvent event) {
>
>          return Arrays.asList(new String[] { event.get("uniqueSegment")
> });
>
>         }
>
>         }, carSegmentCounterPE);
>
>     }
>
>
> However, just the second PE (here: carSegmentCounterPE) receives and
> processes event. What I am doing wrong?
>
> Thanks in advance.
>
>
> Best, Martin
>
>
>

Mime
View raw message