spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnpritchard <nicholas.pritch...@falkonry.com>
Subject Stopping a Custom Receiver
Date Fri, 20 Feb 2015 18:07:39 GMT
Hi,

I have a use case for creating a DStream from a single file. I have created
a custom receiver that reads the file, calls 'store' with the contents, then
calls 'stop'. However, I'm second guessing if this is the correct approach
due to the spark logs I see.

I always see these logs, and the 'ERROR' and 'WARN' level makes me feel
uneasy:

    19:27:21,161 ERROR ReceiverTracker:75 - Deregistered receiver for stream
2: Finished reading file from /etc/tercel/PipelineTemplate.json
    19:27:21,221  WARN ReceiverSupervisorImpl:71 - Stopped executor without
error

In some situations (i.e. server instead of laptop), I get this fatal error
(spark shuts down):

    19:35:08,213 ERROR DAGSchedulerActorSupervisor:96 - eventProcesserActor
failed; shutting down SparkContext
    org.apache.spark.SparkException: Attempted to use BlockRDD[3] at
receiverStream at BootstrapMetadata.scala:26 after its blocks have been
removed!
	at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
	at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
	at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234)
	at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:233)
...

FYI, "BootstrapMetadata.scala:26" is where I've declared the receiverStream.

I am wondering if it is incorrect to call "stop" within the custom receiver,
and if this is the reason why the blocks are removed.

Thanks,
Nick


P.S. This is the receiver implementation:

class FileReceiver(path: String) extends
Receiver[String](StorageLevel.MEMORY_ONLY) {

  private var source: BufferedSource = null

  def onStart() {
    read()
  }

  def onStop(): Unit = {
    source.close()
  }

  private def read() {
    try {
      source = Source.fromFile(path)
      val content = source.getLines().mkString
      store(content)
      stop(s"Finished reading file from $path")
    } catch {
      case e: Exception =>
        stop(s"Error reading file from $path", e)
    }
  }

}





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-a-Custom-Receiver-tp21740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message