Add multi-core concurrent packet processing
This commit is contained in:
parent
ac6d532651
commit
683d332abc
12 changed files with 400 additions and 190 deletions
|
@ -16,7 +16,6 @@
|
|||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
|
@ -26,6 +25,11 @@
|
|||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
#ifdef __FreeBSD__
|
||||
#include <sched.h>
|
||||
#include <pthread_np.h>
|
||||
#endif
|
||||
|
||||
#include "../version.h"
|
||||
#include "../include/ZeroTierOne.h"
|
||||
|
||||
|
@ -758,7 +762,7 @@ struct TcpConnection
|
|||
Mutex writeq_m;
|
||||
};
|
||||
|
||||
struct OneServiceIncomingPacket
|
||||
struct PacketRecord
|
||||
{
|
||||
uint64_t now;
|
||||
int64_t sock;
|
||||
|
@ -785,14 +789,20 @@ public:
|
|||
SoftwareUpdater *_updater;
|
||||
bool _updateAutoApply;
|
||||
|
||||
httplib::Server _controlPlane;
|
||||
httplib::Server _controlPlane;
|
||||
httplib::Server _controlPlaneV6;
|
||||
std::thread _serverThread;
|
||||
std::thread _serverThread;
|
||||
std::thread _serverThreadV6;
|
||||
bool _serverThreadRunning;
|
||||
bool _serverThreadRunningV6;
|
||||
|
||||
bool _allowTcpFallbackRelay;
|
||||
unsigned int _rxThreadCount;
|
||||
BlockingQueue<PacketRecord *> _rxPacketQueue;
|
||||
std::vector<PacketRecord *> _rxPacketVector;
|
||||
std::vector<std::thread> _rxPacketThreads;
|
||||
Mutex _rxPacketVector_m,_rxPacketThreads_m;
|
||||
|
||||
bool _allowTcpFallbackRelay;
|
||||
bool _forceTcpRelay;
|
||||
bool _allowSecondaryPort;
|
||||
|
||||
|
@ -842,8 +852,6 @@ public:
|
|||
// Deadline for the next background task service function
|
||||
volatile int64_t _nextBackgroundTaskDeadline;
|
||||
|
||||
|
||||
|
||||
std::map<uint64_t,NetworkState> _nets;
|
||||
Mutex _nets_m;
|
||||
|
||||
|
@ -890,9 +898,9 @@ public:
|
|||
,_node((Node *)0)
|
||||
,_updater((SoftwareUpdater *)0)
|
||||
,_updateAutoApply(false)
|
||||
,_controlPlane()
|
||||
,_controlPlane()
|
||||
,_controlPlaneV6()
|
||||
,_serverThread()
|
||||
,_serverThread()
|
||||
,_serverThreadV6()
|
||||
,_serverThreadRunning(false)
|
||||
,_serverThreadRunningV6(false)
|
||||
|
@ -926,9 +934,79 @@ public:
|
|||
_ports[1] = 0;
|
||||
_ports[2] = 0;
|
||||
|
||||
prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
|
||||
prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
|
||||
prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
|
||||
bool _enablePinning = false;
|
||||
char* pinningVar = std::getenv("ZT_CPU_PINNING");
|
||||
if (pinningVar) {
|
||||
int tmp = atoi(pinningVar);
|
||||
if (tmp > 0) {
|
||||
_enablePinning = true;
|
||||
}
|
||||
}
|
||||
char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY");
|
||||
if (concurrencyVar) {
|
||||
int tmp = atoi(concurrencyVar);
|
||||
if (tmp > 0) {
|
||||
_rxThreadCount = tmp;
|
||||
}
|
||||
else {
|
||||
_rxThreadCount = std::thread::hardware_concurrency();
|
||||
}
|
||||
}
|
||||
else {
|
||||
_rxThreadCount = std::thread::hardware_concurrency();
|
||||
}
|
||||
for (unsigned int i = 0; i < _rxThreadCount; ++i) {
|
||||
_rxPacketThreads.push_back(std::thread([this, i]() {
|
||||
|
||||
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
|
||||
int pinCore = i % _rxThreadCount;
|
||||
fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
|
||||
pthread_t self = pthread_self();
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(pinCore, &cpuset);
|
||||
#endif
|
||||
#ifdef __LINUX__
|
||||
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
|
||||
#elif __FreeBSD__
|
||||
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
|
||||
#endif
|
||||
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
|
||||
if (rc != 0)
|
||||
{
|
||||
fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
PacketRecord* packet = nullptr;
|
||||
for (;;) {
|
||||
if (! _rxPacketQueue.get(packet)) {
|
||||
break;
|
||||
}
|
||||
if (! packet) {
|
||||
break;
|
||||
}
|
||||
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
|
||||
{
|
||||
Mutex::Lock l(_rxPacketVector_m);
|
||||
_rxPacketVector.push_back(packet);
|
||||
}
|
||||
if (ZT_ResultCode_isFatal(err)) {
|
||||
char tmp[256];
|
||||
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
|
||||
Mutex::Lock _l(_termReason_m);
|
||||
_termReason = ONE_UNRECOVERABLE_ERROR;
|
||||
_fatalErrorMessage = tmp;
|
||||
this->terminate();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
|
||||
prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
|
||||
prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
|
||||
|
||||
#if ZT_VAULT_SUPPORT
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
|
@ -940,20 +1018,34 @@ public:
|
|||
#ifdef __WINDOWS__
|
||||
WinFWHelper::removeICMPRules();
|
||||
#endif
|
||||
|
||||
_rxPacketQueue.stop();
|
||||
_rxPacketThreads_m.lock();
|
||||
for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) {
|
||||
t->join();
|
||||
}
|
||||
_rxPacketThreads_m.unlock();
|
||||
_binder.closeAll(_phy);
|
||||
|
||||
#if ZT_VAULT_SUPPORT
|
||||
curl_global_cleanup();
|
||||
#endif
|
||||
|
||||
_controlPlane.stop();
|
||||
_controlPlane.stop();
|
||||
if (_serverThreadRunning) {
|
||||
_serverThread.join();
|
||||
_serverThread.join();
|
||||
}
|
||||
_controlPlaneV6.stop();
|
||||
if (_serverThreadRunningV6) {
|
||||
_serverThreadV6.join();
|
||||
}
|
||||
_rxPacketVector_m.lock();
|
||||
while (!_rxPacketVector.empty()) {
|
||||
delete _rxPacketVector.back();
|
||||
_rxPacketVector.pop_back();
|
||||
}
|
||||
_rxPacketVector_m.unlock();
|
||||
|
||||
|
||||
#ifdef ZT_USE_MINIUPNPC
|
||||
delete _portMapper;
|
||||
|
@ -1270,6 +1362,9 @@ public:
|
|||
const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500;
|
||||
clockShouldBe = now + (int64_t)delay;
|
||||
_phy.poll(delay);
|
||||
|
||||
|
||||
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
Mutex::Lock _l(_termReason_m);
|
||||
|
@ -2756,25 +2851,37 @@ public:
|
|||
// Handlers for Node and Phy<> callbacks
|
||||
// =========================================================================
|
||||
|
||||
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
|
||||
|
||||
|
||||
|
||||
inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len)
|
||||
{
|
||||
if (_forceTcpRelay) {
|
||||
return;
|
||||
}
|
||||
Metrics::udp_recv += len;
|
||||
Metrics::udp_recv += len;
|
||||
const uint64_t now = OSUtils::now();
|
||||
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
|
||||
if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
|
||||
_lastDirectReceiveFromGlobal = now;
|
||||
}
|
||||
const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
|
||||
if (ZT_ResultCode_isFatal(rc)) {
|
||||
char tmp[256];
|
||||
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
|
||||
Mutex::Lock _l(_termReason_m);
|
||||
_termReason = ONE_UNRECOVERABLE_ERROR;
|
||||
_fatalErrorMessage = tmp;
|
||||
this->terminate();
|
||||
}
|
||||
|
||||
PacketRecord* packet;
|
||||
_rxPacketVector_m.lock();
|
||||
if (_rxPacketVector.empty()) {
|
||||
packet = new PacketRecord;
|
||||
}
|
||||
else {
|
||||
packet = _rxPacketVector.back();
|
||||
_rxPacketVector.pop_back();
|
||||
}
|
||||
_rxPacketVector_m.unlock();
|
||||
|
||||
packet->sock = reinterpret_cast<int64_t>(sock);
|
||||
packet->now = now;
|
||||
memcpy(&(packet->from), from, sizeof(struct sockaddr_storage));
|
||||
packet->size = (unsigned int)len;
|
||||
memcpy(packet->data, data, len);
|
||||
_rxPacketQueue.postLimit(packet, 256 * _rxThreadCount);
|
||||
}
|
||||
|
||||
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
|
||||
|
@ -2996,6 +3103,7 @@ public:
|
|||
n.setTap(EthernetTap::newInstance(
|
||||
nullptr,
|
||||
_homePath.c_str(),
|
||||
_rxThreadCount,
|
||||
MAC(nwc->mac),
|
||||
nwc->mtu,
|
||||
(unsigned int)ZT_IF_METRIC,
|
||||
|
@ -3509,8 +3617,9 @@ public:
|
|||
inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
|
||||
{
|
||||
NetworkState *n = reinterpret_cast<NetworkState *>(*nuptr);
|
||||
if ((!n)||(!n->tap()))
|
||||
if ((!n)||(!n->tap())) {
|
||||
return;
|
||||
}
|
||||
n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue