Cluster work -- integrating with the rest of the code.

This commit is contained in:
Adam Ierymenko 2015-10-20 15:27:53 -07:00
parent 5e6eae620b
commit 57e29857cf
9 changed files with 538 additions and 107 deletions

View file

@ -46,6 +46,7 @@
#include "Address.hpp"
#include "Identity.hpp"
#include "SelfAwareness.hpp"
#include "Cluster.hpp"
const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
@ -135,6 +136,9 @@ Node::~Node()
delete RR->antiRec;
delete RR->mc;
delete RR->sw;
#ifdef ZT_ENABLE_CLUSTER
delete RR->cluster;
#endif
}
ZT_ResultCode Node::processWirePacket(
@ -329,7 +333,18 @@ ZT_ResultCode Node::processBackgroundTasks(uint64_t now,volatile uint64_t *nextB
}
try {
*nextBackgroundTaskDeadline = now + (uint64_t)std::max(std::min(timeUntilNextPingCheck,RR->sw->doTimerTasks(now)),(unsigned long)ZT_CORE_TIMER_TASK_GRANULARITY);
#ifdef ZT_ENABLE_CLUSTER
// If clustering is enabled we have to call cluster->doPeriodicTasks() very often, so we override normal timer deadline behavior
if (RR->cluster) {
RR->sw->doTimerTasks(now);
RR->cluster->doPeriodicTasks();
*nextBackgroundTaskDeadline = now + ZT_CLUSTER_PERIODIC_TASK_PERIOD; // this is really short so just tick at this rate
} else {
#endif
*nextBackgroundTaskDeadline = now + (uint64_t)std::max(std::min(timeUntilNextPingCheck,RR->sw->doTimerTasks(now)),(unsigned long)ZT_CORE_TIMER_TASK_GRANULARITY);
#ifdef ZT_ENABLE_CLUSTER
}
#endif
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
@ -554,6 +569,62 @@ void Node::circuitTestEnd(ZT_CircuitTest *test)
}
}
ZT_ResultCode Node::clusterInit(
unsigned int myId,
const struct sockaddr_storage *zeroTierPhysicalEndpoints,
unsigned int numZeroTierPhysicalEndpoints,
int x,
int y,
int z,
void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
void *sendFunctionArg,
int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
void *addressToLocationFunctionArg)
{
#ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
return ZT_RESULT_ERROR_BAD_PARAMETER;
std::vector<InetAddress> eps;
for(unsigned int i=0;i<numZeroTierPhysicalEndpoints;++i)
eps.push_back(InetAddress(zeroTierPhysicalEndpoints[i]));
std::sort(eps.begin(),eps.end());
RR->cluster = new Cluster(RR,myId,eps,x,y,z,sendFunction,sendFunctionArg,addressToLocationFunction,addressToLocationFunctionArg);
return ZT_RESULT_OK;
#else
return ZT_RESULT_ERROR_UNSUPPORTED_OPERATION;
#endif
}
ZT_ResultCode Node::clusterAddMember(unsigned int memberId)
{
#ifdef ZT_ENABLE_CLUSTER
if (!RR->cluster)
return ZT_RESULT_ERROR_BAD_PARAMETER;
RR->cluster->addMember((uint16_t)memberId);
return ZT_RESULT_OK;
#else
return ZT_RESULT_ERROR_UNSUPPORTED_OPERATION;
#endif
}
void Node::clusterRemoveMember(unsigned int memberId)
{
#ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
RR->cluster->removeMember((uint16_t)memberId);
#endif
}
void Node::clusterHandleIncomingMessage(const void *msg,unsigned int len)
{
#ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
RR->cluster->handleIncomingStateMessage(msg,len);
#endif
}
/****************************************************************************/
/* Node methods used only within node/ */
/****************************************************************************/
@ -806,6 +877,22 @@ void ZT_Node_freeQueryResult(ZT_Node *node,void *qr)
} catch ( ... ) {}
}
int ZT_Node_addLocalInterfaceAddress(ZT_Node *node,const struct sockaddr_storage *addr,int metric, enum ZT_LocalInterfaceAddressTrust trust)
{
try {
return reinterpret_cast<ZeroTier::Node *>(node)->addLocalInterfaceAddress(addr,metric,trust);
} catch ( ... ) {
return 0;
}
}
void ZT_Node_clearLocalInterfaceAddresses(ZT_Node *node)
{
try {
reinterpret_cast<ZeroTier::Node *>(node)->clearLocalInterfaceAddresses();
} catch ( ... ) {}
}
void ZT_Node_setNetconfMaster(ZT_Node *node,void *networkControllerInstance)
{
try {
@ -829,19 +916,75 @@ void ZT_Node_circuitTestEnd(ZT_Node *node,ZT_CircuitTest *test)
} catch ( ... ) {}
}
int ZT_Node_addLocalInterfaceAddress(ZT_Node *node,const struct sockaddr_storage *addr,int metric, enum ZT_LocalInterfaceAddressTrust trust)
enum ZT_ResultCode ZT_Node_clusterInit(
ZT_Node *node,
unsigned int myId,
const struct sockaddr_storage *zeroTierPhysicalEndpoints,
unsigned int numZeroTierPhysicalEndpoints,
int x,
int y,
int z,
void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
void *sendFunctionArg,
int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
void *addressToLocationFunctionArg)
{
try {
return reinterpret_cast<ZeroTier::Node *>(node)->addLocalInterfaceAddress(addr,metric,trust);
return reinterpret_cast<ZeroTier::Node *>(node)->clusterInit(myId,zeroTierPhysicalEndpoints,numZeroTierPhysicalEndpoints,x,y,z,sendFunction,sendFunctionArg,addressToLocationFunction,addressToLocationFunctionArg);
} catch ( ... ) {
return 0;
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
}
void ZT_Node_clearLocalInterfaceAddresses(ZT_Node *node)
/**
* Add a member to this cluster
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param memberId Member ID (must be less than or equal to ZT_CLUSTER_MAX_MEMBERS)
* @return OK or error if clustering is disabled, ID invalid, etc.
*/
enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
{
try {
reinterpret_cast<ZeroTier::Node *>(node)->clearLocalInterfaceAddresses();
return reinterpret_cast<ZeroTier::Node *>(node)->clusterAddMember(memberId);
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
}
/**
* Remove a member from this cluster
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param memberId Member ID to remove (nothing happens if not present)
*/
void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
{
try {
reinterpret_cast<ZeroTier::Node *>(node)->clusterRemoveMember(memberId);
} catch ( ... ) {}
}
/**
* Handle an incoming cluster state message
*
* The message itself contains cluster member IDs, and invalid or badly
* addressed messages will be silently discarded.
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param msg Cluster message
* @param len Length of cluster message
*/
void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len)
{
try {
reinterpret_cast<ZeroTier::Node *>(node)->clusterHandleIncomingMessage(msg,len);
} catch ( ... ) {}
}