spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dibyendu Bhattacharya <dibyendu.bhattach...@gmail.com>
Subject Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Date Fri, 22 May 2015 03:35:10 GMT
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <tdas@databricks.com> wrote:

> Looks like somehow the file size reported by the FSInputDStream of
> Tachyon's FileSystem interface, is returning zero.
>
> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Just to follow up this thread further .
>>
>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon
>> ,  which is good.
>>
>> I continue doing some testing around storing the Spark Streaming WAL and
>> CheckPoint files also in Tachyon . Here is few finding ..
>>
>>
>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>> throughput is much higher . I tested the Driver and Receiver failure cases
>> , and Spark Streaming is able to recover without any Data Loss on Driver
>> failure.
>>
>> *But on Receiver failure , Spark Streaming looses data* as I see
>> Exception while reading the WAL file from Tachyon "receivedData" location
>>  for the same Receiver id which just failed.
>>
>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>> recover from both Driver and Receiver failure .
>>
>> Here is the Log details when Spark Streaming receiver failed ...I raised
>> a JIRA for the same issue :
>> https://issues.apache.org/jira/browse/SPARK-7525
>>
>>
>>
>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>> (epoch 1)*
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>> remove executor 2 from BlockManagerMaster.
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>> successfully in removeExecutor
>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>> receiver for stream 2 from 10.252.5.62*:47255
>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
>> 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>> not read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>> EOF: 645603894, fileSize = 0*
>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>> ... 15 more
>>
>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
>> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
>> 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
>> (Could not read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://
>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>> [duplicate 1]
>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
>> stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
>> updated: app-20150511104442-0048/2 is now LOST (worker lost)
>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>> Executor app-20150511104442-0048/2 removed: worker lost
>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>> Asked to remove non-existent executor 2
>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage
>> 103.0 (TID 423) on executor 10.252.5.62: org.apache.spark.SparkException
>> (Could not read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://
>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>> [duplicate 2]
>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0
>> failed 4 times; aborting job
>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>> 103.0, whose tasks have all completed, from pool
>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 103
>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>> foreachRDD at Consumer.java:92, took 0.953482 s
>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running
>> job streaming job 1431341145000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
>> in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could not
>> read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://
>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>> )
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IllegalArgumentException: Seek position is past EOF:
>> 645603894, fileSize = 0
>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>> ... 15 more
>>
>>
>>
>>
>>
>>
>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan.li@gmail.com> wrote:
>>
>>> Thanks for the updates!
>>>
>>> Best,
>>>
>>> Haoyuan
>>>
>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> Just a followup on this Thread .
>>>>
>>>> I tried Hierarchical Storage on Tachyon (
>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and
>>>> that
>>>> seems to have worked and I did not see any any Spark Job failed due to
>>>> BlockNotFoundException.
>>>> below is my  Hierarchical Storage settings..
>>>>
>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>
>>>>
>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>
>>>> > Dear All ,
>>>> >
>>>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP
>>>> block
>>>> > store  . Primary reason for evaluating Tachyon is to find if Tachyon
>>>> can
>>>> > solve the Spark BlockNotFoundException .
>>>> >
>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted ,
>>>> jobs
>>>> > failed due to block not found exception and storing blocks in
>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>> throughput a
>>>> > lot .
>>>> >
>>>> >
>>>> > To test how Tachyon behave , I took the latest spark 1.4 from master
>>>> , and
>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>>> Tachyon
>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>>> node AWS
>>>> > x-large cluster.
>>>> >
>>>> > I have used the low level Receiver based Kafka consumer (
>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>> written
>>>> > to pull from Kafka and write Blocks to Tachyon
>>>> >
>>>> >
>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>>> case )
>>>> > but very good overall memory utilization (as it is off heap store) .
>>>> >
>>>> >
>>>> > But I found one issue on which I need to clarification .
>>>> >
>>>> >
>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due to
a
>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>> blocks in
>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>>> evicted
>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>> >  BlockNotFoundException .
>>>> >
>>>> > I see a pull request which discuss the same ..
>>>> >
>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>> >
>>>> >
>>>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException
is
>>>> > gone , but it again impact the throughput ..
>>>> >
>>>> >
>>>> > Just curious to know , if Tachyon has any settings which can solve the
>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>> > CACHE_THROUGH  ?
>>>> >
>>>> > Regards,
>>>> > Dibyendu
>>>> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> Haoyuan Li
>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>
>>
>>
>

Mime
View raw message