storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shamsul haque <shams...@gmail.com>
Subject Fwd: unable to push data in kafka queue using storm trident
Date Sat, 05 Apr 2014 10:14:33 GMT
Hi team,

I am making an application to get data from kafka queue and after some
processing push it again in kafka queue in different topic. I got a help
from this URL<https://github.com/quintona/trident-kafka-push/blob/master/src/test/java/com/github/quintona/TestTopology.java>.
But data is not going in kafka queue. Below is my Topology and
KafkaState
(implements State) class. Please tell me where i am doing wrong.

public class TestTridentTopology {

public static Logger logger = Logger.getLogger(TestTridentTopology.class);

public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
    String topologyName = null;
    String[] kafkaTopic = null;
    int length = args.length;
    if (args != null && length > 0) {
        topologyName = args[0];
        kafkaTopic = new String[length - 1];
        for (int i = 1; i < length; i++) {
            kafkaTopic[i - 1] = args[i];
        }
    }
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(5);
    conf.setMaxSpoutPending(5);
    conf.setMaxTaskParallelism(3);
    if (topologyName != null) {
        StormSubmitter.submitTopology(topologyName, conf,
buildTopology(kafkaTopic));
    } else {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, buildTopology(kafkaTopic));
        Utils.sleep(10000);
    }
}

/**
 *
 * @param service
 * @return
 */
public static StormTopology buildTopology(String[] kafkaTopic) {

    try {
        BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers");

        TridentKafkaConfig config = new TridentKafkaConfig(brokerHost,
"testtopic1");
        config.forceStartOffsetTime(-2);
        config.scheme = new SchemeAsMultiScheme(new StringScheme());

        TransactionalTridentKafkaSpout spout = new
TransactionalTridentKafkaSpout(config);

        TridentTopology topology = new TridentTopology();
        TridentState parallelismHint = topology.newStream("feed", spout)
                .shuffle()
                .each(new Fields("str"), new TridentFetcherBolt(), new
Fields("textJSON"))

.partitionPersist(KafkaState.transactional("testtopic2", new
KafkaState.Options()), new KafkaStateUpdater("textJSON"))
                .parallelismHint(1);

        logger.warn("parallelismHint" + parallelismHint);

        return topology.build();
    } catch (Exception e) {
        logger.error("Exception: ", e);
    }
    return null;

}
}


public class KafkaState implements State {

private static final Logger logger = Logger.getLogger(KafkaState.class);

ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();

public static class Options implements Serializable {

    public String zookeeperHost = "127.0.0.1";
    public int zookeeperPort = 2181;
    public String serializerClass = "kafka.serializer.StringEncoder";
    public String kafkaConnect = "127.0.0.1:9092";

    public Options() {
        logger.debug("KafkaState::Options()");
    }

    public Options(String zookeeperHost, int zookeeperPort, String
serializerClass, String topicName) {
        this.zookeeperHost = zookeeperHost;
        this.zookeeperPort = zookeeperPort;
        this.serializerClass = serializerClass;
    }
}

public static StateFactory transactional(String topic, Options options) {
    logger.debug("KafkaState::transactional: " + topic);
    return new Factory(topic, options, true);
}

public static StateFactory nonTransactional(String topic, Options options) {
    return new Factory(topic, options, false);
}

protected static class Factory implements StateFactory {

    private Options options;
    private String topic;
    boolean transactional;

    public Factory(String topic, Options options, boolean transactional) {
        this.options = options;
        this.topic = topic;
        this.transactional = transactional;
    }

    @Override
    public State makeState(Map conf, IMetricsContext metrics,
            int partitionIndex, int numPartitions) {
        return new KafkaState(topic, options, transactional);
    }

}

private Options options;
private String topic;
Producer<String, String> producer;
private boolean transactional;

public KafkaState(String topic, Options options, boolean transactional) {
    this.topic = topic;
    this.options = options;
    this.transactional = transactional;
    Properties props = new Properties();
    props.put("zk.connect", options.zookeeperHost + ":" +
Integer.toString(options.zookeeperPort));
    props.put("serializer.class", options.serializerClass);
    props.put("metadata.broker.list", options.kafkaConnect);

    ProducerConfig config = new ProducerConfig(props);
    producer = new Producer<>(config);
    logger.debug("producer initialized successfully." + producer);
}

@Override
public void beginCommit(Long txid) {
    logger.debug("KafkaState::beginCommit");
    if (messages.size() > 0) {
        throw new RuntimeException("Kafka State is invalid, the
previous transaction didn't flush");
    }
}

public void enqueue(String message) {
    logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message);
    if (transactional) {
        messages.add(message);
    } else {
        sendMessage(message);
    }
}

private void sendMessage(String message) {
    KeyedMessage<String, String> data = new KeyedMessage<String,
String>(topic, message);
    producer.send(data);
}

@Override
public void commit(Long txid) {
    String message = messages.poll();
    logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message);

    while (message != null) {
        sendMessage(message);
        message = messages.poll();
    }
}
}

Thanks

Mime
View raw message