spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Phil Kallos <phil.kal...@gmail.com>
Subject Re: Spark 1.5 Streaming and Kinesis
Date Mon, 19 Oct 2015 21:07:19 GMT
I am currently trying a few code changes to see if I can squash this error.
I have created https://issues.apache.org/jira/browse/SPARK-11193 to track
progress, hope that is okay!

In the meantime, can anyone confirm their ability to run the Kinesis-ASL
example using Spark > 1.5.x ? Would be helpful to know if it works in some
cases but not others.
http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html

Thanks
Phil

On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré <jb@nanthrax.net>
wrote:

> Hi Phil,
>
> sorry I didn't have time to investigate yesterday (I was on a couple of
> other Apache projects ;)). I will try to do it today. I keep you posted.
>
> Regards
> JB
>
> On 10/16/2015 07:21 AM, Phil Kallos wrote:
>
>> JB,
>>
>> To clarify, you are able to run the Amazon Kinesis example provided in
>> the spark examples dir?
>>
>> bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
>> [endpoint url] ?
>>
>> If it helps, below are the steps I used to build spark
>>
>> mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package
>>
>> And I did this with revision 4f894dd6906311cb57add6757690069a18078783
>> (v.1.5.1)
>>
>> Thanks,
>> Phil
>>
>>
>> On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eugen@gmail.com
>> <mailto:cepoi.eugen@gmail.com>> wrote:
>>
>>     So running it using spark-submit doesnt change anything, it still
>> works.
>>
>>     When reading the code
>>
>> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
>>     it looks like the receivers are definitely being ser/de. I think
>>     this is the issue, need to find a way to confirm that now...
>>
>>     2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eugen@gmail.com
>>     <mailto:cepoi.eugen@gmail.com>>:
>>
>>         Hey,
>>
>>         A quick update on other things that have been tested.
>>
>>         When looking at the compiled code of the
>>         spark-streaming-kinesis-asl jar everything looks normal (there
>>         is a class that implements SyncMap and it is used inside the
>>         receiver).
>>         Starting a spark shell and using introspection to instantiate a
>>         receiver and check that blockIdToSeqNumRanges implements SyncMap
>>         works too. So obviously it has the correct type according to that.
>>
>>         Another thing to test could be to do the same introspection
>>         stuff but inside a spark job to make sure it is not a problem in
>>         the way the jobs are run.
>>         The other idea would be that this is a problem related to
>>         ser/de. For example if the receiver was being serialized and
>>         then deserialized it could definitely happen depending on the
>>         lib used and its configuration that it just doesn't preserve the
>>         concrete type. So it would deserialize using the compile type
>>         instead of the runtime type.
>>
>>         Cheers,
>>         Eugen
>>
>>
>>         2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <jb@nanthrax.net
>>         <mailto:jb@nanthrax.net>>:
>>
>>             Thanks for the update Phil.
>>
>>             I'm preparing a environment to reproduce it.
>>
>>             I keep you posted.
>>
>>             Thanks again,
>>             Regards
>>             JB
>>
>>             On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>                 Not a dumb question, but yes I updated all of the
>>                 library references to
>>                 1.5, including  (even tried 1.5.1).
>>
>>                 // Versions.spark set elsewhere to "1.5.0"
>>                 "org.apache.spark" %% "spark-streaming-kinesis-asl" %
>>                 Versions.spark %
>>                 "provided"
>>
>>                 I am experiencing the issue in my own spark project, but
>>                 also when I try
>>                 to run the spark streaming kinesis example that comes in
>>                 spark/examples
>>
>>                 Tried running the streaming job locally, and also in EMR
>>                 with release
>>                 4.1.0 that includes Spark 1.5
>>
>>                 Very strange!
>>
>>                      ---------- Forwarded message ----------
>>
>>                      From: "Jean-Baptiste Onofré" <jb@nanthrax.net
>>                 <mailto:jb@nanthrax.net> <mailto:jb@nanthrax.net
>>                 <mailto:jb@nanthrax.net>>>
>>                      To: user@spark.apache.org
>>                 <mailto:user@spark.apache.org>
>>                 <mailto:user@spark.apache.org
>>
>>                 <mailto:user@spark.apache.org>>
>>
>>                      Cc:
>>                      Date: Thu, 15 Oct 2015 08:03:55 +0200
>>                      Subject: Re: Spark 1.5 Streaming and Kinesis
>>                      Hi Phil,
>>                      KinesisReceiver is part of extra. Just a dumb
>>                 question: did you
>>                      update all, including the Spark Kinesis extra
>>                 containing the
>>                      KinesisReceiver ?
>>                      I checked on tag v1.5.0, and at line 175 of the
>>                 KinesisReceiver, we see:
>>                      blockIdToSeqNumRanges.clear()
>>                      which is a:
>>                      private val blockIdToSeqNumRanges = new
>>                      mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>                           with mutable.SynchronizedMap[StreamBlockId,
>>                 SequenceNumberRanges]
>>                      So, it doesn't look fully correct to me.
>>                      Let me investigate a bit this morning.
>>                      Regards
>>                      JB
>>                      On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>                      We are trying to migrate from Spark1.4 to Spark1.5
>>                 for our Kinesis
>>                      streaming applications, to take advantage of the
>>                 new Kinesis
>>                      checkpointing improvements in 1.5.
>>                      However after upgrading, we are consistently seeing
>>                 the following error:
>>                      java.lang.ClassCastException:
>>                 scala.collection.mutable.HashMap cannot be
>>                      cast to scala.collection.mutable.SynchronizedMap
>>                      at
>>
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>                      at
>>
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>                      at
>>
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>                      at
>>
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>                      at
>>
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>                      at
>>
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>                      at
>>
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>                      at
>>
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>                      at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>                      at
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>                      at
>>
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>                      at
>>
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>                      at java.lang.Thread.run(Thread.java:745)
>>                      I even get this when running the Kinesis examples :
>>
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>                 with
>>                      bin/run-example streaming.KinesisWordCountASL
>>                      Am I doing something incorrect?
>>
>>
>>                      --
>>                      Jean-Baptiste Onofré
>>                 jbonofre@apache.org <mailto:jbonofre@apache.org>
>>                 <mailto:jbonofre@apache.org <mailto:jbonofre@apache.org>>
>>                 http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>                      Talend - http://www.talend.com <
>> http://www.talend.com/>
>>
>>                      Hi,
>>
>>
>>             --
>>             Jean-Baptiste Onofré
>>             jbonofre@apache.org <mailto:jbonofre@apache.org>
>>             http://blog.nanthrax.net
>>             Talend - http://www.talend.com
>>
>>
>> ---------------------------------------------------------------------
>>             To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>             <mailto:user-unsubscribe@spark.apache.org>
>>             For additional commands, e-mail: user-help@spark.apache.org
>>             <mailto:user-help@spark.apache.org>
>>
>>
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Mime
View raw message