spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brandon White <bwwintheho...@gmail.com>
Subject Re: Spark Streaming - Inserting into Tables
Date Mon, 13 Jul 2015 02:22:08 GMT
Hi Yin,

Yes there were no new rows. I fixed it by doing a .remember on the context.
Obviously, this is not ideal.

On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yhuai@databricks.com> wrote:

> Hi Brandon,
>
> Can you explain what did you mean by "It simply does not work"? You did
> not see new data files?
>
> Thanks,
>
> Yin
>
> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwinthehouse@gmail.com>
> wrote:
>
>> Why does this not work? Is insert into broken in 1.3.1? It does not throw
>> any errors, fail, or throw exceptions. It simply does not work.
>>
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
>> parquetFile.registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> Or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>> val currentStream = ssc.textFileStream("s3://textFileDirectory")
>> val day = sqlContext.jsonFile("s3://textFileDirectory")
>> day.registerTempTable("rideaccepted")
>>
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.registerTempTable("tmp_rideaccepted")
>>   sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore..registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>

Mime
View raw message