spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Working with Avro Generic Records in the interactive scala shell
Date Tue, 27 May 2014 15:55:28 GMT
Also see this context from February.  We started working with Chill to get
Avro records automatically registered with Kryo.  I'm not sure the final
status, but from the Chill PR #172 it looks like this might be much less
friction than before.

Issue we filed: https://github.com/twitter/chill/issues/171
Pull request that adds an AvroSerializer to Chill:
https://github.com/twitter/chill/pull/172
Issue on the old Spark tracker:
https://spark-project.atlassian.net/browse/SPARK-746

Matt can you comment if this change helps you streamline that gist even
further?

Andrew




On Tue, May 27, 2014 at 8:49 AM, Jeremy Lewi <jeremy@lewi.us> wrote:

> Thanks that's super helpful.
>
> J
>
>
> On Tue, May 27, 2014 at 8:01 AM, Matt Massie <massie@berkeley.edu> wrote:
>
>> I really should update that blog post. I created a gist (see
>> https://gist.github.com/massie/7224868) which explains a cleaner, more
>> efficient approach.
>>
>> --
>> Matt <http://www.linkedin.com/in/mattmassie/> Massie<http://www.twitter.com/matt_massie>
>> UC, Berkeley AMPLab <https://twitter.com/amplab>
>>
>>
>> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi <jeremy@lewi.us> wrote:
>>
>>> I was able to work around this by switching to the SpecificDatum
>>> interface and following this example:
>>>
>>> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java
>>>
>>> As in the example, I defined a subclass of my Avro type which
>>> implemented the Serializable interface using Avro serialization methods.
>>>
>>> I also defined a copy constructor which converted from the actual avro
>>> type to my subclass.
>>>
>>> In spark, after reading the Avro file, I ran a map operation to convert
>>> from the avro type to my serializable subclass.
>>> This worked although I'm not sure its the most efficient solution.
>>>
>>> Here's a gist of what I run in the console:
>>> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific
>>>
>>> I haven't gotten Kryo registration to work yet but it seems like setting
>>> the registrator before launching the console using the environment variable
>>> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
>>> context in the console.
>>>
>>> J
>>>
>>>
>>> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi <jeremy@lewi.us> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> Thanks for the help.
>>>>
>>>> The class should be on the path on all nodes. Here's what I did:
>>>> 1) I built a jar from my scala code.
>>>> 2) I copied that jar to a location on all nodes in my cluster
>>>> (/usr/local/spark)
>>>> 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
>>>> 4) I repeated the process with the avro mapreduce jar to provide
>>>> AvroKey.
>>>>
>>>> I doubt this is the best way to set the classpath but it seems to work.
>>>>
>>>> J
>>>>
>>>>
>>>> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus <jmarcus@meetup.com>wrote:
>>>>
>>>>> Jeremy,
>>>>>
>>>>> Just to be clear, are you assembling a jar with that class compiled
>>>>> (with its dependencies) and including the path to that jar on the command
>>>>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?
>>>>>
>>>>> --j
>>>>>
>>>>>
>>>>> On Saturday, May 24, 2014, Jeremy Lewi <jeremy@lewi.us> wrote:
>>>>>
>>>>>> Hi Spark Users,
>>>>>>
>>>>>> I'm trying to read and process an Avro dataset using the interactive
>>>>>> spark scala shell. When my pipeline executes I get the
>>>>>> ClassNotFoundException pasted at the end of this email.
>>>>>> I'm trying to use the Generic Avro API (not the Specific API).
>>>>>>
>>>>>> Here's a gist of the commands I'm running in the spark console:
>>>>>> https://gist.github.com/jlewi/2c853edddd0ceee5f00c
>>>>>>
>>>>>> Here's my registrator for kryo.
>>>>>>
>>>>>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala
>>>>>>
>>>>>> Any help or suggestions would be greatly appreciated.
>>>>>>
>>>>>> Thanks
>>>>>> Jeremy
>>>>>>
>>>>>> Here's the log message that is spewed out.
>>>>>>
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>  at java.security.AccessController.doPrivileged(Native Method)
>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>>>>  at java.lang.Class.forName0(Native Method)
>>>>>> at java.lang.Class.forName(Class.java:270)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>>  at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>  at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>  at
>>>>>> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>>>>>  at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>  at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>>>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>  at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>  at
>>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>>>>>  at
>>>>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>>>>>> at
>>>>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>>>>>>  at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>  at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>>>>>>  at
>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
>>>>>> at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>  at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>>  at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>  at
>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>>>>>>  at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>  at java.lang.Thread.run(Thread.java:744)
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 2 (task 0.0:2)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 1]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 4 (task 0.0:4)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 2]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 3 (task 0.0:3)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 3]
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:3 as TID
12
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:3 as 1703
>>>>>> bytes in 1 ms
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:4 as TID
13
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:4 as 1703
>>>>>> bytes in 1 ms
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Starting task 0.0:2 as TID
14
>>>>>> on executor 0: spark-slave00.c.biocloudops.internal (PROCESS_LOCAL)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Serialized task 0.0:2 as 1703
>>>>>> bytes in 0 ms
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 7 (task 0.0:7)
>>>>>> 14/05/24 02:00:48 INFO TaskSetManager: Loss was due to
>>>>>> java.lang.ClassNotFoundException:
>>>>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 [duplicate 4]
>>>>>> 14/05/24 02:00:48 WARN TaskSetManager: Lost TID 6 (task 0.0:6)
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message