spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dongjoon Hyun (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-27798) ConvertToLocalRelation should tolerate expression reusing output object
Date Mon, 02 Mar 2020 21:15:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-27798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Dongjoon Hyun updated SPARK-27798:
----------------------------------
    Summary: ConvertToLocalRelation should tolerate expression reusing output object  (was:
from_avro can modify variables in other rows in local mode)

> ConvertToLocalRelation should tolerate expression reusing output object
> -----------------------------------------------------------------------
>
>                 Key: SPARK-27798
>                 URL: https://issues.apache.org/jira/browse/SPARK-27798
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.3
>            Reporter: Yosuke Mori
>            Assignee: L. C. Hsieh
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.3.4, 2.4.4, 3.0.0
>
>         Attachments: Screen Shot 2019-05-21 at 2.39.27 PM.png
>
>
> Steps to reproduce:
> Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the {{from_avro}}
function to deserialize the binary into another column. Verify that all of the rows incorrectly
have the same value.
> Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload
objects into binary using the defined avro schema, then tries to deserialize using {{from_avro}}
with that same schema:
> {code:java}
> import org.apache.avro.Schema
> import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
> import org.apache.avro.io.EncoderFactory
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.avro.from_avro
> import org.apache.spark.sql.functions.col
> import java.io.ByteArrayOutputStream
> object TestApp extends App {
>   // Payload container
>   case class TestEvent(payload: Array[Byte])
>   // Deserialized Payload
>   case class TestPayload(message: String)
>   // Schema for Payload
>   val simpleSchema =
>     """
>       |{
>       |"type": "record",
>       |"name" : "Payload",
>       |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
>       |}
>     """.stripMargin
>   // Convert TestPayload into avro binary
>   def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = {
>     val schema = new Schema.Parser().parse(avsc)
>     val out = new ByteArrayOutputStream()
>     val writer = new GenericDatumWriter[GenericRecord](schema)
>     val encoder = EncoderFactory.get().binaryEncoder(out, null)
>     val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build()
>     writer.write(rootRecord, encoder)
>     encoder.flush()
>     out.toByteArray
>   }
>   val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
>   import spark.implicits._
>   List(
>     TestPayload("one"),
>     TestPayload("two"),
>     TestPayload("three"),
>     TestPayload("four")
>   ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema)))
>     .toDS()
>     .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema))
>     .show(truncate = false)
> }
> {code}
> And here is what this program outputs:
> {noformat}
> +----------------------+-------------------+
> |payload               |deserializedPayload|
> +----------------------+-------------------+
> |[00 06 6F 6E 65]      |[four]             |
> |[00 06 74 77 6F]      |[four]             |
> |[00 0A 74 68 72 65 65]|[four]             |
> |[00 08 66 6F 75 72]   |[four]             |
> +----------------------+-------------------+{noformat}
> Here, we can see that the avro binary is correctly generated, but the deserialized version
is a copy of the last row. I have not yet verified that this is an issue in cluster mode as
well.
>  
> I dug into a bit more of the code and it seems like the resuse of {{result}} in {{AvroDataToCatalyst}}
is overwriting the decoded values of previous rows. I set a breakpoint in {{LocalRelation}}
and the {{data}} sequence seem to all point to the same address in memory - and therefore
a mutation in one variable will cause all of it to mutate.
> !Screen Shot 2019-05-21 at 2.39.27 PM.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message