spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24335) Dataset.map schema not applied in some cases
Date Mon, 25 Feb 2019 23:17:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777379#comment-16777379
] 

Jungtaek Lim commented on SPARK-24335:
--------------------------------------

As an workaround SPARK-26987 will help Java API users to explicitly create Row which understands
schema.

> Dataset.map schema not applied in some cases
> --------------------------------------------
>
>                 Key: SPARK-24335
>                 URL: https://issues.apache.org/jira/browse/SPARK-24335
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.3.0
>            Reporter: Robert Reid
>            Priority: Major
>
> In the following code an {color:#808080}UnsupportedOperationException{color} is thrown
in the filter() call just after the Dateset.map() call unless withWatermark() is added between
them. The error reports `{color:#808080}fieldIndex on a Row without schema is undefined{color}`. 
I expect the map() method to have applied the schema and for it to be accessible in filter(). 
Without the extra withWatermark() call my debugger reports that the `row` objects in the filter
lambda are `GenericRow`.  With the watermark call it reports that they are `GenericRowWithSchema`.
> I should add that I'm new to working with Structured Streaming.  So if I'm overlooking
some implied dependency please fill me in.
> I'm encountering this in new code for a new production job. The presented code is distilled
down to demonstrate the problem.  While the problem can be worked around simply by adding
withWatermark() I'm concerned that this will leave the code in a fragile state.  With this
simplified code if this error occurs again it will be easy to identify what change led to
the error.  But in the code I'm writing, with this functionality delegated to other classes,
it is (and has been) very challenging to identify the cause.
>  
> {code:java}
> public static void main(String[] args) {
>     SparkSession sparkSession = SparkSession.builder().master("local").getOrCreate();
>     sparkSession.conf().set(
>             "spark.sql.streaming.checkpointLocation",
>             "hdfs://localhost:9000/search_relevance/checkpoint" // for spark 2.3
>             // "spark.sql.streaming.checkpointLocation", "tmp/checkpoint" // for spark
2.1
>     );
>     StructType inSchema = DataTypes.createStructType(
>             new StructField[] {
>                     DataTypes.createStructField("id", DataTypes.StringType      , false),
>                     DataTypes.createStructField("ts", DataTypes.TimestampType   , false),
>                     DataTypes.createStructField("f1", DataTypes.LongType        , true)
>             }
>     );
>     Dataset<Row> rawSet = sparkSession.sqlContext().readStream()
>             .format("rate")
>             .option("rowsPerSecond", 1)
>             .load()
>             .map(   (MapFunction<Row, Row>) raw -> {
>                         Object[] fields = new Object[3];
>                         fields[0] = "id1";
>                         fields[1] = raw.getAs("timestamp");
>                         fields[2] = raw.getAs("value");
>                         return RowFactory.create(fields);
>                     },
>                     RowEncoder.apply(inSchema)
>             )
>             // If withWatermark() is included above the filter() line then this works.
 Without it we get:
>             //    Caused by: java.lang.UnsupportedOperationException: fieldIndex on a
Row without schema is undefined.
>             // at the row.getAs() call.
>             // .withWatermark("ts", "10 seconds")  // <-- This is required for row.getAs("f1")
to work ???
>             .filter((FilterFunction<Row>) row -> !row.getAs("f1").equals(0L))
>             .withWatermark("ts", "10 seconds")
>             ;
>     StreamingQuery streamingQuery = rawSet
>             .select("*")
>             .writeStream()
>             .format("console")
>             .outputMode("append")
>             .start();
>     try {
>         streamingQuery.awaitTermination(30_000);
>     } catch (StreamingQueryException e) {
>         System.out.println("Caught exception at 'awaitTermination':");
>         e.printStackTrace();
>     }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message