spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <iras...@cloudera.com>
Subject Re: reading a csv dynamically
Date Fri, 23 Jan 2015 01:26:21 GMT
Spark can definitely process data with optional fields.  It kinda depends
on what you want to do with the results -- its more of a object design /
knowing scala types question.

Eg., scala has a built in type Option specifically for handling optional
data, which works nicely in pattern matching & functional programming.
Just to save myself some typing, I'm going to show an example with 2 or 3
fields:

myProcessedRdd: RDD[(String, Double, Option[Double])] =
sc.textFile("file.csv").map{txt =>
  val split = txt.split(",")
  val opt = if split.length == 3 Some(split.toDouble) else None
  (split(0),split(1).toDouble, opt)
}

then eg., say in a later processing step, you want to make the 3rd field
have a default of 6.9, you'd do something like:

myProcessedRdd.map{ case (name, count,ageOpt) =>  //arbitrary variable
names I'm just making up
  val age = ageOpt.getOrElse(6.9)
   ...
}

You might be interested in reading up on Scala's Option type, and how you
can use it.  There are a lot of other options too, eg. the Either type if
you want to track 2 alternatives, Try for keeping track of errors, etc.
You can play around with all of them outside of spark.  Of course you could
do similar things in Java well without these types.  You just need to write
your own container for dealing w/ missing data, which could be really
simple in your use case.

I would advise against first creating a key w/ the number of fields, and
then doing a groupByKey.  Since you are only expecting 2 different lengths,
al the data will only go to two tasks, so this will not scale very well.
And though the data is now grouped by length, its all in one RDD, so you've
still got to figure out what to do with both record lengths.

Imran


On Wed, Jan 21, 2015 at 6:46 PM, Pankaj Narang <pankajnarang81@gmail.com>
wrote:

> Yes I think you need to create one map first which will keep the number of
> values in every line. Now you can group all the records with same number of
> values. Now you know how many types of arrays you will have.
>
>
> val dataRDD = sc.textFile("file.csv")
> val dataLengthRDD =   dataRDD .map(line=>(_.split(",").length,line))
> val groupedData = dataLengthRDD.groupByKey()
>
> now you can process the groupedData as it will have arrays of length x in
> one RDD.
>
> groupByKey([numTasks])  When called on a dataset of (K, V) pairs, returns a
> dataset of (K, Iterable<V>) pairs.
>
>
> I hope this helps
>
> Regards
> Pankaj
> Infoshore Software
> India
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message