I have reached over 800M rows (813,997,990), and now it’s starting to timeout when UPSERTing data.

16/07/27 00:04:58 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 17.0 (TID 87, prod-dc1-datanode163.pdc1i.gradientx.com): com.stumbleupon.async.TimeoutException: Timed out after 30000ms when joining Deferred@1592877776(state=PENDING, result=null, callback=org.kududb.client.AsyncKuduSession$ConvertBatchToListOfResponsesCB@154c94f8 -> wakeup thread Executor task launch worker-2, errback=passthrough -> wakeup thread Executor task launch worker-2)
at com.stumbleupon.async.Deferred.doJoin(Deferred.java:1177)
at com.stumbleupon.async.Deferred.join(Deferred.java:1045)
at org.kududb.client.KuduSession.close(KuduSession.java:110)
at org.kududb.spark.kudu.KuduContext.writeRows(KuduContext.scala:181)
at org.kududb.spark.kudu.KuduContext$$anonfun$writeRows$1.apply(KuduContext.scala:131)
at org.kududb.spark.kudu.KuduContext$$anonfun$writeRows$1.apply(KuduContext.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Thanks,
Ben


On Jul 18, 2016, at 10:32 AM, Todd Lipcon <todd@cloudera.com> wrote:

On Mon, Jul 18, 2016 at 10:31 AM, Benjamin Kim <bbuild11@gmail.com> wrote:
Todd,

Thanks for the info. I was going to upgrade after the testing, but now, it looks like I will have to do it earlier than expected.

I will do the upgrade, then resume.

OK, sounds good. The upgrade shouldn't invalidate any performance testing or anything -- just fixes this important bug.

-Todd


On Jul 18, 2016, at 10:29 AM, Todd Lipcon <todd@cloudera.com> wrote:

Hi Ben,

Any chance that you are running Kudu 0.9.0 instead of 0.9.1? There's a known serious bug in 0.9.0 which can cause this kind of corruption.

Assuming that you are running with replication count 3 this time, you should be able to move aside that tablet metadata file and start the server. It will recreate a new repaired replica automatically.

-Todd

On Mon, Jul 18, 2016 at 10:28 AM, Benjamin Kim <bbuild11@gmail.com> wrote:
During my re-population of the Kudu table, I am getting this error trying to restart a tablet server after it went down. The job that populates this table has been running for over a week.

[libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "kudu.tablet.TabletSuperBlockPB" because it is missing required fields: rowsets[2324].columns[15].block
F0718 17:01:26.783571   468 tablet_server_main.cc:55] Check failed: _s.ok() Bad status: IO error: Could not init Tablet Manager: Failed to open tablet metadata for tablet: 24637ee6f3e5440181ce3f20b1b298ba: Failed to load tablet metadata for tablet id 24637ee6f3e5440181ce3f20b1b298ba: Could not load tablet metadata from /mnt/data1/kudu/data/tablet-meta/24637ee6f3e5440181ce3f20b1b298ba: Unable to parse PB from path: /mnt/data1/kudu/data/tablet-meta/24637ee6f3e5440181ce3f20b1b298ba
*** Check failure stack trace: ***
    @           0x7d794d  google::LogMessage::Fail()
    @           0x7d984d  google::LogMessage::SendToLog()
    @           0x7d7489  google::LogMessage::Flush()
    @           0x7da2ef  google::LogMessageFatal::~LogMessageFatal()
    @           0x78172b  (unknown)
    @       0x344d41ed5d  (unknown)
    @           0x7811d1  (unknown)

Does anyone know what this means?

Thanks,
Ben


On Jul 11, 2016, at 10:47 AM, Todd Lipcon <todd@cloudera.com> wrote:

On Mon, Jul 11, 2016 at 10:40 AM, Benjamin Kim <bbuild11@gmail.com> wrote:
Todd,

I had it at one replica. Do I have to recreate?

We don't currently have the ability to "accept data loss" on a tablet (or set of tablets). If the machine is gone for good, then currently the only easy way to recover is to recreate the table. If this sounds really painful, though, maybe we can work up some kind of tool you could use to just recreate the missing tablets (with those rows lost).

-Todd

On Jul 11, 2016, at 10:37 AM, Todd Lipcon <todd@cloudera.com> wrote:

Hey Ben,

Is the table that you're querying replicated? Or was it created with only one replica per tablet?

-Todd

On Mon, Jul 11, 2016 at 10:35 AM, Benjamin Kim <bkim@amobee.com> wrote:
Over the weekend, a tablet server went down. It’s not coming back up. So, I decommissioned it and removed it from the cluster. Then, I restarted Kudu because I was getting a timeout  exception trying to do counts on the table. Now, when I try again. I get the same error.

16/07/11 17:32:36 WARN scheduler.TaskSetManager: Lost task 468.3 in stage 0.0 (TID 603, prod-dc1-datanode167.pdc1i.gradientx.com): com.stumbleupon.async.TimeoutException: Timed out after 30000ms when joining Deferred@712342716(state=PAUSED, result=Deferred@1765902299, callback=passthrough -> scanner opened -> wakeup thread Executor task launch worker-2, errback=openScanner errback -> passthrough -> wakeup thread Executor task launch worker-2)
at com.stumbleupon.async.Deferred.doJoin(Deferred.java:1177)
at com.stumbleupon.async.Deferred.join(Deferred.java:1045)
at org.kududb.client.KuduScanner.nextRows(KuduScanner.java:57)
at org.kududb.spark.kudu.RowResultIteratorScala.hasNext(KuduRDD.scala:99)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Does anyone know how to recover from this?

Thanks,
Benjamin Kim
Data Solutions Architect

[a•mo•bee] (n.) the company defining digital marketing.

3250 Ocean Park Blvd, Suite 200  |  Santa Monica, CA 90405  |  www.amobee.com

On Jul 6, 2016, at 9:46 AM, Dan Burkert <dan@cloudera.com> wrote:



On Wed, Jul 6, 2016 at 7:05 AM, Benjamin Kim <bbuild11@gmail.com> wrote:
Over the weekend, the row count is up to <500M. I will give it another few days to get to 1B rows. I still get consistent times ~15s for doing row counts despite the amount of data growing.

On another note, I got a solicitation email from SnappyData to evaluate their product. They claim to be the “Spark Data Store” with tight integration with Spark executors. It claims to be an OLTP and OLAP system with being an in-memory data store first then to disk. After going to several Spark events, it would seem that this is the new “hot” area for vendors. They all (MemSQL, Redis, Aerospike, Datastax, etc.) claim to be the best "Spark Data Store”. I’m wondering if Kudu will become this too? With the performance I’ve seen so far, it would seem that it can be a contender. All that is needed is a hardened Spark connector package, I would think. The next evaluation I will be conducting is to see if SnappyData’s claims are valid by doing my own tests.

It's hard to compare Kudu against any other data store without a lot of analysis and thorough benchmarking, but it is certainly a goal of Kudu to be a great platform for ingesting and analyzing data through Spark.  Up till this point most of the Spark work has been community driven, but more thorough integration testing of the Spark connector is going to be a focus going forward.

- Dan

 
Cheers,
Ben



On Jun 15, 2016, at 12:47 AM, Todd Lipcon <todd@cloudera.com> wrote:

Hi Benjamin,

What workload are you using for benchmarks? Using spark or something more custom? rdd or data frame or SQL, etc? Maybe you can share the schema and some queries

Todd

Todd

On Jun 15, 2016 8:10 AM, "Benjamin Kim" <bbuild11@gmail.com> wrote:
Hi Todd,

Now that Kudu 0.9.0 is out. I have done some tests. Already, I am impressed. Compared to HBase, read and write performance are better. Write performance has the greatest improvement (> 4x), while read is > 1.5x. Albeit, these are only preliminary tests. Do you know of a way to really do some conclusive tests? I want to see if I can match your results on my 50 node cluster.

Thanks,
Ben

On May 30, 2016, at 10:33 AM, Todd Lipcon <todd@cloudera.com> wrote:

On Sat, May 28, 2016 at 7:12 AM, Benjamin Kim <bbuild11@gmail.com> wrote:
Todd,

It sounds like Kudu can possibly top or match those numbers put out by Aerospike. Do you have any performance statistics published or any instructions as to measure them myself as good way to test? In addition, this will be a test using Spark, so should I wait for Kudu version 0.9.0 where support will be built in?

We don't have a lot of benchmarks published yet, especially on the write side. I've found that thorough cross-system benchmarks are very difficult to do fairly and accurately, and often times users end up misguided if they pay too much attention to them :) So, given a finite number of developers working on Kudu, I think we've tended to spend more time on the project itself and less time focusing on "competition". I'm sure there are use cases where Kudu will beat out Aerospike, and probably use cases where Aerospike will beat Kudu as well.

From my perspective, it would be great if you can share some details of your workload, especially if there are some areas you're finding Kudu lacking. Maybe we can spot some easy code changes we could make to improve performance, or suggest a tuning variable you could change.

-Todd


On May 27, 2016, at 9:19 PM, Todd Lipcon <todd@cloudera.com> wrote:

On Fri, May 27, 2016 at 8:20 PM, Benjamin Kim <bbuild11@gmail.com> wrote:
Hi Mike,

First of all, thanks for the link. It looks like an interesting read. I checked that Aerospike is currently at version 3.8.2.3, and in the article, they are evaluating version 3.5.4. The main thing that impressed me was their claim that they can beat Cassandra and HBase by 8x for writing and 25x for reading. Their big claim to fame is that Aerospike can write 1M records per second with only 50 nodes. I wanted to see if this is real.

1M records per second on 50 nodes is pretty doable by Kudu as well, depending on the size of your records and the insertion order. I've been playing with a ~70 node cluster recently and seen 1M+ writes/second sustained, and bursting above 4M. These are 1KB rows with 11 columns, and with pretty old HDD-only nodes. I think newer flash-based nodes could do better.
 

To answer your questions, we have a DMP with user profiles with many attributes. We create segmentation information off of these attributes to classify them. Then, we can target advertising appropriately for our sales department. Much of the data processing is for applying models on all or if not most of every profile’s attributes to find similarities (nearest neighbor/clustering) over a large number of rows when batch processing or a small subset of rows for quick online scoring. So, our use case is a typical advanced analytics scenario. We have tried HBase, but it doesn’t work well for these types of analytics.

I read, that Aerospike in the release notes, they did do many improvements for batch and scan operations.

I wonder what your thoughts are for using Kudu for this.

Sounds like a good Kudu use case to me. I've heard great things about Aerospike for the low latency random access portion, but I've also heard that it's _very_ expensive, and not particularly suited to the columnar scan workload. Lastly, I think the Apache license of Kudu is much more appealing than the AGPL3 used by Aerospike. But, that's not really a direct answer to the performance question :)
 

Thanks,
Ben


On May 27, 2016, at 6:21 PM, Mike Percy <mpercy@cloudera.com> wrote:

Have you considered whether you have a scan heavy or a random access heavy workload? Have you considered whether you always access / update a whole row vs only a partial row? Kudu is a column store so has some awesome performance characteristics when you are doing a lot of scanning of just a couple of columns.

I don't know the answer to your question but if your concern is performance then I would be interested in seeing comparisons from a perf perspective on certain workloads.

Finally, a year ago Aerospike did quite poorly in a Jepsen test: https://aphyr.com/posts/324-jepsen-aerospike

I wonder if they have addressed any of those issues.

Mike

On Friday, May 27, 2016, Benjamin Kim <bbuild11@gmail.com> wrote:
I am just curious. How will Kudu compare with Aerospike (http://www.aerospike.com)? I went to a Spark Roadshow and found out about this piece of software. It appears to fit our use case perfectly since we are an ad-tech company trying to leverage our user profiles data. Plus, it already has a Spark connector and has a SQL-like client. The tables can be accessed using Spark SQL DataFrames and, also, made into SQL tables for direct use with Spark SQL ODBC/JDBC Thriftserver. I see from the work done here http://gerrit.cloudera.org:8080/#/c/2992/ that the Spark integration is well underway and, from the looks of it lately, almost complete. I would prefer to use Kudu since we are already a Cloudera shop, and Kudu is easy to deploy and configure using Cloudera Manager. I also hope that some of Aerospike’s speed optimization techniques can make it into Kudu in the future, if they have not been already thought of or included.

Just some thoughts…

Cheers,
Ben


-- 
--
Mike Percy
Software Engineer, Cloudera






-- 
Todd Lipcon
Software Engineer, Cloudera




-- 
Todd Lipcon
Software Engineer, Cloudera







-- 
Todd Lipcon
Software Engineer, Cloudera




-- 
Todd Lipcon
Software Engineer, Cloudera




-- 
Todd Lipcon
Software Engineer, Cloudera




-- 
Todd Lipcon
Software Engineer, Cloudera