spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dave Ariens <dari...@blackberry.com>
Subject RE: Accessing Kerberos Secured HDFS Resources from Spark on Mesos
Date Mon, 29 Jun 2015 13:18:10 GMT
I'd like to toss out another idea that doesn't involve a complete end-to-end Kerberos implementation.
 Essentially, have the driver authenticate to  Kerberos, instantiate a Hadoop file system,
and serialize/cache it for the executors to use instead of them having to instantiate their
own.

- Driver authenticates to Kerberos via UserGroupInformation.loginUserFromKeytab(principal,
keytab)
- Driver instantiates a Hadoop configuration via hdfs-site.xml and core-site.xml
- Driver instantiates the Hadoop file system from a path based on the Hadoop root URI (hdfs://hadoop-cluster.site.org/)
and hadoop config
- Driver makes this file system available to all future executors
- Executors first check for an existing/cached file system object before instantiating their
own

I've used this technique within (non-distributed) multi-threaded Java applications (https://github.com/blackberry/KaBoom)
and it works quite well. I'm not sure if it's possible to implement on a distributed application
like Spark, but it might be worth investigation...

Here's the references to file system instantiation that I found in 1.4:

dariens@dariens-laptop1 (branch-1.4) ~/OtherProjects/spark $ grep -ir getFileSystem *
core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala:    val fs = path.getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:  @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = outputDir.getFileSystem(broadcastedConf.value.value)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = path.getFileSystem(broadcastedConf.value.value)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = path.getFileSystem(conf)
core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala:    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala:      val fs = path.getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val threadStats =
getFileSystemThreadStatistics()
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val getBytesReadMethod
= getFileSystemThreadStatisticsMethod("getBytesRead")
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val threadStats =
getFileSystemThreadStatistics()
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val getBytesWrittenMethod
= getFileSystemThreadStatisticsMethod("getBytesWritten")
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:  private def getFileSystemThreadStatistics():
Seq[AnyRef] = {
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:  private def getFileSystemThreadStatisticsMethod(methodName:
String): Method = {
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:    val fs = pattern.getFileSystem(conf)
core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:        path.getFileSystem(conf.value)
core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:    val fs = outputPath.getFileSystem(conf)
core/src/main/scala/org/apache/spark/SparkContext.scala:      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
core/src/main/scala/org/apache/spark/SparkContext.scala:      val fs = path.getFileSystem(hadoopConfiguration)
core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala:    val fs
= file.getFileSystem(job)
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala:      val fs = pathp.getFileSystem(conf)
core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala:  private[this]
val fs = path.getFileSystem(
sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala:        val fs =
path.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala:    val fs = outputPath.getFileSystem(hadoopConf)
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala:        val fs = hdfsPath.getFileSystem(hadoopConf)
sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala:          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala: 
  val fileSystem = outputPath.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala:    val fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala:    val fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala:    val fs: FileSystem
= origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:       
path.getFileSystem(conf).makeQualified(path)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:    val
fs = fspath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:      val
fs = footer.getFile.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:      val
fs = file.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:    val
fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala:    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala:    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java:
   fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = path.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:       
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:      val
fs = summaryPath.getFileSystem(configuration)
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala:    
 val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala:      val
fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala:    val fs = fspath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala:    val fs = origPath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala:      new Path(path,
filename).getFileSystem(conf),
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala:              val fs =
pathPattern.getFileSystem(sc.hiveconf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala:        val fs = path.getFileSystem(sc.hiveconf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala:    val fs =
outputPath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala:            val fs = path.getFileSystem(conf)
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:    val
fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:    val
fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:    val fs = rootDir.getFileSystem(new
Configuration())
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:    var fs = testDir.getFileSystem(new
Configuration())
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:             
    fs = testDir.getFileSystem(new Configuration())
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    val dfs = getFileSystemForPath(dfsPath,
conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    val dfs = getFileSystemForPath(dfsPath,
conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    val dfs = getFileSystemForPath(dfsPath,
conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:  def getFileSystemForPath(path:
Path, conf: Configuration): FileSystem = {
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    val fs = path.getFileSystem(conf)
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala:   
      val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala:   
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    val fs = fsOption.getOrElse(path.getFileSystem(new
Configuration()))
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    if (fs_ == null)
fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    def fs: FileSystem
= checkpointPath.getFileSystem(hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala:    if
(fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala: 
              fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala:      val fs =
path.getFileSystem(sparkContext.hadoopConfiguration)
yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:    val destFs = destDir.getFileSystem(hadoopConf)
yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:    val srcFs = srcPath.getFileSystem(hadoopConf)
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala:        val dstFs
= dst.getFileSystem(conf)



From: Steve Loughran [mailto:stevel@hortonworks.com]
Sent: Sunday, June 28, 2015 10:34 AM
To: Tim Chen
Cc: Marcelo Vanzin; Dave Ariens; Olivier Girardot; user@spark.apache.org
Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos


On 27 Jun 2015, at 07:56, Tim Chen <tim@mesosphere.io<mailto:tim@mesosphere.io>>
wrote:

Does YARN provide the token through that env variable you mentioned? Or how does YARN do this?



Roughly:

1. client-side launcher creates the delegation tokens and adds them as byte[] data to the
the request.
2. The YARN RM uses the HDFS token for the localisation, so the node managers can access the
content the user has the rights to.
3. There's some other stuff related to token refresh of restarted app masters, essentially
guaranteeing that even an AM restarted 3 days after the first launch will still have current
credentials.
4. It's the duty of the launched App master to download those delegated tokens and make use
of them. partly through the UGI stuff, also through other mechanisms (example, a subset of
the tokens are usually passed to the launched containers)
5. It's also the duty of the launched AM to deal with token renewal and expiry. Short-lived
(< 72h) apps don't have to worry about this -making the jump to long lived services adds
a lot of extra work (which is in Spark 1.4)



Tim

On Fri, Jun 26, 2015 at 3:51 PM, Marcelo Vanzin <vanzin@cloudera.com<mailto:vanzin@cloudera.com>>
wrote:
On Fri, Jun 26, 2015 at 3:44 PM, Dave Ariens <dariens@blackberry.com<mailto:dariens@blackberry.com>>
wrote:
Fair. I will look into an alternative with a generated delegation token.   However the same
issue exists.   How can I have the executor run some arbitrary code when it gets a task assignment
and before it proceeds to process it's resources?

Hmm, good question. If it doesn't already, Mesos could have its own implementation of CoarseGrainedExecutorBackend
that provides that functionality. The only difference is that you'd run something before the
executor starts up, not before each task.

YARN actually doesn't do it that way; YARN provides the tokens to the executor before the
process starts, so that when you call "UserGroupInformation.getCurrentUser()" the tokens are
already there.

One way of doing that is by writing the tokens to a file and setting the KRB5CCNAME env variable
when starting the process. You can check the Hadoop sources for details. Not sure if there's
another way.



From: Marcelo Vanzin
Sent: Friday, June 26, 2015 6:20 PM
To: Dave Ariens
Cc: Tim Chen; Olivier Girardot; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos


On Fri, Jun 26, 2015 at 3:09 PM, Dave Ariens <dariens@blackberry.com<mailto:dariens@blackberry.com>>
wrote:
Would there be any way to have the task instances in the slaves call the UGI login with a
principal/keytab provided to the driver?

That would only work with a very small number of executors. If you have many login requests
in a short period of time with the same principal, the KDC will start to deny logins. That's
why delegation tokens are used instead of explicit logins.

--
Marcelo



--
Marcelo



Mime
View raw message