rip out rabbitMQ
This commit is contained in:
parent
acb4ef0f12
commit
73b1d57b13
18 changed files with 23 additions and 7813 deletions
|
@ -456,15 +456,14 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
|
|||
|
||||
} // anonymous namespace
|
||||
|
||||
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort, MQConfig *mqc) :
|
||||
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort) :
|
||||
_startTime(OSUtils::now()),
|
||||
_listenPort(listenPort),
|
||||
_node(node),
|
||||
_ztPath(ztPath),
|
||||
_path(dbPath),
|
||||
_sender((NetworkController::Sender *)0),
|
||||
_db(this),
|
||||
_mqc(mqc)
|
||||
_db(this)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -485,7 +484,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
|
|||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
|
||||
_db.addDB(std::shared_ptr<DB>(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc)));
|
||||
_db.addDB(std::shared_ptr<DB>(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort)));
|
||||
} else {
|
||||
#endif
|
||||
_db.addDB(std::shared_ptr<DB>(new FileDB(_path.c_str())));
|
||||
|
|
|
@ -44,8 +44,6 @@ namespace ZeroTier {
|
|||
|
||||
class Node;
|
||||
|
||||
struct MQConfig;
|
||||
|
||||
class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener
|
||||
{
|
||||
public:
|
||||
|
@ -53,7 +51,7 @@ public:
|
|||
* @param node Parent node
|
||||
* @param dbPath Database path (file path or database credentials)
|
||||
*/
|
||||
EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort, MQConfig *mqc = NULL);
|
||||
EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort);
|
||||
virtual ~EmbeddedNetworkController();
|
||||
|
||||
virtual void init(const Identity &signingId,Sender *sender);
|
||||
|
@ -150,8 +148,6 @@ private:
|
|||
|
||||
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
|
||||
std::mutex _memberStatus_l;
|
||||
|
||||
MQConfig *_mqc;
|
||||
};
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
|
@ -17,13 +17,12 @@
|
|||
|
||||
#include "../node/Constants.hpp"
|
||||
#include "EmbeddedNetworkController.hpp"
|
||||
#include "RabbitMQ.hpp"
|
||||
#include "../version.h"
|
||||
#include "hiredis.h"
|
||||
|
||||
#include <libpq-fe.h>
|
||||
#include <sstream>
|
||||
#include <amqp.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
|
@ -69,7 +68,7 @@ std::string join(const std::vector<std::string> &elements, const char * const se
|
|||
|
||||
using namespace ZeroTier;
|
||||
|
||||
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
|
||||
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort)
|
||||
: DB()
|
||||
, _myId(myId)
|
||||
, _myAddress(myId.address())
|
||||
|
@ -78,7 +77,6 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
|
|||
, _run(1)
|
||||
, _waitNoticePrinted(false)
|
||||
, _listenPort(listenPort)
|
||||
, _mqc(mqc)
|
||||
{
|
||||
char myAddress[64];
|
||||
_myAddressStr = myId.address().toString(myAddress);
|
||||
|
@ -593,7 +591,7 @@ void PostgreSQL::heartbeat()
|
|||
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
|
||||
std::string now = std::to_string(OSUtils::now());
|
||||
std::string host_port = std::to_string(_listenPort);
|
||||
std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
|
||||
std::string use_rabbitmq = (false) ? "true" : "false";
|
||||
const char *values[10] = {
|
||||
controllerId,
|
||||
hostname,
|
||||
|
@ -645,10 +643,10 @@ void PostgreSQL::membersDbWatcher()
|
|||
|
||||
initializeMembers(conn);
|
||||
|
||||
if (this->_mqc != NULL) {
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
_membersWatcher_RabbitMQ();
|
||||
if (false) {
|
||||
// PQfinish(conn);
|
||||
// conn = NULL;
|
||||
// _membersWatcher_RabbitMQ();
|
||||
} else {
|
||||
_membersWatcher_Postgres(conn);
|
||||
PQfinish(conn);
|
||||
|
@ -703,43 +701,6 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
|
|||
}
|
||||
}
|
||||
|
||||
void PostgreSQL::_membersWatcher_RabbitMQ() {
|
||||
char buf[11] = {0};
|
||||
std::string qname = "member_"+ std::string(_myAddress.toString(buf));
|
||||
RabbitMQ rmq(_mqc, qname.c_str());
|
||||
try {
|
||||
rmq.init();
|
||||
} catch (std::runtime_error &e) {
|
||||
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
||||
exit(11);
|
||||
}
|
||||
while (_run == 1) {
|
||||
try {
|
||||
std::string msg = rmq.consume();
|
||||
// fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
|
||||
if (msg.empty()) {
|
||||
continue;
|
||||
}
|
||||
json tmp(json::parse(msg));
|
||||
json &ov = tmp["old_val"];
|
||||
json &nv = tmp["new_val"];
|
||||
json oldConfig, newConfig;
|
||||
if (ov.is_object()) oldConfig = ov;
|
||||
if (nv.is_object()) newConfig = nv;
|
||||
if (oldConfig.is_object() || newConfig.is_object()) {
|
||||
_memberChanged(oldConfig,newConfig,(this->_ready>=2));
|
||||
}
|
||||
} catch (std::runtime_error &e) {
|
||||
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
|
||||
break;
|
||||
} catch(std::exception &e ) {
|
||||
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
|
||||
} catch(...) {
|
||||
fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PostgreSQL::_membersWatcher_Reids() {
|
||||
char buff[11] = {0};
|
||||
|
||||
|
@ -756,10 +717,10 @@ void PostgreSQL::networksDbWatcher()
|
|||
|
||||
initializeNetworks(conn);
|
||||
|
||||
if (this->_mqc != NULL) {
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
_networksWatcher_RabbitMQ();
|
||||
if (false) {
|
||||
// PQfinish(conn);
|
||||
// conn = NULL;
|
||||
// _networksWatcher_RabbitMQ();
|
||||
} else {
|
||||
_networksWatcher_Postgres(conn);
|
||||
PQfinish(conn);
|
||||
|
@ -812,43 +773,6 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
|
|||
}
|
||||
}
|
||||
|
||||
void PostgreSQL::_networksWatcher_RabbitMQ() {
|
||||
char buf[11] = {0};
|
||||
std::string qname = "network_"+ std::string(_myAddress.toString(buf));
|
||||
RabbitMQ rmq(_mqc, qname.c_str());
|
||||
try {
|
||||
rmq.init();
|
||||
} catch (std::runtime_error &e) {
|
||||
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
||||
exit(11);
|
||||
}
|
||||
while (_run == 1) {
|
||||
try {
|
||||
std::string msg = rmq.consume();
|
||||
if (msg.empty()) {
|
||||
continue;
|
||||
}
|
||||
// fprintf(stderr, "Got network update: %s\n", msg.c_str());
|
||||
json tmp(json::parse(msg));
|
||||
json &ov = tmp["old_val"];
|
||||
json &nv = tmp["new_val"];
|
||||
json oldConfig, newConfig;
|
||||
if (ov.is_object()) oldConfig = ov;
|
||||
if (nv.is_object()) newConfig = nv;
|
||||
if (oldConfig.is_object()||newConfig.is_object()) {
|
||||
_networkChanged(oldConfig,newConfig,(this->_ready >= 2));
|
||||
}
|
||||
} catch (std::runtime_error &e) {
|
||||
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
||||
break;
|
||||
} catch (std::exception &e) {
|
||||
fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what());
|
||||
} catch(...) {
|
||||
fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PostgreSQL::_networksWatcher_Redis() {
|
||||
|
||||
}
|
||||
|
|
|
@ -26,8 +26,6 @@ typedef struct pg_conn PGconn;
|
|||
|
||||
namespace ZeroTier {
|
||||
|
||||
struct MQConfig;
|
||||
|
||||
/**
|
||||
* A controller database driver that talks to PostgreSQL
|
||||
*
|
||||
|
@ -37,7 +35,7 @@ struct MQConfig;
|
|||
class PostgreSQL : public DB
|
||||
{
|
||||
public:
|
||||
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
|
||||
PostgreSQL(const Identity &myId, const char *path, int listenPort);
|
||||
virtual ~PostgreSQL();
|
||||
|
||||
virtual bool waitForReady();
|
||||
|
@ -59,10 +57,8 @@ private:
|
|||
void heartbeat();
|
||||
void membersDbWatcher();
|
||||
void _membersWatcher_Postgres(PGconn *conn);
|
||||
void _membersWatcher_RabbitMQ();
|
||||
void networksDbWatcher();
|
||||
void _networksWatcher_Postgres(PGconn *conn);
|
||||
void _networksWatcher_RabbitMQ();
|
||||
|
||||
void _membersWatcher_Reids();
|
||||
void _networksWatcher_Redis();
|
||||
|
@ -98,8 +94,6 @@ private:
|
|||
mutable volatile bool _waitNoticePrinted;
|
||||
|
||||
int _listenPort;
|
||||
|
||||
MQConfig *_mqc;
|
||||
};
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
|
@ -1,120 +0,0 @@
|
|||
/*
|
||||
* Copyright (c)2019 ZeroTier, Inc.
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file in the project's root directory.
|
||||
*
|
||||
* Change Date: 2023-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2.0 of the Apache License.
|
||||
*/
|
||||
/****/
|
||||
|
||||
#include "RabbitMQ.hpp"
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#include <amqp.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
#include <stdexcept>
|
||||
#include <cstring>
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
|
||||
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
|
||||
: _mqc(cfg)
|
||||
, _qName(queueName)
|
||||
, _socket(NULL)
|
||||
, _status(0)
|
||||
{
|
||||
}
|
||||
|
||||
RabbitMQ::~RabbitMQ()
|
||||
{
|
||||
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(_conn);
|
||||
}
|
||||
|
||||
void RabbitMQ::init()
|
||||
{
|
||||
struct timeval tval;
|
||||
memset(&tval, 0, sizeof(struct timeval));
|
||||
tval.tv_sec = 5;
|
||||
|
||||
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
|
||||
_conn = amqp_new_connection();
|
||||
_socket = amqp_tcp_socket_new(_conn);
|
||||
if (!_socket) {
|
||||
throw std::runtime_error("Can't create socket for RabbitMQ");
|
||||
}
|
||||
|
||||
_status = amqp_socket_open_noblock(_socket, _mqc->host.c_str(), _mqc->port, &tval);
|
||||
if (_status) {
|
||||
throw std::runtime_error("Can't connect to RabbitMQ");
|
||||
}
|
||||
|
||||
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
|
||||
_mqc->username.c_str(), _mqc->password.c_str());
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("RabbitMQ Login Error");
|
||||
}
|
||||
|
||||
static int chan = 0;
|
||||
{
|
||||
Mutex::Lock l(_chan_m);
|
||||
_channel = ++chan;
|
||||
}
|
||||
amqp_channel_open(_conn, _channel);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error opening communication channel");
|
||||
}
|
||||
|
||||
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error declaring queue " + std::string(_qName));
|
||||
}
|
||||
|
||||
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error consuming queue " + std::string(_qName));
|
||||
}
|
||||
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
|
||||
}
|
||||
|
||||
std::string RabbitMQ::consume()
|
||||
{
|
||||
amqp_rpc_reply_t res;
|
||||
amqp_envelope_t envelope;
|
||||
amqp_maybe_release_buffers(_conn);
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 1;
|
||||
timeout.tv_usec = 0;
|
||||
|
||||
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
|
||||
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
|
||||
// timeout waiting for message. Return empty string
|
||||
return "";
|
||||
} else {
|
||||
throw std::runtime_error("Error getting message");
|
||||
}
|
||||
}
|
||||
|
||||
std::string msg(
|
||||
(const char*)envelope.message.body.bytes,
|
||||
envelope.message.body.len
|
||||
);
|
||||
amqp_destroy_envelope(&envelope);
|
||||
return msg;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif // ZT_CONTROLLER_USE_LIBPQ
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Copyright (c)2019 ZeroTier, Inc.
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file in the project's root directory.
|
||||
*
|
||||
* Change Date: 2023-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2.0 of the Apache License.
|
||||
*/
|
||||
/****/
|
||||
|
||||
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
|
||||
#define ZT_CONTROLLER_RABBITMQ_HPP
|
||||
|
||||
#include "DB.hpp"
|
||||
#include <string>
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
struct MQConfig {
|
||||
std::string host;
|
||||
int port;
|
||||
std::string username;
|
||||
std::string password;
|
||||
};
|
||||
}
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#include "../node/Mutex.hpp"
|
||||
|
||||
#include <amqp.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
|
||||
class RabbitMQ {
|
||||
public:
|
||||
RabbitMQ(MQConfig *cfg, const char *queueName);
|
||||
~RabbitMQ();
|
||||
|
||||
void init();
|
||||
|
||||
std::string consume();
|
||||
|
||||
private:
|
||||
MQConfig *_mqc;
|
||||
const char *_qName;
|
||||
|
||||
amqp_socket_t *_socket;
|
||||
amqp_connection_state_t _conn;
|
||||
amqp_queue_declare_ok_t *_q;
|
||||
int _status;
|
||||
|
||||
int _channel;
|
||||
|
||||
Mutex _chan_m;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif // ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#endif // ZT_CONTROLLER_RABBITMQ_HPP
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue