Make Service communicate via empty-line-delimited Dictionary objects instead of the old size prefix way.

This commit is contained in:
Adam Ierymenko 2014-05-08 21:27:59 +00:00
parent 98f0418fb9
commit 99c5fae9da
4 changed files with 64 additions and 79 deletions

View file

@ -43,6 +43,7 @@
#include <sys/select.h>
#include <sys/wait.h>
#include "Constants.hpp"
#include "Service.hpp"
#include "RuntimeEnvironment.hpp"
#include "Utils.hpp"
@ -91,32 +92,19 @@ bool Service::send(const Dictionary &msg)
{
if (_childStdin <= 0)
return false;
std::string mser = msg.toString();
if (mser.length() > ZT_SERVICE_MAX_MESSAGE_SIZE)
return false;
// This can technically block. We'll fix this if it ends up being a
// problem.
uint32_t len = Utils::hton((uint32_t)mser.length());
if (write(_childStdin,&len,4) != 4)
return false;
if ((int)write(_childStdin,mser.data(),mser.length()) != (int)mser.length())
return false;
return true;
std::string mser(msg.toString());
mser.append(ZT_EOL_S);
return ((long)::write(_childStdin,mser.data(),mser.length()) == (long)mser.length());
}
void Service::threadMain()
throw()
{
char buf[131072];
char buf[16384];
fd_set readfds,writefds,exceptfds;
struct timeval tv;
std::string stderrBuf;
std::string stdoutBuf;
unsigned int stdoutExpecting = 0;
int eolsInARow = 0;
std::string stderrBuf,stdoutBuf;
while (_run) {
if (_pid <= 0) {
@ -184,18 +172,18 @@ void Service::threadMain()
tv.tv_sec = 1;
tv.tv_usec = 0;
select(std::max(_childStdout,_childStderr)+1,&readfds,&writefds,&exceptfds,&tv);
::select(std::max(_childStdout,_childStderr)+1,&readfds,&writefds,&exceptfds,&tv);
if (!_run) {
if (_childStdin > 0) close(_childStdin);
if (_childStdin > 0) ::close(_childStdin);
_childStdin = 0;
if (_childStdout > 0) close(_childStdout);
if (_childStderr > 0) close(_childStderr);
if (_childStdout > 0) ::close(_childStdout);
if (_childStderr > 0) ::close(_childStderr);
return;
}
if ((_childStderr > 0)&&(FD_ISSET(_childStderr,&readfds))) {
int n = (int)read(_childStderr,buf,sizeof(buf));
int n = (int)::read(_childStderr,buf,sizeof(buf));
for(int i=0;i<n;++i) {
if ((buf[i] == '\r')||(buf[i] == '\n')) {
stderrBuf = Utils::trim(stderrBuf);
@ -207,29 +195,20 @@ void Service::threadMain()
}
if ((_childStdout > 0)&&(FD_ISSET(_childStdout,&readfds))) {
int n = (int)read(_childStdout,buf,sizeof(buf));
int n = (int)::read(_childStdout,buf,sizeof(buf));
for(int i=0;i<n;++i) {
stdoutBuf.push_back(buf[i]);
if (stdoutExpecting) {
if (stdoutBuf.length() == stdoutExpecting) {
try {
_handler(_arg,*this,Dictionary(stdoutBuf));
} catch ( ... ) {
LOG("unexpected exception handling message from service %s",_name.c_str());
}
if ((buf[i] == '\n')||(buf[i] == '\r')) {
if (buf[i] == '\n')
++eolsInARow;
} else eolsInARow = 0;
if (eolsInARow >= 2) {
// Two CRs in a row ends a message
try {
_handler(_arg,*this,Dictionary(stdoutBuf));
stdoutBuf = "";
stdoutExpecting = 0;
}
} else if (stdoutBuf.length() == 4) {
stdoutExpecting = Utils::ntoh(*((const uint32_t *)stdoutBuf.data()));
stdoutBuf = "";
if (stdoutExpecting > ZT_SERVICE_MAX_MESSAGE_SIZE) {
LOG("message size overrun from service %s: %u bytes -- restarting service",_name.c_str(),stdoutExpecting);
stdoutExpecting = 0;
kill(_pid,SIGKILL);
break;
}
}
} catch ( ... ) {} // handlers should not throw
} else stdoutBuf.push_back(buf[i]);
}
}
}