From user-return-35135-apmail-spark-user-archive=spark.apache.org@spark.apache.org Tue Jun 9 15:19:13 2015 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EFCFB181DB for ; Tue, 9 Jun 2015 15:19:13 +0000 (UTC) Received: (qmail 80887 invoked by uid 500); 9 Jun 2015 15:19:09 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 80811 invoked by uid 500); 9 Jun 2015 15:19:09 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 80801 invoked by uid 99); 9 Jun 2015 15:19:09 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 15:19:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1A450C095F for ; Tue, 9 Jun 2015 15:19:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id KOCPFEyMmECZ for ; Tue, 9 Jun 2015 15:18:55 +0000 (UTC) Received: from mail-wi0-f169.google.com (mail-wi0-f169.google.com [209.85.212.169]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 5F9F743DA5 for ; Tue, 9 Jun 2015 15:18:54 +0000 (UTC) Received: by wiga1 with SMTP id a1so21045469wig.0 for ; Tue, 09 Jun 2015 08:18:08 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:cc:content-type; bh=9xokUbBD2s+L4LTV7L5VuYXd7jRd8Puh5Qm5pw05TPM=; b=YRhttCXcAm1w12MaQBsyyJ7OaVxpmyj117gr+hcpI6q4iadJM8o/aB7+IVQnKLFv5s iaxzRH7zN8ZhOqCmsuRoCMBXyvp2GQjy9651ujP4DivjqOogiU4/2FTmcM97GIiZyMDr fYOCmZ4TbYbHpBQIwmQBy8PwN0WX5niXkKSwjlZd0r7NKsfYF4rcLkljNIQNz/AzI87x LC5EzroBH6TuXWDkRhrEZVTeilXxyhMXS5aVsRv8w8rH9PxLU1Z38Z+Xd+r0/WrAXZXL qkBTLgKyNVJQy4JlXbw1REsHn7KTDYaYr4BIW7YrP72aCMwkJK9xy4ShlGdpQ+AAG4zN GKYw== X-Gm-Message-State: ALoCoQk6sPLm9m3ZAC1hVxi4+3KUk4JIkLpGrwd7w3APxNHdEbN++ZLK69ej+GYve5lxGLk0GsQL MIME-Version: 1.0 X-Received: by 10.194.83.70 with SMTP id o6mr43066979wjy.44.1433863088560; Tue, 09 Jun 2015 08:18:08 -0700 (PDT) Received: by 10.27.97.10 with HTTP; Tue, 9 Jun 2015 08:18:08 -0700 (PDT) In-Reply-To: <2090914.9SjQqMW3Ph@madara> References: <2090914.9SjQqMW3Ph@madara> Date: Tue, 9 Jun 2015 11:18:08 -0400 Message-ID: Subject: Re: Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException From: Josh Mahonin To: Jeroen Vlek Cc: "user@spark.apache.org" , user@phoenix.apache.org Content-Type: multipart/alternative; boundary=047d7bb04ade0cf23905181744ec --047d7bb04ade0cf23905181744ec Content-Type: text/plain; charset=UTF-8 This may or may not be helpful for your classpath issues, but I wanted to verify that basic functionality worked, so I made a sample app here: https://github.com/jmahonin/spark-streaming-phoenix This consumes events off a Kafka topic using spark streaming, and writes out event counts to Phoenix using the new phoenix-spark functionality: http://phoenix.apache.org/phoenix_spark.html It's definitely overkill, and would probably be more efficient to use the JDBC driver directly, but it serves as a proof-of-concept. I've only tested this in local mode. To convert it to a full jobs JAR, I suspect that keeping all of the spark and phoenix dependencies marked as 'provided', and including the Phoenix client JAR in the Spark classpath would work as well. Good luck, Josh On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek wrote: > Hi, > > I posted a question with regards to Phoenix and Spark Streaming on > StackOverflow [1]. Please find a copy of the question to this email below > the > first stack trace. I also already contacted the Phoenix mailing list and > tried > the suggestion of setting spark.driver.userClassPathFirst. Unfortunately > that > only pushed me further into the dependency hell, which I tried to resolve > until I hit a wall with an UnsatisfiedLinkError on Snappy. > > What I am trying to achieve: To save a stream from Kafka into > Phoenix/Hbase > via Spark Streaming. I'm using MapR as a platform and the original > exception > happens both on a 3-node cluster, as on the MapR Sandbox (a VM for > experimentation), in YARN and stand-alone mode. Further experimentation > (like > the saveAsNewHadoopApiFile below), was done only on the sandbox in > standalone > mode. > > Phoenix only supports Spark from 4.4.0 onwards, but I thought I could > use a naive implementation that creates a new connection for > every RDD from the DStream in 4.3.1. This resulted in the > ClassNotFoundException described in [1], so I switched to 4.4.0. > > Unfortunately the saveToPhoenix method is only available in Scala. So I did > find the suggestion to try it via the saveAsNewHadoopApiFile method [2] > and an > example implementation [3], which I adapted to my own needs. > > However, 4.4.0 + saveAsNewHadoopApiFile raises the same > ClassNotFoundExeption, just a slightly different stacktrace: > > java.lang.RuntimeException: java.sql.SQLException: ERROR 103 > (08004): Unable to establish connection. > at > > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58) > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995) > at > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to > establish connection. > at > > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386) > at > > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860) > at > > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860) > at > > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) > at > > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131) > at > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) > at > java.sql.DriverManager.getConnection(DriverManager.java:571) > at > java.sql.DriverManager.getConnection(DriverManager.java:187) > at > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92) > at > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80) > at > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68) > at > > org.apache.phoenix.mapreduce.PhoenixRecordWriter.(PhoenixRecordWriter.java:49) > at > > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55) > ... 8 more > Caused by: java.io.IOException: > java.lang.reflect.InvocationTargetException > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457) > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350) > at > > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286) > ... 23 more > Caused by: java.lang.reflect.InvocationTargetException > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455) > ... 26 more > Caused by: java.lang.UnsupportedOperationException: Unable to find > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > at > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36) > at > > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56) > at > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.(HConnectionManager.java:769) > at > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.(HConnectionManager.java:689) > ... 31 more > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:191) > at > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32) > ... 34 more > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > ====== Below is my question from StackOverflow ========== > > I'm trying to connect to Phoenix via Spark and I keep getting the > following exception when opening a connection via the JDBC driver (cut > for brevity, full stacktrace below): > > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > > The class in question is provided by the jar called phoenix- > core-4.3.1.jar (despite it being in the HBase package namespace, I > guess they need it to integrate with HBase). > > There are numerous questions on SO about ClassNotFoundExceptions > on Spark and I've tried the fat-jar approach (both with Maven's > assembly and shade plugins; I've inspected the jars, they **do** > contain ClientRpcControllerFactory), and I've tried a lean jar while > specifying the jars on the command line. For the latter, the command > I used is as follows: > > /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark- > streaming- > > kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics- > core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar -- > class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector > KafkaStreamConsumer.jar node1:5181 0 topic > jdbc:phoenix:node1:5181 true > > I've also done a classpath dump from within the code and the first > classloader in the hierarchy already knows the Phoenix jar: > > 2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO > nl.work.kafkastreamconsumer.phoenix.LinePersister - > [file:/home/work/projects/customer/KafkaStreamConsumer.jar, > file:/home/work/projects/customer/lib/spark-streaming- > kafka_2.10-1.3.1.jar, > file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, > file:/home/work/projects/customer/lib/zkclient-0.3.jar, > file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, > file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, > file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar] > > So the question is: What am I missing here? Why can't Spark load the > correct class? There should be only one version of the class flying > around (namely the one from phoenix-core), so I doubt it's a > versioning conflict. > > [Executor task launch worker-3] ERROR > nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while > processing line > java.lang.RuntimeException: java.sql.SQLException: ERROR 103 > (08004): Unable to establish connection. > at > > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.(PhoenixConnection.java:41) > at > > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40) > at > > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32) > at > > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class. > $plus$plus$eq(Growable.scala:48) > at scala.collection.mutable.ArrayBuffer. > $plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer. > $plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at > scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to > establish connection. > at > > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362) > at > > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810) > at > > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810) > at > > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) > at > > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126) > at > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) > at > java.sql.DriverManager.getConnection(DriverManager.java:571) > at > java.sql.DriverManager.getConnection(DriverManager.java:233) > at > > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.(PhoenixConnection.java:39) > ... 25 more > Caused by: java.io.IOException: > java.lang.reflect.InvocationTargetException > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457) > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350) > at > > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47) > at > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280) > ... 36 more > Caused by: java.lang.reflect.InvocationTargetException > at > sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown > Source) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455) > ... 39 more > Caused by: java.lang.UnsupportedOperationException: Unable to > find > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > at > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36) > at > > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56) > at > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.(HConnectionManager.java:769) > at > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.(HConnectionManager.java:689) > ... 43 more > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at > java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:191) > at > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32) > ... 46 more > > **/edit** > > Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the > classes in question. Since the saveToPhoenix() method is not yet > available for the Java API and since this is just a POC, my idea was to > simply use the JDBC driver for each mini-batch. > > public class PhoenixConnection implements AutoCloseable, > Serializable { > private static final long serialVersionUID = > -4491057264383873689L; > private static final String PHOENIX_DRIVER = > "org.apache.phoenix.jdbc.PhoenixDriver"; > > static { > try { > Class.forName(PHOENIX_DRIVER); > } catch (ClassNotFoundException e) { > throw new RuntimeException(e); > } > } > > private Connection connection; > > public PhoenixConnection(final String jdbcUri) { > > try { > connection = DriverManager.getConnection(jdbcUri); > } catch (SQLException e) { > throw new RuntimeException(e); > } > } > > public List> executeQuery(final String sql) > throws SQLException { > > ArrayList> resultList = new > ArrayList<>(); > try (PreparedStatement statement = > connection.prepareStatement(sql); ResultSet resultSet = > statement.executeQuery() ) { > ResultSetMetaData metaData = > resultSet.getMetaData(); > while (resultSet.next()) { > Map row = new > HashMap<>(metaData.getColumnCount()); > for (int column = 0; column < > metaData.getColumnCount(); ++column) { > final String columnLabel = > metaData.getColumnLabel(column); > row.put(columnLabel, > resultSet.getObject(columnLabel)); > } > } > } > resultList.trimToSize(); > > return resultList; > } > > @Override > public void close() { > try { > connection.close(); > } catch (SQLException e) { > throw new RuntimeException(e); > } > } > > } > > public class LinePersister implements Function, > Void> { > private static final long serialVersionUID = > -2529724617108874989L; > private static final Logger LOGGER = > Logger.getLogger(LinePersister.class); > private static final String TABLE_NAME = "mail_events"; > > private final String jdbcUrl; > > public LinePersister(String jdbcUrl) { > this.jdbcUrl = jdbcUrl; > } > > > > @Override > public Void call(JavaRDD dataSet) throws Exception { > LOGGER.info(String.format( > "Starting conversion on rdd with %d > elements", > dataSet.count())); > > List collectResult = dataSet.map(new > Function() { > > private static final long serialVersionUID = > -6651313541439109868L; > > @Override > public Void call(String line) throws Exception { > LOGGER.info("Writing line " + line); > Event event = EventParser.parseLine(line); > try (PhoenixConnection connection = new > PhoenixConnection( > jdbcUrl)) { > connection.executeQuery(event > > .createUpsertStatement(TABLE_NAME)); > } catch (Exception e) { > LOGGER.error("Error while > processing line", > e); > > dumpClasspath(this.getClass().getClassLoader()); > > } > return null; > } > }).collect(); > > LOGGER.info(String.format("Got %d results: ", > collectResult.size())); > > return null; > } > > public static void dumpClasspath(ClassLoader loader) > { > LOGGER.info("Classloader " + loader + ":"); > > if (loader instanceof URLClassLoader) > { > URLClassLoader ucl = (URLClassLoader)loader; > LOGGER.info(Arrays.toString(ucl.getURLs())); > } > else > LOGGER.error("cannot display components as not a > URLClassLoader)"); > > if (loader.getParent() != null) > dumpClasspath(loader.getParent()); > } > } > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > 4.0.0 > nl.work > KafkaStreamConsumer > 1.0 > jar > > > UTF-8 > 1.7 > 1.7 > 1.3.1 > 4.3.10.Final > 4.4.0-HBase-0.98 > 0.98.9-hadoop2 > 0.0.2-clabs-spark-1.3.1 hbase.version> > > > > org.apache.spark > spark-core_2.10 > ${spark.version} > provided > > > org.apache.spark > spark-streaming_2.10 > ${spark.version} > provided > > > org.apache.spark > spark-streaming-kafka_2.10 > ${spark.version} > provided > > > org.apache.phoenix > phoenix-core > ${phoenix.version} > provided > > > org.apache.phoenix > phoenix-spark > ${phoenix.version} > provided > > > org.apache.hbase > hbase-client > ${hbase.version} > provided > > > com.cloudera > spark-hbase > ${spark-hbase.version} > provided > > > junit > junit > 4.10 > test > > > > > > org.apache.maven.plugins > > maven-compiler-plugin > 3.3 > > > ${maven.compiler.source} > > ${maven.compiler.target} > > > > > > > > unknown-jars-temp-repo > A temporary repository created by NetBeans > for libraries and jars it could not identify. Please replace the > dependencies in this repository with correct ones and delete this > repository. > file:${project.basedir}/lib > > > > > > Cheers, > Jeroen > > > [1] > http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0-hbase-0-98-on-spark-1-3-1-classnotfoundexceptio > [2] https://groups.google.com/forum/#!topic/phoenix-hbase-user/pKnvE1pd_K8 > [3] > https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org > For additional commands, e-mail: user-help@spark.apache.org > > --047d7bb04ade0cf23905181744ec Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
This may or may not be helpful for your classpath issues, = but I wanted to verify that basic functionality worked, so I made a sample = app here:

https://github.com/jmahonin/spark-streaming-phoenix

This c= onsumes events off a Kafka topic using spark streaming, and writes out even= t counts to Phoenix using the new phoenix-spark functionality:
http://phoenix.apache.org= /phoenix_spark.html

It's definitely overkill, and would prob= ably be more efficient to use the JDBC driver directly, but it serves as a = proof-of-concept.

I've only tested this in local mode. To conver= t it to a full jobs JAR, I suspect that keeping all of the spark and phoeni= x dependencies marked as 'provided', and including the Phoenix clie= nt JAR in the Spark classpath would work as well.

Good l= uck,

Josh
On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek <j.vlek@anchormen.nl> wrote:
Hi,

I posted a question with regards to Phoenix and Spark Streaming on
StackOverflow [1]. Please find a copy of the question to this email = below the
first stack trace. I also already contacted the Phoenix mailing list and tr= ied
the suggestion of setting spark.driver.userClassPathFirst. Unfortunately th= at
only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy.

What I am trying to achieve: To save a stream from Kafka into=C2=A0 Phoenix= /Hbase
via Spark Streaming. I'm using MapR as a platform and the original exce= ption
happens both on a 3-node cluster, as on the MapR Sandbox (a VM for
experimentation), in YARN and stand-alone mode. Further experimentation (li= ke
the saveAsNewHadoopApiFile below), was done only on the sandbox in standalo= ne
mode.

Phoenix only supports Spark from 4.4.0 onwards, but I thought I could
use a naive implementation that creates a new connection for
every RDD from the DStream in 4.3.1.=C2=A0 This resulted in the
ClassNotFoundException described in [1], so I switched to 4.4.0.

Unfortunately the saveToPhoenix method is only available in Scala. So I did=
find the suggestion to try it via the saveAsNewHadoopApiFi= le method [2] and an
example implementation [3], which I adapted to my own needs.

However, 4.4.0 + saveAsNewHadoopApiFile=C2=A0 raises the same
ClassNotFoundExeption, just a slightly different stacktrace:

=C2=A0 java.lang.RuntimeException: java.sql.SQLException: ERROR 103
(08004): Unable to establish connection.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.PhoenixOutputFormat.ge= tRecordWriter(PhoenixOutputFormat.java:58)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.sc= ala:995)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunct= ions.scala:979)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.spark.scheduler.Task.run(Task.sca= la:64)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1= 145)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:= 615)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
establish connection.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.exception.SQLExceptionCode$Facto= ry$1.newException(SQLExceptionCode.java:386)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionIn= fo.java:145)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connect= ionQueryServicesImpl.java:288)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQ= ueryServicesImpl.java:171)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQuer= yServicesImpl.java:1881)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQuer= yServicesImpl.java:1860)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.= java:77)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQuerySe= rvicesImpl.java:1860)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDri= ver.java:162)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbedde= dDriver.java:131)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.sql.DriverManager.getConnection(DriverManager.java:571)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.sql.DriverManager.getConnection(DriverManager.java:187)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.util.ConnectionUtil.ge= tConnection(ConnectionUtil.java:92)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Connec= tionUtil.java:80)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Connec= tionUtil.java:68)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecord= Writer.java:49)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(Pho= enixOutputFormat.java:55)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 8 more
Caused by: java.io.IOException:
java.lang.reflect.InvocationTargetException
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.c= reateConnection(HConnectionManager.java:457)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnect= ionManager.java:350)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createCo= nnection(HConnectionFactory.java:47)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(= ConnectionQueryServicesImpl.java:286)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 23 more
Caused by: java.lang.reflect.InvocationTargetException
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcce= ssorImpl.java:57)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegating= ConstructorAccessorImpl.java:45)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnect= ionManager.java:455)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 26 more
Caused by: java.lang.UnsupportedOperationException: Unable to find
org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.util.ReflectionUtils.instan= tiateWithCustomCtor(ReflectionUtils.java:36)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerF= actory.java:56)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation= .<init>(HConnectionManager.java:769)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpleme= ntation.<init>(HConnectionManager.java:689)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 31 more
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(URLClassLoader= .java:366)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(URLClassLoader= .java:355)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.security.AccessController.doPrivileged(= Native Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader.findClass(URLClassLo= ader.java:354)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.ClassLoader.loadClass(ClassLoader.= java:425)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.ClassLoader.loadClass(ClassLoader.= java:358)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Class.forName0(Native Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Class.forName(Class.java:191)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCt= or(ReflectionUtils.java:32)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 34 more

Driver stacktrace:
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGSche= duler.scala:1204)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSche= duler.scala:1193)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(= DAGScheduler.scala:1192)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:= 59)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler.abortStage(= DAGScheduler.scala:1192)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.appl= y(DAGScheduler.scala:693)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed= $1.apply(DAGScheduler.scala:693)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.Option.foreach(Option.scala:236)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFaile= d(DAGScheduler.scala:693)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedu= ler.scala:1393)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DA= GScheduler.scala:1354)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


=3D=3D=3D=3D=3D=3D Below is my question from StackOverflow =3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D

I'm trying to connect to Phoenix via Spark and I keep getting the
following exception when opening a connection via the JDBC driver (cut
for brevity, full stacktrace below):

=C2=A0 =C2=A0 Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(= URLClassLoader.java:366)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(= URLClassLoader.java:355)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.security.AccessController= .doPrivileged(Native Method)

The class in question is provided by the jar called phoenix-
core-4.3.1.jar (despite it being in the HBase package namespace, I
guess they need it to integrate with HBase).

There are numerous questions on SO about ClassNotFoundExceptions
on Spark and I've tried the fat-jar approach (both with Maven's
assembly and shade plugins; I've inspected the jars, they **do**
contain ClientRpcControllerFactory), and I've tried a lean jar while specifying the jars on the command line. For the latter, the command
I used is as follows:

=C2=A0 =C2=A0 /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark= -
streaming-
kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metr= ics-
core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --
class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector
KafkaStreamConsumer.jar node1:5181 0 topic
jdbc:phoenix:node1:5181 true

I've also done a classpath dump from within the code and the first
classloader in the hierarchy already knows the Phoenix jar:

=C2=A0 =C2=A0 2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO<= br> nl.work.kafkastreamconsumer.phoenix.LinePersister -
[file:/home/work/projects/customer/KafkaStreamConsumer.jar,
file:/home/work/projects/customer/lib/spark-streaming-
kafka_2.10-1.3.1.jar,
file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar,
file:/home/work/projects/customer/lib/zkclient-0.3.jar,
file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar,
file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar,
file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]

So the question is: What am I missing here? Why can't Spark load the correct class? There should be only one version of the class flying
around (namely the one from phoenix-core), so I doubt it's a
versioning conflict.

=C2=A0 =C2=A0 [Executor task launch worker-3] ERROR
nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while
processing line
=C2=A0 =C2=A0 java.lang.RuntimeException: java.sql.SQLException: ERROR 103<= br> (08004): Unable to establish connection.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
nl.work.kafkastreamconsumer.phoenix.PhoenixCon= nection.<init>(PhoenixConnection.java:41)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java= :40)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java= :32)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.app= ly(JavaPairRDD.scala:999)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.c= ollection.Iterator$$anon$11.next(Iterator.scala:328)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.Iterator$clas= s.foreach(Iterator.scala:727)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.AbstractItera= tor.foreach(Iterator.scala:1157)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.generic.Growa= ble$class.
$plus$plus$eq(Growable.scala:48)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.mutable.Array= Buffer.
$plus$plus$eq(ArrayBuffer.scala:103)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.mutable.Array= Buffer.
$plus$plus$eq(ArrayBuffer.scala:47)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.TraversableOnce$
class.to(TraversableOnce.scala:273)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.AbstractIterat= or.to(Iterator.scala:1157)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.collection.AbstractItera= tor.toArray(Iterator.scala:1157)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:14= 98)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:14= 98)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.spark.scheduler.Tas= k.run(Task.scala:64)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1= 145)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:= 615)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.ja= va:745)
=C2=A0 =C2=A0 Caused by: java.sql.SQLException: ERROR 103 (08004): Unable t= o
establish connection.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.exception.SQLExceptionCode$Facto= ry$1.newException(SQLExceptionCode.java:362)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionIn= fo.java:133)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connect= ionQueryServicesImpl.java:282)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQ= ueryServicesImpl.java:166)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQuer= yServicesImpl.java:1831)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQuer= yServicesImpl.java:1810)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.= java:77)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQuerySe= rvicesImpl.java:1810)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDri= ver.java:162)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbedde= dDriver.java:126)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.sql.DriverManager.getConnection(DriverManager.java:571)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.sql.DriverManager.getConnection(DriverManager.java:233)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(P= hoenixConnection.java:39)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 25 more
=C2=A0 =C2=A0 Caused by: java.io.IOException:
java.lang.reflect.InvocationTargetException
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.c= reateConnection(HConnectionManager.java:457)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnect= ionManager.java:350)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createCo= nnection(HConnectionFactory.java:47)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(= ConnectionQueryServicesImpl.java:280)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 36 more
=C2=A0 =C2=A0 Caused by: java.lang.reflect.InvocationTargetException
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown
Source)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegating= ConstructorAccessorImpl.java:45)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnect= ionManager.java:455)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 39 more
=C2=A0 =C2=A0 Caused by: java.lang.UnsupportedOperationException: Unable to=
find
org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.util.ReflectionUtils.instan= tiateWithCustomCtor(ReflectionUtils.java:36)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerF= actory.java:56)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation= .<init>(HConnectionManager.java:769)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpleme= ntation.<init>(HConnectionManager.java:689)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 43 more
=C2=A0 =C2=A0 Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(= URLClassLoader.java:366)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.URLClassLoader$1.run(= URLClassLoader.java:355)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.security.AccessController= .doPrivileged(Native Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.ClassLoader.loadClas= s(ClassLoader.java:425)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.ClassLoader.loadClas= s(ClassLoader.java:358)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Class.forName0(Nativ= e Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Class.forName(Class.= java:191)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCt= or(ReflectionUtils.java:32)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 46 mor= e

**/edit**

Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the
classes in question. Since the saveToPhoenix() method is not yet
available for the Java API and since this is just a POC, my idea was to
simply use the JDBC driver for each mini-batch.

=C2=A0 =C2=A0 public class PhoenixConnection implements AutoCloseable,
Serializable {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D<= br> -4491057264383873689L;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final String PHOENIX_DRIVER =3D<= br> "org.apache.phoenix.jdbc.PhoenixDriver";

=C2=A0 =C2=A0 =C2=A0 =C2=A0 static {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 Class.forName(PHOENIX_DRIVER);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (ClassNotFo= undException e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 throw new RuntimeException(e);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 private Connection connection;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 public PhoenixConnection(final String jdbcUri) = {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 connection =3D DriverManager.getConnection(jdbcUri);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (SQLExcepti= on e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 throw new RuntimeException(e);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 public List<Map<String, Object>> ex= ecuteQuery(final String sql)
throws SQLException {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ArrayList<Map<= ;String, Object>> resultList =3D new
ArrayList<>();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try (PreparedStatem= ent statement =3D
connection.prepareStatement(sql); ResultSet resultSet =3D
statement.executeQuery() ) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 ResultSetMetaData metaData =3D
resultSet.getMetaData();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 while (resultSet.next()) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Map<String, Object> row =3D ne= w
HashMap<>(metaData.getColumnCount());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 for (int column =3D 0; column < metaData.getColumnCount(); ++column) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 final St= ring columnLabel =3D
metaData.getColumnLabel(column);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 row.put(= columnLabel,
resultSet.getObject(columnLabel));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 resultList.trimToSi= ze();

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return resultList;<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 public void close() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 connection.close();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (SQLExcepti= on e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 throw new RuntimeException(e);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 public class LinePersister implements Function<JavaRDD<= String>,
Void> {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final long serialVersionUID =3D<= br> -2529724617108874989L;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final Logger LOGGER =3D
Logger.getLogger(LinePersister.class);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private static final String TABLE_NAME =3D &quo= t;mail_events";

=C2=A0 =C2=A0 =C2=A0 =C2=A0 private final String jdbcUrl;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 public LinePersister(String jdbcUrl) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 this.jdbcUrl =3D jd= bcUrl;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }



=C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 public Void call(JavaRDD<String> dataSet)= throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.info(String.= format(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "Starting conversion on rdd wit= h %d elements",
dataSet.count()));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 List<Void> co= llectResult =3D dataSet.map(new
Function<String, Void>() {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 private static final long serialVersionUID =3D
-6651313541439109868L;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 public Void call(String line) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.info("Writing line "= ; + line);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Event event =3D EventParser.parseLin= e(line);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try (PhoenixConnection connection = =3D new
PhoenixConnection(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 jdbcUrl)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 connecti= on.executeQuery(event

.createUpsertStatement(TABLE_NAME));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } catch (Exception e) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.e= rror("Error while processing line",
e);

dumpClasspath(this.getClass().getClassLoader());

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return null;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }).collect();

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.info(String.= format("Got %d results: ",
collectResult.size()));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return null;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 public static void dumpClasspath(ClassLoader lo= ader)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.info("Classloader &qu= ot; + loader + ":");

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (loader instanceof URLClassLoa= der)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 URLClassLoader ucl = =3D (URLClassLoader)loader;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.info(Arrays.= toString(ucl.getURLs()));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 else
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 LOGGER.error("= cannot display components as not a
URLClassLoader)");

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (loader.getParent() !=3D null)=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 dumpClasspath(loade= r.getParent());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 <?xml version=3D"1.0" encoding=3D"UTF-8&quo= t;?>
=C2=A0 =C2=A0 <project xmlns=3D"http://maven.apache.org/POM/4.0.0" xmlns:xsi=3D"http://www.w3.org/2001/XMLSchema-instance"
=C2=A0 =C2=A0 =C2=A0 =C2=A0 xsi:schemaLocation=3D"http://maven.apache.org/POM/4.0= .0
h= ttp://maven.apache.org/xsd/maven-4.0.0.xsd">
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <modelVersion>4.0.0</modelVersion><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 <groupId>nl.work</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <artifactId>KafkaStreamConsumer</artif= actId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <version>1.0</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <packaging>jar</packaging>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding&= gt;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <maven.compiler.= source>1.7</maven.compiler.source>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <maven.compiler.= target>1.7</maven.compiler.target>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <spark.version&g= t;1.3.1</spark.version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <hibernate.versi= on>4.3.10.Final</hibernate.version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <phoenix.version= >4.4.0-HBase-0.98</phoenix.version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <hbase.version&g= t;0.98.9-hadoop2</hbase.version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <spark-hbase.ver= sion>0.0.2-clabs-spark-1.3.1</spark-
hbase.version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 </properties>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependencies>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.spark</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>spark-core_2.10</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${spark.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.spark</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>spark-streaming_2.10</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${spark.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.spark</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>spark-streaming-kafka_2.10</artifactId><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${spark.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.phoenix</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>phoenix-core</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${phoenix.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.phoenix</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>phoenix-spark</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${phoenix.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>org.apache.hbase</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>hbase-client</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${hbase.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>com.cloudera</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>spark-hbase</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>${spark-hbase.version}</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>provided</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <dependency><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <groupId>junit</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <artifactId>junit</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <version>4.10</version>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <scope>test</scope>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependency>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 </dependencies>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <build>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <plugins>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <plugin>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <groupId>org.apache.maven.plug= ins</groupId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <artifactId>maven-compiler-plu= gin</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <version>3.3</version> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <configuration>

<source>${maven.compiler.source}</source>

<target>${maven.compiler.target}</target>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </configuration>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 </plugin>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <!-- <plugin>
<groupId>org.apache.maven.plugins</groupId> <artifactId>m= aven-
shade-plugin</artifactId>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <version>2.3</version> &= lt;executions>
<execution> <phase>package</phase> <goals>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <goal>shade</goal> </= goals> <configuration>
<filters> <filter> <artifact>*:*</artifact>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <excludes> <exclude>META= -INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <exclude>META-INF/*.RSA</ex= clude>
</excludes> </filter> </filters> </configuration> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </execution> </executions&g= t; </plugin> -->
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </plugins> =C2=A0 =C2=A0 =C2=A0 =C2=A0 </build>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 <repositories>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 <repository><= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <id>unknown-jars-temp-repo</id>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <name>A temporary repository created by NetBeans
for libraries and jars it could not identify. Please replace the
dependencies in this repository with correct ones and delete this
repository.</name>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 <url>file:${project.basedir}/lib</url>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 </repository>=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 </repositories>
=C2=A0 =C2=A0 </project>


Cheers,
Jeroen


[1] http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and= -4-4-0-hbase-0-98-on-spark-1-3-1-classnotfoundexceptio
[2] https://groups.google.com/forum/#!topic/phoen= ix-hbase-user/pKnvE1pd_K8
[3] https://gist.github.com/mravi/444afe7f4= 9821819c987#file-phoenixsparkjob-java

---------------------------------------------------------------= ------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


--047d7bb04ade0cf23905181744ec--