spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Broadcast var is null
Date Mon, 05 Oct 2015 19:52:11 GMT
Make sure the broadcast variable works independent of the streaming
application. Then make sure it work without have
StreamingContext.getOrCreate(). That will disambiguate whether that error
is thrown when starting a new context, or when recovering a context from
checkpoint (as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin <dpristin@gmail.com> wrote:

> Hi,
>
> Can anyone point me out to what I'm doing wrong? I've implemented a very
> basic spark streaming app that uses a single broadcast variable. When it
> runs locally it produces a proper output (the array I broadcast). But when
> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
> is the code:
>
> --- imports go here
>
> object BroadcastTest extends App {
>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>   logger.info("OinkSparkMain - Setup Logger")
>
> // This is our custom context setup code; nothing fancy goes on here
>   val config = Configuration(args)
>   val ssc: StreamingContext =
> StreamingContext.getOrCreate(config.checkpointDirectory, () => {
> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})
>
>
>   val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
>   val messages = kafkaStreamFactory.Create
>
>   // Grab the value data above kafka input dstream as a string
>   val events = messages.map( s => s._2 )
>
>   //Create a broadcast variable - straight from the dev guide
>   val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))
>
>   //Try to print out the value of the broadcast var here
>   val transformed = events.transform(rdd => {
>     rdd.map(x => {
>       if(broadcastVar == null) {
>         println("broadcastVar is null")
>       }  else {
>         println("broadcastVar value: " + broadcastVar.value.mkString("|"))
>       }
>       x
>     })
>   })
>
>   transformed.foreachRDD(x => logger.info("Data: " +
> x.collect.mkString("|")))
>
>   ssc.start()
>   ssc.awaitTermination()
> }
>
> Any input is very much appreciated!
>
> Regards,
> Dmitry.
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message