spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaditya Ramesh (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
Date Fri, 17 Feb 2017 23:03:44 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872698#comment-15872698
] 

Aaditya Ramesh commented on SPARK-19525:
----------------------------------------

We are suggesting to compress only before we write the checkpoint, not in memory. This is
not happening right now - we just serialize the elements in the partition one by one and add
to the serialization stream, according to {{ReliableCheckpointRDD.writePartitionToCheckpointFile}}:

{code}
    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize,
        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
    }
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    Utils.tryWithSafeFinally {
      serializeStream.writeAll(iterator)
    } {
      serializeStream.close()
    }

{code}

As you can see, we don't do any compression after the serialization step. In our patch, we
just use the CompressionCodec and wrap the serialization stream in compression codec output
stream, and correspondingly in the read.

> Enable Compression of Spark Streaming Checkpoints
> -------------------------------------------------
>
>                 Key: SPARK-19525
>                 URL: https://issues.apache.org/jira/browse/SPARK-19525
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Aaditya Ramesh
>
> In our testing, compressing partitions while writing them to checkpoints on HDFS using
snappy helped performance significantly while also reducing the variability of the checkpointing
operation. In our tests, checkpointing time was reduced by 3X, and variability was reduced
by 2X for data sets of compressed size approximately 1 GB.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message