spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Turning kryo on does not decrease binary output
Date Fri, 03 Jan 2014 17:43:06 GMT
For hadoop properties I find the most reliable way to be to set them on a
Configuration object and use a method on SparkContext that accepts that
conf object.

>From working code:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def nlLZOfile(path: String) = {
    val conf = new Configuration
    conf.set("textinputformat.record.delimiter", "\n")
    sc.newAPIHadoopFile(path,
classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
classOf[Text], conf)
      .map(_._2.toString)
}


On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia <buendia360@gmail.com>wrote:

> Thanks for clarifying this.
>
> I tried setting hadoop properties before constructing SparkContext, but it
> had no effect.
>
> Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Hi,
>>
>> I believe Kryo is only use during RDD serialization (i.e. communication
>> between nodes), not for saving. If you want to compress output, you can use
>> GZip or snappy codec like that :
>>
>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>> "true")
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>> codec)
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>> "BLOCK")
>>
>> (That's for HDP2, for HDP1, the keys are different)
>> Regards
>> Guillaume
>>
>>   Hi,
>>
>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>> Double)*], expecting the output binary to be smaller, but it is exactly
>> the same size of when kryo is not on.
>>
>>  I've checked the log, and there is no trace of kryo related errors.
>>
>>  The code looks something like:
>>
>> class MyRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.setRegistrationRequired(true)
>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>   }
>> }
>>  System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>>  At the end, I tried to call:
>>
>> kryo.setRegistrationRequired(*true*)
>>
>>  to make sure my class gets registered. But I found errors like:
>>
>> Exception in thread "DAGScheduler"
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> *scala.math.Numeric$IntIsIntegral$*
>> Note: To register this class use:
>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>>  It appears many scala internal types have to be registered in order to
>> have full kryo support.
>>
>>  Any idea why my simple tuple type should not get kryo benefits?
>>
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Mime
View raw message