spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: hadoop input/output format advanced control
Date Fri, 26 Jun 2015 14:37:43 GMT
I am trying the very same thing to configure min split size with Spark
1.3.1 and i get compilation error

Code:

    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")

    sc.newAPIHadoopFile(path + "/*.avro",

      classOf[AvroKey[GenericRecord]],

      classOf[NullWritable],

      classOf[AvroKeyInputFormat[GenericRecord]],

      hadoopConfiguration)

</code>

Error:

[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: inferred type arguments
[org.apache.hadoop.io.NullWritable,org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord],org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]]
do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F <:
org.apache.hadoop.mapreduce.InputFormat[K,V]]

[INFO]     sc.newAPIHadoopFile(path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

[INFO]        ^


Hence i modified to

<code>

    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")

    sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

</code>


But i still get error

[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass:
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
Class[org.apache.hadoop.io.NullWritable],conf:
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)] <and>

[INFO]   (path: String)(implicit km:
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
implicit fm:
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)]

[INFO]  cannot be applied to (String,
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
Class[org.apache.hadoop.io.NullWritable],
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
org.apache.hadoop.conf.Configuration)

[INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

[INFO]                        ^

[ERROR] one error found

[INFO]
------------------------------------------------------------------------

[INFO] BUILD FAILURE

[INFO] -----------------------

On Tue, Mar 24, 2015 at 11:46 AM, Nick Pentreath <nick.pentreath@gmail.com>
wrote:

> You can indeed override the Hadoop configuration at a per-RDD level -
> though it is a little more verbose, as in the below example, and you need
> to effectively make a copy of the hadoop Configuration:
>
> val thisRDDConf = new Configuration(sc.hadoopConfiguration)
> thisRDDConf.set("mapred.min.split.size", "500000000")
> val rdd = sc.newAPIHadoopFile(path,
>   classOf[SequenceFileInputFormat[IntWritable, Text]],
>   classOf[IntWritable],
>   classOf[Text],
>   thisRDDConf
> )
> println(rdd.partitions.size)
>
> val rdd2 = sc.newAPIHadoopFile(path,
>   classOf[SequenceFileInputFormat[IntWritable, Text]],
>   classOf[IntWritable],
>   classOf[Text]
> )
> println(rdd2.partitions.size)
>
>
> For example, if I run the above on the following directory (some files I
> have lying around):
>
> -rw-r--r--  1 Nick  staff     0B Jul 11  2014 _SUCCESS
> -rw-r--r--  1 Nick  staff   291M Sep 16  2014 part-00000
> -rw-r--r--  1 Nick  staff   227M Sep 16  2014 part-00001
> -rw-r--r--  1 Nick  staff   370M Sep 16  2014 part-00002
> -rw-r--r--  1 Nick  staff   244M Sep 16  2014 part-00003
> -rw-r--r--  1 Nick  staff   240M Sep 16  2014 part-00004
>
> I get output:
>
> 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
> *5*
>
> ... and then for the second RDD:
>
> 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
> newAPIHadoopFile at TestHash.scala:41
> *45*
>
> As expected.
>
> Though a more succinct way of passing in those conf options would be nice
> - but this should get you what you need.
>
>
>
> On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> currently its pretty hard to control the Hadoop Input/Output formats used
>> in Spark. The conventions seems to be to add extra parameters to all
>> methods and then somewhere deep inside the code (for example in
>> PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
>> settings on the Hadoop Configuration object.
>>
>> for example for compression i see "codec: Option[Class[_ <:
>> CompressionCodec]] = None" added to a bunch of methods.
>>
>> how scalable is this solution really?
>>
>> for example i need to read from a hadoop dataset and i dont want the
>> input (part) files to get split up. the way to do this is to set
>> "mapred.min.split.size". now i dont want to set this at the level of the
>> SparkContext (which can be done), since i dont want it to apply to input
>> formats in general. i want it to apply to just this one specific input
>> dataset i need to read. which leaves me with no options currently. i could
>> go add yet another input parameter to all the methods
>> (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
>> etc.). but that seems ineffective.
>>
>> why can we not expose a Map[String, String] or some other generic way to
>> manipulate settings for hadoop input/output formats? it would require
>> adding one more parameter to all methods to deal with hadoop input/output
>> formats, but after that its done. one parameter to rule them all....
>>
>> then i could do:
>> val x = sc.textFile("/some/path", formatSettings =
>> Map("mapred.min.split.size" -> "12345"))
>>
>> or
>> rdd.saveAsTextFile("/some/path, formatSettings =
>> Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" ->
>> "somecodec"))
>>
>
>


-- 
Deepak

Mime
View raw message