spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: How to read gzip data in Spark - Simple question
Date Thu, 06 Aug 2015 03:29:40 GMT
That seem to be working. however i see a new exception

Code:
def formatStringAsDate(dateStr: String) = new
SimpleDateFormat("yyyy-MM-dd").parse(dateStr)

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
val rowStructText =
sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz")
case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
f12: Integer, f13: Integer, f14: String)

val summary  = rowStructText.map(s => s.split(",")).map(
    s => Summary(formatStringAsDate(s(0)),
            s(1).replaceAll("\"", "").toLong,
            s(3).replaceAll("\"", "").toLong,
            s(4).replaceAll("\"", "").toInt,
            s(5).replaceAll("\"", ""),
            s(6).replaceAll("\"", "").toInt,
            formatStringAsDate(s(7)),
            formatStringAsDate(s(8)),
            s(9).replaceAll("\"", "").toInt,
            s(10).replaceAll("\"", "").toInt,
            s(11).replaceAll("\"", "").toFloat,
            s(12).replaceAll("\"", "").toInt,
            s(13).replaceAll("\"", "").toInt,
            s(14).replaceAll("\"", "")
        )
).toDF()
bank.registerTempTable("summary")


//Output
import java.text.SimpleDateFormat import java.util.Calendar import
java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
rowStructText: org.apache.spark.rdd.RDD[String] =
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz
MapPartitionsRDD[105] at textFile at <console>:60 defined class Summary x:
org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
<console>:61 java.lang.UnsupportedOperationException: Schema for type
java.util.Date is not supported at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


Any suggestions

On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver <philip.weaver@gmail.com>
wrote:

> The parallelize method does not read the contents of a file. It simply
> takes a collection and distributes it to the cluster. In this case, the
> String is a collection 67 characters.
>
> Use sc.textFile instead of sc.parallelize, and it should work as you want.
>
> On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
wrote:
>
>> I have csv data that is embedded in gzip format on HDFS.
>>
>> *With Pig*
>>
>> a = load
>> '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz' using
>> PigStorage();
>>
>> b = limit a 10
>>
>>
>> (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,,,,,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)
>>
>>
>> (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)
>>
>>
>> However with Spark
>>
>> val rowStructText =
>> sc.parallelize("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00000.gz")
>>
>> val x = rowStructText.map(s => {
>>
>>     println(s)
>>
>>     s}
>>
>>     )
>>
>> x.count
>>
>> Questions
>>
>> 1) x.count always shows 67 irrespective of the path i change in
>> sc.parallelize
>>
>> 2) It shows x as RDD[Char] instead of String
>>
>> 3) println() never emits the rows.
>>
>> Any suggestions
>>
>> -Deepak
>>
>>
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak

Mime
View raw message