spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hemanth Gudela <hemanth.gud...@qvantel.com>
Subject Spark registered view in "Future" - View changes updated in "Future" are lost in main thread
Date Mon, 24 Apr 2017 12:29:34 GMT
Hi,

I’m trying to write a background thread using “Future” which would periodically re-register
a view with latest data from underlying database table.
However, the data changes updated in “Future” thread are lost in main thread.

In the below code,

1.       In the beginning, registered view “myView” has only 1 row (1, ‘a’), that
is shown as first output

+-------+-----+

|id     |value|

+-------+-----+

|1      |a    |

+-------+-----+

2.       After a minute, a background “Future” thread inserts a new row (1, ‘b’) in
the database, and then re-registers “myView” with latest updates from underlying table.

a.       The second output clearly shows that “myView” in the “Future” has 2 rows
  +-------+-----+

  |id     |value|

  +-------+-----+

  |1      |a    |

  |2      |b    |

  +-------+-----+



3.       After 2 minutes, when I query “myView” in the main thread, it doesn’t show
newly added row (1, ‘b”) even though “myView” has picked up the changes in “Future”
thread. As you can observe, the third output shows only one row (1, ‘a’) again!

+-------+-----+

|id     |value|

+-------+-----+

|1      |a    |

+-------+-----+

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val url = "jdbc:mysql://localhost:3306/myDb?sessionVariables=sql_mode='ANSI'" //mysql jdbc
url

//set database properties
val dbProperties = new java.util.Properties
dbProperties.setProperty("user","myUser")
dbProperties.setProperty("password","myPass")

//read a database table, register a temp view and cache it
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView")  //register
a temp view named "myView"
spark.sql("cache table myView")
spark.sql("select * from myView").show //in the beginning, myView has just 1 record
/*
+-------+-----+
|id     |value|
+-------+-----+
|1      |a    |
+-------+-----+
*/

Future { //in a background thread, insert a new row in database table, re-register temp view
and refresh the cache
  Thread.sleep(1000*60) //(not necessary but) wait for a minute

  spark.sql("select 2 as id, 'b' as value").write.mode("append").jdbc(url,"myView",dbProperties)
  spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") //re-register
"myView"
  spark.sql("cache table myView")    //refresh cache of myView again
  spark.sql("select * from myView").show  //myView now has 2 records
  /*
  +-------+-----+
  |id     |value|
  +-------+-----+
  |1      |a    |
  |2      |b    |
  +-------+-----+
  */
}
Thread.sleep(1000*60*2) //wait for 2 minutes
spark.sql("select * from myView").show //Why is myView having only 1 record!?!
/*
+-------+-----+
|id     |value|
+-------+-----+
|1      |a    |
+-------+-----+
*/

I have assumed that a temp view registered in “Future” thread is thread local, but that
doesn’t seem to be the case always.
When the data source is a database table, the data changes updated in a registered view “Future”
are lost in main thread. However, when the data source is parquet, the changes updated in
a registered view sustain even in the main thread.

Could you please throw some light on what’s happening in the behavior of registered view
when the data source is database, and why the behavior is different when data source is parquet.

Thank you (in advance ☺)
Hemanth



Mime
View raw message