spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24910) Spark Bloom Filter Closure Serialization improvement for very high volume of Data
Date Fri, 03 Aug 2018 03:08:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen updated SPARK-24910:
------------------------------
    Shepherd:   (was: Sean Owen)
       Flags:   (was: Patch)
      Labels:   (was: bloom-filter)
    Priority: Minor  (was: Major)

> Spark Bloom Filter Closure Serialization improvement for very high volume of Data
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-24910
>                 URL: https://issues.apache.org/jira/browse/SPARK-24910
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.1
>            Reporter: Himangshu Ranjan Borah
>            Priority: Minor
>
> I am proposing an improvement to the Bloom Filter Generation logic being used in the DataFrameStatFunctions'
Bloom Filter API using mapPartitions() instead of aggregate() to avoid closure serialization
which fails for huge BitArrays.
> Spark's Stat Functions' Bloom Filter Implementation uses aggregate/treeAggregate operations
which uses a closure with a dependency on the bloom filter that is created in the driver.
Since Spark hard codes the closure serializer to Java Serializer it fails in closure cleanup
for very big sizes of Bloom Filters (Typically with num items ~ Billions and with fpp ~ 0.001).
Kryo serializer work's fine in such a scale but seems like there were some issues using Kryo
for closure serialization due to which Spark 2.0 hardcoded it to Java. The call-stack that
we get typically looks like,
> {{{color:#f79232}java.lang.OutOfMemoryError{color}}}
> {{{color:#f79232} at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}}
> {{{color:#f79232} at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}}
> {{{color:#f79232} at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}}
> {{{color:#f79232} at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}}
> {{{color:#f79232} at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}}
> {{{color:#f79232} at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}}
> {{{color:#f79232} at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}}
> {{{color:#f79232} at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}}
> {{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}}
> {{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}}
> {{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}}
> {{{color:#f79232} at org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}}
> {{{color:#f79232} at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}}
> {{{color:#f79232} at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}}
> {{{color:#f79232} at org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}}
> {{{color:#f79232} at org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}}
> This issue can be overcome if we *don't* use the *aggregate()* operations for the Bloom
Filter generation and use *mapPartitions()* kind of operations where we create the Bloom Filters
inside the executors (by giving them enough memory and controlling the no. of executors and
partitions) and then return the final bloom filters per partition to the driver after which
we can aggregate just the bloom filters either in the driver itself or distributing it using 
theeAggregate(). This way we can make use of the Kryo serializer for just the returning Bloom
Filter and it works fine on big datasets and is scalable according to the cluster specifications.
Please comment on this from your end or let me know of any upcoming improvements on this issue
that you might already be working on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message