spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Praseetha <prasikris...@gmail.com>
Subject Re: Verifying if DStream is empty
Date Mon, 20 Jun 2016 17:03:16 GMT
Thanks a lot for the response.
input1Pair is a DStream. I tried with the code snippet below,

    result.foreachRDD{externalRDD =>
       if(!externalRDD.isEmpty()){
         val ss = input1Pair.transform{ rdd =>
input2Pair.leftOuterJoin(rdd)}
       }else{
         val ss = input1Pair.transform{ rdd =>
input2Pair.leftOuterJoin(rdd)}
       }
     }

I'm getting the following exception:
*java.lang.IllegalStateException: Adding new inputs, transformations, and
output operations after starting a context is not supported*
* at
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)*
* at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)*
* at
org.apache.spark.streaming.dstream.TransformedDStream.<init>(TransformedDStream.scala:25)*
* at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)*
* at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)*

I don't think we can perform transformation on RDDs,that are outside for
foreachRDD.
My requirement is to figure out if the DStream 'result' is empty or not and
based on the result, perform some operation on input1Pair DStream and
input2Pair RDD.


On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan <newvalue92@gmail.com>
wrote:

> Hi Praseetha,
> In order to check if DStream is empty or not, using isEmpty method is
> correct. I think the problem here is calling  input1Pair.lefOuterJoin(input2Pair).
> I guess input1Pair rdd comes from above transformation. You should do it
> on DStream instead. In this case, do any transformation with x variable
> instead.
> If you use input2Pair rdd a lot, you can consider caching it for better
> performance.
>
> 2016-06-20 19:30 GMT+07:00 Praseetha <prasikrishna@gmail.com>:
>
>>
>> Hi Experts,
>>
>> I have 2 inputs, where first input is stream (say input1) and the second
>> one is batch (say input2). I want to figure out if the keys in first input
>> matches single row or more than one row in the second input. The further
>> transformations/logic depends on the number of rows matching, whether
>> single row matches or multiple rows match (for atleast one key in the first
>> input)
>>
>> if(single row matches){
>>      // do some tranformation
>> }else{
>>      // do some transformation
>> }
>>
>> Code that i tried so far
>>
>> val input1Pair = streamData.map(x => (x._1, x))
>> val input2Pair = input2.map(x => (x._1, x))
>> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
>> val result = joinData.mapValues{
>>     case(v, Some(a)) => 1L
>>     case(v, None) => 0
>>  }.reduceByKey(_ + _).filter(_._2 > 1)
>>
>> I have done the above coding. When I do result.print, it prints nothing
>> if all the keys matches only one row in the input2. With the fact that the
>> DStream may have multiple RDDs, not sure how to figure out if the DStream
>> is empty or not.
>>
>> I tried using foreachRDD, but the streaming app stops abruptly.
>>
>> Inside foreachRDD i was performing transformations with other RDDs. like,
>>
>> result.foreachRDD{ x=>
>>
>> if(x.isEmpty){
>>
>> val out = input1Pair.lefOuterJoin(input2Pair)
>>
>> }else{
>>
>> val out = input1Pair.rightOuterJoin(input2Pair)
>>
>> }
>>
>> Can you please suggest.
>>
>>
>> Regds,
>> --Praseetha
>>
>
>

Mime
View raw message