spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Stream reading from database using spark streaming
Date Thu, 02 Jun 2016 19:58:56 GMT
ok that is fine. so the source is an IMDB something like Oracle TimesTen
that I have worked with before.
The second source is some organised data (I assume you mean structured
tabular data


   1. Data is read from source one, the IMDB. The assumption is that within
   the batch interval that data is not going to change
   2. Data is read from source 2 which you will confirm what it is and
   again that data is not going to change within the batch interval
   3. You then want to register each RDD as temp table and do SQL on temp
   tables?

An example something like below

val sparkConf = new SparkConf().
             setAppName("CEP_streaming_with_JDBC").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")

  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  HiveContext.sql("use oraclehadoop")
  var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
  var _username : String = "xyz"
  var _password : String = "xyz123"

  // Get data from Oracle table
  val d = HiveContext.load("jdbc",
  Map("url" -> _ORACLEserver,
  "dbtable" -> "(SELECT amount_sold, time_id, TO_CHAR(channel_id) AS
channel_id FROM scratchpad.sales)",
  "user" -> _username,
  "password" -> _password))

*d.registerTempTable("tmp")*
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" ->
"CEP_streaming_with_JDBC" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 2 June 2016 at 20:45, Zakaria Hili <zakahili@gmail.com> wrote:

> Ted Yu
> That's not a spark streaming, its just a simple batch.
>
> Mich Talebzadeh
> in fact, I'm not working with mysql, but I'm working with an In-memory new
> SQL database, I said Mysql because if I found something compatible with
> mysql, it will work with NewSQL, which is very efficient for reading.
> why spark streaming with a database, the answer: I have another stream
> processing who  push data (organized data) into in memory database, and
> spark should read this data and use dataframe +Machine learning for
> prediction.
>
>
>
>
> 2016-06-02 19:12 GMT+02:00 Mich Talebzadeh <mich.talebzadeh@gmail.com>:
>
>> I don't understand this.  How are you going to read from RDBMS database,
>> through JDBC?
>>
>> How often are you going to sample the transactional tables?
>>
>> You may find that a JDBC connection will take longer than your sliding
>> window length.
>>
>> Is this for real time analytics?
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 18:08, Ted Yu <yuzhihong@gmail.com> wrote:
>>
>>>
>>> http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/
>>>
>>>
>>> https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.rdd.JdbcRDD
>>>
>>> FYI
>>>
>>> On Thu, Jun 2, 2016 at 6:26 AM, Zakaria Hili <zakahili@gmail.com> wrote:
>>>
>>>> I want to use spark streaming to read data from RDBMS database like
>>>> mysql.
>>>>
>>>> but I don't know how to do this using JavaStreamingContext
>>>>
>>>>  JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(500));DataFrame
df = jssc. ??
>>>>
>>>> I search in the internet but I didn't find anything
>>>>
>>>> thank you in advance.
>>>> ᐧ
>>>>
>>>
>>>
>>
> ᐧ
>

Mime
View raw message