spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jakob Odersky <ja...@odersky.com>
Subject Re: The error to read HDFS custom file in spark.
Date Thu, 17 Mar 2016 21:38:20 GMT
Doesn't FileInputFormat require type parameters? Like so:

class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat[LW, RD]

I haven't verified this but it could be related to the compile error
you're getting.

On Thu, Mar 17, 2016 at 9:53 AM, Benyi Wang <bewang.tech@gmail.com> wrote:
> I would say change
>
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends
> FileInputFormat
>
> to
>
> class RawDataInputFormat[LongWritable, RDRawDataRecord] extends
> FileInputFormat
>
>
> On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>>
>> Hi Tony,
>>
>> Is
>>
>> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>
>> One of your own packages?
>>
>> Sounds like it is one throwing the error
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 17 March 2016 at 15:21, Tony Liu <tony.liu0112@gmail.com> wrote:
>>>
>>> Hi,
>>>    My HDFS file is store with custom data structures. I want to read it
>>> with SparkContext object.So I define a formatting object:
>>>
>>> 1. code of RawDataInputFormat.scala
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/16/16.
>>>   */
>>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
>>> extends FileInputFormat {
>>>
>>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter:
>>> Reporter): RecordReader[LW, RD] = {
>>>     new RawReader(split, job, reporter)
>>>   }
>>>
>>> }
>>>
>>> 2. code of RawReader.scala
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/17/16.
>>>   */
>>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends
>>> RecordReader[LW, RD] {
>>>
>>>   var reader: SequenceFile.Reader = null
>>>   var currentPos: Long = 0L
>>>   var length: Long = 0L
>>>
>>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>>>     this()
>>>     val p = (split.asInstanceOf[FileSplit]).getPath
>>>     reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>>   }
>>>
>>>   override def next(key: LW, value: RD): Boolean = {
>>>     val flag = reader.next(key, value)
>>>     currentPos = reader.getPosition()
>>>     flag
>>>   }
>>>
>>>   override def getProgress: Float = Math.min(1.0f, currentPos /
>>> length.toFloat)
>>>
>>>   override def getPos: Long = currentPos
>>>
>>>   override def createKey(): LongWritable = {
>>>     new LongWritable()
>>>   }
>>>
>>>   override def close(): Unit = {
>>>     reader.close()
>>>   }
>>>
>>>   override def createValue(): RDRawDataRecord = {
>>>     new RDRawDataRecord()
>>>   }
>>> }
>>>
>>> 3. code of RDRawDataRecord.scala
>>>
>>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>>> import java.io.DataInput;
>>> import java.io.DataOutput;
>>> import java.io.IOException;
>>> import org.apache.commons.lang.StringUtils;
>>> import org.apache.hadoop.io.Writable;
>>>
>>> public class RDRawDataRecord implements Writable {
>>>     private String smac;
>>>     private String dmac;
>>>     private int hrssi;
>>>     private int lrssi;
>>>     private long fstamp;
>>>     private long lstamp;
>>>     private long maxstamp;
>>>     private long minstamp;
>>>     private long stamp;
>>>
>>>     public void readFields(DataInput in) throws IOException {
>>>         this.smac = in.readUTF();
>>>         this.dmac = in.readUTF();
>>>         this.hrssi = in.readInt();
>>>         this.lrssi = in.readInt();
>>>         this.fstamp = in.readLong();
>>>         this.lstamp = in.readLong();
>>>         this.maxstamp = in.readLong();
>>>         this.minstamp = in.readLong();
>>>         this.stamp = in.readLong();
>>>     }
>>>
>>>     public void write(DataOutput out) throws IOException {
>>>         out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>>>         out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>>>         out.writeInt(this.hrssi);
>>>         out.writeInt(this.lrssi);
>>>         out.writeLong(this.fstamp);
>>>         out.writeLong(this.lstamp);
>>>         out.writeLong(this.maxstamp);
>>>         out.writeLong(this.minstamp);
>>>         out.writeLong(this.stamp);
>>>     }
>>>
>>>     /**
>>>
>>>         ignore getter setter
>>>
>>>     **/
>>>
>>> }
>>>
>>> At last, I use this code to run:
>>>
>>> val filePath =
>>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>>> val conf = new SparkConf()
>>> conf.setMaster("local")
>>> conf.setAppName("demo")
>>> val sc = new SparkContext(conf)
>>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
>>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>> file.foreach(v => {
>>>   println(v._2.getDmac) // Attribute of custom objects
>>> })
>>>
>>> I get an error, it says:
>>>
>>> Error:(41, 19) type arguments
>>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>> conform to the bounds of none of the overloaded alternatives of
>>>  value hadoopFile: [K, V, F <:
>>> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km:
>>> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit
>>> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K,
V,
>>> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions:
>>> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm:
>>> scala.reflect.ClassTag[V], implicit fm:
>>> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
>>>     val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
>>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>>                   ^
>>>
>>>
>>> I also try read the text file with SparkContext AIP
>>> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', It
>>> works.
>>> This error is what does this mean? How to fix this error´╝č
>>>
>>> Thank you for help me.
>>>
>>> --
>>> Tony
>>> :)
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message