spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philip Weaver <philip.wea...@gmail.com>
Subject Re: How to read gzip data in Spark - Simple question
Date Thu, 06 Aug 2015 05:07:41 GMT
I encourage you to find the answer this this on your own :).

On Wed, Aug 5, 2015 at 9:43 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com> wrote:

> Code:
>
> val summary  = rowStructText.map(s => s.split(",")).map(
>     {
>     s =>
>     Summary(formatStringAsDate(s(0)),
>             s(1).replaceAll("\"", "").toLong,
>             s(3).replaceAll("\"", "").toLong,
>             s(4).replaceAll("\"", "").toInt,
>             s(5).replaceAll("\"", ""),
>             s(6).replaceAll("\"", "").toInt,
>             formatStringAsDate(s(7)),
>             formatStringAsDate(s(8)),
>             s(9).replaceAll("\"", "").toInt,
>             s(10).replaceAll("\"", "").toInt,
>             s(11).replaceAll("\"", "").toFloat,
>             s(12).replaceAll("\"", "").toInt,
>             s(13).replaceAll("\"", "").toInt,
>             s(14).replaceAll("\"", "")
>         )
>     }
> )
> summary.saveAsTextFile("sparkO")
>
> Exception:
> import java.text.SimpleDateFormat import java.util.Calendar import
> java.sql.Date import org.apache.spark.storage.StorageLevel
> formatStringAsDate: (dateStr: String)java.sql.Date rowStructText:
> org.apache.spark.rdd.RDD[String] =
> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz
> MapPartitionsRDD[263] at textFile at <console>:154 defined class Summary
> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[265] at map
> at <console>:159 sumDF: org.apache.spark.sql.DataFrame = [f1: date, f2:
> bigint, f3: bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9:
> int, f10: int, f11: float, f12: int, f13: int, f14: string]
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 45.0 (TID 1872, datanode-6-3486.phx01.dev.ebayc3.com):
> java.lang.ArrayIndexOutOfBoundsException: 1 at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:163)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:161)
> at scala.collection.Iterator$$anon
>
> On Wed, Aug 5, 2015 at 9:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
wrote:
>
>> how do i persist the RDD to HDFS ?
>>
>> On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver <philip.weaver@gmail.com>
>> wrote:
>>
>>> This message means that java.util.Date is not supported by Spark
>>> DataFrame. You'll need to use java.sql.Date, I believe.
>>>
>>> On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>> wrote:
>>>
>>>> That seem to be working. however i see a new exception
>>>>
>>>> Code:
>>>> def formatStringAsDate(dateStr: String) = new
>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr)
>>>>
>>>>
>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
>>>> val rowStructText =
>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz")
>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11:
>>>> Float, f12: Integer, f13: Integer, f14: String)
>>>>
>>>> val summary  = rowStructText.map(s => s.split(",")).map(
>>>>     s => Summary(formatStringAsDate(s(0)),
>>>>             s(1).replaceAll("\"", "").toLong,
>>>>             s(3).replaceAll("\"", "").toLong,
>>>>             s(4).replaceAll("\"", "").toInt,
>>>>             s(5).replaceAll("\"", ""),
>>>>             s(6).replaceAll("\"", "").toInt,
>>>>             formatStringAsDate(s(7)),
>>>>             formatStringAsDate(s(8)),
>>>>             s(9).replaceAll("\"", "").toInt,
>>>>             s(10).replaceAll("\"", "").toInt,
>>>>             s(11).replaceAll("\"", "").toFloat,
>>>>             s(12).replaceAll("\"", "").toInt,
>>>>             s(13).replaceAll("\"", "").toInt,
>>>>             s(14).replaceAll("\"", "")
>>>>         )
>>>> ).toDF()
>>>> bank.registerTempTable("summary")
>>>>
>>>>
>>>> //Output
>>>> import java.text.SimpleDateFormat import java.util.Calendar import
>>>> java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
>>>> rowStructText: org.apache.spark.rdd.RDD[String] =
>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz
>>>> MapPartitionsRDD[105] at textFile at <console>:60 defined class Summary
x:
>>>> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
>>>> <console>:61 java.lang.UnsupportedOperationException: Schema for type
>>>> java.util.Date is not supported at
>>>> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
>>>> at
>>>> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
>>>> at
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)
>>>>
>>>>
>>>> Any suggestions
>>>>
>>>> On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver <philip.weaver@gmail.com>
>>>> wrote:
>>>>
>>>>> The parallelize method does not read the contents of a file. It simply
>>>>> takes a collection and distributes it to the cluster. In this case, the
>>>>> String is a collection 67 characters.
>>>>>
>>>>> Use sc.textFile instead of sc.parallelize, and it should work as you
>>>>> want.
>>>>>
>>>>> On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have csv data that is embedded in gzip format on HDFS.
>>>>>>
>>>>>> *With Pig*
>>>>>>
>>>>>> a = load
>>>>>> '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz'
using
>>>>>> PigStorage();
>>>>>>
>>>>>> b = limit a 10
>>>>>>
>>>>>>
>>>>>> (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,,,,,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)
>>>>>>
>>>>>>
>>>>>> (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)
>>>>>>
>>>>>>
>>>>>> However with Spark
>>>>>>
>>>>>> val rowStructText =
>>>>>> sc.parallelize("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00000.gz")
>>>>>>
>>>>>> val x = rowStructText.map(s => {
>>>>>>
>>>>>>     println(s)
>>>>>>
>>>>>>     s}
>>>>>>
>>>>>>     )
>>>>>>
>>>>>> x.count
>>>>>>
>>>>>> Questions
>>>>>>
>>>>>> 1) x.count always shows 67 irrespective of the path i change in
>>>>>> sc.parallelize
>>>>>>
>>>>>> 2) It shows x as RDD[Char] instead of String
>>>>>>
>>>>>> 3) println() never emits the rows.
>>>>>>
>>>>>> Any suggestions
>>>>>>
>>>>>> -Deepak
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>

Mime
View raw message