spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rohit Pujari <rpuj...@hortonworks.com>
Subject Re: Spark Streaming
Date Sat, 17 Jan 2015 10:17:14 GMT
Hi Francois:

I tried using "print(kafkaStream)" as output operator but no luck. It throws the same error.
Any other thoughts?

Thanks,
Rohit


From: "francois.garillot@typesafe.com<mailto:francois.garillot@typesafe.com>" <francois.garillot@typesafe.com<mailto:francois.garillot@typesafe.com>>
Date: Saturday, January 17, 2015 at 4:10 AM
To: Rohit Pujari <rpujari@hortonworks.com<mailto:rpujari@hortonworks.com>>
Subject: Re: Spark Streaming

Streams are lazy. Their computation is triggered by an output operator, which is apparently
missing from your code. See the programming guide:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

-
FG



On Sat, Jan 17, 2015 at 11:06 AM, Rohit Pujari <rpujari@hortonworks.com<mailto:rpujari@hortonworks.com>>
wrote:

Hello Folks:

I'm running into following error while executing relatively straight forward spark-streaming
code. Am I missing anything?

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered,
so nothing to execute


Code:

val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaStream = {
      val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
      val kafkaParams = Map(
        "zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181",
        "group.id<http://group.id>" -> "twitter",
        "zookeeper.connection.timeout.ms<http://zookeeper.connection.timeout.ms>" ->
"1000")
      val inputTopic = "twitter"
      val numPartitionsOfInputTopic = 2
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER)
      }
      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }

    //print(kafkaStream)
    ssc.start()
    ssc.awaitTermination()

--
Rohit Pujari


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed
and may contain information that is confidential, privileged and exempt from disclosure under
applicable law. If the reader of this message is not the intended recipient, you are hereby
notified that any printing, copying, dissemination, distribution, disclosure or forwarding
of this communication is strictly prohibited. If you have received this communication in error,
please contact the sender immediately and delete it from your system. Thank You.


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed
and may contain information that is confidential, privileged and exempt from disclosure under
applicable law. If the reader of this message is not the intended recipient, you are hereby
notified that any printing, copying, dissemination, distribution, disclosure or forwarding
of this communication is strictly prohibited. If you have received this communication in error,
please contact the sender immediately and delete it from your system. Thank You.

Mime
View raw message