Hi all,

 

I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase:

 

        @Override

        void call(Iterator<Metric> it) throws Exception {

            HConnection hConnection = null;

            HTableInterface htable = null;

            try {

                hConnection = HConnectionManager.createConnection(_conf.value());

                htable = hConnection.getTable(_tablePrefix + "_" + new SimpleDateFormat("yyyy_MM_dd").format(new Date()));

                htable.setAutoFlush(false, true);

                while (it.hasNext()) {

                    Metric metric = it.next();

                    htable.put(_put.call(metric));

                }

                htable.flushCommits();

            }finally{

                try {

                    if (htable != null) {

                        htable.close();

                    }

                } catch (Exception e) {

                    System.err.println("error closing htable");

                    System.err.println(e.toString());

                }

                try {

                    if (hConnection != null) {

                        hConnection.close();

                    }

                } catch (Exception e) {

                    System.err.println("error closing hConnection");

                    System.err.println(e.toString());

                }

            }

        }

 

I use Kafka receiver to create input stream.

KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER());

 

With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch.

 

The Executor threads stayed in TIMED_WAITING state:

Thread 54: Executor task launch worker-0 (TIMED_WAITING)

java.lang.Thread.sleep(Native Method)

org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296)

org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090)

org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047)

org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365)

org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310)

org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971)

org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954)

org.apache.hadoop.hbase.client.HTable.put(HTable.java:915)

com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125)

com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42)

com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35)

org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)

org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195)

org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)

org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

org.apache.spark.scheduler.Task.run(Task.scala:56)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)

 

KafkaMessageHandler thread is in WAITING state

Thread 70: KafkaMessageHandler-0 (WAITING)

sun.misc.Unsafe.park(Native Method)

java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)

java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)

java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

kafka.consumer.ConsumerIterator.makeNext(Unknown Source)

kafka.consumer.ConsumerIterator.makeNext(Unknown Source)

kafka.utils.IteratorTemplate.maybeComputeNext(Unknown Source)

kafka.utils.IteratorTemplate.hasNext(Unknown Source)

org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:132)

java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

java.util.concurrent.FutureTask.run(FutureTask.java:262)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)

 

Do anybody have similar issues or know how to solve this? I am using Hadoop 2.5.2 with Hbase 0.98.8.

 

Thanks very much,

Max