spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tanase <atan...@adobe.com>
Subject Re: Broadcast var is null
Date Mon, 05 Oct 2015 20:14:05 GMT
FYI the same happens with accumulators when recovering from checkpoint. I'd love to see this
fixed somehow as the workaround (using a singleton factory in foreachRdd to make sure the
accumulators are initialized instead of null) is really intrusive...

Sent from my iPhone

On 05 Oct 2015, at 22:52, Tathagata Das <tdas@databricks.com<mailto:tdas@databricks.com>>
wrote:

Make sure the broadcast variable works independent of the streaming application. Then make
sure it work without have StreamingContext.getOrCreate(). That will disambiguate whether that
error is thrown when starting a new context, or when recovering a context from checkpoint
(as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin <dpristin@gmail.com<mailto:dpristin@gmail.com>>
wrote:
Hi,

Can anyone point me out to what I'm doing wrong? I've implemented a very
basic spark streaming app that uses a single broadcast variable. When it
runs locally it produces a proper output (the array I broadcast). But when
deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
is the code:

--- imports go here

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info<http://logger.info>("OinkSparkMain - Setup Logger")

// This is our custom context setup code; nothing fancy goes on here
  val config = Configuration(args)
  val ssc: StreamingContext =
StreamingContext.getOrCreate(config.checkpointDirectory, () => {
SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})


  val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
  val messages = kafkaStreamFactory.Create

  // Grab the value data above kafka input dstream as a string
  val events = messages.map( s => s._2 )

  //Create a broadcast variable - straight from the dev guide
  val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))

  //Try to print out the value of the broadcast var here
  val transformed = events.transform(rdd => {
    rdd.map(x => {
      if(broadcastVar == null) {
        println("broadcastVar is null")
      }  else {
        println("broadcastVar value: " + broadcastVar.value.mkString("|"))
      }
      x
    })
  })

  transformed.foreachRDD(x => logger.info<http://logger.info>("Data: " +
x.collect.mkString("|")))

  ssc.start()
  ssc.awaitTermination()
}

Any input is very much appreciated!

Regards,
Dmitry.







--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com<http://nabble.com>.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<mailto:user-help@spark.apache.org>



Mime
View raw message