storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Neilson <arsneil...@gmail.com>
Subject Re: [storm-user] Re: BaseAggregator.complete(...) never called
Date Sat, 05 Apr 2014 17:12:15 GMT
I have also run into this issue more recently. I have a topology that runs
fine for ~12-48 hours and then all of a sudden the complete() method in my
BaseAggregator stops getting called entirely. Looking for some guidance:

- Under which conditions would this happen?
- I am using an OpaqueTridentKafkaSpout (
https://github.com/wurstmeister/storm-kafka-0.8-plus/blob/master/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java).
Would a property of opaque spouts maybe cause this?
- Is there anything beyond what is contained in
https://github.com/nathanmarz/storm/wiki/Trident-state#persistentaggregate,
https://github.com/nathanmarz/storm/wiki/Trident-spouts and
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview that I should
know when using aggregators in Trident?


On Fri, Jul 26, 2013 at 9:03 AM, Laurent <lau.thoulon@gmail.com> wrote:

> I'm having the same problem here.
> Actually, it works fine in LocalCluster mode but it doesn't work when i
> deploy it using StormSubmitter.
> Well, that's not totally right, it works the first time it's executed but
> then stops passing through the complete.
>
> I noticed in init that the batch id evolves like that :
> 1:0
> 2:0
> 2:1
> 2:2
> 2:3
> 2:4
> ...
> 2:n
>
> So i see there's something wrong but can't find what.
> Have you noticed the same behavior on your side ?
> Have you had any chance finding where you problem came from ?
>
> I've tried switching to a CombinnerAggregator : same issue.
>
> Regards
> Laurent
>
>
> On Thursday, January 17, 2013 4:57:46 PM UTC+1, Paul Bogen wrote:
>>
>> In my TridentTopology I have a portion where I split a string into words
>> and counts. I then group the tuples by the path of the file they come from
>> and finally I aggregate the groups into a List using a
>> custom BaseAggregator.
>>
>> Relevant section of topology:
>>
>> stream = stream.each(new Fields("content"), new VectorizeFunction(), newFields
>> ("term", "count")).parallelismHint(8);
>> GroupedStream gstream = stream.groupBy(new Fields("path"));
>> Stream = gstream.aggregate(new Fields("term", "weight"), newVectorAggregator
>> (), new Fields("vector")).parallelismHint(8);
>>
>> The problem is the complete method is never called to emit the list.
>>
>> Here is the aggregator:
>>
>> import java.util.concurrent.ConcurrentHashMap;
>> import java.util.concurrent.ConcurrentMap;
>> import backtype.storm.tuple.Values;
>>
>> import storm.trident.operation.BaseAggregator;
>> import storm.trident.operation.TridentCollector;
>> import storm.trident.tuple.TridentTuple;
>>
>> public class VectorAggregator extends BaseAggregator<ConcurrentMap<String,
>> Double>> {
>>
>>     public ConcurrentMap<String, Double> init(Object batchId,
>> TridentCollector collector) {
>>         return new ConcurrentHashMap<>();
>>     }
>>
>>     public void aggregate(ConcurrentMap<String, Double> val,
>> TridentTuple tuple, TridentCollector collector) {
>>         val.put(tuple.getStringByField("term"), tuple.getDoubleByField("
>> weight"));
>>     }
>>
>>     public void complete(ConcurrentMap<String, Double> val,
>> TridentCollector collector) {
>>         collector.emit(new Values(val));
>>     }
>> }
>>
>> What am I doing wrong?
>>
>> plb
>>
>>  --
> You received this message because you are subscribed to the Google Groups
> "storm-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to storm-user+unsubscribe@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>
>

Mime
View raw message