spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Long, Andrew" <>
Subject Which parts of a parquet read happen on the driver vs the executor?
Date Thu, 11 Apr 2019 20:00:19 GMT
Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  Writes are
working but I’m struggling with getting reads working due to serialization issues. I’ve
got code that works in master=local but not in yarn.  So here are my questions.

  1.  Is there an easy way to tell if a particular function in spark will be run on the driver
or the executor?  My current system is that if the function uses the spark session it runs
on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver or the executor?
 Dyue to the spark session I was suspecting that it was run on the driver and then the resulting
iterator was sent to the executor to run the read but I’ve been running into serialization

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: Failed to
serialize task 26, not attempting to retry it. scala.collection.Iterator$$anon$12
Serialization stack:
                - object not serializable (class: scala.collection.Iterator$$anon$12, value:
non-empty iterator)
                - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6993864a)
                - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.$colon$colon, List(non-empty iterator))
                - field (class:, name:
readers, type: class scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
= {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
    partitionValues = InternalRow.empty,//This should be empty for non partitioned values
    filePath = path.toString,
    start = 0,
    length = status.getLen

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
    new ParquetFileFormat().buildReaderWithPartitionValues(
      sparkSession = spark,
      dataSchema = fileSchema,
      partitionSchema = partitionSchema,//this should be empty for non partitioned fields
      requiredSchema = requiredSchema,
      filters = Seq.empty,
      options = Map.empty,
      hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
    case r: InternalRow => Seq(r)
    case b: ColumnarBatch => b.rowIterator().asScala


Cheers Andrew
View raw message