spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Turning kryo on does not decrease binary output
Date Fri, 03 Jan 2014 18:34:12 GMT
Hi Aureliano,

First, check out the documentation the team has written up on using Kryo
here: http://spark.incubator.apache.org/docs/latest/tuning.htmlspecifically
the Data Serialization and Serialized RDD Storage sections.

If you want RDDs to spill over to disk if they don't fit in memory (rather
than be recalculated), then you must use the MEMORY_AND_DISK storage level
-- 
http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence

That focus is on using Kryo for temporary RDD serialization though, not so
much storing long term binary output.  It sounds like you're going to need
to touch a little of the Hadoop APIs to get this working.

Check out how the saveAsTextFile(path, code) method works:
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L893

Which calls the saveAsHadoopFile method in a PairRDD:
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648

Hope that helps you down the right path,
Andrew





On Fri, Jan 3, 2014 at 1:18 PM, Aureliano Buendia <buendia360@gmail.com>wrote:

> Even
>
> someMap.saveAsTextFile("out", classOf[GzipCodec])
>
> has no effect.
>
> Also, I notices that saving sequence files has no compression option (my
> original question was about compressing binary output).
>
> Having said this, I still do not understand why kryo cannot be helpful
> when saving binary output. Binary output uses java serialization, which has
> a pretty hefty overhead.
>
> How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>
>
>
> On Fri, Jan 3, 2014 at 5:58 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  That's the right place. Maybe try with HDP1 properties :
>>
>>
>> http://stackoverflow.com/questions/17241185/spark-standalone-mode-how-to-compress-spark-output-written-to-hdfs
>>
>> About your Kryo error, you can use that if you want a coverage of scala
>> types : https://github.com/romix/scala-kryo-serialization
>>
>> Guillaume
>>
>>  Thanks for clarifying this.
>>
>>  I tried setting hadoop properties before constructing SparkContext, but
>> it had no effect.
>>
>>  Where is the right place to set these properties?
>>
>>
>> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
>> guillaume.pitel@exensa.com> wrote:
>>
>>>  Hi,
>>>
>>> I believe Kryo is only use during RDD serialization (i.e. communication
>>> between nodes), not for saving. If you want to compress output, you can use
>>> GZip or snappy codec like that :
>>>
>>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>>
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>>> "true")
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>>> codec)
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>>> "BLOCK")
>>>
>>> (That's for HDP2, for HDP1, the keys are different)
>>> Regards
>>> Guillaume
>>>
>>>   Hi,
>>>
>>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>>> Double)*], expecting the output binary to be smaller, but it is exactly
>>> the same size of when kryo is not on.
>>>
>>>  I've checked the log, and there is no trace of kryo related errors.
>>>
>>>  The code looks something like:
>>>
>>> class MyRegistrator extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.setRegistrationRequired(true)
>>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>>   }
>>> }
>>>  System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>>
>>>  At the end, I tried to call:
>>>
>>> kryo.setRegistrationRequired(*true*)
>>>
>>>  to make sure my class gets registered. But I found errors like:
>>>
>>> Exception in thread "DAGScheduler"
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.IllegalArgumentException: Class is not registered:
>>> *scala.math.Numeric$IntIsIntegral$*
>>> Note: To register this class use:
>>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>>
>>>  It appears many scala internal types have to be registered in order to
>>> have full kryo support.
>>>
>>>  Any idea why my simple tuple type should not get kryo benefits?
>>>
>>>
>>>
>>>   --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 <%2B33%280%296%2025%2048%2086%2080> / +33(0)9 70 44
>>> 67 53 <%2B33%280%299%2070%2044%2067%2053>
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 <%2B33%280%291%2084%2016%2036%2077> / Fax +33(0)9
>>> 72 28 37 05
>>>
>>
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Mime
View raw message