spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Malouf <malouf.g...@gmail.com>
Subject Re:
Date Mon, 02 Sep 2013 16:57:30 GMT
Hi Matei,

I've been experimenting in the interpreter to get this to work:

import org.apache.hadoop.io._;
import
com.mediacrossing.data.interfaces.PurchasedImpressionMessage.PurchasedImpression;
val hdfsBase="hdfs://nn-01:8020/";
val data = hdfsBase +
"flume/impressions/yr=2013/mo=06/d=16/logger=dn-01s1/Impr.1371340801351";
val inputs = sc.sequenceFile[LongWritable, BytesWritable](data);
val lineItems = inputs.map({ case(_, value: BytesWritable) =>
PurchasedImpression.parseFrom(value.getBytes()).getBuyerData.getLineItemId
});
val counts = lineItems.map(li => (li, 1)).reduceByKey(_ + _);
counts.foreach { case (li, count) => println(li+": "+count) }

After all of that input, I get the following:

13/09/02 16:55:25 INFO spark.SparkContext: Starting job: foreach at
<console>:27
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Registering RDD 4
(reduceByKey at <console>:24)
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Got job 0 (foreach at
<console>:27) with 2 output partitions (allowLocal=false)
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Final stage: Stage 0
(reduceByKey at <console>:24)
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1)
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1)
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[4] at reduceByKey at <console>:24), which has no missing
parents
13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 1 (MapPartitionsRDD[4] at reduceByKey at <console>:24)
13/09/02 16:55:25 INFO cluster.ClusterScheduler: Adding task set 1.0 with 2
tasks
13/09/02 16:55:25 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 0
on executor 0: spark-02 (non-preferred, not one of dn-03, dn-02, dn-01)
13/09/02 16:55:25 INFO cluster.TaskSetManager: Serialized task 1.0:0 as
2052 bytes in 110 ms
13/09/02 16:55:29 INFO cluster.TaskSetManager: Lost TID 0 (task 1.0:0)
13/09/02 16:55:29 INFO cluster.TaskSetManager: Loss was due to
com.google.protobuf.InvalidProtocolBufferException
com.google.protobuf.InvalidProtocolBufferException: Protocol message
contained an invalid tag (zero).
    at
com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:68)
    at
com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
    at
com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3639)
    at
com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3371)
    at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300)
    at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at
com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162)
    at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716)
    at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at
com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153)
    at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709)
    at
com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression.parseFrom(PurchasedImpressionMessage.java:3305)
    at
$line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
    at
$line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22)
    at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
    at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
    at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$class.foreach(Iterator.scala:772)
    at scala.collection.Iterator$$anon$22.foreach(Iterator.scala:451)
    at spark.Aggregator.combineValuesByKey(Aggregator.scala:20)
    at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
    at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
    at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
    at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
    at spark.RDD.iterator(RDD.scala:196)
    at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
    at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
    at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)



Thanks,

Gary






On Sun, Sep 1, 2013 at 1:52 PM, Gary Malouf <malouf.gary@gmail.com> wrote:

> We are using Spark 0.7.3 compiled (and running) against Hadood
> 2.0.0-mr1-cdh4.2.1.
>
> When I read a sequence file in, I have a series of key-value pairs
> (specifically, the keys are longs and the values are byte arrays).  When I
> use the scala-based Scoobi library to parse each byte-array into a protobuf
> message, I have no issues.  However, when I try to parse the values in
> these sequence files into the protobuf messages they were created as, I get
> the following:
>
> com.google.protobuf.InvalidProtocolBufferException: Protocol message
> contained an invalid tag (zero)
>
> Has anyone else experiened this before?  Is there anything special that
> must be done when reading in the sequence files?
>
>

Mime
View raw message