spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Spark Streaming - Inserting into Tables
Date Wed, 15 Jul 2015 03:20:43 GMT
Why is .remember not ideal?


On Sun, Jul 12, 2015 at 7:22 PM, Brandon White <bwwinthehouse@gmail.com>
wrote:

> Hi Yin,
>
> Yes there were no new rows. I fixed it by doing a .remember on the
> context. Obviously, this is not ideal.
>
> On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yhuai@databricks.com> wrote:
>
>> Hi Brandon,
>>
>> Can you explain what did you mean by "It simply does not work"? You did
>> not see new data files?
>>
>> Thanks,
>>
>> Yin
>>
>> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwinthehouse@gmail.com>
>> wrote:
>>
>>> Why does this not work? Is insert into broken in 1.3.1? It does not
>>> throw any errors, fail, or throw exceptions. It simply does not work.
>>>
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>
>>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>>
>>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
>>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
>>> parquetFile.registerTempTable("rideaccepted")
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.insertInto("rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>> Or this?
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>> val currentStream = ssc.textFileStream("s3://textFileDirectory")
>>> val day = sqlContext.jsonFile("s3://textFileDirectory")
>>> day.registerTempTable("rideaccepted")
>>>
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.registerTempTable("tmp_rideaccepted")
>>>   sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>> or this?
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>
>>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>>
>>> dayBefore..registerTempTable("rideaccepted")
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.insertInto("rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>
>

Mime
View raw message