kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <g...@confluent.io>
Subject Re: Kafka and Spark Issue
Date Mon, 02 Nov 2015 17:46:02 GMT
Since the error is from the HBase client and completely unrelated to Kafka,
you will have better luck in the HBase mailing list.

On Mon, Nov 2, 2015 at 9:16 AM, Nikhil Gs <gsnikhil1432010@gmail.com> wrote:

> Hello Team,
>
> My scenario is to load the data from producer topic to Hbase by using Spark
> API. Our cluster is Kerberos authenticated and when we running the below
> kafkaToHbase.java and the error which I am facing is also below. Let me
> know if anyone can have any idea what can be done.
>
> package com.spark.example;
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.MasterNotRunningException;
> import org.apache.hadoop.hbase.client.HBaseAdmin;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.security.UserGroupInformation;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat;
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.StorageLevels;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
>
> import scala.Tuple2;
>
> public class KafkaToHbase {
>
> private final static String rawTableName = "kafkatest1";
>
>     //final static String user = "a_rakesh.samineni@SUDDENLINK.CEQUEL3.COM
> ";
>     //sfinal static String keyPath =
> "/home/a_rakesh.samineni/a_rakesh.samineni.keytab";
>
> @SuppressWarnings({ "serial", "deprecation" })
> public static void main(String[] args) throws Exception {
> // // Authenticating Kerberos principal
> //        System.out.println("Principal Authentication: ");
> //        try {
> //
> //            UserGroupInformation.loginUserFromKeytab(user, keyPath);
> //        }
> //        catch(Exception e){
> //         e.printStackTrace();
> //        }
>
>
> //1. Create the spark streaming context with a 10 second batch size
> SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest");
> sparkConf.set("spark.driver.allowMultipleContexts", "true");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> int numThreads = Integer.parseInt(args[3]);
> Map<String, Integer> topicMap = new HashMap<String, Integer>();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
> topicMap.put(topic, numThreads);
> }
>
> //System.out.println("creating stream..");
> JavaPairReceiverInputDStream<String, String> messages =
> KafkaUtils.createStream(ssc, args[0], args[1], topicMap,
> StorageLevels.MEMORY_AND_DISK_SER);
> //messages.print();
>          //System.out.println("messages"+messages);
>
> //3. create connection with HBase
>    Configuration config = null;
> try {
> System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
> //System.out.println("in hbase configuraiton");
> //UserGroupInformation.loginUserFromKeytab(user, keyPath);
> config = HBaseConfiguration.create();
> config.set("hadoop.security.authentication","kerberos");
> config.set("hbase.rpc.protection", "privacy");
> config.addResource(new
> org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml"));
>        config.addResource(new
> org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"));
> config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02,
> sdldalplhdm01");
> config.set("hbase.zookeeper.property.clientPort","2181");
> UserGroupInformation.setConfiguration(config);
> //System.out.println("after hbase configuration");
> UserGroupInformation.loginUserFromKeytab("
> a_nikhil.gopishetti@SUDDENLINK.CEQUEL3.COM",
> "/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab");
> //config.set("hbase.master", "10.48.210.248:60010");
> HBaseAdmin.checkHBaseAvailable(config);
> //System.out.println("HBase is running!");
> }
> catch (MasterNotRunningException e) {
> System.out.println("HBase is not running!");
> System.exit(1);
> }catch (Exception ce){
> ce.printStackTrace();
> }
>
> //config.set(TableInputFormat.INPUT_TABLE, rawTableName);
>
> //4. new Hadoop API configuration
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
>
> });
> JavaPairDStream<ImmutableBytesWritable, Put> hbasePuts= lines.mapToPair(
> new PairFunction<String, ImmutableBytesWritable, Put>(){
>
> @SuppressWarnings("deprecation")
> public Tuple2<ImmutableBytesWritable, Put> call(String line) {
> Put put = new Put(Bytes.toBytes(Math.random()));
> put.add(Bytes.toBytes("cf12"), Bytes.toBytes("firstColumn"),
> Bytes.toBytes(line));
> return new Tuple2<ImmutableBytesWritable, Put>(new
> ImmutableBytesWritable(), put);
> }
> });
>
> final Job newAPIJobConfiguration1 = Job.getInstance(config);
> hbasePuts.foreachRDD(new Function<JavaPairRDD<ImmutableBytesWritable, Put>,
> Void>() {
> public Void call(JavaPairRDD<ImmutableBytesWritable, Put>
> hbasePutJavaPairRDD) throws Exception {
>
>
> newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> rawTableName);
>
> newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
>
> hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
> //hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(config);
> return null;
> }
> });
>
> ssc.start();
> //ssc.awaitTerminationOrTimeout(10000000);
> ssc.awaitTermination();
> ssc.stop();
> }
>
> }
>
> *Error:*
>
>
>
> Below are the log lines which is blocking us.
>
>
>
> 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Opening socket connection to
> server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181. Will not
> attempt to authenticate using SASL (unknown error)
>
> 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Socket connection established,
> initiating session, client: /10.48.210.241:52395, server:
> sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181
>
> 15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Session establishment complete
> on server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181,
> sessionid = 0x44fbd94675cf6c7, negotiated timeout = 60000
>
> 15/11/01 21:22:24 INFO mapreduce.TableOutputFormat: Created table instance
> for kafkatest1
>
> 15/11/01 21:22:24 WARN security.UserGroupInformation:
> PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE)
> cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
> 15/11/01 21:22:24 WARN ipc.AbstractRpcClient: Exception encountered while
> connecting to the server : javax.security.sasl.SaslException: GSS initiate
> failed [Caused by GSSException: No valid credentials provided (Mechanism
> level: Failed to find any Kerberos tgt)]
>
> *15/11/01 21:22:24 ERROR ipc.AbstractRpcClient: SASL authentication failed.
> The most likely cause is missing or invalid credentials. Consider 'kinit'.*
>
> *javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]*
>
>                 at
>
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>
>                 at
>
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
>
>                 at java.security.AccessController.doPrivileged(Native
> Method)
>
>                 at javax.security.auth.Subject.doAs(Subject.java:415)
>
>                 at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
>
>                 at
> org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
>
>                 at
>
> org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
>
>                 at
>
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
>
>                 at
>
> org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
>
>                 at
>
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
>
>                 at
>
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
>
>                 at
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369)
>
>                 at
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
>
>                 at
>
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
>
>                 at
>
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)
>
>                 at
>
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:111)
>
>                 at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1007)
>
>                 at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
>
>                 at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>                 at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
>                 at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
>                 at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>                 at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>                 at java.lang.Thread.run(Thread.java:745)
>
> Caused by: GSSException: No valid credentials provided (Mechanism level:
> Failed to find any Kerberos tgt)
>
>                 at
>
> sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
>
>                 at
>
> sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
>
>                 at
>
> sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
>
>                 at
>
> sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
>
>                 at
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
>
>                 at
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
>
>                 at
>
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
>
>                 ... 31 more
>
> 15/11/01 21:22:27 WARN security.UserGroupInformation:
> PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE)
> cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
> 15/11/01 21:22:27 WARN ipc.AbstractRpcClient: Exception encountered while
> connecting to the server : javax.security.sasl.SaslException: GSS initiate
> failed [Caused by GSSException: No valid credentials provided (Mechanism
> level: Failed to find any Kerberos tgt)]
>
> 15/11/01 21:22:27 ERROR ipc.AbstractRpcClient: SASL authentication failed.
> The most likely cause is missing or invalid credentials. Consider 'kinit'.
>
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
>
> Regards,
> Nik.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message