cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthew F. Dennis (JIRA)" <>
Subject [jira] [Commented] (CASSANDRA-1278) Make bulk loading into Cassandra less crappy, more pluggable
Date Wed, 04 May 2011 01:34:05 GMT


Matthew F. Dennis commented on CASSANDRA-1278:

bq. It was intentional as previously only the streaming was buffered (at 4k)
It was the other way around IIRC: (non-encrypted) streaming used channel.transferTo, which
bypassed the buffering entirely. The buffering was for internode messaging: see CASSANDRA-1943.

Yes, I see now; that was unintentional and has been corrected.

bq. We could construct something that buffers up X amount of data and then frames the data
being sent and change the inner loop to decompose that but it's extra complexity, code and
You're already buffering rows in StreamingProxyFlusher.bufferRow: the change would simply
be to continue to buffer rows until a threshold was reached. The benefit here is that the
code on the receiving side doesn't need to change when the proxy starts sending it a different
SSTable version/format. I've never heard of somebody regretting having framing in a protocol:
it's always the other way around.

Yes, I understood what you were suggesting; that was precisely the extra buffering I was talking
about.  Buffering more than one row on the client side means we keep larger buffers around
and increase the GC pressure which is already a problem on the proxy because of thrift.  That
being said, I've changed the protocol to be framed but the proxy still just sends one row
at a time (each row in a frame) to avoid the problems mentioned.  If we later wanted to change
the proxy to buffer more or implement a different client the server won't care.

{quote}Also, an SSTable version (as usually held by Descriptor) should be added to the header
of your protocol so that clients don't break by sending unversioned blobs: not having versioning
is my primary complaint vis-a-vis BinaryMemtables.{quote}

Added, along with a protocol version, to the header.

bq. If we buffer it on the other side we consume more memory for a longer period of time
I was talking about buffering on the client side: the server side can do one system call to
flush to disk, such that it never enters userspace.

I was too, it's primarily the buffering on the proxy side that is the problem.  The goal is
to get the data off the proxy as quickly as possible.  As quickly as possible is one row at
a time because of the serialization format (size must be known before entire row can be written).

bq. If you're comparing to the streams we use for repair and similar, they require table names
and byte ranges be known up front

We've had enough trouble debugging streaming when people use it all the time for repair. I
shudder to think of the bugs we'll introduce to a second-class protocol that gets used slightly
more often than BMT.

that's because the streaming used for repair is complex and fragile; independent streams are
tightly coupled in a session, sizes must be known up front, retries are complex and require
out-of-band messaging between nodes, everything is "buffered" on disk before building of any
indexes/filters starts, et cetera.  In comparison the protocol used for loading is extremely
simple; if it makes you feel better we could add a CRC/MD5 to the stream.

Maybe we've been too clever here: why not just write out the full sstable on the client, and
stream it over (indexes and all) so that

    * we move the [primary] index build off the server, which should give a nice performance
    * we have filenames and sizes ready to go so streaming will be happy

We're still talking about a minor change to streaming of recognizing that we're getting all
the components and not just data, but that's something we can deal with at the StreamInSession
level, I don't think we'll need to change the protocol itself.

One of the main goals of the bulk loading was that no local/temp storage was required on the
client; that has been the plan from the beginning.  If you have something that generates full
tables, indexes and filters it makes more sense to generate them locally by using the SSTableWriter
directly, push them to the box and then using CASSANDRA-2438 to "add" them to the node.  Maybe
we could add this as an option to the proxy to make it just a bit easier to do but it certainly
isn't suitable as the only option.  If we want this, it should be a separate ticket as it's
separate functionality.  Overall though, I'm not really a fan of requiring temp space on the

The problem I can think of at the moment is that for large clusters this is a lot of seeking
on the proxy since you need to generate one table for every replica set or a lot of repeated
passes on the same data.  Even if you do this or make it "very fast" (tm) it doesn't much
matter because as you transfer small tables to nodes they will almost immediately be compacted
meaning the work saved to generate the indexes and filters was wasted and was only a small
percentage of the overall work moved off of the cluster.  Compacting the tables on the clients
before sending them would just make a questionable idea worse...

bq. Maybe we've been too clever here: why not just write out the full sstable on the client,
and stream it over (indexes and all) so that

As much as I want to merge the protocols, I'm not sure I like the limitations this puts on
clients: being able to send a stream without needing local tempspace is very, very beneficial,
IMO (for example, needing tempspace was by far the most annoying limitation of a Hadoop LuceneOutputFormat
I worked on).

Exactly; requiring temp space seems like an anti-feature to me.

bq. If you're comparing to the streams we use for repair and similar, they require table names
and byte ranges be known up front

bq. something we can deal with at the StreamInSession level, I don't think we'll need to change
the protocol itself

With versioned messaging, changing the protocol is at least possible, if painful... my dream
would be:

   1. Deprecate the file ranges in Streaming session objects, to be replaced with framing
in the stream
   2. Move the Streaming session object to a header of the streaming connection (almost identical
to LoaderStream)
   3. Deprecate the Messaging based setup and teardown for streaming sessions: a sender initiates
a stream by opening a streaming connection, and tears it down with success codes after each
file (again, like this protocol)

The protocol is now versioned (as well as the table format) so this is possible (though certainly
on a different ticket).  If we change the existing streaming to use this protocol I think
we end up with something a lot less fragile and a lot less complex.

Essentially the sender is in control and keeps retrying until the receiver has the data; deprecate
sessions all together.  When node A wants to send things to node B, it records that fact in
the system table.  For each entry it sends the file using the bulk loading protocol and continues
retrying until the file is excepted.  For each range it wants to send it frames the entire
range.  The only complex part is preventing removal of the SSTable on the source (node A)
until it was successfully streamed to the destination (node B).

{quote}tl;dr: I'd prefer some slight adjustments to Matt's protocol (mentioned above) over
requiring tempspace on the client.{quote}


> Make bulk loading into Cassandra less crappy, more pluggable
> ------------------------------------------------------------
>                 Key: CASSANDRA-1278
>                 URL:
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Tools
>            Reporter: Jeremy Hanna
>            Assignee: Matthew F. Dennis
>             Fix For: 0.8.1
>         Attachments: 1278-cassandra-0.7-v2.txt, 1278-cassandra-0.7.1.txt, 1278-cassandra-0.7.txt
>   Original Estimate: 40h
>          Time Spent: 40h 40m
>  Remaining Estimate: 0h
> Currently bulk loading into Cassandra is a black art.  People are either directed to
just do it responsibly with thrift or a higher level client, or they have to explore the contrib/bmt
example -  That contrib module requires delving
into the code to find out how it works and then applying it to the given problem.  Using either
method, the user also needs to keep in mind that overloading the cluster is possible - which
will hopefully be addressed in CASSANDRA-685
> This improvement would be to create a contrib module or set of documents dealing with
bulk loading.  Perhaps it could include code in the Core to make it more pluggable for external
clients of different types.
> It is just that this is something that many that are new to Cassandra need to do - bulk
load their data into Cassandra.

This message is automatically generated by JIRA.
For more information on JIRA, see:

View raw message