spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout
Date Tue, 26 Jul 2016 14:02:27 GMT
Can you go ahead and open a Jira ticket with that explanation?

Is there a reason you need to use receivers instead of the direct stream?

On Tue, Jul 26, 2016 at 4:45 AM, Andy Zhao <andyrao1986@gmail.com> wrote:
> Hi guys,
>
>     I wrote a spark streaming program which consume 1000 messages from one
> topic of Kafka, did some transformation, and wrote the result back to
> another topic. But only found 988 messages in the second topic. I checked
> log info and confirmed all messages was received by receivers. But I found a
> hdfs writing time out message printed from Class BatchedWriteAheadLog.
>
>     I checkout source code and found code like this:
>
>     /** Add received block. This event will get written to the write ahead
> log (if enabled). */
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
>     try {
>       val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
>       if (writeResult) {
>         synchronized {
>           getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo
>         }
>         logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
>           s"block ${receivedBlockInfo.blockStoreResult.blockId}")
>       } else {
>         logDebug(s"Failed to acknowledge stream
> ${receivedBlockInfo.streamId} receiving " +
>           s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
> Ahead Log.")
>       }
>       writeResult
>     } catch {
>       case NonFatal(e) =>
>         logError(s"Error adding block $receivedBlockInfo", e)
>         false
>     }
>   }
>
>
>     It seems that ReceiverTracker tries to write block info to hdfs, but the
> write operation time out, this cause writeToLog function return false, and
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo" is skipped. so the block info is lost.
>
>    The spark version I use is 1.6.1 and I did not turn on
> spark.streaming.receiver.writeAheadLog.enable.
>
>    I want to know whether or not this is a designed behaviour.
>
> Thanks
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-lost-data-when-ReceiverTracker-writes-Blockinfo-to-hdfs-timeout-tp27410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message