storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bharat Jayaraman Karthick <bhara...@teclever.com>
Subject Unable to merge DRPC Spout with any other spout using Trident
Date Fri, 29 Apr 2016 17:09:48 GMT
Hi,

We have a use case which requires a bolt consuming streams emitted by Kafka
spout & a DRPC spout. I used TridentTopology and tried to merge the streaam
but got error message "Cannot join DRPC stream with streams originating
from other spouts".

To check i used TopologyBuilder to merge these two streams and was able to
merge tuples / group the tuples emitted by these two streams.

Can you help me understand why TridentTopology throws the error message
when we try to merge DRPC stream with any other streaam emitted by any
spout.

For Trident, i used the following topology

    TridentTopology topology = new TridentTopology();
    SkuUpdatesKafkaEmulateSpout spout = new SkuUpdatesKafkaEmulateSpout(10);
    Stream kafkaStream = topology.newStream("kafka_stream", spout);
    Stream drpcStream = topology.newDRPCStream("drpc_stream", drpc)
            .each(new Fields("args"), new DRPC_ArgsSplit(), new
Fields("sku", "new_value"));
    Stream merged = topology.merge(kafkaStream, drpcStream);
    merged.persistentAggregate(new MemoryMapState.Factory(), new
Fields("sku"), new Sum(), new Fields("value"))
            .parallelismHint(2);
    return topology.build();

For TopologyBuilder, i used the following topology

TopologyBuilder builder = new TopologyBuilder();

Config conf = new Config();
conf.setDebug(true);
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout1 = new DRPCSpout("processOrder", drpc);
DemoSpout spout2 = new DemoSpout();

builder.setSpout("drpc", spout1);
builder.setSpout("demospout", spout2);
builder.setBolt("scraperBolt", new
XMLScrapperBolt()).shuffleGrouping("drpc").shuffleGrouping("demospout");
builder.setBolt("returnBolt", new
DRPCBolt()).shuffleGrouping("scraperBolt", "drpc-stream");
builder.setBolt("return", new
ReturnResults()).shuffleGrouping("returnBolt");

Regards,
Bharat Karthick J

Mime
View raw message