Switch to local.conf-based config of multithreading

This commit is contained in:
Joseph Henry 2024-08-20 13:39:15 -07:00
parent 8283a6d6d4
commit b1a30ae4ff
No known key found for this signature in database
GPG key ID: C45B33FF5EBC9344
6 changed files with 177 additions and 183 deletions

View file

@ -798,12 +798,13 @@ public:
bool _serverThreadRunning;
bool _serverThreadRunningV6;
unsigned int _rxThreadCount;
BlockingQueue<PacketRecord *> _rxPacketQueue;
std::vector<PacketRecord *> _rxPacketVector;
std::vector<std::thread> _rxPacketThreads;
Mutex _rxPacketVector_m,_rxPacketThreads_m;
bool _enableMulticore;
bool _multicoreEnabled;
bool _cpuPinningEnabled;
unsigned int _concurrency;
bool _allowTcpFallbackRelay;
bool _forceTcpRelay;
@ -938,89 +939,6 @@ public:
_ports[1] = 0;
_ports[2] = 0;
_enableMulticore = false;
char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE");
if (multicoreVar) {
int tmp = atoi(multicoreVar);
if (tmp > 0) {
_enableMulticore = true;
}
}
if (_enableMulticore) {
bool _enablePinning = false;
char* pinningVar = std::getenv("ZT_CORE_PINNING");
if (pinningVar) {
int tmp = atoi(pinningVar);
if (tmp > 0) {
_enablePinning = true;
}
}
char* concurrencyVar = std::getenv("ZT_CONCURRENCY");
if (concurrencyVar) {
int tmp = atoi(concurrencyVar);
if (tmp > 0) {
_rxThreadCount = tmp;
}
else {
_rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
}
}
else {
_rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
}
fprintf(stderr, "using %d rx threads\n", _rxThreadCount);
for (unsigned int i = 0; i < _rxThreadCount; ++i) {
_rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() {
if (_enablePinning) {
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
int pinCore = i % _rxThreadCount;
fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
#endif
#ifdef __LINUX__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#elif __FreeBSD__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#endif
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
if (rc != 0)
{
fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
#endif
}
PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueue.get(packet)) {
break;
}
if (! packet) {
break;
}
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
{
Mutex::Lock l(_rxPacketVector_m);
_rxPacketVector.push_back(packet);
}
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
}
}));
}
}
prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
@ -1071,6 +989,64 @@ public:
delete _rc;
}
void setUpMultithreading()
{
_node->initMultithreading(true, _concurrency, _cpuPinningEnabled);
bool pinning = _cpuPinningEnabled;
fprintf(stderr, "Starting %d RX threads\n", _concurrency);
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxPacketThreads.push_back(std::thread([this, i, pinning]() {
if (pinning) {
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
int pinCore = i % _concurrency;
fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
#endif
#ifdef __LINUX__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#elif __FreeBSD__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#endif
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
if (rc != 0)
{
fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
#endif
}
PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueue.get(packet)) {
break;
}
if (! packet) {
break;
}
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
{
Mutex::Lock l(_rxPacketVector_m);
_rxPacketVector.push_back(packet);
}
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
}
}));
}
}
virtual ReasonForTermination run()
{
try {
@ -2672,7 +2648,18 @@ public:
fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S);
}
_portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true);
_node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false));
_multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false);
_concurrency = OSUtils::jsonInt(settings["concurrency"],0);
_cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false);
if (_multicoreEnabled) {
unsigned int maxConcurrency = std::thread::hardware_concurrency();
if (_concurrency <= 1 || _concurrency >= maxConcurrency) {
unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1);
fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault);
_concurrency = conservativeDefault;
}
setUpMultithreading();
}
#ifndef ZT_SDK
const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT));
@ -3001,7 +2988,7 @@ public:
_lastDirectReceiveFromGlobal = now;
}
if (_enableMulticore) {
if (_multicoreEnabled) {
PacketRecord* packet;
_rxPacketVector_m.lock();
if (_rxPacketVector.empty()) {
@ -3018,7 +3005,7 @@ public:
memcpy(&(packet->from), from, sizeof(struct sockaddr_storage));
packet->size = (unsigned int)len;
memcpy(packet->data, data, len);
_rxPacketQueue.postLimit(packet, 256 * _rxThreadCount);
_rxPacketQueue.postLimit(packet, 256 * _concurrency);
}
else {
const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);