Why is .remember not ideal?


On Sun, Jul 12, 2015 at 7:22 PM, Brandon White <bwwinthehouse@gmail.com> wrote:
Hi Yin,

Yes there were no new rows. I fixed it by doing a .remember on the context. Obviously, this is not ideal. 

On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yhuai@databricks.com> wrote:
Hi Brandon,

Can you explain what did you mean by "It simply does not work"? You did not see new data files?

Thanks,

Yin

On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwinthehouse@gmail.com> wrote:
Why does this not work? Is insert into broken in 1.3.1? It does not throw any errors, fail, or throw exceptions. It simply does not work. 


val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
parquetFile.registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()

Or this?

val ssc = new StreamingContext(sc, Minutes(10))
val currentStream = ssc.textFileStream("s3://textFileDirectory")
val day = sqlContext.jsonFile("s3://textFileDirectory")
day.registerTempTable("rideaccepted")


currentStream.foreachRDD { rdd =>
val df = sqlContext.jsonRDD(rdd)
df.registerTempTable("tmp_rideaccepted")
sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
}

ssc.start()

or this?

val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore..registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()