spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re:
Date Mon, 02 Sep 2013 18:30:30 GMT
So I think the problem might be that BytesWritable.getBytes() can return an array bigger than
the actual bytes used (see http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#getBytes()
). It just returns a backing array that can be reused across records. Try using copyBytes
instead: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#copyBytes()
.

Matei

On Sep 2, 2013, at 9:57 AM, Gary Malouf <malouf.gary@gmail.com> wrote:

> Hi Matei,
> 
> I've been experimenting in the interpreter to get this to work:
> 
> import org.apache.hadoop.io._;
> import com.mediacrossing.data.interfaces.PurchasedImpressionMessage.PurchasedImpression;
> val hdfsBase="hdfs://nn-01:8020/";
> val data = hdfsBase + "flume/impressions/yr=2013/mo=06/d=16/logger=dn-01s1/Impr.1371340801351";
> val inputs = sc.sequenceFile[LongWritable, BytesWritable](data);
> val lineItems = inputs.map({ case(_, value: BytesWritable) => PurchasedImpression.parseFrom(value.getBytes()).getBuyerData.getLineItemId
});
> val counts = lineItems.map(li => (li, 1)).reduceByKey(_ + _);
> counts.foreach { case (li, count) => println(li+": "+count) }
> 
> After all of that input, I get the following:
> 
> 13/09/02 16:55:25 INFO spark.SparkContext: Starting job: foreach at <console>:27
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Got job 0 (foreach at <console>:27)
with 2 output partitions (allowLocal=false)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Final stage: Stage 0 (reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4]
at reduceByKey at <console>:24), which has no missing parents
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage
1 (MapPartitionsRDD[4] at reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO cluster.ClusterScheduler: Adding task set 1.0 with 2 tasks
> 13/09/02 16:55:25 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 0 on executor
0: spark-02 (non-preferred, not one of dn-03, dn-02, dn-01)
> 13/09/02 16:55:25 INFO cluster.TaskSetManager: Serialized task 1.0:0 as 2052 bytes in
110 ms
> 13/09/02 16:55:29 INFO cluster.TaskSetManager: Lost TID 0 (task 1.0:0)
> 13/09/02 16:55:29 INFO cluster.TaskSetManager: Loss was due to com.google.protobuf.InvalidProtocolBufferException
> com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid
tag (zero).
>     at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:68)
>     at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
>     at com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3639)
>     at com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3371)
>     at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300)
>     at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
>     at com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162)
>     at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716)
>     at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
>     at com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153)
>     at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709)
>     at com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression.parseFrom(PurchasedImpressionMessage.java:3305)
>     at $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
>     at $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
>     at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>     at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>     at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:458)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>     at scala.collection.Iterator$$anon$22.foreach(Iterator.scala:451)
>     at spark.Aggregator.combineValuesByKey(Aggregator.scala:20)
>     at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
>     at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
>     at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
>     at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
>     at spark.RDD.iterator(RDD.scala:196)
>     at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
>     at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
>     at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 
> 
> 
> Thanks,
> 
> Gary
> 
> 
> 
> 
> 
> 
> On Sun, Sep 1, 2013 at 1:52 PM, Gary Malouf <malouf.gary@gmail.com> wrote:
> 
> We are using Spark 0.7.3 compiled (and running) against Hadood 2.0.0-mr1-cdh4.2.1.
> 
> When I read a sequence file in, I have a series of key-value pairs (specifically, the
keys are longs and the values are byte arrays).  When I use the scala-based Scoobi library
to parse each byte-array into a protobuf message, I have no issues.  However, when I try to
parse the values in these sequence files into the protobuf messages they were created as,
I get the following:
> 
> com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid
tag (zero)
> 
> Has anyone else experiened this before?  Is there anything special that must be done
when reading in the sequence files?
> 
> 


Mime
View raw message