spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jack Hu (JIRA)" <>
Subject [jira] [Created] (SPARK-3275) Socket receiver can not recover when the socket server restarted
Date Thu, 28 Aug 2014 04:36:02 GMT
Jack Hu created SPARK-3275:

             Summary: Socket receiver can not recover when the socket server restarted 
                 Key: SPARK-3275
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.0.2
            Reporter: Jack Hu

To reproduce this issue:
1. create a application with a socket dstream
2. start the socket server and start the application
3. restart the socket server
4. the socket dstream will fail to reconnect (it will close the connection after a successful

The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect
after the reconnect:
In SocketReceiver ::receive() the while loop will never be entered after reconnect since the
isStopped will returns true:
     val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)

That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped
when the connection losses, but it is reset after the call of Receiver start method:

def startReceiver(): Unit = synchronized {
    try {
      logInfo("Starting receiver")
      logInfo("Called receiver onStart")
      receiverState = Started
    } catch {
      case t: Throwable =>
        stop("Error starting receiver " + streamId, Some(t))

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message