kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nikhil Gs <gsnikhil1432...@gmail.com>
Subject Kafka and Spark Issue
Date Mon, 02 Nov 2015 17:16:26 GMT
Hello Team,

My scenario is to load the data from producer topic to Hbase by using Spark
API. Our cluster is Kerberos authenticated and when we running the below
kafkaToHbase.java and the error which I am facing is also below. Let me
know if anyone can have any idea what can be done.

package com.spark.example;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaToHbase {

private final static String rawTableName = "kafkatest1";

    //final static String user = "a_rakesh.samineni@SUDDENLINK.CEQUEL3.COM";
    //sfinal static String keyPath =
"/home/a_rakesh.samineni/a_rakesh.samineni.keytab";

@SuppressWarnings({ "serial", "deprecation" })
public static void main(String[] args) throws Exception {
// // Authenticating Kerberos principal
//        System.out.println("Principal Authentication: ");
//        try {
//
//            UserGroupInformation.loginUserFromKeytab(user, keyPath);
//        }
//        catch(Exception e){
//         e.printStackTrace();
//        }


//1. Create the spark streaming context with a 10 second batch size
SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));

int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

//System.out.println("creating stream..");
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(ssc, args[0], args[1], topicMap,
StorageLevels.MEMORY_AND_DISK_SER);
//messages.print();
         //System.out.println("messages"+messages);

//3. create connection with HBase
   Configuration config = null;
try {
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
//System.out.println("in hbase configuraiton");
//UserGroupInformation.loginUserFromKeytab(user, keyPath);
config = HBaseConfiguration.create();
config.set("hadoop.security.authentication","kerberos");
config.set("hbase.rpc.protection", "privacy");
config.addResource(new
org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml"));
       config.addResource(new
org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"));
config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02,
sdldalplhdm01");
config.set("hbase.zookeeper.property.clientPort","2181");
UserGroupInformation.setConfiguration(config);
//System.out.println("after hbase configuration");
UserGroupInformation.loginUserFromKeytab("
a_nikhil.gopishetti@SUDDENLINK.CEQUEL3.COM",
"/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab");
//config.set("hbase.master", "10.48.210.248:60010");
HBaseAdmin.checkHBaseAvailable(config);
//System.out.println("HBase is running!");
}
catch (MasterNotRunningException e) {
System.out.println("HBase is not running!");
System.exit(1);
}catch (Exception ce){
ce.printStackTrace();
}

//config.set(TableInputFormat.INPUT_TABLE, rawTableName);

//4. new Hadoop API configuration
JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}

});
JavaPairDStream<ImmutableBytesWritable, Put> hbasePuts= lines.mapToPair(
new PairFunction<String, ImmutableBytesWritable, Put>(){

@SuppressWarnings("deprecation")
public Tuple2<ImmutableBytesWritable, Put> call(String line) {
Put put = new Put(Bytes.toBytes(Math.random()));
put.add(Bytes.toBytes("cf12"), Bytes.toBytes("firstColumn"),
Bytes.toBytes(line));
return new Tuple2<ImmutableBytesWritable, Put>(new
ImmutableBytesWritable(), put);
}
});

final Job newAPIJobConfiguration1 = Job.getInstance(config);
hbasePuts.foreachRDD(new Function<JavaPairRDD<ImmutableBytesWritable, Put>,
Void>() {
public Void call(JavaPairRDD<ImmutableBytesWritable, Put>
hbasePutJavaPairRDD) throws Exception {

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
rawTableName);
newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
//hbasePutJavaPairRDD.saveAsNewAPIHadoopDataset(config);
return null;
}
});

ssc.start();
//ssc.awaitTerminationOrTimeout(10000000);
ssc.awaitTermination();
ssc.stop();
}

}

*Error:*



Below are the log lines which is blocking us.



15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Opening socket connection to
server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181. Will not
attempt to authenticate using SASL (unknown error)

15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Socket connection established,
initiating session, client: /10.48.210.241:52395, server:
sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181

15/11/01 21:22:23 INFO zookeeper.ClientCnxn: Session establishment complete
on server sdldalplhdm03.suddenlink.cequel3.com/10.48.210.248:2181,
sessionid = 0x44fbd94675cf6c7, negotiated timeout = 60000

15/11/01 21:22:24 INFO mapreduce.TableOutputFormat: Created table instance
for kafkatest1

15/11/01 21:22:24 WARN security.UserGroupInformation:
PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE)
cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]

15/11/01 21:22:24 WARN ipc.AbstractRpcClient: Exception encountered while
connecting to the server : javax.security.sasl.SaslException: GSS initiate
failed [Caused by GSSException: No valid credentials provided (Mechanism
level: Failed to find any Kerberos tgt)]

*15/11/01 21:22:24 ERROR ipc.AbstractRpcClient: SASL authentication failed.
The most likely cause is missing or invalid credentials. Consider 'kinit'.*

*javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]*

                at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)

                at
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)

                at java.security.AccessController.doPrivileged(Native
Method)

                at javax.security.auth.Subject.doAs(Subject.java:415)

                at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)

                at
org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174)

                at
org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)

                at
org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)

                at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)

                at
org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)

                at
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)

                at
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)

                at
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369)

                at
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)

                at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)

                at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)

                at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:111)

                at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1007)

                at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)

                at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

                at org.apache.spark.scheduler.Task.run(Task.scala:64)

                at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

                at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at java.lang.Thread.run(Thread.java:745)

Caused by: GSSException: No valid credentials provided (Mechanism level:
Failed to find any Kerberos tgt)

                at
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)

                at
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)

                at
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)

                at
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)

                at
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)

                at
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)

                at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)

                ... 31 more

15/11/01 21:22:27 WARN security.UserGroupInformation:
PriviledgedActionException as:a_nikhil.gopishetti (auth:SIMPLE)
cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]

15/11/01 21:22:27 WARN ipc.AbstractRpcClient: Exception encountered while
connecting to the server : javax.security.sasl.SaslException: GSS initiate
failed [Caused by GSSException: No valid credentials provided (Mechanism
level: Failed to find any Kerberos tgt)]

15/11/01 21:22:27 ERROR ipc.AbstractRpcClient: SASL authentication failed.
The most likely cause is missing or invalid credentials. Consider 'kinit'.

javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]


Regards,
Nik.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message