spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shubham Chaurasia <shubh.chaura...@gmail.com>
Subject Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
Date Tue, 09 Oct 2018 13:39:32 GMT
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and
still it is creating multiple instances.

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── shubham
│   │   │           ├── MyDataSource.java
│   │   │           └── reader
│   │   │               └── MyDataSourceReader.java


MyDataSource.java
-------------------------------------------------

package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

  public DataSourceReader createReader(DataSourceOptions options) {
    System.out.println("MyDataSource.createReader: Going to create a
new MyDataSourceReader");
    return new MyDataSourceReader(options.asMap());
  }

  public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema, SaveMode mode, DataSourceOptions options) {
    return Optional.empty();
  }
}


MyDataSourceReader.java
-------------------------------------------------

package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  private Map<String, String> options;
  private StructType schema;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public StructType readSchema() {
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
    return this.schema;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
    return new ArrayList<>();
  }
}


----------------------------------------
spark-shell output
----------------------------------------
scala> spark.read.format("com.shubham.MyDataSource").option("query",
"select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema:
com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions:
com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+----+----+
|col1|col2|
+----+----+
+----+----+


Here 2 instances of reader, MyDataSourceReader@69fa5536 and
MyDataSourceReader@3095c449 are being created. Consequently schema is null
in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <Assaf.Mendelson@rsa.com>
wrote:

> I am using v2.4.0-RC2
>
>
>
> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
> How are you calling it?
>
>
>
> When I do:
>
> Val df = spark.read.format(mypackage).load().show()
>
> I am getting a single creation, how are you creating the reader?
>
>
>
> Thanks,
>
>         Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
> *Sent:* Tuesday, October 9, 2018 2:02 PM
> *To:* Mendelson, Assaf; user@spark.apache.org
> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>
>
>
> Full Code:
>
>
>
> MyDataSource is the entry point which simply creates Reader and Writer
>
>
>
> public class MyDataSource implements DataSourceV2, WriteSupport,
> ReadSupport, SessionConfigSupport {
>
>
>
>   @Override public DataSourceReader createReader(DataSourceOptions
> options) {
>
>     return new MyDataSourceReader(options.asMap());
>
>   }
>
>
>
>   @Override
>
>   public Optional<DataSourceWriter> createWriter(String jobId, StructType
> schema,
>
>       SaveMode mode, DataSourceOptions options) {
>
>     // creates a dataSourcewriter here..
>
>     return Optional.of(dataSourcewriter);
>
>   }
>
>
>
>   @Override public String keyPrefix() {
>
>     return "myprefix";
>
>   }
>
>
>
> }
>
>
>
> public class MyDataSourceReader implements DataSourceReader,
> SupportsScanColumnarBatch {
>
>
>
>   StructType schema = null;
>
>   Map<String, String> options;
>
>
>
>   public MyDataSourceReader(Map<String, String> options) {
>
>     System.out.println("MyDataSourceReader.MyDataSourceReader:
> Instantiated...." + this);
>
>     this.options = options;
>
>   }
>
>
>
>   @Override
>
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>
>     //variable this.schema is null here since readSchema() was called on a
> different instance
>
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
> this + " schema: " + this.schema);
>
>     //more logic......
>
>     return null;
>
>   }
>
>
>
>   @Override
>
>   public StructType readSchema() {
>
>     //some logic to discover schema
>
>     this.schema = (new StructType())
>
>         .add("col1", "int")
>
>         .add("col2", "string");
>
>     System.out.println("MyDataSourceReader.readSchema: " + this + "
> schema: " + this.schema);
>
>     return this.schema;
>
>   }
>
> }
>
>
>
> Thanks,
>
> Shubham
>
>
>
> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <Assaf.Mendelson@rsa.com>
> wrote:
>
> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
>         Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch
{
>
>   StructType *schema *= *null*;
>   Map<String, String> *options*;
>
>   *public *MyDataSourceReader(Map<String, String> options) {
>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+
*this*);
>     *this*.*options *= options;
>   }
>
>   @Override
>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions()
{
>
> *//variable this.schema is null here since readSchema() was called on a different instance
   *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema:
" *+ *this*.*schema*);
>
> *//more logic......    **return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>         .add(*"col1"*, *"int"*)
>         .add(*"col2"*, *"string"*);
>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: "
*+ *this*.*schema*);
>     *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable
schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance
of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions()
method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>

Mime
View raw message