spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Darin McBeath <ddmcbe...@yahoo.com.INVALID>
Subject Question about RDD Union and SubtractByKey
Date Tue, 11 Nov 2014 01:43:21 GMT
I have the following code where I'm using RDD 'union' and 'subtractByKey' to create a new baseline
RDD.  All of my RDDs are a key pair with the 'key' a String and the 'value' a String (xml
document).
// ******************************************************// Merge the daily deletes/updates/adds
with the baseline// ****************************************************** // Concat the Updates,
Deletes into one PairRDDJavaPairRDD<String,String> updateDeletePairRDD = updatePairRDD.union(deletePairRDD);
// Remove the update/delete  keys from the baselineJavaPairRDD<String,String> workSubtractBaselinePairRDD
= baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the AddsJavaPairRDD<String,String>
workAddBaselinePairRDD = workSubtractBaselinePairRDD.union(addPairRDD);

// Add in the UpdatesJavaPairRDD<String,String> newBaselinePairRDD = workAddBaselinePairRDD.union(updatePairRDD);
When I go to 'count' the newBaselinePairRDD 
// Output count for new baseline log.info("Number of new baseline records: " + newBaselinePairRDD.count());
I'm getting the following exception (the above log.info is SparkSync.java:785).  What I find
odd is the reference to spark sql.  So, I'm curious as to whether under the covers the RDD
union and/or subtractByKey are implemented as spark sql. I wouldn't think so but thought I
would ask.  I'm also suspicious to the reference to the '<' and whether that is because
of my xml document in the value portion of the key pair.  Any insights would be appreciated.
 If there are thoughts for how to better approach my problem (even debugging), I would be
interested in that as well.  The updatePairRDD, deletePairRDD, baselinePairRDD, addPairRDD,
and updateDeletePairRDD are all 'hashPartitioned'.
It's also a bit difficult to trace things because my application is a 'java' application and
the stack references a lot of scala and very few references to my application other than one
(SparkSync.java:785).  My application is using Spark SQL for some other tasks so perhaps
an RDD (part) is being re-calculated and is resulting in this error.  But, based on other
logging statements throughout my application, I don't believe this is the case.
Thanks.
Darin.
14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at SparkSync.java:78514/11/10
22:35:27 WARN scheduler.TaskSetManager: Lost task 0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal):
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected
a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@e8f759e;
line: 1, column: 2]        com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) 
      com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) 
      com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) 
      com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415) 
      com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679) 
      com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024) 
      com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) 
      com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)   
    org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275) 
      org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274) 
      scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
      org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62) 
      org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) 
      org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)     
  org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)        org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) 
      org.apache.spark.rdd.RDD.iterator(RDD.scala:227)        org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
      org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
      org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)     
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
      org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)     
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)        org.apache.spark.scheduler.Task.run(Task.scala:54) 
      org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)       
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)       
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)       
java.lang.Thread.run(Thread.java:745)

Mime
View raw message