spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From N B <nb.nos...@gmail.com>
Subject Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Date Sat, 26 Sep 2015 18:36:24 GMT
I wanted to add that we are not configuring the WAL in our scenario.

Thanks again,
Nikunj


On Sat, Sep 26, 2015 at 11:35 AM, N B <nb.nospam@gmail.com> wrote:

> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does this limitation apply only
> if recovery is needed in case of driver failure?
>
> What if we are not interested in recovery after a driver failure. However,
> just for the purposes of running streaming pipelines that do
> reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
> checkpoint directory configured.
>
> Do you think this usage will also run into issues if an S3 location is
> provided for the checkpoint directory. We will not use it to do any
> explicit recovery like I stated above.
>
> Thanks
> Nikunj
>
>
>
> On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> In Spark Streaming , Checkpoint Directory is used for two purpose
>>
>> 1. Metadata checkpointing
>>
>> 2. Data checkpointing
>>
>> If you enable WAL to recover from Driver failure, Spark Streaming will
>> also write the Received Blocks in WAL which stored in checkpoint directory.
>>
>> For streaming solution to recover from any failure without any data loss
>> , you need to enable Meta Data Check pointing and WAL.  You do not need to
>> enable Data Check pointing.
>>
>> From my experiments and the PR I mentioned , I configured the Meta Data
>> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
>> did not use any WAL . The PR I proposed would recover from Driver fail-over
>> without using any WAL like feature because Blocks are already available in
>> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
>> received blocks.
>>
>> Now the question is , can I configure Tachyon as my Metadata Checkpoint
>> location ? I tried that , and Streaming application writes the
>> receivedBlockMeataData to Tachyon, but on driver failure, it can not
>> recover the received block meta data from Tachyon. I sometime see Zero size
>> files in Tachyon checkpoint location , and it can not recover past events .
>> I need to understand what is the issue of storing meta data in Tachyon .
>> That needs a different JIRA I guess.
>>
>> Let me know I am able to explain the current scenario around Spark
>> Streaming and Tachyon .
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb.nospam@gmail.com> wrote:
>>
>>> Hi Dibyendu,
>>>
>>> I am not sure I understand completely. But are you suggesting that
>>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>>> failure and recover the blocks form Tachyon.
>>>>
>>>> The The Motivation for this PR is  :
>>>>
>>>> If Streaming application stores the blocks OFF_HEAP, it may not need
>>>> any WAL like feature to recover from Driver failure. As long as the writing
>>>> of blocks to Tachyon from Streaming receiver is durable, it should be
>>>> recoverable from Tachyon directly on Driver failure.
>>>> This can solve the issue of expensive WAL write and duplicating the
>>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>>> No-Data-Loss channel using OFF_HEAP store.
>>>>
>>>> https://github.com/apache/spark/pull/8817
>>>>
>>>> This PR still under review . But having done various fail over testing
>>>> in my environment , I see this PR worked perfectly fine without any data
>>>> loss . Let see what TD and other have to say on this PR .
>>>>
>>>> Below is the configuration I used to test this PR ..
>>>>
>>>>
>>>> Spark : 1.6 from Master
>>>> Tachyon : 0.7.1
>>>>
>>>> SparkConfiguration Details :
>>>>
>>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>>> .set("spark.streaming.unpersist", "true")
>>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>>> .set("tachyon.usezookeeper","true")
>>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>>> ip-10-252-5-113.asskickery.us:19998")
>>>>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>>>         .set("spark.externalBlockStore.folderName","pearson")
>>>>         .set("spark.externalBlockStore.dirId", "subpub")
>>>>
>>>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>>>
>>>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>>>> 10000));
>>>>
>>>> String checkpointDirectory = "hdfs://
>>>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>>>
>>>> jsc.checkpoint(checkpointDirectory);
>>>>
>>>>
>>>> //I am using the My Receiver Based Consumer (
>>>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>>>> KafkaUtil.CreateStream will also work
>>>>
>>>> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>>>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nospam@gmail.com> wrote:
>>>>
>>>>> Hi Dibyendu,
>>>>>
>>>>> How does one go about configuring spark streaming to use tachyon as
>>>>> its place for storing checkpoints? Also, can one do this with tachyon
>>>>> running on a completely different node than where spark processes are
>>>>> running?
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> Thanks for looking into this. Further investigating I found that
the
>>>>>> issue is with Tachyon does not support File Append. The streaming
receiver
>>>>>> which writes to WAL when failed, and again restarted, not able to
append to
>>>>>> same WAL file after restart.
>>>>>>
>>>>>> I raised this with Tachyon user group, and Haoyuan told that within
3
>>>>>> months time Tachyon file append will be ready. Will revisit this
issue
>>>>>> again then .
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>>
>>>>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <tdas@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like somehow the file size reported by the FSInputDStream
of
>>>>>>> Tachyon's FileSystem interface, is returning zero.
>>>>>>>
>>>>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just to follow up this thread further .
>>>>>>>>
>>>>>>>> I was doing some fault tolerant testing of Spark Streaming
with
>>>>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email,
I could able
>>>>>>>> to solve the BlockNotFound exception when I used Hierarchical
Storage
>>>>>>>> of Tachyon ,  which is good.
>>>>>>>>
>>>>>>>> I continue doing some testing around storing the Spark Streaming
>>>>>>>> WAL and CheckPoint files also in Tachyon . Here is few finding
..
>>>>>>>>
>>>>>>>>
>>>>>>>> When I store the Spark Streaming Checkpoint location in Tachyon
,
>>>>>>>> the throughput is much higher . I tested the Driver and Receiver
failure
>>>>>>>> cases , and Spark Streaming is able to recover without any
Data Loss on
>>>>>>>> Driver failure.
>>>>>>>>
>>>>>>>> *But on Receiver failure , Spark Streaming looses data* as
I see
>>>>>>>> Exception while reading the WAL file from Tachyon "receivedData"
location
>>>>>>>>  for the same Receiver id which just failed.
>>>>>>>>
>>>>>>>> If I change the Checkpoint location back to HDFS , Spark
Streaming
>>>>>>>> can recover from both Driver and Receiver failure .
>>>>>>>>
>>>>>>>> Here is the Log details when Spark Streaming receiver failed
...I
>>>>>>>> raised a JIRA for the same issue :
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor
lost: 2
>>>>>>>> (epoch 1)*
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint
- Trying
>>>>>>>> to remove executor 2 from BlockManagerMaster.
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint
-
>>>>>>>> Removing block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed
2
>>>>>>>> successfully in removeExecutor
>>>>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker
- *Registered
>>>>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task
2.1 in
>>>>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException:
*Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position
is
>>>>>>>> past EOF: 645603894, fileSize = 0*
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting
task
>>>>>>>> 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task
2.2 in
>>>>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>>>>> org.apache.spark.SparkException (Could not read data from
write ahead log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 1]
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting
task
>>>>>>>> 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor
-
>>>>>>>> Executor updated: app-20150511104442-0048/2 is now LOST (worker
lost)
>>>>>>>> INFO :
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
- Executor
>>>>>>>> app-20150511104442-0048/2 removed: worker lost
>>>>>>>> ERROR:
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
- Asked to
>>>>>>>> remove non-existent executor 2
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task
2.3 in
>>>>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>>>>> org.apache.spark.SparkException (Could not read data from
write ahead log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 2]
>>>>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2
in stage
>>>>>>>> 103.0 failed 4 times; aborting job
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed
>>>>>>>> TaskSet 103.0, whose tasks have all completed, from pool
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling
>>>>>>>> stage 103
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage
103
>>>>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120
failed:
>>>>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler
- Error
>>>>>>>> running job streaming job 1431341145000 ms.0
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
failure:
>>>>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure:
Lost task 2.3 in
>>>>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException:
Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> )
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Seek position
is
>>>>>>>> past EOF: 645603894, fileSize = 0
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan.li@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the updates!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Haoyuan
>>>>>>>>>
>>>>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya
<
>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just a followup on this Thread .
>>>>>>>>>>
>>>>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html
) ,
>>>>>>>>>> and that
>>>>>>>>>> seems to have worked and I did not see any any Spark
Job failed
>>>>>>>>>> due to
>>>>>>>>>> BlockNotFoundException.
>>>>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>>>>
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya
<
>>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> > Dear All ,
>>>>>>>>>> >
>>>>>>>>>> > I have been playing with Spark Streaming on
Tachyon as the
>>>>>>>>>> OFF_HEAP block
>>>>>>>>>> > store  . Primary reason for evaluating Tachyon
is to find if
>>>>>>>>>> Tachyon can
>>>>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when
blocks are
>>>>>>>>>> evicted , jobs
>>>>>>>>>> > failed due to block not found exception and
storing blocks in
>>>>>>>>>> > MEMORY_AND_DISK is not a good option either
as it impact the
>>>>>>>>>> throughput a
>>>>>>>>>> > lot .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > To test how Tachyon behave , I took the latest
spark 1.4 from
>>>>>>>>>> master , and
>>>>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in
Fault Tolerant
>>>>>>>>>> Mode . Tachyon
>>>>>>>>>> > is running in 3 Node AWS x-large cluster and
Spark is running
>>>>>>>>>> in 3 node AWS
>>>>>>>>>> > x-large cluster.
>>>>>>>>>> >
>>>>>>>>>> > I have used the low level Receiver based Kafka
consumer (
>>>>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)
 which I
>>>>>>>>>> have written
>>>>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > I found there is similar improvement in throughput
(as
>>>>>>>>>> MEMORY_ONLY case )
>>>>>>>>>> > but very good overall memory utilization (as
it is off heap
>>>>>>>>>> store) .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > But I found one issue on which I need to clarification
.
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > In Tachyon case also , I find  BlockNotFoundException
 , but
>>>>>>>>>> due to a
>>>>>>>>>> > different reason .  What I see TachyonBlockManager.scala
put
>>>>>>>>>> the blocks in
>>>>>>>>>> > WriteType.TRY_CACHE configuration . And because
of this Blocks
>>>>>>>>>> ate evicted
>>>>>>>>>> > from Tachyon Cache and when Spark try to find
the block it
>>>>>>>>>> throws
>>>>>>>>>> >  BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > I see a pull request which discuss the same
..
>>>>>>>>>> >
>>>>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > When I modified the WriteType to CACHE_THROUGH
,
>>>>>>>>>> BlockDropException is
>>>>>>>>>> > gone , but it again impact the throughput ..
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Just curious to know , if Tachyon has any settings
which can
>>>>>>>>>> solve the
>>>>>>>>>> > Block Eviction from Cache to Disk, other than
explicitly setting
>>>>>>>>>> > CACHE_THROUGH  ?
>>>>>>>>>> >
>>>>>>>>>> > Regards,
>>>>>>>>>> > Dibyendu
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Haoyuan Li
>>>>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message