spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: Custom Spark data source in Java
Date Wed, 22 Mar 2017 21:13:41 GMT
ok, I understand. For 1) As a minimum you need to implement inferSchema and
buildReader. InferSchema must return the Schema of a row. For example, if
it contains one column of type String it returns:
StructType(collection.immutable.Seq(StructField("column1", StringType,
true))

buildreader: here you find an article how to create a function1 in Java:
http://www.codecommit.com/blog/java/interop-between-java-and-scala

It returns basically a function that takes a file as input and returns the
rows as output (Iterator).  Btw. for better readability i would recommend
Java8 Lambda functions. instead of Function 1 etc. this would look also
much more similar to Scala, but is fully Java compliant.

you can find an example in Scala here: https://github.com/ZuInnoTe/
spark-hadoopoffice-ds/blob/master/src/main/scala/org/
zuinnote/spark/office/excel/DefaultSource.scala
It is a little bit more complex, because it returns for each row an array
that contains element of a  complex type (Excel cell)

For 2) it is in fact similar. You have to create a class that inherits from
Baserelation and implements TableScan. There you need to implement schema
and buildScan. Then you return simply ;-) an object of this class.

Here another example in Scala: https://github.com/ZuInnoTe/
spark-hadoopcryptoledger-ds/blob/master/src/main/scala/
org/zuinnote/spark/bitcoin/block/BitcoinBlockRelation.scala

Sorry it is again a little bit more complex, because it returns Bitcoin
blocks from the blockchain...

I hope it helps as a start. Let me know if you have more questions.

On Wed, Mar 22, 2017 at 9:35 PM, Jean Georges Perrin <jgp@jgp.net> wrote:

> Thanks Jörn,
>
> I tried to super simplify my project so I can focus on the plumbing and I
> will add the existing code & library later. So, as of now, the project will
> not have a lot of meaning but will allow me to understand the job.
>
> my call is:
>
> String filename = "src/test/resources/simple.json";
> SparkSession spark = SparkSession.builder().appName("X-parse").master("
> local").getOrCreate();
> Dataset<Row> df = spark.read().format("x.CharCounterDataSource")
> .option("char", "a") // count the number of 'a'
> .load(filename); // local file (line 40 in the stacks below)
> df.show();
>
> Ideally, this should display something like:
>
> +--+
> | a|
> +--+
> |45|
> +--+
>
> Things gets trickier when I try to work on x.CharCounterDataSource:
>
> I looked at 2 ways to do it:
>
> 1) one based on FileFormat:
>
> public class CharCounterDataSource implements FileFormat {
>
> @Override
> public Function1<PartitionedFile, Iterator<InternalRow>>
> buildReader(SparkSession arg0, StructType arg1,
> StructType arg2, StructType arg3, Seq<Filter> arg4, Map<String, String>
> arg5, Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Function1<PartitionedFile, Iterator<InternalRow>>
> buildReaderWithPartitionValues(SparkSession arg0,
> StructType arg1, StructType arg2, StructType arg3, Seq<Filter> arg4,
> Map<String, String> arg5,
> Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Option<StructType> inferSchema(SparkSession arg0, Map<String,
> String> arg1, Seq<FileStatus> arg2) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean isSplitable(SparkSession arg0, Map<String, String> arg1,
> Path arg2) {
> // TODO Auto-generated method stub
> return false;
> }
>
> @Override
> public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1,
> Map<String, String> arg2, StructType arg3) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean supportBatch(SparkSession arg0, StructType arg1) {
> // TODO Auto-generated method stub
> return false;
> }
> }
>
> I know it is an empty class (generated by Eclipse) and I am not expecting
> much out of it.
>
> Running it says:
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$
> sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(
> DataSource.scala:188)
> at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(
> DataSource.scala:387)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> at x.spark.datasource.counter.CharCounterDataSourceTest.test(
> CharCounterDataSourceTest.java:40)
>
> Nothing surprising...
>
> 2) One based on RelationProvider:
>
> public class CharCounterDataSource implements RelationProvider {
>
> @Override
> public BaseRelation createRelation(SQLContext arg0, Map<String, String>
> arg1) {
> // TODO Auto-generated method stub
> return null;
> }
>
> }
>
> which fails too...
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(
> LogicalRelation.scala:40)
> at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(
> SparkSession.scala:389)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> at x.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)
>
>
> Don't get me wrong - I understand it fails - but what I need is "just one
> hint" to continue building the glue ;-)...
>
> (Un)fortunately, we cannot use Scala...
>
> jg
>
> On Mar 22, 2017, at 4:00 PM, Jörn Franke <jornfranke@gmail.com> wrote:
>
> I think you can develop a Spark data source in Java, but you are right
> most use for the glue Spark even if they have a Java library (this is what
> I did for the project I open sourced). Coming back to your question, it is
> a little bit difficult to assess the exact issue without the code.
> You could also try to first have a very simple Scala data source that
> works and then translate it to Java and do the test there. You could then
> also post the code here without disclosing confidential stuff.
> Or you try directly in Java a data source that returns always a row with
> one column containing a String. I fear in any case you need to import some
> Scala classes in Java and/or have some wrappers in Scala.
> If you use fileformat that you need at least spark 2.0.
>
> On 22 Mar 2017, at 20:27, Jean Georges Perrin <jgp@jgp.net> wrote:
>
>
> Hi,
>
> I am trying to build a custom file data source for Spark, in Java. I have
> found numerous examples in Scala (including the CSV and XML data sources
> from Databricks), but I cannot bring Scala in this project. We also already
> have the parser itself written in Java, I just need to build the "glue"
> between the parser and Spark.
>
> This is how I'd like to call it:
>
>     String filename = "src/test/resources/simple.x";
>
>     SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();
>
>     Dataset<Row> df = spark.read().format("x.RandomDataSource")
>             .option("metadataTag", "schema") // hint to find schema
>             .option("dataTag", "data") // hint to find data
>             .load(filename); // local file
>
> So far, I tried is implement x.RandomDataSource:
>
> • Based on FileFormat, which makes the most sense, but I do not have a
> clue on how to build buildReader()...
> • Based on RelationProvider, but same here...
>
> It seems that in both case, the call is made to the right class, but I get
> into NPE because I do not provide much. Any hint or example would be
> greatly appreciated!
>
> Thanks
>
> jg
>
>
>

Mime
View raw message