spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bisson <Timothy.Bis...@microsoft.com.INVALID>
Subject Re: Implementing TableProvider in Spark 3.0
Date Thu, 10 Dec 2020 08:23:07 GMT
Following up on this older thread.

Looking at the implementation of DataFrameWriter, it doesn't seem possible to use the schema
from the dataframe itself when writing out a V2 interface?

In order to pass the dataframe schema to a datasourceV2 implementation, a custom write DataSource
needs to extend FileDataSourceV2. However, in lookupV2Provider(), if the Datasource is FileDataSourceV2,
isDefined will be be None, so isDefiend is always false. The result is saveToV1Source() is
always called.


Snippet of  DataFrameWriter:save():
 val maybeV2Provider = lookupV2Provider()
    if (maybeV2Provider.isDefined) {
      val provider = maybeV2Provider.get
      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
        provider, df.sparkSession.sessionState.conf)
      val options = sessionOptions ++ extraOptions
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)

      def getTable: Table = {
        // For file source, it's expensive to infer schema/partition at each write. Here we
pass
        // the schema of input query and the user-specified partitioning to `getTable`. If
the
        // query schema is not compatible with the existing data, the write can still success
but
        // following reads would fail.
        if (provider.isInstanceOf[FileDataSourceV2]) {
          provider.getTable(
            df.schema.asNullable,
            partitioningAsV2.toArray,
            dsOptions.asCaseSensitiveMap())
        } else {
          DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema
= None)
        }
      }

<quote author="Richard Xin-2">
 Saw


Sent from Yahoo Mail for iPhone


On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj &lt;<email>Sricheta.Ruj@.com</email>&gt<mailto:Sricheta.Ruj@.com%3c/email%3e&gt>;
wrote:

<!--#yiv2112349251 _filtered {} _filtered {} _filtered {}#yiv2112349251 #yiv2112349251
p.yiv2112349251MsoNormal, #yiv2112349251 li.yiv2112349251MsoNormal, #yiv2112349251 div.yiv2112349251MsoNormal
{margin:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv2112349251 pre {margin:0in;margin-bottom:.0001pt;font-size:10.0pt;font-family:"Courier
New";}#yiv2112349251 span.yiv2112349251EmailStyle17 {font-family:"Calibri", sans-serif;color:windowtext;}#yiv2112349251
span.yiv2112349251HTMLPreformattedChar {font-family:"Courier New";}#yiv2112349251 .yiv2112349251MsoChpDefault
{font-family:"Calibri", sans-serif;} _filtered {}#yiv2112349251 div.yiv2112349251WordSection1
{}-->
Hello Spark Team

I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write-
how do I get the user specified schema?

This is what I am trying to achieve-


valdata =Seq(
   Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, byteVal, shortVal),
   Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, byteVal, shortVal)
)

val schema = new StructType()
   .add(StructField("name", StringType,true))
   .add(StructField("id", IntegerType,true))
   .add(StructField("flag", BooleanType,true))
   .add(StructField("salary", DoubleType,true))
   .add(StructField("phone", LongType,true))
   .add(StructField("dob", DateType,true))
   .add(StructField("weight",  DecimalType(Constants.DECIMAL_PRECISION,7),true))
   .add(StructField("time", TimestampType,true))
   .add(StructField("float", FloatType,true))
   .add(StructField("byte", ByteType,true))
   .add(StructField("short", ShortType,true))


val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
   schema)

//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
   .option("storage",storageAccountName)
   .option("container",outputContainer)
   .option("manifest","/root/default.manifest.cdm.json")
   .option("entity","TestEntity")
   .option("format","parquet")
   .option("compression","gzip")
   .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid)
   .mode(SaveMode.Append)
   .save()

I have my custom DataSource Implementation as follows –

class DefaultSource extends DataSourceRegister with TableProvider  {

  override def shortName(): String = "spark-cdm"

  override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

  override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {
    getTable(null, null, options).partitioning
  }

  override def supportsExternalMetadata = true

  override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String,
String]): Table = {
    println(schema)
    new MysqlTable(schema, properties)
  }
}

I get null here. I am not sure how should I get the StructType I created on df.write.. Any
help would be appreciated.

Thank and Regards,
Sricheta Ruj.


Thanks,
Sricheta Ruj
</quote>
Mime
View raw message