incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Schneider <another.martin.schnei...@gmail.com>
Subject Forwarding events to multipe PEs
Date Mon, 17 Mar 2014 19:53:18 GMT
Hi,

I would like to forward an event to multiple PEs. Therefore, I tried to use
the following code in the InputAdapter:

                         PositionReportEvent positionReport =
newPositionReportEvent(line);


                        String vehicleId =
Integer.valueOf(positionReport.getVehicleId()).toString();

                        positionReport.put("vehicleId", String.class,
vehicleId);

                        positionReport.put("uniqueSegment", String.class,
positionReport.getUniqueSegment());

                        getRemoteStream().put(positionReport);



                        AddCarToSegmentEvent addCar =
newAddCarToSegmentEvent(positionReport);

                        addCar.put("uniqueSegment", String.class,
addCar.getUniqueSegment());

                        addCar.put("vehicleId", String.class, vehicleId);

                        getRemoteStream().put(addCar);



                        RemoveCarFromSegmentEvent removeCar =
newRemoveCarFromSegmentEvent(positionReport);

                        removeCar.put("uniqueSegment", String.class,
removeCar.getPreviousUniqueSegment());

                        removeCar.put("vehicleId", String.class, vehicleId);

                        getRemoteStream().put(removeCar);



                        }


and the following code in the App

    @Override

    protected void onInit() {

    CarsInSegmentCounterPE carSegmentCounterPE =
createPE(CarsInSegmentCounterPE.class);

    VehicleSpeedPE vehSpeedPE = createPE(VehicleSpeedPE.class);

        // Create a stream that listens to the "names" stream and passes
events to the helloPE instance.





        createInputStream("LinearRoad", newKeyFinder<PositionReportEvent>() {


            @Override

            public List<String> get(PositionReportEvent event) {

                return Arrays.asList(new String[] { event.get("vehicleId")
});

            }

        }, vehSpeedPE);



        createInputStream("LinearRoad", newKeyFinder<PositionReportEvent>() {



        @Override

        public List<String> get(PositionReportEvent event) {

         return Arrays.asList(new String[] { event.get("uniqueSegment") });

        }

        }, carSegmentCounterPE);

    }


However, just the second PE (here: carSegmentCounterPE) receives and
processes event. What I am doing wrong?

Thanks in advance.


Best, Martin

Mime
View raw message