spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Quinn <squ...@gatech.edu>
Subject Re: Iterative transformations over RDD crashes in phantom reduce
Date Tue, 18 Nov 2014 19:31:59 GMT
Sorry everyone--turns out an oft-forgotten single line of code was 
required to make this work:

index = 0
INDEX = sc.broadcast(index)
M = M.flatMap(func1).reduceByKey(func2)
M.foreach(debug_output)
*M.cache()*
index = 1
INDEX = sc.broadcast(index)
M = M.flatMap(func1)
M.foreach(debug_output)

Works as expected now, and I understand why it was failing before: Spark 
was trying to recompute the RDD but consequently it was invoked with 
index == 1.

On 11/18/14 2:02 PM, Shannon Quinn wrote:
> To clarify about what, precisely, is impossible: the crash happens 
> with INDEX == 1 in func2, but func2 is only called in the reduceByKey 
> transformation when INDEX == 0. And according to the output of the 
> foreach() in line 4, that reduceByKey(func2) works just fine. How is 
> it then invoked again with INDEX == 1 when there clearly isn't another 
> reduce call at line 7?
>
> On 11/18/14 1:58 PM, Shannon Quinn wrote:
>> Hi all,
>>
>> This is somewhat related to my previous question ( 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html

>> , for additional context) but for all practical purposes this is its 
>> own issue.
>>
>> As in my previous question, I'm making iterative changes to an RDD, 
>> where each iteration depends on the results of the previous one. I've 
>> stripped down what was previously a loop to just be two sequential 
>> edits to try and nail down where the problem is. It looks like this:
>>
>> index = 0
>> INDEX = sc.broadcast(index)
>> M = M.flatMap(func1).reduceByKey(func2)
>> M.foreach(debug_output)
>> index = 1
>> INDEX = sc.broadcast(index)
>> M = M.flatMap(func1)
>> M.foreach(debug_output)
>>
>> M is basically a row-indexed matrix, where each index points to a 
>> dictionary (sparse matrix more or less, with some domain-specific 
>> modifications). This program crashes on the second-to-last (7th) 
>> line; the creepy part is that it says the crash happens in "func2" 
>> with the broadcast variable "INDEX" == 1 (it attempts to access an 
>> entry that doesn't exist in a dictionary of one of the rows).
>>
>> How is that even possible? Am I missing something fundamental about 
>> how Spark works under the hood?
>>
>> Thanks for your help!
>>
>> Shannon
>


Mime
View raw message