spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
Date Wed, 10 Oct 2018 05:32:49 GMT
Generally please avoid System.out.println, but use a logger -even for examples. People may
take these examples from here and put it in their production code.

> Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia <shubh.chaurasia@gmail.com>:
> 
> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and still it is
creating multiple instances. 
> 
> Here is my project:
> 
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │       └── shubham
> │   │   │           ├── MyDataSource.java
> │   │   │           └── reader
> │   │   │               └── MyDataSourceReader.java
> 
> 
> MyDataSource.java
> -------------------------------------------------
> package com.shubham;
> 
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
> 
> import java.util.Optional;
> 
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
> 
>   public DataSourceReader createReader(DataSourceOptions options) {
>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>     return new MyDataSourceReader(options.asMap());
>   }
> 
>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema,
SaveMode mode, DataSourceOptions options) {
>     return Optional.empty();
>   }
> }
> 
> MyDataSourceReader.java
> -------------------------------------------------
> package com.shubham.reader;
> 
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
> 
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> 
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch
{
> 
>   private Map<String, String> options;
>   private StructType schema;
> 
>   public MyDataSourceReader(Map<String, String> options) {
>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>     this.options = options;
>   }
> 
>   @Override
>   public StructType readSchema() {
>     this.schema = (new StructType())
>         .add("col1", "int")
>         .add("col2", "string");
>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>     return this.schema;
>   }
> 
>   @Override
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema:
" + this.schema);
>     return new ArrayList<>();
>   }
> }
> 
> ----------------------------------------
> spark-shell output
> ----------------------------------------
> scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from
some_table").load.show
> 
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449
schema: null
> +----+----+
> |col1|col2|
> +----+----+
> +----+----+
> 
> 
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449
are being created. Consequently schema is null in MyDataSourceReader@3095c449.
> 
> Am I not doing it the correct way?
> 
> Thanks,
> Shubham
> 
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <Assaf.Mendelson@rsa.com> wrote:
>> I am using v2.4.0-RC2
>> 
>>  
>> 
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are
you calling it?
>> 
>>  
>> 
>> When I do:
>> 
>> Val df = spark.read.format(mypackage).load().show()
>> 
>> I am getting a single creation, how are you creating the reader?
>> 
>>  
>> 
>> Thanks,
>> 
>>         Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com] 
>> Sent: Tuesday, October 9, 2018 2:02 PM
>> To: Mendelson, Assaf; user@spark.apache.org
>> Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and
hence not preserving the state
>> 
>>  
>> 
>> [EXTERNAL EMAIL] 
>> Please report any suspicious attachments, links, or requests for sensitive information.
>> 
>> Thanks Assaf, you tried with tags/v2.4.0-rc2?
>> 
>>  
>> 
>> Full Code:
>> 
>>  
>> 
>> MyDataSource is the entry point which simply creates Reader and Writer
>> 
>>  
>> 
>> public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport
{
>> 
>>  
>> 
>>   @Override public DataSourceReader createReader(DataSourceOptions options) {
>> 
>>     return new MyDataSourceReader(options.asMap());
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
>> 
>>       SaveMode mode, DataSourceOptions options) {
>> 
>>     // creates a dataSourcewriter here..
>> 
>>     return Optional.of(dataSourcewriter);
>> 
>>   }
>> 
>>  
>> 
>>   @Override public String keyPrefix() {
>> 
>>     return "myprefix";
>> 
>>   }
>> 
>>  
>> 
>> }
>> 
>>  
>> 
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch
{
>> 
>>  
>> 
>>   StructType schema = null;
>> 
>>   Map<String, String> options;
>> 
>>  
>> 
>>   public MyDataSourceReader(Map<String, String> options) {
>> 
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...."
+ this);
>> 
>>     this.options = options;
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions()
{
>> 
>>     //variable this.schema is null here since readSchema() was called on a different
instance
>> 
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + "
schema: " + this.schema);
>> 
>>     //more logic......
>> 
>>     return null;
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public StructType readSchema() {
>> 
>>     //some logic to discover schema
>> 
>>     this.schema = (new StructType())
>> 
>>         .add("col1", "int")
>> 
>>         .add("col2", "string");
>> 
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>> 
>>     return this.schema;
>> 
>>   }
>> 
>> }
>> 
>>  
>> 
>> Thanks,
>> 
>> Shubham
>> 
>>  
>> 
>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <Assaf.Mendelson@rsa.com> wrote:
>> 
>> Could you add a fuller code example? I tried to reproduce it in my environment and
I am getting just one instance of the reader…
>> 
>>  
>> 
>> Thanks,
>> 
>>         Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com] 
>> Sent: Tuesday, October 9, 2018 9:31 AM
>> To: user@spark.apache.org
>> Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence
not preserving the state
>> 
>>  
>> 
>> [EXTERNAL EMAIL] 
>> Please report any suspicious attachments, links, or requests for sensitive information.
>> 
>> Hi All,
>> 
>>  
>> 
>> --Spark built with tags/v2.4.0-rc2
>> 
>>  
>> 
>> Consider following DataSourceReader implementation:
>> 
>>  
>> 
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch
{
>> 
>>   StructType schema = null;
>>   Map<String, String> options;
>> 
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...."
+ this);
>>     this.options = options;
>>   }
>> 
>>   @Override
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions()
{
>>     //variable this.schema is null here since readSchema() was called on a different
instance
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + "
schema: " + this.schema);
>>     //more logic......
>>     return null;
>>   }
>> 
>>   @Override
>>   public StructType readSchema() {
>>     //some logic to discover schema
>>     this.schema = (new StructType())
>>         .add("col1", "int")
>>         .add("col2", "string");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>>     return this.schema;
>>   }
>> }
>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class
variable schema.
>> 2) Now when planBatchInputPartitions() is called, it is being called on a different
instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>  
>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions()
method?
>>  
>> Console Logs:
>>  
>> scala> mysource.executeQuery("select * from movie").show
>>  
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>  
>> 
>> Thanks,
>> 
>> Shubham
>> 
>>  

Mime
View raw message