storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan Leung <ncle...@gmail.com>
Subject Re: Storm Field Grouping with specific fields
Date Mon, 23 Feb 2015 13:00:45 GMT
You can put user and host in separate tuple fields and do fields grouping
on those fields.
On Feb 23, 2015 6:18 AM, "Vineet Mishra" <clearmidoubt@gmail.com> wrote:

> I tried looking for a solution and could find this, CustomStreamGrouping
>
> I guess this should help me out, but I am getting an exception while
> implementing this.
>
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException at
> clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
> clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
> clojure.lang.APersistentVector.get(APersistentVector.java:171) at
> com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
> at
> backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
> at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
> backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
> at
> backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
> at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
> backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
> at
> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
> at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
> at
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ... 6 more
>
> Let me know who has even faced the same issue.
>
> On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <clearmidoubt@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I am having a topology with Kafka Spout Implementation with the
>> topologyBuilder mentioned below,
>>
>>         TopologyBuilder builder=new TopologyBuilder();
>>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>>         builder.setBolt("Parser", new
>> ParserBolt()).globalGrouping("KafkaSpout");
>>         builder.setBolt("FileBolt", new
>> PersistBolt()).globalGrouping("Parser");
>>
>>         Config config=new Config();
>>         config.put(Config.TOPOLOGY_WORKERS, 4);
>>         config.setNumWorkers(2);
>>         config.setMaxSpoutPending(10);
>>         config.setMaxTaskParallelism(10);
>>
>> I am having two level of Bolts,
>>
>> 1) Parser - Parsing of data and emitting a output tuple value which is
>> containing POJO serialized object
>> 2) Persist - Persisting of the forwarded data after some computation,
>> which is received through previous bolt(Parser).
>>
>> Now I was looking out a way for the last PersistBolt("FileBolt") I want
>> the field grouping on the parser bolt based on the some field value(POJO)
>> which is being emitted.
>>
>>
>> To make it more clear,
>>
>> Parser is emitting a POJO of the form,
>>
>> collector.emit(new Values(responseHandler));
>>
>> where responseHandler is a POJO,
>>
>> public class ResponseHandler implements Serializable{
>>
>> private String host = null;
>> private String user = null;
>> private String msg = null;
>>  public String getHost() {
>> return host;
>> }
>> public void setHost(String host) {
>> this.host = host;
>> }
>> public String getUser() {
>> return hostName;
>> }
>> public void setuser(String user) {
>> this.user = user;
>> }
>> public String getMsg() {
>> return msg;
>> }
>> public void setMsg(String msg) {
>> this.msg = msg;
>> }
>>  }
>>
>> Now I was looking out for a way to field group on the host and user level.
>>
>> Actively looking for the way around!
>>
>> Thanks!
>>
>
>

Mime
View raw message