No body have implemented Trident with Kafka yet ?


On Sat, Apr 5, 2014 at 3:44 PM, shamsul haque <shams.hq@gmail.com> wrote:

Hi team,

I am making an application to get data from kafka queue and after some processing push it again in kafka queue in different topic. I got a help from this URL . But data is not going in kafka queue. Below is my Topology and KafkaState (implements State) class. Please tell me where i am doing wrong.

public class TestTridentTopology {

public static Logger logger = Logger.getLogger(TestTridentTopology.class);

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    String topologyName = null;
    String[] kafkaTopic = null;
    int length = args.length;
    if (args != null && length > 0) {
        topologyName = args[0];
        kafkaTopic = new String[length - 1];
        for (int i = 1; i < length; i++) {
            kafkaTopic[i - 1] = args[i];
        }
    }
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(5);
    conf.setMaxSpoutPending(5);
    conf.setMaxTaskParallelism(3);
    if (topologyName != null) {
        StormSubmitter.submitTopology(topologyName, conf, buildTopology(kafkaTopic));
    } else {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, buildTopology(kafkaTopic));
        Utils.sleep(10000);
    }
}

/**
 *
 * @param service
 * @return
 */
public static StormTopology buildTopology(String[] kafkaTopic) {

    try {
        BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers");

        TridentKafkaConfig config = new TridentKafkaConfig(brokerHost, "testtopic1");
        config.forceStartOffsetTime(-2);
        config.scheme = new SchemeAsMultiScheme(new StringScheme());

        TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(config);

        TridentTopology topology = new TridentTopology();
        TridentState parallelismHint = topology.newStream("feed", spout)
                .shuffle()
                .each(new Fields("str"), new TridentFetcherBolt(), new Fields("textJSON"))
                .partitionPersist(KafkaState.transactional("testtopic2", new KafkaState.Options()), new KafkaStateUpdater("textJSON"))
                .parallelismHint(1);

        logger.warn("parallelismHint" + parallelismHint);

        return topology.build();
    } catch (Exception e) {
        logger.error("Exception: ", e);
    }
    return null;

}
}


public class KafkaState implements State { private static final Logger logger = Logger.getLogger(KafkaState.class); ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>(); public static class Options implements Serializable { public String zookeeperHost = "127.0.0.1"; public int zookeeperPort = 2181; public String serializerClass = "kafka.serializer.StringEncoder"; public String kafkaConnect = "127.0.0.1:9092"; public Options() { logger.debug("KafkaState::Options()"); } public Options(String zookeeperHost, int zookeeperPort, String serializerClass, String topicName) { this.zookeeperHost = zookeeperHost; this.zookeeperPort = zookeeperPort; this.serializerClass = serializerClass; } } public static StateFactory transactional(String topic, Options options) { logger.debug("KafkaState::transactional: " + topic); return new Factory(topic, options, true); } public static StateFactory nonTransactional(String topic, Options options) { return new Factory(topic, options, false); } protected static class Factory implements StateFactory { private Options options; private String topic; boolean transactional; public Factory(String topic, Options options, boolean transactional) { this.options = options; this.topic = topic; this.transactional = transactional; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return new KafkaState(topic, options, transactional); } } private Options options; private String topic; Producer<String, String> producer; private boolean transactional; public KafkaState(String topic, Options options, boolean transactional) { this.topic = topic; this.options = options; this.transactional = transactional; Properties props = new Properties(); props.put("zk.connect", options.zookeeperHost + ":" + Integer.toString(options.zookeeperPort)); props.put("serializer.class", options.serializerClass); props.put("metadata.broker.list", options.kafkaConnect); ProducerConfig config = new ProducerConfig(props); producer = new Producer<>(config); logger.debug("producer initialized successfully." + producer); } @Override public void beginCommit(Long txid) { logger.debug("KafkaState::beginCommit"); if (messages.size() > 0) { throw new RuntimeException("Kafka State is invalid, the previous transaction didn't flush"); } } public void enqueue(String message) { logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message); if (transactional) { messages.add(message); } else { sendMessage(message); } } private void sendMessage(String message) { KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message); producer.send(data); } @Override public void commit(Long txid) { String message = messages.poll(); logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message); while (message != null) { sendMessage(message); message = messages.poll(); } } }

Thanks