trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject [trafficserver] 01/06: Add QUICPollCont
Date Wed, 07 Feb 2018 06:19:47 GMT
This is an automated email from the ASF dual-hosted git repository.

scw00 pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit d9a5b22299a8d9dbcc1d7231ae806ab692669166
Author: Oknet Xu <xuchao@skyguard.com.cn>
AuthorDate: Sun Jan 28 17:55:16 2018 +0800

    Add QUICPollCont
---
 iocore/net/I_UDPPacket.h          |   6 ++
 iocore/net/P_QUICNet.h            |  60 ++++++++++++++++++
 iocore/net/P_QUICNetProcessor.h   |   2 +
 iocore/net/P_QUICNetVConnection.h |   2 +
 iocore/net/P_UnixNet.h            |  24 +++++++-
 iocore/net/QUICNet.cc             | 125 ++++++++++++++++++++++++++++++++++++++
 iocore/net/QUICNetVConnection.cc  |  50 ++++++++++++++-
 iocore/net/QUICPacketHandler.cc   |  23 +++----
 8 files changed, 274 insertions(+), 18 deletions(-)

diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index df899ff..271a250 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -62,6 +62,12 @@ public:
   IpEndpoint to;   // what address to send to
 
   int from_size;
+  typedef union udppacket_data {
+    void    *ptr;
+    uint32_t u32;
+    uint64_t u64;
+  } udppacket_data_t;
+  udppacket_data_t data;
 
   LINK(UDPPacket, link);
 };
diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h
new file mode 100644
index 0000000..fba0e96
--- /dev/null
+++ b/iocore/net/P_QUICNet.h
@@ -0,0 +1,60 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef __P_QUICNET_H__
+#define __P_QUICNET_H__
+
+#include <bitset>
+
+#include "ts/ink_platform.h"
+
+#include "P_Net.h"
+class NetHandler;
+typedef int (NetHandler::*NetContHandler)(int, void *);
+
+struct QUICPollCont : public Continuation {
+  NetHandler *net_handler;
+  PollDescriptor *pollDescriptor;
+
+  QUICPollCont(Ptr<ProxyMutex> &m);
+  QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh);
+  ~QUICPollCont();
+  int pollEvent(int, Event *);
+
+public:
+  // Atomic Queue to save incoming packets
+  ASLL(UDPPacketInternal, alink) inQueue;
+
+  // Internal Queue to save Long Header Packet
+  Que(UDPPacket, link) longInQueue;
+  // Internal Queue to save Short Header Packet
+  Que(UDPPacket, link) shortInQueue;
+};
+
+static inline QUICPollCont *
+get_QUICPollCont(EThread *t)
+{
+  return (QUICPollCont *)ETHREAD_GET_PTR(t, quic_NetProcessor.quicPollCont_offset);
+}
+
+#endif
diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h
index 2972060..2b7eb10 100644
--- a/iocore/net/P_QUICNetProcessor.h
+++ b/iocore/net/P_QUICNetProcessor.h
@@ -67,6 +67,8 @@ public:
 
   Action *main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt) override;
 
+  off_t quicPollCont_offset;
+
 private:
   QUICNetProcessor(const QUICNetProcessor &);
   QUICNetProcessor &operator=(const QUICNetProcessor &);
diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h
index a82c37b..4e999d9 100644
--- a/iocore/net/P_QUICNetVConnection.h
+++ b/iocore/net/P_QUICNetVConnection.h
@@ -302,4 +302,6 @@ private:
   QUICStatelessResetToken _reset_token;
 };
 
+typedef int (QUICNetVConnection::*QUICNetVConnHandler)(int, void *);
+
 extern ClassAllocator<QUICNetVConnection> quicNetVCAllocator;
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index 6c69bd3..6b9ad5f 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -575,8 +575,15 @@ EventIO::start(EventLoop l, NetAccept *vc, int events)
 TS_INLINE int
 EventIO::start(EventLoop l, UnixNetVConnection *vc, int events)
 {
+  int r;
   type = EVENTIO_READWRITE_VC;
-  return start(l, vc->con.fd, (Continuation *)vc, events);
+  r    = start(l, vc->con.fd, (Continuation *)vc, events);
+  if (r < 0 && vc->options.ip_proto == NetVCOptions::USE_UDP) {
+    // Hack for QUICNetVC
+    return 0;
+  } else {
+    return r;
+  }
 }
 TS_INLINE int
 EventIO::start(EventLoop l, UnixUDPConnection *vc, int events)
@@ -611,6 +618,12 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e)
   data.c     = c;
   fd         = afd;
   event_loop = l;
+  // Hack for QUICNetVC:
+  //   quicnetvc->con.fd == NO_FD
+  //   quicnetvc->options.ip_proto == NetVCOptions::USE_UDP
+  if (afd == NO_FD) {
+    return -1;
+  }
 #if TS_USE_EPOLL
   struct epoll_event ev;
   memset(&ev, 0, sizeof(ev));
@@ -643,6 +656,9 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e)
 TS_INLINE int
 EventIO::modify(int e)
 {
+  if (fd == NO_FD) {
+    return 0;
+  }
   ink_assert(event_loop);
 #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER)
   struct epoll_event ev;
@@ -722,6 +738,9 @@ EventIO::modify(int e)
 TS_INLINE int
 EventIO::refresh(int e)
 {
+  if (fd == NO_FD) {
+    return 0;
+  }
   ink_assert(event_loop);
 #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER)
   e = e & events;
@@ -763,6 +782,9 @@ EventIO::refresh(int e)
 TS_INLINE int
 EventIO::stop()
 {
+  if (fd == NO_FD) {
+    return 0;
+  }
   if (event_loop) {
     int retval = 0;
 #if TS_USE_EPOLL
diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc
new file mode 100644
index 0000000..c40121f
--- /dev/null
+++ b/iocore/net/QUICNet.cc
@@ -0,0 +1,125 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "P_Net.h"
+
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m)
+  : Continuation(m.get()), net_handler(nullptr)
+{
+  SET_HANDLER(&PollCont::pollEvent);
+}
+
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh)
+  : Continuation(m.get()), net_handler(nh)
+{
+  SET_HANDLER(&QUICPollCont::pollEvent);
+}
+
+QUICPollCont::~QUICPollCont()
+{
+}
+
+//
+// QUICPollCont continuation which traverse the inQueue(ASLL)
+// and create new QUICNetVC for Initial Packet,
+// and push the triggered QUICNetVC into enable list.
+//
+int
+QUICPollCont::pollEvent(int, Event *)
+{
+  UnixUDPConnection *uc;
+  QUICPacketHandler *ph;
+  QUICNetVConnection *vc;
+  QUICConnectionId cid;
+  uint8_t *buf;
+  uint8_t ptype;
+  UDPPacket *packet_r;
+  UDPPacketInternal *p = nullptr;
+  NetHandler *nh       = get_NetHandler(t);
+
+  // Process the ASLL
+  SList(UDPPacketInternal, alink) aq(inQueue.popall());
+  Queue<UDPPacketInternal> result;
+  while ((p = aq.pop())) {
+    result.push(p);
+  }
+
+  while ((p = result.pop())) {
+    uc  = static_cast<UnixUDPConnection *>(p->getConnection());
+    ph  = static_cast<QUICPacketHandler *>(uc->continuation);
+    vc  = static_cast<QUICNetVConnection *>(p->data.ptr);
+    buf = (uint8_t *)p->getIOBlockChain()->buf();
+    cid = QUICPacket::connection_id(buf)
+    if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value.
+      ptype = buf[0] & 0x7f;
+      if (ptype == QUICPacketType::INITIAL) { // Initial Packet
+        vc->read.triggered = 1;
+        vc->push_packet(p);
+        // reschedule the vc and callback vc->acceptEvent
+        this_ethread()->schedule_imm(vc);
+      } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet
+        // TODO:
+      } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet
+        if (vc) {
+          vc->read.triggered = 1;
+          vc->push_packet(p);
+        } else {
+          longInQueue.push(p);
+        }
+      } else {
+        ink_assert(!"not reached!");
+      }
+    } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid
type value.
+      if (vc) {
+        vc->read.triggered = 1;
+        vc->push_packet(p);
+      } else {
+        shortInQueue.push(p);
+      }
+    } else {
+      ink_assert(!"not reached!");
+    }
+
+    // Push QUICNetVC into nethandler's enabled list
+    if (vc != nullptr) {
+      int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
+      if (!isin) {
+        nh->read_enable_list.push(vc);
+      }
+    }
+  }
+  
+  return EVENT_CONT;
+}
+
+void
+initialize_thread_for_quic_net(EThread *thread)
+{
+  NetHandler *nh       = get_NetHandler(thread);
+  QUICPollCont *quicpc = get_QUICPollCont(thread);
+
+  new ((ink_dummy_for_new *)quicpc) QUICPollCont(thread->mutex, nh);
+
+  thread->schedule_every(quicpc, -9);
+}
+
diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc
index 3967513..b0088c8 100644
--- a/iocore/net/QUICNetVConnection.cc
+++ b/iocore/net/QUICNetVConnection.cc
@@ -63,7 +63,7 @@ void
 QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con, QUICPacketHandler
*packet_handler,
                          QUICConnectionTable *ctable)
 {
-  SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake);
+  SET_HANDLER((NetVConnHandler)&QUICNetVConnection::acceptEvent);
   this->_packet_transmitter_mutex    = new_ProxyMutex();
   this->_frame_transmitter_mutex     = new_ProxyMutex();
   this->_udp_con                     = udp_con;
@@ -94,6 +94,51 @@ QUICNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader
 }
 
 int
+QUICNetVConnection::acceptEvent(int event, Event *e)
+{
+  EThread *t    = (e == nullptr) ? this_ethread() : e->ethread;
+  NetHandler *h = get_NetHandler(t);
+
+  MUTEX_TRY_LOCK(lock, h->mutex, t);
+  if (!lock.is_locked()) {
+    if (event == EVENT_NONE) {
+      t->schedule_in(this, HRTIME_MSECONDS(net_retry_delay));
+      return EVENT_DONE;
+    } else {
+      e->schedule_in(HRTIME_MSECONDS(net_retry_delay));
+      return EVENT_CONT;
+    }
+  }
+
+  thread = t;
+
+  // Send this NetVC to NetHandler and start to polling read & write event.
+  if (h->startIO(this) < 0) {
+    free(t);
+    return EVENT_DONE;
+  }
+
+  // Handshake callback handler.
+  SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake);
+
+  // Send this netvc to InactivityCop.
+  nh->startCop(this);
+
+  if (inactivity_timeout_in) {
+    set_inactivity_timeout(inactivity_timeout_in);
+  } else {
+    set_inactivity_timeout(0);
+  }
+
+  if (active_timeout_in) {
+    set_active_timeout(active_timeout_in);
+  }
+
+  action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
+  return EVENT_DONE;
+}
+
+int
 QUICNetVConnection::startEvent(int /*event ATS_UNUSED */, Event *e)
 {
   return EVENT_DONE;
@@ -589,8 +634,7 @@ QUICNetVConnection::get_udp_con()
 void
 QUICNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
 {
-  ink_assert(false);
-
+  this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr);
   return;
 }
 
diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc
index e60b442..8ac76c0 100644
--- a/iocore/net/QUICPacketHandler.cc
+++ b/iocore/net/QUICPacketHandler.cc
@@ -124,6 +124,7 @@ QUICPacketHandlerIn::init_accept(EThread *t = nullptr)
 void
 QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
 {
+  EThread *eth;
   IOBufferBlock *block = udp_packet->getIOBlockChain();
 
   if (is_debug_tag_set("quic_sec")) {
@@ -158,6 +159,8 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
       return;
     }
 
+    eth = eventProcessor.assign_thread(ET_NET);
+
     // Create a new NetVConnection
     QUICConnectionId original_cid = this->_read_connection_id(block);
     QUICNetVConnection *vc        = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr));
@@ -165,29 +168,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
     vc->id = net_next_connection_number();
     vc->con.move(con);
     vc->submit_time = Thread::get_hrtime();
-    vc->mutex       = this->mutex;
+    vc->thread      = eth;
+    vc->mutex       = new_ProxyMutex();
     vc->action_     = *this->action_;
     vc->set_is_transparent(this->opt.f_inbound_transparent);
     vc->set_context(NET_VCONNECTION_IN);
-    vc->read.triggered = 1;
-    vc->start(this->_ssl_ctx);
     vc->options.ip_proto  = NetVCOptions::USE_UDP;
     vc->options.ip_family = udp_packet->from.sa.sa_family;
 
-    this->action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc);
     qc = vc;
   }
 
-  if (qc->is_closed()) {
-    this->_ctable.erase(qc->connection_id(), qc);
-    // FIXME QUICNetVConnection is NOT freed to prevent crashes. #2674
-    // QUICNetVConnections are going to be freed by QUICNetHandler
-    // vc->free(vc->thread);
-  } else {
-    qc->handle_received_packet(udp_packet);
-    // FIXME This cast is temporal. It'll be removed when we introduce QUICNetHandler.
-    eventProcessor.schedule_imm(static_cast<QUICNetVConnection *>(qc), ET_CALL, QUIC_EVENT_PACKET_READ_READY,
nullptr);
-  }
+  // Push the packet into QUICPollCont
+  udp_packet->data.ptr = vc;
+  get_QUICPollCont(eth)->inQueue.push(udp_packet);
+
 }
 
 // TODO: Should be called via eventProcessor?

-- 
To stop receiving notification emails like this one, please contact
scw00@apache.org.

Mime
View raw message