trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [7/8] trafficserver git commit: TS-974: Partial Object Caching.
Date Mon, 29 Jun 2015 11:47:56 GMT
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index e8ff804..4523c2a 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -48,14 +48,14 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, co
     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
     if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
       c = new_CacheVC(cont);
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+      c->vol = vol;
+      c->first_key = c->key = c->earliest_key = *key;
       c->vio.op = VIO::READ;
       c->base_stat = cache_read_active_stat;
-      CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
-      c->first_key = c->key = c->earliest_key = *key;
-      c->vol = vol;
-      c->frag_type = type;
       c->od = od;
+      c->frag_type = type;
+      CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
     }
     if (!c)
       goto Lmiss;
@@ -91,7 +91,46 @@ Lcallreturn:
   return &c->_action;
 }
 
-#ifdef HTTP_CACHE
+
+Action *
+Cache::open_read(Continuation *cont, CacheVConnection *vc, HTTPHdr *client_request_hdr)
+{
+  Action *zret = ACTION_RESULT_DONE;
+
+  CacheVC *write_vc = dynamic_cast<CacheVC *>(vc);
+  if (write_vc) {
+    Vol *vol = write_vc->vol;
+    ProxyMutex *mutex = cont->mutex; // needed for stat macros
+    CacheVC *c = new_CacheVC(cont);
+
+    c->vol = write_vc->vol;
+    c->first_key = write_vc->first_key;
+    // [amc] Need to fix this as it's pointless. In general @a earliest_key in the write VC
+    // won't be the correct value - it's randomly generated and for a partial fill won't be
+    // set to the actual alternate value until later (in @c set_http_info).
+    c->earliest_key = c->key = write_vc->earliest_key;
+    c->vio.op = VIO::READ;
+    c->base_stat = cache_read_active_stat;
+    c->od = write_vc->od;
+    c->frag_type = write_vc->frag_type;
+    CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+    //    write_vc->alternate.request_get(&c->request);
+    //    client_request_hdr->copy_shallow(&c->request);
+    c->request.copy_shallow(client_request_hdr);
+    c->params = write_vc->params; // seems to be a no-op, always NULL.
+    c->dir = c->first_dir = write_vc->first_dir;
+    c->write_vc = write_vc;
+    c->first_buf = write_vc->first_buf; // I don't think this is effective either.
+    SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+    zret = &c->_action; // default, override if needed.
+    CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+    if (lock.is_locked() && c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE) {
+      zret = ACTION_RESULT_DONE;
+    }
+  }
+  return zret;
+}
+
 Action *
 Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type,
                  const char *hostname, int host_len)
@@ -112,15 +151,15 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
     if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
       c = new_CacheVC(cont);
-      c->first_key = c->key = c->earliest_key = *key;
       c->vol = vol;
+      c->first_key = c->key = c->earliest_key = *key;
       c->vio.op = VIO::READ;
       c->base_stat = cache_read_active_stat;
+      c->od = od;
+      c->frag_type = CACHE_FRAG_TYPE_HTTP;
       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
       c->request.copy_shallow(request);
-      c->frag_type = CACHE_FRAG_TYPE_HTTP;
       c->params = params;
-      c->od = od;
     }
     if (!lock.is_locked()) {
       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
@@ -160,27 +199,55 @@ Lcallreturn:
     return ACTION_RESULT_DONE;
   return &c->_action;
 }
-#endif
 
 uint32_t
 CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
 {
   uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
-  if (cache_config_compatibility_4_2_0_fixup && // manual override not engaged
+  if (zret != static_cast<uint32_t>(-1) &&      // Make sure we haven't already failed
+      cache_config_compatibility_4_2_0_fixup && // manual override not engaged
       !this->f.doc_from_ram_cache &&            // it's already been done for ram cache fragments
       vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0) {
     for (int i = info->xcount - 1; i >= 0; --i) {
-      info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
-      info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
+      info->data(i)._alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
+      info->data(i)._alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
     }
   }
   return zret;
 }
 
+char const *
+CacheVC::get_http_range_boundary_string(int *len) const
+{
+  return resp_range.getBoundaryStr(len);
+}
+
+int64_t
+CacheVC::get_effective_content_size()
+{
+  return resp_range.hasRanges() ? resp_range.calcContentLength() : alternate.object_size_get();
+}
+
+int
+CacheVC::closeReadAndFree(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+  //  cancel_trigger(); // ??
+  if (od) {
+    CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+    if (!lock.is_locked()) {
+      SET_HANDLER(&CacheVC::closeReadAndFree);
+      VC_SCHED_LOCK_RETRY();
+    }
+    vol->close_read(this);
+  }
+  return free_CacheVC(this);
+}
+
 int
 CacheVC::openReadFromWriterFailure(int event, Event *e)
 {
-  od = NULL;
+  // od = NULL;
+  vol->close_read(this);
   vector.clear(false);
   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
   CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
@@ -190,105 +257,6 @@ CacheVC::openReadFromWriterFailure(int event, Event *e)
 }
 
 int
-CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
-  intptr_t err = ECACHE_DOC_BUSY;
-  CacheVC *w = NULL;
-
-  ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
-
-  if (!od)
-    return EVENT_RETURN;
-
-  if (frag_type != CACHE_FRAG_TYPE_HTTP) {
-    ink_assert(od->num_writers == 1);
-    w = od->writers.head;
-    if (w->start_time > start_time || w->closed < 0) {
-      od = NULL;
-      return EVENT_RETURN;
-    }
-    if (!w->closed)
-      return -err;
-    write_vc = w;
-  }
-#ifdef HTTP_CACHE
-  else {
-    write_vector = &od->vector;
-    int write_vec_cnt = write_vector->count();
-    for (int c = 0; c < write_vec_cnt; c++)
-      vector.insert(write_vector->get(c));
-    // check if all the writers who came before this reader have
-    // set the http_info.
-    for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
-      if (w->start_time > start_time || w->closed < 0)
-        continue;
-      if (!w->closed && !cache_config_read_while_writer) {
-        return -err;
-      }
-      if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
-        continue;
-
-      if (!w->closed && !w->alternate.valid()) {
-        od = NULL;
-        ink_assert(!write_vc);
-        vector.clear(false);
-        return EVENT_CONT;
-      }
-      // construct the vector from the writers.
-      int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
-      if (w->f.update) {
-        // all Update cases. Need to get the alternate index.
-        alt_ndx = get_alternate_index(&vector, w->update_key);
-        // if its an alternate delete
-        if (!w->alternate.valid()) {
-          if (alt_ndx >= 0)
-            vector.remove(alt_ndx, false);
-          continue;
-        }
-      }
-      ink_assert(w->alternate.valid());
-      if (w->alternate.valid())
-        vector.insert(&w->alternate, alt_ndx);
-    }
-
-    if (!vector.count()) {
-      if (od->reading_vec) {
-        // the writer(s) are reading the vector, so there is probably
-        // an old vector. Since this reader came before any of the
-        // current writers, we should return the old data
-        od = NULL;
-        return EVENT_RETURN;
-      }
-      return -ECACHE_NO_DOC;
-    }
-    if (cache_config_select_alternate) {
-      alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
-      if (alternate_index < 0)
-        return -ECACHE_ALT_MISS;
-    } else
-      alternate_index = 0;
-    CacheHTTPInfo *obj = vector.get(alternate_index);
-    for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
-      if (obj->m_alt == w->alternate.m_alt) {
-        write_vc = w;
-        break;
-      }
-    }
-    vector.clear(false);
-    if (!write_vc) {
-      DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
-      od = NULL;
-      return EVENT_RETURN;
-    }
-
-    DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
-           write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
-  }
-#endif // HTTP_CACHE
-  return EVENT_NONE;
-}
-
-int
 CacheVC::openReadFromWriter(int event, Event *e)
 {
   if (!f.read_from_writer_called) {
@@ -304,167 +272,70 @@ CacheVC::openReadFromWriter(int event, Event *e)
     f.read_from_writer_called = 1;
   }
   cancel_trigger();
-  intptr_t err = ECACHE_DOC_BUSY;
   DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
-#ifndef READ_WHILE_WRITER
-  return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-#else
+
   if (_action.cancelled) {
-    od = NULL; // only open for read so no need to close
-    return free_CacheVC(this);
+    return this->closeReadAndFree(0, NULL);
+    //    od = NULL; // only open for read so no need to close
+    //    return free_CacheVC(this);
   }
   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
   if (!lock.is_locked())
     VC_SCHED_LOCK_RETRY();
-  od = vol->open_read(&first_key); // recheck in case the lock failed
-  if (!od) {
+  if (!od && NULL == (od = vol->open_read(&first_key))) {
     MUTEX_RELEASE(lock);
     write_vc = NULL;
     SET_HANDLER(&CacheVC::openReadStartHead);
     return openReadStartHead(event, e);
-  } else
-    ink_assert(od == vol->open_read(&first_key));
-  if (!write_vc) {
-    int ret = openReadChooseWriter(event, e);
-    if (ret < 0) {
-      MUTEX_RELEASE(lock);
-      SET_HANDLER(&CacheVC::openReadFromWriterFailure);
-      return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
-    } else if (ret == EVENT_RETURN) {
-      MUTEX_RELEASE(lock);
-      SET_HANDLER(&CacheVC::openReadStartHead);
-      return openReadStartHead(event, e);
-    } else if (ret == EVENT_CONT) {
-      ink_assert(!write_vc);
-      VC_SCHED_WRITER_RETRY();
-    } else
-      ink_assert(write_vc);
-  } else {
-    if (writer_done()) {
-      MUTEX_RELEASE(lock);
-      DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
-      od = NULL;
-      write_vc = NULL;
-      SET_HANDLER(&CacheVC::openReadStartHead);
-      return openReadStartHead(event, e);
-    }
-  }
-#ifdef HTTP_CACHE
-  OpenDirEntry *cod = od;
-#endif
-  od = NULL;
-  // someone is currently writing the document
-  if (write_vc->closed < 0) {
-    MUTEX_RELEASE(lock);
-    write_vc = NULL;
-    // writer aborted, continue as if there is no writer
-    SET_HANDLER(&CacheVC::openReadStartHead);
-    return openReadStartHead(EVENT_IMMEDIATE, 0);
-  }
-  // allow reading from unclosed writer for http requests only.
-  ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
-  if (!write_vc->closed && !write_vc->fragment) {
-    if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP ||
-        writer_lock_retry >= cache_config_read_while_writer_max_retries) {
-      MUTEX_RELEASE(lock);
-      return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-    }
-    DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
-           write_vc->fragment, writer_lock_retry);
-    VC_SCHED_WRITER_RETRY();
   }
 
-  CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
-  if (!writer_lock.is_locked()) {
-    DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
+  CACHE_TRY_LOCK(lock_od, od->mutex, mutex->thread_holding);
+  if (!lock_od.is_locked())
     VC_SCHED_LOCK_RETRY();
-  }
-  MUTEX_RELEASE(lock);
 
-  if (!write_vc->io.ok())
-    return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-#ifdef HTTP_CACHE
-  if (frag_type == CACHE_FRAG_TYPE_HTTP) {
-    DDebug("cache_read_agg", "%p: key: %X http passed stage 1, closed: %d, frag: %d", this, first_key.slice32(1), write_vc->closed,
-           write_vc->fragment);
-    if (!write_vc->alternate.valid())
-      return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-    alternate.copy(&write_vc->alternate);
-    vector.insert(&alternate);
-    alternate.object_key_get(&key);
-    write_vc->f.readers = 1;
-    if (!(write_vc->f.update && write_vc->total_len == 0)) {
-      key = write_vc->earliest_key;
-      if (!write_vc->closed)
-        alternate.object_size_set(write_vc->vio.nbytes);
-      else
-        alternate.object_size_set(write_vc->total_len);
-    } else {
-      key = write_vc->update_key;
-      ink_assert(write_vc->closed);
-      DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
-      // Update case (b) : grab doc_len from the writer's alternate
-      doc_len = alternate.object_size_get();
-      if (write_vc->update_key == cod->single_doc_key && (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) &&
-          write_vc->first_buf._ptr()) {
-        // the resident alternate is being updated and its a
-        // header only update. The first_buf of the writer has the
-        // document body.
-        Doc *doc = (Doc *)write_vc->first_buf->data();
-        writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
-        MUTEX_RELEASE(writer_lock);
-        ink_assert(doc_len == doc->data_len());
-        length = doc_len;
-        f.single_fragment = 1;
-        doc_pos = 0;
-        earliest_key = key;
-        dir_clean(&first_dir);
-        dir_clean(&earliest_dir);
-        SET_HANDLER(&CacheVC::openReadFromWriterMain);
-        CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
-        return callcont(CACHE_EVENT_OPEN_READ);
-      }
-      // want to snarf the new headers from the writer
-      // and then continue as if nothing happened
-      last_collision = NULL;
-      MUTEX_RELEASE(writer_lock);
-      SET_HANDLER(&CacheVC::openReadStartEarliest);
-      return openReadStartEarliest(event, e);
+  if (od->open_writer) {
+    // Alternates are in flux, wait for origin server response to update them.
+    if (!od->open_waiting.in(this)) {
+      wake_up_thread = mutex->thread_holding;
+      od->open_waiting.push(this);
     }
-  } else {
-#endif // HTTP_CACHE
-    DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
-    key = write_vc->earliest_key;
-#ifdef HTTP_CACHE
+    Debug("amc", "[CacheVC::openReadFromWriter] waiting for %p", od->open_writer);
+    return EVENT_CONT; // wait for the writer to wake us up.
   }
-#endif
-  if (write_vc->fragment) {
-    doc_len = write_vc->vio.nbytes;
-    last_collision = NULL;
-    DDebug("cache_read_agg", "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment", this, first_key.slice32(1),
-           write_vc->closed, write_vc->fragment, (int)doc_len);
-    MUTEX_RELEASE(writer_lock);
-    // either a header + body update or a new document
+
+  MUTEX_RELEASE(lock); // we have the OD lock now, don't need the vol lock.
+
+  if (write_vc && CACHE_ALT_INDEX_DEFAULT != (alternate_index = get_alternate_index(&(od->vector), write_vc->earliest_key))) {
+    // Found the alternate for our write VC. Really, though, if we have a write_vc we should never fail to get
+    // the alternate - we should probably check for that.
+    alternate.copy_shallow(od->vector.get(alternate_index));
+    key = earliest_key = alternate.object_key_get();
+    doc_len = alternate.object_size_get();
+    Debug("amc", "[openReadFromWriter] - setting alternate from write_vc %p to #%d : %p", write_vc, alternate_index,
+          alternate.m_alt);
+    MUTEX_RELEASE(lock_od);
     SET_HANDLER(&CacheVC::openReadStartEarliest);
     return openReadStartEarliest(event, e);
+  } else {
+    if (cache_config_select_alternate) {
+      alternate_index = HttpTransactCache::SelectFromAlternates(&od->vector, &request, params);
+      if (alternate_index < 0) {
+        MUTEX_RELEASE(lock_od);
+        SET_HANDLER(&CacheVC::openReadFromWriterFailure);
+        return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(-ECACHE_ALT_MISS));
+      }
+      Debug("amc", "[openReadFromWriter] select alt: %d %p (current %p)", alternate_index, od->vector.get(alternate_index)->m_alt,
+            alternate.m_alt);
+      write_vector = &od->vector;
+    } else {
+      alternate_index = 0;
+    }
+    MUTEX_RELEASE(lock_od);
+    SET_HANDLER(&CacheVC::openReadStartHead);
+    return openReadStartHead(event, e);
   }
-  writer_buf = write_vc->blocks;
-  writer_offset = write_vc->offset;
-  length = write_vc->length;
-  // copy the vector
-  f.single_fragment = !write_vc->fragment; // single fragment doc
-  doc_pos = 0;
-  earliest_key = write_vc->earliest_key;
-  ink_assert(earliest_key == key);
-  doc_len = write_vc->total_len;
-  dir_clean(&first_dir);
-  dir_clean(&earliest_dir);
-  DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
-  MUTEX_RELEASE(writer_lock);
-  SET_HANDLER(&CacheVC::openReadFromWriterMain);
-  CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
-  return callcont(CACHE_EVENT_OPEN_READ);
-#endif // READ_WHILE_WRITER
+  ink_assert(false);
+  return EVENT_DONE; // should not get here.
 }
 
 int
@@ -575,6 +446,8 @@ CacheVC::openReadReadDone(int event, Event *e)
         goto Lcallreturn;
       return EVENT_CONT;
     } else if (write_vc) {
+      ink_release_assert(!"[amc] Handle this");
+#if 0
       if (writer_done()) {
         last_collision = NULL;
         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
@@ -589,6 +462,7 @@ CacheVC::openReadReadDone(int event, Event *e)
       }
       DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
       VC_SCHED_WRITER_RETRY(); // wait for writer
+#endif
     }
     // fall through for truncated documents
   }
@@ -596,126 +470,144 @@ Lerror:
   char tmpstring[100];
   Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
   return calluser(VC_EVENT_ERROR);
-Ldone:
+  // Ldone:
   return calluser(VC_EVENT_EOS);
 Lcallreturn:
   return handleEvent(AIO_EVENT_DONE, 0);
 LreadMain:
-  fragment++;
+  ++fragment;
   doc_pos = doc->prefix_len();
+  doc_pos += resp_range.getOffset() - frag_upper_bound; // used before update!
+  frag_upper_bound += doc->data_len();
   next_CacheKey(&key, &key);
   SET_HANDLER(&CacheVC::openReadMain);
   return openReadMain(event, e);
 }
 
+void
+CacheVC::update_key_to_frag_idx(int target)
+{
+  if (0 == target) {
+    fragment = 0;
+    key = earliest_key;
+  } else {
+    FragmentDescriptor *frag = alternate.force_frag_at(target);
+    ink_assert(frag);
+    key = frag->m_key;
+  }
+}
+
+int
+CacheVC::frag_idx_for_offset(uint64_t offset)
+{
+  FragmentDescriptorTable *frags = alternate.get_frag_table();
+  int count = alternate.get_frag_count();
+  uint32_t ffs = alternate.get_frag_fixed_size();
+  int idx = count / 2;
+
+  ink_assert(offset < doc_len);
+
+  if (ffs)
+    idx = offset / ffs; // good guess as to the right offset.
+
+  if (count > 1 && 0 == (*frags)[1].m_offset)
+    ++idx;
+
+  do {
+    uint64_t upper = idx >= count ? doc_len : (*frags)[idx + 1].m_offset;
+    uint64_t lower = idx <= 0 ? 0 : (*frags)[idx].m_offset;
+    if (offset < lower)
+      idx = idx / 2;
+    else if (offset >= upper)
+      idx = (count + idx + 1) / 2;
+    else
+      break;
+  } while (true);
+  return idx;
+}
+
+/* There is a fragment available, decide what do to next.
+ */
 int
 CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
 {
   cancel_trigger();
   Doc *doc = (Doc *)buf->data();
-  int64_t ntodo = vio.ntodo();
-  int64_t bytes = doc->len - doc_pos;
+  int64_t bytes = vio.ntodo();
   IOBufferBlock *b = NULL;
-  if (seek_to) { // handle do_io_pread
-    if (seek_to >= doc_len) {
-      vio.ndone = doc_len;
-      return calluser(VC_EVENT_EOS);
+  uint64_t target_offset = resp_range.getOffset();
+  uint64_t lower_bound = frag_upper_bound - doc->data_len();
+
+  if (bytes <= 0)
+    return EVENT_CONT;
+
+  // Start shipping
+  while (bytes > 0 && lower_bound <= target_offset && target_offset < frag_upper_bound) {
+    if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // wait for reader
+      return EVENT_CONT;
+
+    if (resp_range.hasPendingRangeShift()) { // in new range, shift to start location.
+      int b_len;
+      char const *b_str = resp_range.getBoundaryStr(&b_len);
+      size_t r_idx = resp_range.getIdx();
+
+      doc_pos = doc->prefix_len() + (target_offset - lower_bound);
+
+      vio.ndone +=
+        HTTPRangeSpec::writePartBoundary(vio.buffer.writer(), b_str, b_len, doc_len, resp_range[r_idx]._min, resp_range[r_idx]._max,
+                                         resp_range.getContentTypeField(), r_idx >= (resp_range.count() - 1));
+      resp_range.consumeRangeShift();
+      Debug("amc", "Range boundary for range %" PRIu64, r_idx);
     }
-#ifdef HTTP_CACHE
-    HTTPInfo::FragOffset *frags = alternate.get_frag_table();
-    if (is_debug_tag_set("cache_seek")) {
-      char b[33], c[33];
-      Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
-            doc_pos, doc->len, doc->key.toHexStr(c));
+
+    bytes = std::min(doc->len - doc_pos, static_cast<int64_t>(resp_range.getRemnantSize()));
+    bytes = std::min(bytes, vio.ntodo());
+    if (bytes > 0) {
+      b = new_IOBufferBlock(buf, bytes, doc_pos);
+      b->_buf_end = b->_end;
+      vio.buffer.writer()->append_block(b);
+      vio.ndone += bytes;
+      doc_pos += bytes;
+      resp_range.consume(bytes);
+      Debug("amc", "shipped %" PRId64 " bytes at target offset %" PRIu64, bytes, target_offset);
+      target_offset = resp_range.getOffset();
     }
-    /* Because single fragment objects can migrate to hang off an alt vector
-       they can appear to the VC as multi-fragment when they are not really.
-       The essential difference is the existence of a fragment table.
-    */
-    if (frags) {
-      int target = 0;
-      HTTPInfo::FragOffset next_off = frags[target];
-      int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
-      ink_assert(lfi >= 0); // because it's not a single frag doc.
-
-      /* Note: frag[i].offset is the offset of the first byte past the
-         i'th fragment. So frag[0].offset is the offset of the first
-         byte of fragment 1. In addition the # of fragments is one
-         more than the fragment table length, the start of the last
-         fragment being the last offset in the table.
-      */
-      if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
-        // search from frag 0 on to find the proper frag
-        while (seek_to >= next_off && target < lfi) {
-          next_off = frags[++target];
-        }
-        if (target == lfi && seek_to >= next_off)
-          ++target;
-      } else { // shortcut if we are in the fragment already
-        target = fragment;
-      }
-      if (target != fragment) {
-        // Lread will read the next fragment always, so if that
-        // is the one we want, we don't need to do anything
-        int cfi = fragment;
-        --target;
-        while (target > fragment) {
-          next_CacheKey(&key, &key);
-          ++fragment;
-        }
-        while (target < fragment) {
-          prev_CacheKey(&key, &key);
-          --fragment;
-        }
 
-        if (is_debug_tag_set("cache_seek")) {
-          char target_key_str[33];
-          key.toHexStr(target_key_str);
-          Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
-        }
-        goto Lread;
+    if (vio.ntodo() <= 0)
+      return calluser(VC_EVENT_READ_COMPLETE);
+    else if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
+      return EVENT_DONE;
+  }
+
+
+#ifdef HTTP_CACHE
+  if (resp_range.getRemnantSize()) {
+    FragmentDescriptorTable *frags = alternate.get_frag_table();
+    int n_frags = alternate.get_frag_count();
+
+    // Quick check for offset in next fragment - very common
+    if (target_offset >= frag_upper_bound && (!frags || fragment >= n_frags || target_offset <= (*frags)[fragment].m_offset)) {
+      Debug("amc", "Non-seeking continuation to next fragment");
+    } else {
+      int target = -1; // target fragment index.
+
+      if (is_debug_tag_set("amc")) {
+        char b[33], c[33];
+        Debug("amc", "Seek @ %" PRIu64 " [r#=%d] in %s from #%d @ %" PRIu64 "/%d/%" PRId64 ":%s%s", target_offset,
+              resp_range.getIdx(), first_key.toHexStr(b), fragment, frag_upper_bound, doc->len, doc->total_len,
+              doc->key.toHexStr(c), (frags ? "" : "no frag table"));
       }
+
+      target = this->frag_idx_for_offset(target_offset);
+      this->update_key_to_frag_idx(target);
+      /// one frag short, because it gets bumped when the fragment is actually read.
+      frag_upper_bound = target > 0 ? (*frags)[target].m_offset : 0;
+      Debug("amc", "Fragment seek from %d to %d target offset %" PRIu64, fragment - 1, target, target_offset);
     }
-    doc_pos = doc->prefix_len() + seek_to;
-    if (fragment)
-      doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
-    vio.ndone = 0;
-    seek_to = 0;
-    ntodo = vio.ntodo();
-    bytes = doc->len - doc_pos;
-    if (is_debug_tag_set("cache_seek")) {
-      char target_key_str[33];
-      key.toHexStr(target_key_str);
-      Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
-    }
-#endif
   }
-  if (ntodo <= 0)
-    return EVENT_CONT;
-  if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
-    return EVENT_CONT;
-  if ((bytes <= 0) && vio.ntodo() >= 0)
-    goto Lread;
-  if (bytes > vio.ntodo())
-    bytes = vio.ntodo();
-  b = new_IOBufferBlock(buf, bytes, doc_pos);
-  b->_buf_end = b->_end;
-  vio.buffer.writer()->append_block(b);
-  vio.ndone += bytes;
-  doc_pos += bytes;
-  if (vio.ntodo() <= 0)
-    return calluser(VC_EVENT_READ_COMPLETE);
-  else {
-    if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
-      return EVENT_DONE;
-    // we have to keep reading until we give the user all the
-    // bytes it wanted or we hit the watermark.
-    if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
-      goto Lread;
-    return EVENT_CONT;
-  }
-Lread : {
-  if (vio.ndone >= (int64_t)doc_len)
+#endif
+
+  if (vio.ntodo() > 0 && 0 == resp_range.getRemnantSize())
     // reached the end of the document and the user still wants more
     return calluser(VC_EVENT_EOS);
   last_collision = 0;
@@ -733,25 +625,20 @@ Lread : {
   if (dir_probe(&key, vol, &dir, &last_collision)) {
     SET_HANDLER(&CacheVC::openReadReadDone);
     int ret = do_read_call(&key);
-    if (ret == EVENT_RETURN)
-      goto Lcallreturn;
+    if (ret == EVENT_RETURN) {
+      lock.release();
+      return handleEvent(AIO_EVENT_DONE, 0);
+    }
     return EVENT_CONT;
-  } else if (write_vc) {
-    if (writer_done()) {
-      last_collision = NULL;
-      while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
-        if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
-          DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
-          doc_len = vio.ndone;
-          goto Leos;
-        }
-      }
+  } else {
+    if (!od->wait_for(earliest_key, this, target_offset)) {
       DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
-      goto Lerror;
+      lock.release();
+      return calluser(VC_EVENT_ERROR);
     }
-    DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
+    DDebug("cache_read_agg", "%p: key: %X ReadMain waiting: %d", this, first_key.slice32(1), (int)vio.ndone);
     SET_HANDLER(&CacheVC::openReadMain);
-    VC_SCHED_WRITER_RETRY();
+    return EVENT_CONT;
   }
   if (is_action_tag_set("cache"))
     ink_release_assert(false);
@@ -759,15 +646,45 @@ Lread : {
           key.slice32(1));
   // remove the directory entry
   dir_delete(&earliest_key, vol, &earliest_dir);
-}
-Lerror:
+  lock.release();
+  // Lerror:
   return calluser(VC_EVENT_ERROR);
-Leos:
+  // Leos:
   return calluser(VC_EVENT_EOS);
-Lcallreturn:
-  return handleEvent(AIO_EVENT_DONE, 0);
 }
 
+int
+CacheVC::openReadWaitEarliest(int evid, Event *)
+{
+  int zret = EVENT_CONT;
+  cancel_trigger();
+
+  CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+  if (!lock.is_locked())
+    VC_SCHED_LOCK_RETRY();
+  Debug("amc", "[CacheVC::openReadWaitEarliest] [%d]", evid);
+  if (NULL == vol->open_read(&first_key)) {
+    // Writer is gone, so no more data for which to wait.
+    // Best option is to just start over from the first frag.
+    // Most likely scenario - object turned out to be a resident alternate so
+    // there's no explicit earliest frag.
+    lock.release();
+    SET_HANDLER(&self::openReadStartHead);
+    //    od = NULL;
+    key = first_key;
+    return handleEvent(EVENT_IMMEDIATE, 0);
+  } else if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, NULL)) {
+    dir = earliest_dir;
+    SET_HANDLER(&self::openReadStartEarliest);
+    if ((zret = do_read_call(&key)) == EVENT_RETURN) {
+      lock.release();
+      return handleEvent(AIO_EVENT_DONE, 0);
+    }
+  }
+  return zret;
+}
+
+
 /*
   This code follows CacheVC::openReadStartHead closely,
   if you change this you might have to change that.
@@ -822,6 +739,8 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
     earliest_key = key;
     doc_pos = doc->prefix_len();
     next_CacheKey(&key, &doc->key);
+    fragment = 1;
+    frag_upper_bound = doc->data_len();
     vol->begin_read(this);
     if (vol->within_hit_evacuate_window(&earliest_dir) &&
         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)) {
@@ -840,9 +759,22 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
 // read has detected that alternate does not exist in the cache.
 // rewrite the vector.
 #ifdef HTTP_CACHE
-    if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
+    // It's OK if there's a writer for this alternate, we can wait on it.
+    if (od && od->has_writer(earliest_key)) {
+      wake_up_thread = mutex->thread_holding;
+      od->wait_for(earliest_key, this, 0);
+      lock.release();
+      // The SM must be signaled that the cache read is open even if we haven't got the earliest frag
+      // yet because otherwise it won't set up the read side of the tunnel before the write side finishes
+      // and terminates the SM (in the case of a resident alternate). But the VC can't be left with this
+      // handler or it will confuse itself when it wakes up from the earliest frag read. So we put it
+      // in a special wait state / handler and then signal the SM.
+      SET_HANDLER(&self::openReadWaitEarliest);
+      return callcont(CACHE_EVENT_OPEN_READ); // must signal read is open
+    } else if (frag_type == CACHE_FRAG_TYPE_HTTP) {
       // don't want any writers while we are evacuating the vector
-      if (!vol->open_write(this, false, 1)) {
+      ink_release_assert(!"[amc] Not handling multiple writers with vector evacuate");
+      if (!vol->open_write(this)) {
         Doc *doc1 = (Doc *)first_buf->data();
         uint32_t len = this->load_http_info(write_vector, doc1);
         ink_assert(len == doc1->hlen && write_vector->count() > 0);
@@ -939,6 +871,8 @@ CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */
       if (od->move_resident_alt)
         dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
       int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
+      Debug("amc", "[openReadVecWrite] select alt: %d %p (current %p)", alt_ndx, write_vector->get(alt_ndx)->m_alt,
+            alternate.m_alt);
       vol->close_write(this);
       if (alt_ndx >= 0) {
         vector.clear();
@@ -963,6 +897,10 @@ Lrestart:
 /*
   This code follows CacheVC::openReadStartEarliest closely,
   if you change this you might have to change that.
+
+  This handles the I/O completion of reading the first doc of the object.
+  If there are alternates, we chain to openreadStartEarliest to read the
+  earliest doc.
 */
 int
 CacheVC::openReadStartHead(int event, Event *e)
@@ -1044,13 +982,18 @@ CacheVC::openReadStartHead(int event, Event *e)
         err = ECACHE_BAD_META_DATA;
         goto Ldone;
       }
-      if (cache_config_select_alternate) {
+      // If @a params is @c NULL then we're a retry from a range request pair so don't do alt select.
+      // Instead try the @a earliest_key - if that's a match then that's the correct alt, written
+      // by the paired write VC.
+      if (cache_config_select_alternate && params) {
         alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
         if (alternate_index < 0) {
           err = ECACHE_ALT_MISS;
           goto Ldone;
         }
-      } else
+        Debug("amc", "[openReadStartHead] select alt: %d %p (current %p, od %p)", alternate_index,
+              vector.get(alternate_index)->m_alt, alternate.m_alt, od);
+      } else if (CACHE_ALT_INDEX_DEFAULT == (alternate_index = get_alternate_index(&vector, earliest_key)))
         alternate_index = 0;
       alternate_tmp = vector.get(alternate_index);
       if (!alternate_tmp->valid()) {
@@ -1064,12 +1007,24 @@ CacheVC::openReadStartHead(int event, Event *e)
       alternate.copy_shallow(alternate_tmp);
       alternate.object_key_get(&key);
       doc_len = alternate.object_size_get();
+
+      // If the object length is known we can check the range.
+      // Otherwise we have to leave it vague and talk to the origin to get full length info.
+      if (alternate.m_alt->m_flag.content_length_p && !resp_range.apply(doc_len)) {
+        err = ECACHE_UNSATISFIABLE_RANGE;
+        goto Ldone;
+      }
+      if (resp_range.isMulti())
+        resp_range.setContentTypeFromResponse(alternate.response_get()).generateBoundaryStr(earliest_key);
+
       if (key == doc->key) { // is this my data?
         f.single_fragment = doc->single_fragment();
         ink_assert(f.single_fragment); // otherwise need to read earliest
         ink_assert(doc->hlen);
         doc_pos = doc->prefix_len();
         next_CacheKey(&key, &doc->key);
+        fragment = 1;
+        frag_upper_bound = doc->data_len();
       } else {
         f.single_fragment = false;
       }
@@ -1077,6 +1032,8 @@ CacheVC::openReadStartHead(int event, Event *e)
 #endif
     {
       next_CacheKey(&key, &doc->key);
+      fragment = 1;
+      frag_upper_bound = doc->data_len();
       f.single_fragment = doc->single_fragment();
       doc_pos = doc->prefix_len();
       doc_len = doc->total_len;
@@ -1087,7 +1044,7 @@ CacheVC::openReadStartHead(int event, Event *e)
       Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
             doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
 #ifdef HTTP_CACHE
-            alternate.get_frag_offset_count()
+            alternate.get_frag_count()
 #else
             0
 #endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/CacheTest.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheTest.cc b/iocore/cache/CacheTest.cc
index fb55123..1921060 100644
--- a/iocore/cache/CacheTest.cc
+++ b/iocore/cache/CacheTest.cc
@@ -387,8 +387,8 @@ EXCLUSIVE_REGRESSION_TEST(cache)(RegressionTest *t, int /* atype ATS_UNUSED */,
 
   r_sequential(t, write_test.clone(), lookup_test.clone(), r_sequential(t, 10, read_test.clone()), remove_test.clone(),
                lookup_fail_test.clone(), read_fail_test.clone(), remove_fail_test.clone(), replace_write_test.clone(),
-               replace_test.clone(), replace_read_test.clone(), large_write_test.clone(), pread_test.clone(), NULL_PTR)
-    ->run(pstatus);
+               replace_test.clone(), replace_read_test.clone(), large_write_test.clone(), pread_test.clone(),
+               NULL_PTR)->run(pstatus);
   return;
 }
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/CacheVol.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index f9047f3..d5d85bc 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -413,8 +413,9 @@ CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
     }
 
     Debug("cache_scan", "trying for writer lock");
-    if (vol->open_write(this, false, 1)) {
-      writer_lock_retry++;
+    if (vol->open_write(this)) {
+      // [amc] This tried to restrict to one writer, must fix at some point.
+      ++writer_lock_retry;
       SET_HANDLER(&CacheVC::scanOpenWrite);
       mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
       return EVENT_CONT;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/CacheWrite.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 3740d21..1fbb16a 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -33,13 +33,19 @@
 // used to get the alternate which is actually present in the document
 #ifdef HTTP_CACHE
 int
-get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
+get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key, int idx)
 {
   int alt_count = cache_vector->count();
   CacheHTTPInfo *obj;
   if (!alt_count)
     return -1;
+  // See if the hint is correct.
+  if (0 <= idx && idx < alt_count && cache_vector->get(idx)->compare_object_key(&key))
+    return idx;
+  // Otherwise scan the vector.
   for (int i = 0; i < alt_count; i++) {
+    if (i == idx)
+      continue; // already checked that one.
     obj = cache_vector->get(i);
     if (obj->compare_object_key(&key)) {
       // Debug("cache_key", "Resident alternate key  %X", key.slice32(0));
@@ -63,20 +69,23 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
     VC_SCHED_LOCK_RETRY();
   int ret = 0;
   {
-    CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+    CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
     if (!lock.is_locked() || od->writing_vec)
       VC_SCHED_LOCK_RETRY();
 
     int vec = alternate.valid();
     if (f.update) {
       // all Update cases. Need to get the alternate index.
-      alternate_index = get_alternate_index(write_vector, update_key);
+      alternate_index = get_alternate_index(write_vector, update_key, alternate_index);
       Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
-            alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
+            alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_count() : -1);
       // if its an alternate delete
       if (!vec) {
         ink_assert(!total_len);
         if (alternate_index >= 0) {
+          MUTEX_TRY_LOCK(stripe_lock, vol->mutex, mutex->thread_holding);
+          if (!stripe_lock.is_locked())
+            VC_SCHED_LOCK_RETRY();
           write_vector->remove(alternate_index, true);
           alternate_index = CACHE_ALT_REMOVED;
           if (!write_vector->count())
@@ -98,16 +107,10 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
       write_vector->remove(0, true);
     }
     if (vec) {
-      /* preserve fragment offset data from old info. This method is
-         called iff the update is a header only update so the fragment
-         data should remain valid.
-      */
-      if (alternate_index >= 0)
-        alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
       alternate_index = write_vector->insert(&alternate, alternate_index);
     }
 
-    if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
+    if (od->move_resident_alt && first_buf._ptr() /* && !od->has_multiple_writers() */) {
       Doc *doc = (Doc *)first_buf->data();
       int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
       int have_res_alt = doc->key == od->single_doc_key;
@@ -134,7 +137,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
     od->writing_vec = 1;
     f.use_first_key = 1;
     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
-    ret = do_write_call();
+    ret = do_write_lock_call();
   }
   if (ret == EVENT_RETURN)
     return handleEvent(AIO_EVENT_DONE, 0);
@@ -161,7 +164,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
      (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
    - alternate_index. Used only if write_vector needs to be written to disk.
      Used to find out the VC's alternate in the write_vector and set its
-     length to tatal_len.
+     length to total_len.
    - write_len. The number of bytes for this fragment.
    - total_len. The total number of bytes for the document so far.
      Doc->total_len and alternate's total len is set to this value.
@@ -316,8 +319,8 @@ Vol::aggWriteDone(int event, Event *e)
     header->last_write_pos = header->write_pos;
     header->write_pos += io.aiocb.aio_nbytes;
     ink_assert(header->write_pos >= start);
-    DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n", hash_text.get(), header->write_pos,
-           header->last_write_pos);
+    Debug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n", hash_text.get(), header->write_pos,
+          header->last_write_pos);
     ink_assert(header->write_pos == header->agg_pos);
     if (header->write_pos + EVACUATION_SIZE > scan_pos)
       periodic_scan();
@@ -722,7 +725,7 @@ agg_copy(char *p, CacheVC *vc)
     IOBufferBlock *res_alt_blk = 0;
 
     uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
-    ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
+    ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc || 0 == vc->fragment);
     ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
     // update copy of directory entry for this document
     dir_set_approx_size(&vc->dir, vc->agg_len);
@@ -781,16 +784,23 @@ agg_copy(char *p, CacheVC *vc)
 #ifdef HTTP_CACHE
       if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
         ink_assert(vc->write_vector->count() > 0);
-        if (!vc->f.update && !vc->f.evac_vector) {
-          ink_assert(!(vc->first_key == zero_key));
-          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-          http_info->object_size_set(vc->total_len);
-        }
-        // update + data_written =>  Update case (b)
-        // need to change the old alternate's object length
-        if (vc->f.update && vc->total_len) {
-          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-          http_info->object_size_set(vc->total_len);
+        if (vc->resp_range.hasRanges()) {
+          int64_t size = vc->alternate.object_size_get();
+          if (size >= 0)
+            doc->total_len = size;
+        } else {
+          // As the header is finalized the fragment vector should be trimmed if the object is complete.
+          if (!vc->f.update && !vc->f.evac_vector) {
+            ink_assert(!(vc->first_key == zero_key));
+            CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+            http_info->object_size_set(vc->total_len);
+          }
+          // update + data_written =>  Update case (b)
+          // need to change the old alternate's object length
+          if (vc->f.update && vc->total_len) {
+            CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+            http_info->object_size_set(vc->total_len);
+          }
         }
         ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
         ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
@@ -1058,6 +1068,35 @@ Lwait:
 }
 
 int
+CacheVC::openWriteEmptyEarliestDone(int event, Event *e)
+{
+  cancel_trigger();
+  if (event == AIO_EVENT_DONE)
+    set_io_not_in_progress();
+  else if (is_io_in_progress())
+    return EVENT_CONT;
+
+  {
+    SCOPED_MUTEX_LOCK(lock, od->mutex, this_ethread());
+    alternate_index = get_alternate_index(write_vector, this->earliest_key);
+    od->write_complete(key, this, io.ok()); // in any case, the IO is over.
+    key = od->key_for(earliest_key, write_pos);
+  }
+
+  SET_HANDLER(&CacheVC::openWriteMain);
+
+  // on error terminate if we're already closed, otherwise notify external continuation.
+  if (!io.ok()) {
+    if (closed) {
+      closed = -1;
+      return die();
+    }
+    return calluser(VC_EVENT_ERROR);
+  }
+  return this->openWriteMain(event, e); // go back to writing our actual data.
+}
+
+int
 CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
 {
   cancel_trigger();
@@ -1068,6 +1107,7 @@ CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED *
       ink_assert(!is_io_in_progress());
       VC_SCHED_LOCK_RETRY();
     }
+    od->close_writer(earliest_key, this);
     vol->close_write(this);
     if (closed < 0 && fragment)
       dir_delete(&earliest_key, vol, &earliest_dir);
@@ -1169,13 +1209,17 @@ CacheVC::openWriteCloseHead(int event, Event *e)
   cancel_trigger();
   f.use_first_key = 1;
   if (io.ok())
-    ink_assert(fragment || (length == (int64_t)total_len));
+    ink_assert(fragment || (length == (int64_t)total_len) ||
+               (resp_range.hasRanges() && alternate.object_size_get() > alternate.get_frag_fixed_size()));
   else
     return openWriteCloseDir(event, e);
-  if (f.data_done)
+  if (f.data_done) {
     write_len = 0;
-  else
+  } else {
     write_len = length;
+    // If we're writing data in the first / header doc, then it's a resident alt.
+    alternate.m_alt->m_flag.complete_p = true;
+  }
 #ifdef HTTP_CACHE
   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
     SET_HANDLER(&CacheVC::updateVector);
@@ -1206,35 +1250,32 @@ CacheVC::openWriteCloseDataDone(int event, Event *e)
     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
     if (!lock.is_locked())
       VC_LOCK_RETRY_EVENT();
-    if (!fragment) {
-      ink_assert(key == earliest_key);
-      earliest_dir = dir;
-#ifdef HTTP_CACHE
-    } else {
-      // Store the offset only if there is a table.
-      // Currently there is no alt (and thence no table) for non-HTTP.
-      if (alternate.valid())
-        alternate.push_frag_offset(write_pos);
-#endif
-    }
-    fragment++;
-    write_pos += write_len;
     dir_insert(&key, vol, &dir);
-    blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
-    next_CacheKey(&key, &key);
-    if (length) {
-      write_len = length;
-      if (write_len > MAX_FRAG_SIZE)
-        write_len = MAX_FRAG_SIZE;
-      if ((ret = do_write_call()) == EVENT_RETURN)
-        goto Lcallreturn;
-      return ret;
-    }
-    f.data_done = 1;
-    return openWriteCloseHead(event, e); // must be called under vol lock from here
   }
-Lcallreturn:
-  return handleEvent(AIO_EVENT_DONE, 0);
+
+  if (key == earliest_key)
+    earliest_dir = dir;
+
+  {
+    SCOPED_MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+    write_vector->write_complete(earliest_key, this, true);
+  }
+
+  write_pos += write_len;
+  blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+  next_CacheKey(&key, &key);
+  if (length) {
+    write_len = length;
+    if (write_len > MAX_FRAG_SIZE)
+      write_len = MAX_FRAG_SIZE;
+    if ((ret = do_write_call()) == EVENT_RETURN)
+      return handleEvent(AIO_EVENT_DONE, 0);
+    return ret;
+  }
+
+  f.data_done = 1;
+  return openWriteCloseHead(event, e); // must be called under vol lock from here
+                                       // [amc] don't see why, guess we'll find out.
 }
 
 int
@@ -1267,8 +1308,9 @@ CacheVC::openWriteClose(int event, Event *e)
       return openWriteCloseDir(event, e);
 #endif
     }
-    if (length && (fragment || length > MAX_FRAG_SIZE)) {
+    if (length && (fragment || length > MAX_FRAG_SIZE || alternate.object_size_get() > alternate.get_frag_fixed_size())) {
       SET_HANDLER(&CacheVC::openWriteCloseDataDone);
+      this->updateWriteStateFromRange();
       write_len = length;
       if (write_len > MAX_FRAG_SIZE)
         write_len = MAX_FRAG_SIZE;
@@ -1296,48 +1338,114 @@ CacheVC::openWriteWriteDone(int event, Event *e)
     SET_HANDLER(&CacheVC::openWriteMain);
     return calluser(VC_EVENT_ERROR);
   }
+
   {
     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
     if (!lock.is_locked())
       VC_LOCK_RETRY_EVENT();
-    // store the earliest directory. Need to remove the earliest dir
-    // in case the writer aborts.
-    if (!fragment) {
-      ink_assert(key == earliest_key);
-      earliest_dir = dir;
-#ifdef HTTP_CACHE
-    } else {
-      // Store the offset only if there is a table.
-      // Currently there is no alt (and thence no table) for non-HTTP.
-      if (alternate.valid())
-        alternate.push_frag_offset(write_pos);
-#endif
-    }
-    ++fragment;
-    write_pos += write_len;
     dir_insert(&key, vol, &dir);
-    DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
-    blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
-    next_CacheKey(&key, &key);
   }
+
+  if (key == earliest_key)
+    earliest_dir = dir;
+
+  {
+    SCOPED_MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+    write_vector->write_complete(earliest_key, this, true);
+  }
+
+  DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
+
+  resp_range.consume(write_len);
+  blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+
   if (closed)
     return die();
   SET_HANDLER(&CacheVC::openWriteMain);
   return openWriteMain(event, e);
 }
 
-static inline int
-target_fragment_size()
+int64_t
+CacheProcessor::get_fixed_fragment_size() const
 {
   return cache_config_target_fragment_size - sizeofDoc;
 }
 
+
+Action *
+CacheVC::do_write_init()
+{
+  Debug("amc", "[do_write_init] vc=%p", this);
+  SET_CONTINUATION_HANDLER(this, &CacheVC::openWriteInit);
+  return EVENT_DONE == this->openWriteInit(EVENT_IMMEDIATE, 0) ? ACTION_RESULT_DONE : &_action;
+}
+
+/* Do some initial setup and then switch over to openWriteMain
+ */
+int
+CacheVC::openWriteInit(int eid, Event *event)
+{
+  Debug("amc", "[openWriteInit] vc=%p", this);
+  {
+    CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
+    if (!lock.is_locked()) {
+      trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), eid);
+      return EVENT_CONT;
+    }
+
+    if (alternate.valid() && earliest_key != alternate.object_key_get()) {
+      // When the VC is created it sets up for a new alternate write. If we're back filling we
+      // need to tweak that back to the existing alternate.
+      Debug("amc", "[CacheVC::openWriteInit] updating earliest key from alternate");
+      alternate.object_key_get(&earliest_key);
+    }
+    // Get synchronized with the OD vector.
+    if (-1 == (alternate_index = get_alternate_index(write_vector, earliest_key))) {
+      Debug("amc", "[openWriteInit] alt not found, inserted");
+      alternate_index = write_vector->insert(&alternate); // not there, add it
+    } else {
+      HTTPInfo *base = write_vector->get(alternate_index);
+      if (!base->is_writeable()) {
+        // The alternate instance is mapped directly on a read buffer, which we can't modify.
+        // It must be replaced with a live, mutable one.
+        Debug("amc", "Updating OD vector element %d : 0x%p with mutable version %p", alternate_index, base, alternate.m_alt);
+        alternate.copy(base);           // make a local copy
+        base->copy_shallow(&alternate); // paste the mutable copy back.
+      }
+    }
+    // mark us as an writer.
+    write_vector->data[alternate_index]._writers.push(this);
+    alternate.copy_shallow(write_vector->get(alternate_index));
+
+    if (this == od->open_writer) {
+      od->open_writer = NULL;
+      CacheVC *reader;
+      while (NULL != (reader = od->open_waiting.pop())) {
+        Debug("amc", "[CacheVC::openWriteInit] wake up %p", reader);
+        reader->wake_up_thread->schedule_imm(reader);
+      }
+    }
+  }
+
+  if (resp_range.hasRanges()) {
+    resp_range.start();
+    //    this->updateWriteStateFromRange();
+  }
+
+  //  key = alternate.get_frag_key_of(write_pos);
+  SET_HANDLER(&CacheVC::openWriteMain);
+  return openWriteMain(eid, event);
+  //  return callcont(CACHE_EVENT_OPEN_WRITE);
+  //  return EVENT_DONE;
+}
+
 int
 CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
 {
   cancel_trigger();
   int called_user = 0;
   ink_assert(!is_io_in_progress());
+  Debug("amc", "[CacheVC::openWriteMain]");
 Lagain:
   if (!vio.buffer.writer()) {
     if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
@@ -1353,10 +1461,16 @@ Lagain:
     if (vio.ntodo() <= 0)
       return EVENT_CONT;
   }
+
   int64_t ntodo = (int64_t)(vio.ntodo() + length);
   int64_t total_avail = vio.buffer.reader()->read_avail();
   int64_t avail = total_avail;
   int64_t towrite = avail + length;
+  int64_t ffs = cacheProcessor.get_fixed_fragment_size();
+
+  Debug("amc", "[CacheVC::openWriteMain] ntodo=%" PRId64 " avail=%" PRId64 " towrite=%" PRId64 " frag=%d", ntodo, avail, towrite,
+        fragment);
+
   if (towrite > ntodo) {
     avail -= (towrite - ntodo);
     towrite = ntodo;
@@ -1369,17 +1483,19 @@ Lagain:
     blocks = vio.buffer.reader()->block;
     offset = vio.buffer.reader()->start_offset;
   }
+
   if (avail > 0) {
     vio.buffer.reader()->consume(avail);
     vio.ndone += avail;
     total_len += avail;
   }
   length = (uint64_t)towrite;
-  if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4))
-    write_len = target_fragment_size();
+  // [amc] Need to change this to be exactly the fixed fragment size for this alternate.
+  if (length > ffs && (length < ffs + ffs / 4))
+    write_len = ffs;
   else
     write_len = length;
-  bool not_writing = towrite != ntodo && towrite < target_fragment_size();
+  bool not_writing = towrite != ntodo && towrite < ffs;
   if (!called_user) {
     if (not_writing) {
       called_user = 1;
@@ -1391,12 +1507,61 @@ Lagain:
   }
   if (not_writing)
     return EVENT_CONT;
+
+  this->updateWriteStateFromRange();
+
+  {
+    CacheHTTPInfo *alt = &alternate;
+    SCOPED_MUTEX_LOCK(lock, od->mutex, this_ethread());
+
+#if 0
+    alternate_index = get_alternate_index(write_vector, earliest_key);
+    if (alternate_index < 0)
+      alternate_index = write_vector->insert(&alternate, alternate_index);
+
+    alt = write_vector->get(alternate_index);
+#endif
+
+    if (fragment != 0 && !alt->m_alt->m_earliest.m_flag.cached_p) {
+      SET_HANDLER(&CacheVC::openWriteEmptyEarliestDone);
+      if (!od->is_write_active(earliest_key, 0)) {
+        write_len = 0;
+        key = earliest_key;
+        Debug("amc", "[CacheVC::openWriteMain] writing empty earliest");
+      } else {
+        // go on the wait list
+        od->wait_for(earliest_key, this, 0);
+        not_writing = true;
+      }
+    } else if (od->is_write_active(earliest_key, write_pos)) {
+      od->wait_for(earliest_key, this, write_pos);
+      not_writing = true;
+    } else if (alternate.is_frag_cached(fragment)) {
+      not_writing = true;
+      Debug("amc", "Fragment %d already cached", fragment);
+      // Consume the data, as we won't be using it.
+      resp_range.consume(write_len);
+      blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+      // need to kick start things again or we'll stall.
+      return this->handleEvent(EVENT_IMMEDIATE);
+    } else {
+      od->write_active(earliest_key, this, write_pos);
+    }
+  }
+
+  if (0 == write_len) // need to set up the write not under OpenDir lock.
+    return do_write_lock_call();
+
   if (towrite == ntodo && f.close_complete) {
     closed = 1;
     SET_HANDLER(&CacheVC::openWriteClose);
     return openWriteClose(EVENT_NONE, NULL);
+  } else if (not_writing) {
+    return EVENT_CONT;
   }
+
   SET_HANDLER(&CacheVC::openWriteWriteDone);
+  Debug("amc", "[CacheVC::openWriteMain] doing write call");
   return do_write_lock_call();
 }
 
@@ -1434,7 +1599,7 @@ Lcollision : {
   }
 }
 Ldone:
-  SET_HANDLER(&CacheVC::openWriteMain);
+  SET_HANDLER(&CacheVC::openWriteInit);
   return callcont(CACHE_EVENT_OPEN_WRITE);
 Lcallreturn:
   return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
@@ -1458,7 +1623,7 @@ CacheVC::openWriteStartDone(int event, Event *e)
     if (!lock.is_locked())
       VC_LOCK_RETRY_EVENT();
 
-    if (_action.cancelled && (!od || !od->has_multiple_writers()))
+    if (_action.cancelled && (!od /* || !od->has_multiple_writers() */))
       goto Lcancel;
 
     if (event == AIO_EVENT_DONE) { // vector read done
@@ -1501,15 +1666,17 @@ CacheVC::openWriteStartDone(int event, Event *e)
     }
 
   Lcollision:
-    int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
+    //    int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
     if (!od) {
-      if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+      if ((err = vol->open_write(this)) > 0)
         goto Lfailure;
-      if (od->has_multiple_writers()) {
-        MUTEX_RELEASE(lock);
-        SET_HANDLER(&CacheVC::openWriteMain);
-        return callcont(CACHE_EVENT_OPEN_WRITE);
-      }
+      /*
+            if (od->has_multiple_writers()) {
+              MUTEX_RELEASE(lock);
+              SET_HANDLER(&CacheVC::openWriteInit);
+              return this->openWriteInit(EVENT_IMMEDIATE, 0);
+            }
+      */
     }
     // check for collision
     if (dir_probe(&first_key, vol, &dir, &last_collision)) {
@@ -1528,8 +1695,9 @@ Lsuccess:
   od->reading_vec = 0;
   if (_action.cancelled)
     goto Lcancel;
-  SET_HANDLER(&CacheVC::openWriteMain);
+  SET_HANDLER(&CacheVC::openWriteInit);
   return callcont(CACHE_EVENT_OPEN_WRITE);
+//  return this->openWriteInit(EVENT_IMMEDIATE, 0);
 
 Lfailure:
   CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
@@ -1553,7 +1721,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
   cancel_trigger();
   if (_action.cancelled)
     return free_CacheVC(this);
-  if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
+  if (((err = vol->open_write_lock(this)) > 0)) {
     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
     free_CacheVC(this);
     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
@@ -1566,7 +1734,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
     return openWriteOverwrite(EVENT_IMMEDIATE, 0);
   } else {
     // write by key
-    SET_HANDLER(&CacheVC::openWriteMain);
+    SET_HANDLER(&CacheVC::openWriteInit);
     return callcont(CACHE_EVENT_OPEN_WRITE);
   }
 }
@@ -1613,7 +1781,7 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
   c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
   c->pin_in_cache = (uint32_t)apin_in_cache;
 
-  if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
+  if ((res = c->vol->open_write_lock(c)) > 0) {
     // document currently being written, abort
     CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
@@ -1626,9 +1794,10 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
     return &c->_action;
   }
   if (!c->f.overwrite) {
-    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
     c->callcont(CACHE_EVENT_OPEN_WRITE);
     return ACTION_RESULT_DONE;
+    //    return c->do_write_init();
   } else {
     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
     if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
@@ -1638,6 +1807,22 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
   }
 }
 
+int
+CacheVC::updateWriteStateFromRange()
+{
+  if (resp_range.hasPendingRangeShift())
+    resp_range.consumeRangeShift();
+  write_pos = resp_range.getOffset();
+  fragment = alternate.get_frag_index_of(write_pos);
+  key = alternate.get_frag_key(fragment);
+  {
+    char tmp[64];
+    Debug("amc", "[writeMain] pos=%" PRId64 " frag=%d/%" PRId64 " key=%s", write_pos, fragment, alternate.get_frag_offset(fragment),
+          key.toHexStr(tmp));
+  }
+  return write_pos;
+}
+
 #ifdef HTTP_CACHE
 // main entry point for writing of http documents
 Action *
@@ -1651,22 +1836,28 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
 
   ink_assert(caches[type] == this);
   intptr_t err = 0;
-  int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
+  //  int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
   CacheVC *c = new_CacheVC(cont);
   ProxyMutex *mutex = cont->mutex;
   c->vio.op = VIO::WRITE;
   c->first_key = *key;
-  /*
-     The transition from single fragment document to a multi-fragment document
-     would cause a problem if the key and the first_key collide. In case of
-     a collision, old vector data could be served to HTTP. Need to avoid that.
-     Also, when evacuating a fragment, we have to decide if its the first_key
-     or the earliest_key based on the dir_tag.
-   */
-  do {
-    rand_CacheKey(&c->key, cont->mutex);
-  } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
-  c->earliest_key = c->key;
+  if (info) {
+    info->object_key_get(&c->key);
+    c->earliest_key = c->key;
+  } else {
+    /*
+      The transition from single fragment document to a multi-fragment document
+      would cause a problem if the key and the first_key collide. In case of
+      a collision, old vector data could be served to HTTP. Need to avoid that.
+      Also, when evacuating a fragment, we have to decide if its the first_key
+      or the earliest_key based on the dir_tag.
+    */
+    do {
+      rand_CacheKey(&c->key, cont->mutex);
+    } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
+    c->earliest_key = c->key;
+  }
+
   c->frag_type = CACHE_FRAG_TYPE_HTTP;
   c->vol = key_to_vol(key, hostname, host_len);
   Vol *vol = c->vol;
@@ -1715,13 +1906,15 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
   {
     CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
     if (lock.is_locked()) {
-      if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+      if ((err = c->vol->open_write(c)) > 0)
         goto Lfailure;
       // If there are multiple writers, then this one cannot be an update.
       // Only the first writer can do an update. If that's the case, we can
       // return success to the state machine now.;
-      if (c->od->has_multiple_writers())
-        goto Lmiss;
+      /*
+            if (c->od->has_multiple_writers())
+              goto Lmiss;
+      */
       if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
         if (c->f.update) {
           // fail update because vector has been GC'd
@@ -1730,9 +1923,12 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
           goto Lfailure;
         }
         // document doesn't exist, begin write
+        ink_assert(NULL == c->od->open_writer);
+        c->od->open_writer = c;
         goto Lmiss;
       } else {
         c->od->reading_vec = 1;
+        c->od->open_writer = c;
         // document exists, read vector
         SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
         switch (c->do_read_call(&c->first_key)) {
@@ -1752,7 +1948,8 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
   }
 
 Lmiss:
-  SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+  //  return c->do_write_init();
+  SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
   c->callcont(CACHE_EVENT_OPEN_WRITE);
   return ACTION_RESULT_DONE;
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/I_Cache.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_Cache.h b/iocore/cache/I_Cache.h
index 77aafe9..e42ddf5 100644
--- a/iocore/cache/I_Cache.h
+++ b/iocore/cache/I_Cache.h
@@ -49,6 +49,7 @@
 #define CACHE_COMPRESSION_LIBZ 2
 #define CACHE_COMPRESSION_LIBLZMA 3
 
+struct CacheVConnection;
 struct CacheVC;
 struct CacheDisk;
 #ifdef HTTP_CACHE
@@ -56,6 +57,7 @@ class CacheLookupHttpConfig;
 class URL;
 class HTTPHdr;
 class HTTPInfo;
+class HTTPRangeSpec;
 
 typedef HTTPHdr CacheHTTPHdr;
 typedef URL CacheURL;
@@ -80,6 +82,17 @@ struct CacheProcessor : public Processor {
                             CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, const char *hostname = 0, int host_len = 0);
   inkcoreapi Action *open_read(Continuation *cont, const CacheKey *key, bool cluster_cache_local,
                                CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, const char *hostname = 0, int host_len = 0);
+
+  /** Open a cache reader from an already open writer.
+
+      This is used for partial content on a cache miss to open a reader corresponding to the
+      partial content writer.
+  */
+  inkcoreapi Action *open_read(Continuation *cont, CacheVConnection *writer, HTTPHdr *client_request_hdr);
+
+  Action *open_read_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key, CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
+                           char *hostname = 0, int host_len = 0);
+
   inkcoreapi Action *open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local,
                                 CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, int expected_size = CACHE_EXPECTED_SIZE,
                                 int options = 0, time_t pin_in_cache = (time_t)0, char *hostname = 0, int host_len = 0);
@@ -124,6 +137,9 @@ struct CacheProcessor : public Processor {
   */
   bool has_online_storage() const;
 
+  /** Get the target fragment size. */
+  int64_t get_fixed_fragment_size() const;
+
   static int IsCacheEnabled();
 
   static bool IsCacheReady(CacheFragType type);
@@ -189,6 +205,62 @@ struct CacheVConnection : public VConnection {
 #ifdef HTTP_CACHE
   virtual void set_http_info(CacheHTTPInfo *info) = 0;
   virtual void get_http_info(CacheHTTPInfo **info) = 0;
+
+  /** Get the boundary string for a multi-part range response.
+      The length of the string is returned in @a len.
+
+      @return A point to the string.
+   */
+  virtual char const *get_http_range_boundary_string(int *len) const = 0;
+
+  /** Get the effective content size.
+
+      This is the amount of actual data based on any range or framing.  Effectively this is the
+      value to be passed to the @c VIO while the content length is used in the HTTP header.
+  */
+  virtual int64_t get_effective_content_size() = 0;
+
+  /** Set the origin reported content size.
+
+      This is the content length reported by the origin server and should be considered a hint, not
+      definitive. The object size, as stored in the cache, is the actual amount of data received and
+      cached.
+
+      @note This is the total content length as reported in the HTTP header, not the partial (range based) response size.
+      Also this is the length of the HTTP content, which may differ from the size of the data stream.
+  */
+  virtual void set_full_content_length(int64_t) = 0;
+
+  /** Set the output ranges for the content.
+   */
+  virtual void set_content_range(HTTPRangeSpec const &range) = 0;
+
+  /// Get the unchanged ranges for the request range @a req.
+  /// If @a req is empty it is treated as a full request (non-partial).
+  /// @return @c true if the @a result is not empty.
+  /// @internal Currently this just returns the single range that is convex hull of the uncached request.
+  /// Someday we may want to do the exact range spec but we use the type for now because it's easier.
+  virtual bool
+  get_uncached(HTTPRangeSpec const &req, HTTPRangeSpec &result, int64_t initial)
+  {
+    (void)req;
+    (void)result;
+    (void)initial;
+    return false;
+  }
+
+  /** Set the range for the input (response content).
+      The incoming bytes will be written to this section of the object.
+      @note This range @b must be absolute.
+      @note The range is inclusive.
+      @return The # of bytes in the range.
+  */
+  virtual int64_t
+  set_inbound_range(int64_t min, int64_t max)
+  {
+    return 1 + (max - min);
+  }
+
 #endif
 
   virtual bool is_ram_cache_hit() const = 0;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/I_CacheDefs.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_CacheDefs.h b/iocore/cache/I_CacheDefs.h
index 941ff0e..02ce264 100644
--- a/iocore/cache/I_CacheDefs.h
+++ b/iocore/cache/I_CacheDefs.h
@@ -21,6 +21,7 @@
   limitations under the License.
  */
 
+#include <vector>
 
 #ifndef _I_CACHE_DEFS_H__
 #define _I_CACHE_DEFS_H__
@@ -32,7 +33,7 @@
 #define CACHE_ALT_INDEX_DEFAULT -1
 #define CACHE_ALT_REMOVED -2
 
-#define CACHE_DB_MAJOR_VERSION 24
+#define CACHE_DB_MAJOR_VERSION 25
 #define CACHE_DB_MINOR_VERSION 0
 
 #define CACHE_DIR_MAJOR_VERSION 18
@@ -144,4 +145,5 @@ struct HttpCacheKey {
    word(2) - tag (lower bits), hosttable hash (upper bits)
    word(3) - ram cache hash, lookaside cache
  */
+
 #endif // __CACHE_DEFS_H__

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/P_CacheBC.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheBC.h b/iocore/cache/P_CacheBC.h
index 2164692..2ffd4e8 100644
--- a/iocore/cache/P_CacheBC.h
+++ b/iocore/cache/P_CacheBC.h
@@ -33,9 +33,9 @@ namespace cache_bc
 */
 
 typedef HTTPHdr HTTPHdr_v21;
+typedef HTTPHdr HTTPHdr_v23;
 typedef HdrHeap HdrHeap_v23;
 typedef CryptoHash CryptoHash_v23;
-typedef HTTPCacheAlt HTTPCacheAlt_v23;
 
 /** Cache backwards compatibility structure - the fragment table.
     This is copied from @c HTTPCacheAlt in @c HTTP.h.
@@ -120,6 +120,34 @@ struct Doc_v23 {
   size_t data_len();
 };
 
+struct HTTPCacheAlt_v23 {
+  uint32_t m_magic;
+  int32_t m_writeable;
+  int32_t m_unmarshal_len;
+
+  int32_t m_id;
+  int32_t m_rid;
+
+  int32_t m_object_key[4];
+  int32_t m_object_size[2];
+
+  HTTPHdr_v23 m_request_hdr;
+  HTTPHdr_v23 m_response_hdr;
+
+  time_t m_request_sent_time;
+  time_t m_response_received_time;
+
+  int m_frag_offset_count;
+  typedef uint64_t FragOffset;
+  FragOffset *m_frag_offsets;
+  static int const N_INTEGRAL_FRAG_OFFSETS = 4;
+  FragOffset m_integral_frag_offsets[N_INTEGRAL_FRAG_OFFSETS];
+
+  RefCountObj *m_ext_buffer;
+};
+
+typedef HTTPCacheAlt_v23 HTTPCacheAlt_v24; // no changes between these versions.
+
 static size_t const sizeofDoc_v23 = sizeof(Doc_v23);
 char *
 Doc_v23::data()

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/528eab64/iocore/cache/P_CacheDir.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h
index c128537..36cf51d 100644
--- a/iocore/cache/P_CacheDir.h
+++ b/iocore/cache/P_CacheDir.h
@@ -213,48 +213,79 @@ struct FreeDir {
 #define dir_prev(_e) (_e)->w[2]
 #define dir_set_prev(_e, _o) (_e)->w[2] = (uint16_t)(_o)
 
-// INKqa11166 - Cache can not store 2 HTTP alternates simultaneously.
-// To allow this, move the vector from the CacheVC to the OpenDirEntry.
-// Each CacheVC now maintains a pointer to this vector. Adding/Deleting
-// alternates from this vector is done under the Vol::lock. The alternate
-// is deleted/inserted into the vector just before writing the vector disk
-// (CacheVC::updateVector).
-LINK_FORWARD_DECLARATION(CacheVC, opendir_link) // forward declaration
 struct OpenDirEntry {
-  DLL<CacheVC, Link_CacheVC_opendir_link> writers; // list of all the current writers
-  DLL<CacheVC, Link_CacheVC_opendir_link> readers; // list of all the current readers - not used
-  CacheHTTPInfoVector vector;                      // Vector for the http document. Each writer
-                                                   // maintains a pointer to this vector and
-                                                   // writes it down to disk.
-  CacheKey single_doc_key;                         // Key for the resident alternate.
-  Dir single_doc_dir;                              // Directory for the resident alternate
-  Dir first_dir;                                   // Dir for the vector. If empty, a new dir is
-                                                   // inserted, otherwise this dir is overwritten
-  uint16_t num_writers;                            // num of current writers
-  uint16_t max_writers;                            // max number of simultaneous writers allowed
-  bool dont_update_directory;                      // if set, the first_dir is not updated.
-  bool move_resident_alt;                          // if set, single_doc_dir is inserted.
-  volatile bool reading_vec;                       // somebody is currently reading the vector
-  volatile bool writing_vec;                       // somebody is currently writing the vector
+  typedef OpenDirEntry self; ///< Self reference type.
+
+  Ptr<ProxyMutex> mutex;
+
+  /// Vector for the http document. Each writer maintains a pointer to this vector and writes it down to disk.
+  CacheHTTPInfoVector vector;
+  CacheKey first_key;         ///< Key for first doc for this object.
+  CacheKey single_doc_key;    // Key for the resident alternate.
+  Dir single_doc_dir;         // Directory for the resident alternate
+  Dir first_dir;              // Dir for the vector. If empty, a new dir is
+                              // inserted, otherwise this dir is overwritten
+  uint16_t num_active;        // num of VCs working with this entry
+  uint16_t max_writers;       // max number of simultaneous writers allowed
+  bool dont_update_directory; // if set, the first_dir is not updated.
+  bool move_resident_alt;     // if set, single_doc_dir is inserted.
+  volatile bool reading_vec;  // somebody is currently reading the vector
+  volatile bool writing_vec;  // somebody is currently writing the vector
+
+  /** Set to a write @c CacheVC that has started but not yet updated the vector.
+
+      If this is set then there is a write @c CacheVC that is active but has not yet been able to
+      update the vector for its alternate. Any new reader should block on open if this is set and
+      enter itself on the @a _waiting list, making this effectively a write lock on the object.
+      This is necessary because we can't reliably do alternate selection in this state. The waiting
+      read @c CacheVC instances are released as soon as the vector is updated, they do not have to
+      wait until the write @c CacheVC has finished its transaction. In practice this means until the
+      server response has been received and processed.
+  */
+  volatile CacheVC *open_writer;
+  /** A list of @c CacheVC instances that are waiting for the @a open_writer.
+   */
+  DLL<CacheVC, Link_CacheVC_Active_Link> open_waiting;
 
   LINK(OpenDirEntry, link);
 
-  int wait(CacheVC *c, int msec);
-
-  bool
-  has_multiple_writers()
-  {
-    return num_writers > 1;
-  }
+  //  int wait(CacheVC *c, int msec);
+
+  /// Get the alternate index for the @a key.
+  int index_of(CacheKey const &key);
+  /// Check if there are any writers for the alternate of @a alt_key.
+  bool has_writer(CacheKey const &alt_key);
+  /// Mark a @c CacheVC as actively writing at @a offset on the alternate with @a alt_key.
+  self &write_active(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+  /// Mark an active write by @a vc as complete and indicate whether it had @a success.
+  /// If the write is not @a success then the fragment is not marked as cached.
+  self &write_complete(CacheKey const &alt_key, CacheVC *vc, bool success = true);
+  /// Indicate if a VC is currently writing to the fragment with this @a offset.
+  bool is_write_active(CacheKey const &alt_key, int64_t offset);
+  /// Get the fragment key for a specific @a offset.
+  CacheKey const &key_for(CacheKey const &alt_key, int64_t offset);
+  /** Wait for a fragment to be written.
+
+      @return @c false if there is no writer that is scheduled to write that fragment.
+   */
+  bool wait_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+  /// Close out anything related to this writer
+  self &close_writer(CacheKey const &alt_key, CacheVC *vc);
 };
 
 struct OpenDir : public Continuation {
-  Queue<CacheVC, Link_CacheVC_opendir_link> delayed_readers;
+  typedef Queue<CacheVC, Link_CacheVC_OpenDir_Link> CacheVCQ;
+  CacheVCQ delayed_readers;
+
   DLL<OpenDirEntry> bucket[OPEN_DIR_BUCKETS];
 
-  int open_write(CacheVC *c, int allow_if_writers, int max_writers);
-  int close_write(CacheVC *c);
-  OpenDirEntry *open_read(const CryptoHash *key);
+  /** Open a live directory entry for @a vc.
+
+      @a force_p is set to @c true to force the entry if it's not already there.
+  */
+  OpenDirEntry *open_entry(Vol *vol, CryptoHash const &key, bool force_p = false);
+  void close_entry(CacheVC *c);
+  //  OpenDirEntry *open_read(CryptoHash *key);
   int signal_readers(int event, Event *e);
 
   OpenDir();


Mime
View raw message