spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: Spark 1.5 Streaming and Kinesis
Date Tue, 20 Oct 2015 15:21:49 GMT
Hi Phil,

as you can see in the Jira, I tried with both Spark 1.5.1 and 
1.6.0-SNAPSHOT, and I'm not able to reproduce your issue.

What's your Java version (and I guess you use scala 2.10 for 
compilation, not -Pscala-2.11) ?

Regards
JB

On 10/20/2015 03:59 PM, Jean-Baptiste Onofré wrote:
> Hi Phil,
>
> did you see my comments in the Jira ?
>
> Can you provide an update in the Jira please ? Thanks !
>
> Regards
> JB
>
> On 10/19/2015 11:07 PM, Phil Kallos wrote:
>> I am currently trying a few code changes to see if I can squash this
>> error. I have created https://issues.apache.org/jira/browse/SPARK-11193
>> to track progress, hope that is okay!
>>
>> In the meantime, can anyone confirm their ability to run the Kinesis-ASL
>> example using Spark > 1.5.x ? Would be helpful to know if it works in
>> some cases but not others.
>> http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html
>>
>> Thanks
>> Phil
>>
>> On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré <jb@nanthrax.net
>> <mailto:jb@nanthrax.net>> wrote:
>>
>>     Hi Phil,
>>
>>     sorry I didn't have time to investigate yesterday (I was on a couple
>>     of other Apache projects ;)). I will try to do it today. I keep you
>>     posted.
>>
>>     Regards
>>     JB
>>
>>     On 10/16/2015 07:21 AM, Phil Kallos wrote:
>>
>>         JB,
>>
>>         To clarify, you are able to run the Amazon Kinesis example
>>         provided in
>>         the spark examples dir?
>>
>>         bin/run-example streaming.KinesisWordCountASL [app name] [stream
>>         name]
>>         [endpoint url] ?
>>
>>         If it helps, below are the steps I used to build spark
>>
>>         mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package
>>
>>         And I did this with revision
>>         4f894dd6906311cb57add6757690069a18078783
>>         (v.1.5.1)
>>
>>         Thanks,
>>         Phil
>>
>>
>>         On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi
>>         <cepoi.eugen@gmail.com <mailto:cepoi.eugen@gmail.com>
>>         <mailto:cepoi.eugen@gmail.com <mailto:cepoi.eugen@gmail.com>>>
>>         wrote:
>>
>>              So running it using spark-submit doesnt change anything, it
>>         still works.
>>
>>              When reading the code
>>
>> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
>>
>>              it looks like the receivers are definitely being ser/de. I
>>         think
>>              this is the issue, need to find a way to confirm that now...
>>
>>              2015-10-15 16:12 GMT+07:00 Eugen Cepoi
>>         <cepoi.eugen@gmail.com <mailto:cepoi.eugen@gmail.com>
>>              <mailto:cepoi.eugen@gmail.com
>> <mailto:cepoi.eugen@gmail.com>>>:
>>
>>                  Hey,
>>
>>                  A quick update on other things that have been tested.
>>
>>                  When looking at the compiled code of the
>>                  spark-streaming-kinesis-asl jar everything looks normal
>>         (there
>>                  is a class that implements SyncMap and it is used
>>         inside the
>>                  receiver).
>>                  Starting a spark shell and using introspection to
>>         instantiate a
>>                  receiver and check that blockIdToSeqNumRanges
>>         implements SyncMap
>>                  works too. So obviously it has the correct type
>>         according to that.
>>
>>                  Another thing to test could be to do the same
>> introspection
>>                  stuff but inside a spark job to make sure it is not a
>>         problem in
>>                  the way the jobs are run.
>>                  The other idea would be that this is a problem
>> related to
>>                  ser/de. For example if the receiver was being
>>         serialized and
>>                  then deserialized it could definitely happen depending
>>         on the
>>                  lib used and its configuration that it just doesn't
>>         preserve the
>>                  concrete type. So it would deserialize using the
>>         compile type
>>                  instead of the runtime type.
>>
>>                  Cheers,
>>                  Eugen
>>
>>
>>                  2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré
>>         <jb@nanthrax.net <mailto:jb@nanthrax.net>
>>                  <mailto:jb@nanthrax.net <mailto:jb@nanthrax.net>>>:
>>
>>                      Thanks for the update Phil.
>>
>>                      I'm preparing a environment to reproduce it.
>>
>>                      I keep you posted.
>>
>>                      Thanks again,
>>                      Regards
>>                      JB
>>
>>                      On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>                          Not a dumb question, but yes I updated all of
>> the
>>                          library references to
>>                          1.5, including  (even tried 1.5.1).
>>
>>                          // Versions.spark set elsewhere to "1.5.0"
>>                          "org.apache.spark" %%
>>         "spark-streaming-kinesis-asl" %
>>                          Versions.spark %
>>                          "provided"
>>
>>                          I am experiencing the issue in my own spark
>>         project, but
>>                          also when I try
>>                          to run the spark streaming kinesis example that
>>         comes in
>>                          spark/examples
>>
>>                          Tried running the streaming job locally, and
>>         also in EMR
>>                          with release
>>                          4.1.0 that includes Spark 1.5
>>
>>                          Very strange!
>>
>>                               ---------- Forwarded message ----------
>>
>>                               From: "Jean-Baptiste Onofré"
>>         <jb@nanthrax.net <mailto:jb@nanthrax.net>
>>                          <mailto:jb@nanthrax.net
>>         <mailto:jb@nanthrax.net>> <mailto:jb@nanthrax.net
>>         <mailto:jb@nanthrax.net>
>>                          <mailto:jb@nanthrax.net
>> <mailto:jb@nanthrax.net>>>>
>>                               To: user@spark.apache.org
>>         <mailto:user@spark.apache.org>
>>                          <mailto:user@spark.apache.org
>>         <mailto:user@spark.apache.org>>
>>                          <mailto:user@spark.apache.org
>>         <mailto:user@spark.apache.org>
>>
>>                          <mailto:user@spark.apache.org
>>         <mailto:user@spark.apache.org>>>
>>
>>                               Cc:
>>                               Date: Thu, 15 Oct 2015 08:03:55 +0200
>>                               Subject: Re: Spark 1.5 Streaming and
>> Kinesis
>>                               Hi Phil,
>>                               KinesisReceiver is part of extra. Just a
>> dumb
>>                          question: did you
>>                               update all, including the Spark Kinesis
>> extra
>>                          containing the
>>                               KinesisReceiver ?
>>                               I checked on tag v1.5.0, and at line 175
>>         of the
>>                          KinesisReceiver, we see:
>>                               blockIdToSeqNumRanges.clear()
>>                               which is a:
>>                               private val blockIdToSeqNumRanges = new
>>                               mutable.HashMap[StreamBlockId,
>>         SequenceNumberRanges]
>>                                    with
>>         mutable.SynchronizedMap[StreamBlockId,
>>                          SequenceNumberRanges]
>>                               So, it doesn't look fully correct to me.
>>                               Let me investigate a bit this morning.
>>                               Regards
>>                               JB
>>                               On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>                               We are trying to migrate from Spark1.4 to
>>         Spark1.5
>>                          for our Kinesis
>>                               streaming applications, to take advantage
>>         of the
>>                          new Kinesis
>>                               checkpointing improvements in 1.5.
>>                               However after upgrading, we are
>>         consistently seeing
>>                          the following error:
>>                               java.lang.ClassCastException:
>>                          scala.collection.mutable.HashMap cannot be
>>                               cast to
>>         scala.collection.mutable.SynchronizedMap
>>                               at
>>
>>
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>
>>                               at
>>
>>
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>
>>                               at
>>
>>
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>
>>                               at
>>
>>
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>
>>                               at
>>
>>
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>
>>                               at
>>
>>
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>                               at
>>
>>
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>                               at
>>
>>
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>                               at
>>         org.apache.spark.scheduler.Task.run(Task.scala:88)
>>                               at
>>
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>                               at
>>
>>
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>>                               at
>>
>>
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>>                               at java.lang.Thread.run(Thread.java:745)
>>                               I even get this when running the Kinesis
>>         examples :
>>
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>                          with
>>                               bin/run-example
>> streaming.KinesisWordCountASL
>>                               Am I doing something incorrect?
>>
>>
>>                               --
>>                               Jean-Baptiste Onofré
>>         jbonofre@apache.org <mailto:jbonofre@apache.org>
>>         <mailto:jbonofre@apache.org <mailto:jbonofre@apache.org>>
>>                          <mailto:jbonofre@apache.org
>>         <mailto:jbonofre@apache.org> <mailto:jbonofre@apache.org
>>         <mailto:jbonofre@apache.org>>>
>>         http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>                               Talend - http://www.talend.com
>>         <http://www.talend.com/>
>>
>>                               Hi,
>>
>>
>>                      --
>>                      Jean-Baptiste Onofré
>>         jbonofre@apache.org <mailto:jbonofre@apache.org>
>>         <mailto:jbonofre@apache.org <mailto:jbonofre@apache.org>>
>>         http://blog.nanthrax.net
>>                      Talend - http://www.talend.com
>>
>>
>>
>> ---------------------------------------------------------------------
>>                      To unsubscribe, e-mail:
>>         user-unsubscribe@spark.apache.org
>>         <mailto:user-unsubscribe@spark.apache.org>
>>                      <mailto:user-unsubscribe@spark.apache.org
>>         <mailto:user-unsubscribe@spark.apache.org>>
>>                      For additional commands, e-mail:
>>         user-help@spark.apache.org <mailto:user-help@spark.apache.org>
>>                      <mailto:user-help@spark.apache.org
>>         <mailto:user-help@spark.apache.org>>
>>
>>
>>
>>
>>
>>     --
>>     Jean-Baptiste Onofré
>>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>>     http://blog.nanthrax.net
>>     Talend - http://www.talend.com
>>
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message