storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shamsul haque <shams...@gmail.com>
Subject Re: unable to push data in kafka queue using storm trident
Date Mon, 07 Apr 2014 11:25:38 GMT
No body have implemented Trident with Kafka yet ?


On Sat, Apr 5, 2014 at 3:44 PM, shamsul haque <shams.hq@gmail.com> wrote:

>
> Hi team,
>
> I am making an application to get data from kafka queue and after some
> processing push it again in kafka queue in different topic. I got a help
> from this URL<https://github.com/quintona/trident-kafka-push/blob/master/src/test/java/com/github/quintona/TestTopology.java>.
But data is not going in kafka queue. Below is my Topology and KafkaState
> (implements State) class. Please tell me where i am doing wrong.
>
> public class TestTridentTopology {
>
> public static Logger logger = Logger.getLogger(TestTridentTopology.class);
>
> public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException
{
>     String topologyName = null;
>     String[] kafkaTopic = null;
>     int length = args.length;
>     if (args != null && length > 0) {
>         topologyName = args[0];
>         kafkaTopic = new String[length - 1];
>         for (int i = 1; i < length; i++) {
>             kafkaTopic[i - 1] = args[i];
>         }
>     }
>     Config conf = new Config();
>     conf.setDebug(false);
>     conf.setNumWorkers(5);
>     conf.setMaxSpoutPending(5);
>     conf.setMaxTaskParallelism(3);
>     if (topologyName != null) {
>         StormSubmitter.submitTopology(topologyName, conf, buildTopology(kafkaTopic));
>     } else {
>         LocalCluster cluster = new LocalCluster();
>         cluster.submitTopology("test", conf, buildTopology(kafkaTopic));
>         Utils.sleep(10000);
>     }
> }
>
> /**
>  *
>  * @param service
>  * @return
>  */
> public static StormTopology buildTopology(String[] kafkaTopic) {
>
>     try {
>         BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers");
>
>         TridentKafkaConfig config = new TridentKafkaConfig(brokerHost, "testtopic1");
>         config.forceStartOffsetTime(-2);
>         config.scheme = new SchemeAsMultiScheme(new StringScheme());
>
>         TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(config);
>
>         TridentTopology topology = new TridentTopology();
>         TridentState parallelismHint = topology.newStream("feed", spout)
>                 .shuffle()
>                 .each(new Fields("str"), new TridentFetcherBolt(), new Fields("textJSON"))
>                 .partitionPersist(KafkaState.transactional("testtopic2", new KafkaState.Options()),
new KafkaStateUpdater("textJSON"))
>                 .parallelismHint(1);
>
>         logger.warn("parallelismHint" + parallelismHint);
>
>         return topology.build();
>     } catch (Exception e) {
>         logger.error("Exception: ", e);
>     }
>     return null;
>
> }
> }
>
>
> public class KafkaState implements State {
>
> private static final Logger logger = Logger.getLogger(KafkaState.class);
>
> ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();
>
> public static class Options implements Serializable {
>
>     public String zookeeperHost = "127.0.0.1";
>     public int zookeeperPort = 2181;
>     public String serializerClass = "kafka.serializer.StringEncoder";
>     public String kafkaConnect = "127.0.0.1:9092";
>
>     public Options() {
>         logger.debug("KafkaState::Options()");
>     }
>
>     public Options(String zookeeperHost, int zookeeperPort, String serializerClass, String
topicName) {
>         this.zookeeperHost = zookeeperHost;
>         this.zookeeperPort = zookeeperPort;
>         this.serializerClass = serializerClass;
>     }
> }
>
> public static StateFactory transactional(String topic, Options options) {
>     logger.debug("KafkaState::transactional: " + topic);
>     return new Factory(topic, options, true);
> }
>
> public static StateFactory nonTransactional(String topic, Options options) {
>     return new Factory(topic, options, false);
> }
>
> protected static class Factory implements StateFactory {
>
>     private Options options;
>     private String topic;
>     boolean transactional;
>
>     public Factory(String topic, Options options, boolean transactional) {
>         this.options = options;
>         this.topic = topic;
>         this.transactional = transactional;
>     }
>
>     @Override
>     public State makeState(Map conf, IMetricsContext metrics,
>             int partitionIndex, int numPartitions) {
>         return new KafkaState(topic, options, transactional);
>     }
>
> }
>
> private Options options;
> private String topic;
> Producer<String, String> producer;
> private boolean transactional;
>
> public KafkaState(String topic, Options options, boolean transactional) {
>     this.topic = topic;
>     this.options = options;
>     this.transactional = transactional;
>     Properties props = new Properties();
>     props.put("zk.connect", options.zookeeperHost + ":" + Integer.toString(options.zookeeperPort));
>     props.put("serializer.class", options.serializerClass);
>     props.put("metadata.broker.list", options.kafkaConnect);
>
>     ProducerConfig config = new ProducerConfig(props);
>     producer = new Producer<>(config);
>     logger.debug("producer initialized successfully." + producer);
> }
>
> @Override
> public void beginCommit(Long txid) {
>     logger.debug("KafkaState::beginCommit");
>     if (messages.size() > 0) {
>         throw new RuntimeException("Kafka State is invalid, the previous transaction
didn't flush");
>     }
> }
>
> public void enqueue(String message) {
>     logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message);
>     if (transactional) {
>         messages.add(message);
>     } else {
>         sendMessage(message);
>     }
> }
>
> private void sendMessage(String message) {
>     KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic,
message);
>     producer.send(data);
> }
>
> @Override
> public void commit(Long txid) {
>     String message = messages.poll();
>     logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message);
>
>     while (message != null) {
>         sendMessage(message);
>         message = messages.poll();
>     }
> }
> }
>
> Thanks
>
>
>
>

Mime
View raw message