trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jplev...@apache.org
Subject svn commit: r1055948 - in /trafficserver/traffic/trunk: iocore/cache/Cache.cc iocore/cache/CacheDisk.cc iocore/cache/RamCacheCLFUS.cc iocore/eventsystem/I_Event.h iocore/eventsystem/Tasks.cc proxy/config/records.config.in
Date Thu, 06 Jan 2011 17:25:56 GMT
Author: jplevyak
Date: Thu Jan  6 17:25:55 2011
New Revision: 1055948

URL: http://svn.apache.org/viewvc?rev=1055948&view=rev
Log:
TS-560: RAM cache compression moved off net threads

Modified:
    trafficserver/traffic/trunk/iocore/cache/Cache.cc
    trafficserver/traffic/trunk/iocore/cache/CacheDisk.cc
    trafficserver/traffic/trunk/iocore/cache/RamCacheCLFUS.cc
    trafficserver/traffic/trunk/iocore/eventsystem/I_Event.h
    trafficserver/traffic/trunk/iocore/eventsystem/Tasks.cc
    trafficserver/traffic/trunk/proxy/config/records.config.in

Modified: trafficserver/traffic/trunk/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/cache/Cache.cc?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/cache/Cache.cc (original)
+++ trafficserver/traffic/trunk/iocore/cache/Cache.cc Thu Jan  6 17:25:55 2011
@@ -839,6 +839,23 @@ CacheProcessor::cacheInitialized()
 
         }
       }
+      switch (cache_config_ram_cache_compress) {
+        default:
+          Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
+        case CACHE_COMPRESSION_NONE: 
+        case CACHE_COMPRESSION_FASTLZ:
+          break;
+        case CACHE_COMPRESSION_LIBZ:
+#if ! TS_HAS_LIBZ
+          Fatal("libz not available for RAM cache compression");
+#endif
+          break;
+        case CACHE_COMPRESSION_LIBLZMA:
+#if ! TS_HAS_LZMA
+          Fatal("lzma not available for RAM cache compression");
+#endif
+          break;
+      }
 
       GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
       GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);

Modified: trafficserver/traffic/trunk/iocore/cache/CacheDisk.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/cache/CacheDisk.cc?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/cache/CacheDisk.cc (original)
+++ trafficserver/traffic/trunk/iocore/cache/CacheDisk.cc Thu Jan  6 17:25:55 2011
@@ -37,6 +37,7 @@ CacheDisk::open(char *s, off_t blocks, o
   io.aiocb.aio_fildes = fd;
   io.aiocb.aio_reqprio = 0;
   io.action = this;
+  // determine header size and hence start point by successive approximation
   uint64_t l;
   for (int i = 0; i < 3; i++) {
     l = (len * STORE_BLOCK_SIZE) - (start - skip);

Modified: trafficserver/traffic/trunk/iocore/cache/RamCacheCLFUS.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/cache/RamCacheCLFUS.cc?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/cache/RamCacheCLFUS.cc (original)
+++ trafficserver/traffic/trunk/iocore/cache/RamCacheCLFUS.cc Thu Jan  6 17:25:55 2011
@@ -25,6 +25,7 @@
 // See https://cwiki.apache.org/confluence/display/TS/RamCache
 
 #include "P_Cache.h"
+#include "I_Tasks.h"
 #if TS_HAS_LIBZ
 #include <zlib.h>
 #endif
@@ -87,7 +88,7 @@ struct RamCacheCLFUS : public RamCache {
   uint16_t *seen;
   int ncompressed;
   RamCacheCLFUSEntry *compressed; // first uncompressed lru[0] entry
-  void compress_entries(int do_at_most = INT_MAX);
+  void compress_entries(EThread *thread, int do_at_most = INT_MAX);
   void resize_hashtable();
   void victimize(RamCacheCLFUSEntry *e);
   void move_compressed(RamCacheCLFUSEntry *e);
@@ -270,7 +271,6 @@ void RamCacheCLFUS::victimize(RamCacheCL
 
 void RamCacheCLFUS::move_compressed(RamCacheCLFUSEntry *e) {
   if (e == compressed) {
-    ncompressed--;
     if (compressed->lru_link.next)
       compressed = compressed->lru_link.next;
     else {
@@ -298,9 +298,10 @@ RamCacheCLFUSEntry *RamCacheCLFUS::destr
   return ret;
 }
 
-void RamCacheCLFUS::compress_entries(int do_at_most) {
+void RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) {
   if (!cache_config_ram_cache_compress)
     return;
+  MUTEX_TAKE_LOCK(part->mutex, thread);
   if (!compressed) {
     compressed = lru[0].head;
     ncompressed = 0;
@@ -329,19 +330,25 @@ void RamCacheCLFUS::compress_entries(int
         case CACHE_COMPRESSION_LIBLZMA: l = e->len; break;
 #endif
       }
+      // store transient data for lock release
+      Ptr<IOBufferData> edata = e->data;
+      uint32_t elen = e->len;
+      INK_MD5 key = e->key;
+      MUTEX_UNTAKE_LOCK(part->mutex, thread);
       b = (char*)xmalloc(l);
+      bool failed = false;
       switch (ctype) {
         default: goto Lfailed;
         case CACHE_COMPRESSION_FASTLZ:
           if (e->len < 16) goto Lfailed;
-          if ((l = fastlz_compress(e->data->data(), e->len, b)) <= 0)
-            goto Lfailed;
+          if ((l = fastlz_compress(edata->data(), elen, b)) <= 0)
+            failed = true;
           break;
 #if TS_HAS_LIBZ
         case CACHE_COMPRESSION_LIBZ: {
           uLongf ll = l;
-          if ((Z_OK != compress((Bytef*)b, &ll, (Bytef*)e->data->data(), e->len)))
-            goto Lfailed;
+          if ((Z_OK != compress((Bytef*)b, &ll, (Bytef*)edata->data(), elen)))
+            failed = true;
           l = (int)ll;
           break;
         }
@@ -350,15 +357,31 @@ void RamCacheCLFUS::compress_entries(int
         case CACHE_COMPRESSION_LIBLZMA: {
           size_t pos = 0, ll = l;
           if (LZMA_OK != lzma_easy_buffer_encode(LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE, NULL,
-                                                 (uint8_t*)e->data->data(), e->len,
(uint8_t*)b, &pos, ll))
-            goto Lfailed;
+                                                 (uint8_t*)edata->data(), elen, (uint8_t*)b,
&pos, ll))
+            failed = true;
           l = (int)pos;
           break;
         }
 #endif
       }
+      MUTEX_TAKE_LOCK(part->mutex, thread);
+      // see if the entry is till around
+      {
+        uint32_t i = key.word(3) % nbuckets;
+        RamCacheCLFUSEntry *ee = bucket[i].head;
+        while (ee) {
+          if (ee->key == key && ee->data == edata) break;
+          ee = ee->hash_link.next;
+        }
+        if (!ee || ee != e) {
+          e = compressed;
+          goto Lcontinue;
+        }
+        if (failed)
+          goto Lfailed;
+      }
       if (l > REQUIRED_COMPRESSION * e->len)
-        e->flag_bits.incompressible = ctype;
+        e->flag_bits.incompressible = true;
       if (l > REQUIRED_SHRINK * e->size)
         goto Lfailed;
       if (l < e->len) {
@@ -391,15 +414,17 @@ void RamCacheCLFUS::compress_entries(int
     xfree(b);
     e->flag_bits.incompressible = 1;
   Lcontinue:;
-    DDebug("ram_cache", "compress %X %d %d %d %d %d %d",
+    DDebug("ram_cache", "compress %X %d %d %d %d %d %d %d",
            e->key.word(3), e->auxkey1, e->auxkey2,
            e->flag_bits.incompressible, e->flag_bits.compressed,
-           e->len, e->compressed_len);
+           e->len, e->compressed_len, ncompressed);
     if (!e->lru_link.next)
       break;
     compressed = e->lru_link.next;
     ncompressed++;
   }
+  MUTEX_UNTAKE_LOCK(part->mutex, thread);
+  return;
 }
 
 void RamCacheCLFUS::requeue_victims(RamCacheCLFUS *c, Que(RamCacheCLFUSEntry, lru_link) &victims)
{
@@ -549,8 +574,6 @@ Linsert:
   e->len = len;
   check_accounting(this);
   DDebug("ram_cache", "put %X %d %d size %d INSERTED", key->word(3), auxkey1, auxkey2,
e->size);
-  if (cache_config_ram_cache_compress_percent)
-    compress_entries();
   return 1;
 Lhistory:
   requeue_victims(this, victims);
@@ -586,6 +609,40 @@ int RamCacheCLFUS::fixup(INK_MD5 * key, 
   return 0;
 }
 
+class RamCacheCLFUSCompressor : public Continuation { public:
+  RamCacheCLFUS *rc;
+  int mainEvent(int event, Event *e);
+  RamCacheCLFUSCompressor(RamCacheCLFUS *arc): rc(arc) { 
+   SET_HANDLER(&RamCacheCLFUSCompressor::mainEvent); 
+  }
+};
+
+int RamCacheCLFUSCompressor::mainEvent(int event, Event *e) {
+  switch (cache_config_ram_cache_compress) {
+    default:
+      Warning("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
+    case CACHE_COMPRESSION_NONE: 
+    case CACHE_COMPRESSION_FASTLZ:
+      break;
+    case CACHE_COMPRESSION_LIBZ:
+#if ! TS_HAS_LIBZ
+      Warning("libz not available for RAM cache compression");
+#endif
+      break;
+    case CACHE_COMPRESSION_LIBLZMA:
+#if ! TS_HAS_LZMA
+      Warning("lzma not available for RAM cache compression");
+#endif
+      break;
+  }
+  if (cache_config_ram_cache_compress_percent)
+    rc->compress_entries(e->ethread);
+  return EVENT_CONT;
+}
+
 RamCache *new_RamCacheCLFUS() {
-  return new RamCacheCLFUS;
+  RamCacheCLFUS *r = new RamCacheCLFUS;
+  eventProcessor.schedule_every(new RamCacheCLFUSCompressor(r), HRTIME_SECOND,
+    ET_TASK);
+  return r;
 }

Modified: trafficserver/traffic/trunk/iocore/eventsystem/I_Event.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/I_Event.h?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/I_Event.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/I_Event.h Thu Jan  6 17:25:55 2011
@@ -95,7 +95,7 @@
 
 typedef int EventType;
 const int ET_CALL = 0;
-const int MAX_EVENT_TYPES = 8; // TODO: How is this sized? Where does 8 come from? Or 17
?
+const int MAX_EVENT_TYPES = 8; // conservative, these are dynamically allocated
 
 class EThread;
 

Modified: trafficserver/traffic/trunk/iocore/eventsystem/Tasks.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/Tasks.cc?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/Tasks.cc (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/Tasks.cc Thu Jan  6 17:25:55 2011
@@ -30,13 +30,7 @@ TasksProcessor tasksProcessor;
 int
 TasksProcessor::start(int task_threads)
 {
-  if (task_threads > 0) {
+  if (task_threads > 0)
     ET_TASK = eventProcessor.spawn_event_threads(task_threads);
-
-    // TODO: Do we need to "initialize" these threads for "net" ?
-    //    for (int i = 0; i < task_threads; i++)
-    //initialize_thread_for_net(eventProcessor.eventthread[ET_TASK][i], i);
-  }
-
   return 0;
 }

Modified: trafficserver/traffic/trunk/proxy/config/records.config.in
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/proxy/config/records.config.in?rev=1055948&r1=1055947&r2=1055948&view=diff
==============================================================================
--- trafficserver/traffic/trunk/proxy/config/records.config.in (original)
+++ trafficserver/traffic/trunk/proxy/config/records.config.in Thu Jan  6 17:25:55 2011
@@ -328,6 +328,8 @@ CONFIG proxy.config.cache.ram_cache.algo
    #  1 : fastlz (extremely fast, relatively low compression)
    #  2 : libz (moderate speed, reasonable compression)
    #  3 : liblzma (very slow, high compression)
+   #  NOTE: compression runs on task threads.  To use more cores for
+   #  compression, increase proxy.config.task_threads.
 CONFIG proxy.config.cache.ram_cache.compress INT 0
    # The maximum number of alternates that are allowed for any given URL.
    # It is not possible to strictly enforce this if the variable
@@ -608,3 +610,9 @@ CONFIG proxy.config.dump_mem_info_freque
   # Log any request that takes more then x number of milliseconds, needs
   # to be > 0 to be enabled
 CONFIG proxy.config.http.slow.log.threshold INT 0
+##############################################################################
+#
+# Tasks
+#
+##############################################################################
+CONFIG proxy.config.task_threads INT 1



Mime
View raw message