spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Malouf <malouf.g...@gmail.com>
Subject Re:
Date Mon, 02 Sep 2013 19:03:10 GMT
That did it, thanks Matei!


On Mon, Sep 2, 2013 at 2:30 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> So I think the problem might be that BytesWritable.getBytes() can return
> an array bigger than the actual bytes used (see
> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#getBytes()).
It just returns a backing array that can be reused across records. Try
> using copyBytes instead:
> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#copyBytes().
>
> Matei
>
> On Sep 2, 2013, at 9:57 AM, Gary Malouf <malouf.gary@gmail.com> wrote:
>
> Hi Matei,
>
> I've been experimenting in the interpreter to get this to work:
>
> import org.apache.hadoop.io._;
> import
> com.mediacrossing.data.interfaces.PurchasedImpressionMessage.PurchasedImpression;
> val hdfsBase="hdfs://nn-01:8020/";
> val data = hdfsBase +
> "flume/impressions/yr=2013/mo=06/d=16/logger=dn-01s1/Impr.1371340801351";
> val inputs = sc.sequenceFile[LongWritable, BytesWritable](data);
> val lineItems = inputs.map({ case(_, value: BytesWritable) =>
> PurchasedImpression.parseFrom(value.getBytes()).getBuyerData.getLineItemId
> });
> val counts = lineItems.map(li => (li, 1)).reduceByKey(_ + _);
> counts.foreach { case (li, count) => println(li+": "+count) }
>
> After all of that input, I get the following:
>
> 13/09/02 16:55:25 INFO spark.SparkContext: Starting job: foreach at
> <console>:27
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Registering RDD 4
> (reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Got job 0 (foreach at
> <console>:27) with 2 output partitions (allowLocal=false)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Final stage: Stage 0
> (reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Parents of final stage:
> List(Stage 1)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Missing parents: List(Stage
> 1)
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting Stage 1
> (MapPartitionsRDD[4] at reduceByKey at <console>:24), which has no missing
> parents
> 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
> from Stage 1 (MapPartitionsRDD[4] at reduceByKey at <console>:24)
> 13/09/02 16:55:25 INFO cluster.ClusterScheduler: Adding task set 1.0 with
> 2 tasks
> 13/09/02 16:55:25 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID
> 0 on executor 0: spark-02 (non-preferred, not one of dn-03, dn-02, dn-01)
> 13/09/02 16:55:25 INFO cluster.TaskSetManager: Serialized task 1.0:0 as
> 2052 bytes in 110 ms
> 13/09/02 16:55:29 INFO cluster.TaskSetManager: Lost TID 0 (task 1.0:0)
> 13/09/02 16:55:29 INFO cluster.TaskSetManager: Loss was due to
> com.google.protobuf.InvalidProtocolBufferException
> com.google.protobuf.InvalidProtocolBufferException: Protocol message
> contained an invalid tag (zero).
>     at
> com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:68)
>     at
> com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
>     at
> com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3639)
>     at
> com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3371)
>     at
> com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300)
>     at
> com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
>     at
> com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162)
>     at
> com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716)
>     at
> com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
>     at
> com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153)
>     at
> com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709)
>     at
> com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression.parseFrom(PurchasedImpressionMessage.java:3305)
>     at
> $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
>     at
> $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
>     at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>     at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>     at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:458)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>     at scala.collection.Iterator$$anon$22.foreach(Iterator.scala:451)
>     at spark.Aggregator.combineValuesByKey(Aggregator.scala:20)
>     at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
>     at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
>     at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
>     at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
>     at spark.RDD.iterator(RDD.scala:196)
>     at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
>     at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
>     at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>
>
> Thanks,
>
> Gary
>
>
>
>
>
>
> On Sun, Sep 1, 2013 at 1:52 PM, Gary Malouf <malouf.gary@gmail.com> wrote:
>
>> We are using Spark 0.7.3 compiled (and running) against Hadood
>> 2.0.0-mr1-cdh4.2.1.
>>
>> When I read a sequence file in, I have a series of key-value pairs
>> (specifically, the keys are longs and the values are byte arrays).  When I
>> use the scala-based Scoobi library to parse each byte-array into a protobuf
>> message, I have no issues.  However, when I try to parse the values in
>> these sequence files into the protobuf messages they were created as, I get
>> the following:
>>
>> com.google.protobuf.InvalidProtocolBufferException: Protocol message
>> contained an invalid tag (zero)
>>
>> Has anyone else experiened this before?  Is there anything special that
>> must be done when reading in the sequence files?
>>
>>
>
>

Mime
View raw message