diff --git a/event/event_system.h b/event/event_system.h index eb7cddd..d9ed427 100644 --- a/event/event_system.h +++ b/event/event_system.h @@ -35,6 +35,7 @@ enum StreamMode StreamModeAccept, StreamModeRead, StreamModeWrite, + StreamModeWait, StreamModeEnd }; diff --git a/event/io_service.cc b/event/io_service.cc index cbe1032..6a9fc69 100644 --- a/event/io_service.cc +++ b/event/io_service.cc @@ -19,7 +19,7 @@ IoService::IoService () : Thread ("IoService"), log_ ("/io/thread") { - handle_ = rfd_ = wfd_ = -1; + timeout_ = handle_ = rfd_ = wfd_ = -1; int fd[2]; if (::pipe (fd) == 0) @@ -54,7 +54,10 @@ void IoService::main () cancel (msg.action); } - poll (-1); + poll (timeout_); + + if (timeout_ > 0) + wakeup_readers (); } set_fd (rfd_, -1, 0); @@ -81,32 +84,40 @@ void IoService::handle_request (EventAction* act) schedule (act); else track (act); - return; + break; case StreamModeAccept: track (act); - return; + break; case StreamModeRead: if (read_channel (act->fd_, (act->callback_ ? act->callback_->param () : ev), 1)) schedule (act); else track (act); - return; + break; case StreamModeWrite: if (write_channel (act->fd_, (act->callback_ ? act->callback_->param () : ev))) schedule (act); else track (act); - return; + break; + + case StreamModeWait: + { + WaitNode node = {current_time () + act->fd_, act}; + wait_list_.insert (wait_list_.end (), node); + timeout_ = IO_POLL_TIMEOUT; + } + break; case StreamModeEnd: if (close_channel (act->fd_, (act->callback_ ? act->callback_->param () : ev))) schedule (act); else track (act); - return; + break; } } } @@ -125,6 +136,7 @@ bool IoService::connect_channel (int fd, Event& ev) case 0: ev.type_ = Event::Done; break; + case -1: switch (errno) { @@ -279,43 +291,65 @@ void IoService::track (EventAction* act) void IoService::cancel (EventAction* act) { std::map::iterator it; - int fd; + std::deque::iterator w; if (act) { - fd = act->fd_; - it = fd_map_.find (fd); - switch (act->mode_) { case StreamModeAccept: case StreamModeRead: + it = fd_map_.find (act->fd_); if (it != fd_map_.end () && it->second.read_action == act) { it->second.reading = false, it->second.read_action = 0; if (it->second.write_action == 0) fd_map_.erase (it); - set_fd (fd, -1, (it->second.writing ? 2 : 0), &it->second); + set_fd (act->fd_, -1, (it->second.writing ? 2 : 0), &it->second); } break; case StreamModeConnect: case StreamModeWrite: case StreamModeEnd: + it = fd_map_.find (act->fd_); if (it != fd_map_.end () && it->second.write_action == act) { it->second.writing = false, it->second.write_action = 0; if (it->second.read_action == 0) fd_map_.erase (it); - set_fd (fd, (it->second.reading ? 2 : 0), -1, &it->second); + set_fd (act->fd_, (it->second.reading ? 2 : 0), -1, &it->second); } break; + + case StreamModeWait: + for (w = wait_list_.begin (); w != wait_list_.end (); ++w) + { + if (w->action == act) + { + wait_list_.erase (w); + break; + } + } + if (wait_list_.empty ()) + timeout_ = -1; + break; } terminate (act); } } +void IoService::wakeup_readers () +{ + std::deque::iterator w; + long t = current_time (); + + for (w = wait_list_.begin (); w != wait_list_.end (); ++w) + if (w->limit > 0 && w->limit <= t) + schedule (w->action), w->limit = 0; +} + void IoService::schedule (EventAction* act) { EventMessage msg = {1, act}; diff --git a/event/io_service.h b/event/io_service.h index 8b38d45..45a61f5 100644 --- a/event/io_service.h +++ b/event/io_service.h @@ -12,7 +12,9 @@ #define EVENT_IO_SERVICE_H #include +#include #include +#include #include #include #include @@ -22,6 +24,7 @@ #define IO_READ_BUFFER_SIZE 0x10000 #define IO_POLL_EVENT_COUNT 512 +#define IO_POLL_TIMEOUT 150 struct IoNode { @@ -32,6 +35,12 @@ struct IoNode EventAction* write_action; }; +struct WaitNode +{ + long limit; + EventAction* action; +}; + class IoService : public Thread { private: @@ -39,7 +48,10 @@ private: RingBuffer gateway_; uint8_t read_pool_[IO_READ_BUFFER_SIZE]; std::map fd_map_; - int handle_, rfd_, wfd_; + std::deque wait_list_; + int timeout_; + int handle_; + int rfd_, wfd_; public: IoService (); @@ -56,6 +68,7 @@ private: bool close_channel (int fd, Event& ev); void track (EventAction* act); void cancel (EventAction* act); + void wakeup_readers (); void schedule (EventAction* act); void terminate (EventAction* act); @@ -68,6 +81,8 @@ public: bool idle () const { return fd_map_.empty (); } void wakeup () { ::write (wfd_, "*", 1); } void take_message (const EventMessage& msg) { gateway_.write (msg); wakeup (); } + long current_time () { struct timeval tv; gettimeofday (&tv, 0); + return ((tv.tv_sec & 0xFF) * 1000 + tv.tv_usec / 1000); } }; #endif /* !EVENT_IO_SERVICE_H */ diff --git a/programs/wanproxy/proxy_connector.cc b/programs/wanproxy/proxy_connector.cc index eed8154..ce76ced 100644 --- a/programs/wanproxy/proxy_connector.cc +++ b/programs/wanproxy/proxy_connector.cc @@ -48,7 +48,7 @@ ProxyConnector::ProxyConnector (const std::string& name, SocketAddressFamily family, const std::string& remote_name, bool cln, bool ssh) - : log_("/wanproxy/proxy/" + name + "/connector"), + : log_("/wanproxy/" + name + "/connector"), interface_codec_(interface_codec), remote_codec_(remote_codec), local_socket_(local_socket), @@ -151,7 +151,7 @@ bool ProxyConnector::build_chains (WANProxyCodec* cdc1, WANProxyCodec* cdc2, Soc { EncodeFilter* enc; DecodeFilter* dec; request_chain_.append ((dec = new DecodeFilter ("/wanproxy/" + cdc1->name_ + "/dec", cdc1->xcache_))); - response_chain_.prepend ((enc = new EncodeFilter ("/wanproxy/" + cdc1->name_ + "/enc", cdc1->xcache_))); + response_chain_.prepend ((enc = new EncodeFilter ("/wanproxy/" + cdc1->name_ + "/enc", cdc1->xcache_, 1))); dec->set_upstream (enc); } diff --git a/xcodec/xcodec_decoder.cc b/xcodec/xcodec_decoder.cc index 090b3a5..cc29f4d 100644 --- a/xcodec/xcodec_decoder.cc +++ b/xcodec/xcodec_decoder.cc @@ -72,8 +72,8 @@ XCodecDecoder::~XCodecDecoder() * instance UUID in the HELLO message and can tell which streams * share an originator. */ -bool -XCodecDecoder::decode (Buffer& output, Buffer& input, std::set& unknown_hashes) + +bool XCodecDecoder::decode (Buffer& output, Buffer& input, std::set& unknown_hashes) { uint8_t data[XCODEC_SEGMENT_LENGTH]; Buffer old; diff --git a/xcodec/xcodec_encoder.cc b/xcodec/xcodec_encoder.cc index 0502b86..fa4bc6a 100644 --- a/xcodec/xcodec_encoder.cc +++ b/xcodec/xcodec_encoder.cc @@ -29,7 +29,6 @@ #include #include #include -#include //////////////////////////////////////////////////////////////////////////////// // // @@ -41,17 +40,13 @@ // // //////////////////////////////////////////////////////////////////////////////// -struct candidate_symbol -{ - bool set_; - unsigned offset_; - uint64_t symbol_; -}; - XCodecEncoder::XCodecEncoder(XCodecCache *cache) : log_("/xcodec/encoder"), cache_(cache) -{ } +{ + candidate_start_ = -1; + candidate_symbol_ = 0; +} XCodecEncoder::~XCodecEncoder() { } @@ -61,14 +56,13 @@ XCodecEncoder::~XCodecEncoder() * to other data, declarations of data to be referenced, and data that needs * escaped. */ -void -XCodecEncoder::encode (Buffer& output, Buffer& input) + +void XCodecEncoder::encode (Buffer& output, Buffer& input, bool wait) { - XCodecHash xcodec_hash; - candidate_symbol candidate = {0, 0, 0}; - unsigned offset = 0; - unsigned o = 0; + int off = source_.length (); Buffer old; + + source_.append (input); for (Buffer::SegmentIterator it = input.segments (); ! it.end (); it.next ()) { @@ -80,14 +74,14 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) /* * Add bytes to the hash until we have a complete hash. */ - if (++o < XCODEC_SEGMENT_LENGTH) - xcodec_hash.add (*p); + if (++off < XCODEC_SEGMENT_LENGTH) + xcodec_hash_.add (*p); else { - if (o == XCODEC_SEGMENT_LENGTH) - xcodec_hash.add (*p); + if (off == XCODEC_SEGMENT_LENGTH) + xcodec_hash_.add (*p); else - xcodec_hash.roll (*p); + xcodec_hash_.roll (*p); /* * And then mix the hash's internal state into a @@ -95,19 +89,18 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) * and to look up possible past occurances of that * data in the XCodecCache. */ - uint64_t hash = xcodec_hash.mix (); + uint64_t hash = xcodec_hash_.mix (); /* * If there is a pending candidate hash that wouldn't * overlap with the data that the rolling hash presently * covers, declare it now. */ - if (candidate.set_ && candidate.offset_ + (XCODEC_SEGMENT_LENGTH * 2) <= offset + o) + if (candidate_start_ >= 0 && candidate_start_ + (XCODEC_SEGMENT_LENGTH * 2) <= off) { - encode_declaration (output, input, offset, candidate.offset_, candidate.symbol_); - o -= (candidate.offset_ + XCODEC_SEGMENT_LENGTH - offset); - offset = (candidate.offset_ + XCODEC_SEGMENT_LENGTH); - candidate.set_ = false; + encode_declaration (output, source_, candidate_start_, candidate_symbol_); + off -= (candidate_start_ + XCODEC_SEGMENT_LENGTH); + candidate_start_ = -1; } /* @@ -122,17 +115,16 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) * identical to this chunk of data, then that's * positively fantastic. */ - if (encode_reference (output, input, offset, offset + o - XCODEC_SEGMENT_LENGTH, hash, old)) + if (encode_reference (output, source_, off - XCODEC_SEGMENT_LENGTH, hash, old)) { /* * We have output any data before this hash * in escaped form, so any candidate hash * before it is invalid now. */ - offset += o; - o = 0; - xcodec_hash.reset(); - candidate.set_ = false; + off = 0; + xcodec_hash_.reset(); + candidate_start_ = -1; } else { @@ -152,7 +144,7 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) * Not defined before, it's a candidate for declaration * if we don't already have one. */ - if (candidate.set_) + if (candidate_start_ >= 0) { /* * We already have a hash that occurs earlier, @@ -160,7 +152,7 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) * covered by this hash, so don't remember it * and keep going. */ - ASSERT(log_, candidate.offset_ + (XCODEC_SEGMENT_LENGTH * 2) > offset + o); + ASSERT(log_, candidate_start_ + (XCODEC_SEGMENT_LENGTH * 2) > off); } else { @@ -171,80 +163,99 @@ XCodecEncoder::encode (Buffer& output, Buffer& input) * find something to reference we can declare this one * for future use. */ - candidate.offset_ = offset + o - XCODEC_SEGMENT_LENGTH; - candidate.symbol_ = hash; - candidate.set_ = true; + candidate_start_ = off - XCODEC_SEGMENT_LENGTH; + candidate_symbol_ = hash; } } } } } + + if (! wait) + flush (output); +} +bool XCodecEncoder::flush (Buffer& output) +{ + bool vld = false; + /* * There's a hash we can declare, do it. */ - if (candidate.set_) + if (candidate_start_ >= 0) { - encode_declaration (output, input, offset, candidate.offset_, candidate.symbol_); - o -= (candidate.offset_ + XCODEC_SEGMENT_LENGTH - offset); - offset = (candidate.offset_ + XCODEC_SEGMENT_LENGTH); - candidate.set_ = false; + encode_declaration (output, source_, candidate_start_, candidate_symbol_); + candidate_start_ = -1; + vld = true; } /* - * There's data after that hash or no candidate hash, so - * just escape it. + * There's data after that hash or no candidate hash, so just escape it. */ - if (offset < input.length ()) - encode_escape (output, input, offset, input.length ()); + if (source_.length () > 0) + { + encode_escape (output, source_, source_.length ()); + vld = true; + } + + xcodec_hash_.reset(); + + return vld; } -void -XCodecEncoder::encode_declaration (Buffer& output, Buffer& input, unsigned offset, unsigned start, uint64_t hash) +void XCodecEncoder::encode_declaration (Buffer& output, Buffer& input, unsigned start, uint64_t hash) { - if (offset < start) - encode_escape (output, input, offset, start); + if (start > 0) + encode_escape (output, input, start); - cache_->enter (hash, input, start); + cache_->enter (hash, input, 0); output.append (XCODEC_MAGIC); output.append (XCODEC_OP_EXTRACT); - output.append (input, start, XCODEC_SEGMENT_LENGTH); + output.append (input, XCODEC_SEGMENT_LENGTH); + + input.skip (XCODEC_SEGMENT_LENGTH); } -void -XCodecEncoder::encode_escape (Buffer& output, Buffer& input, unsigned offset, unsigned limit) +void XCodecEncoder::encode_escape (Buffer& output, Buffer& input, unsigned length) { unsigned pos; - while (offset < limit && input.find (XCODEC_MAGIC, offset, limit - offset, &pos)) + while (length > 0) { - if (offset < pos) - output.append (input, offset, pos - offset); - output.append (XCODEC_MAGIC); - output.append (XCODEC_OP_ESCAPE); - offset = pos + 1; + if (input.find (XCODEC_MAGIC, 0, length, &pos)) + { + if (pos > 0) + output.append (input, 0, pos); + output.append (XCODEC_MAGIC); + output.append (XCODEC_OP_ESCAPE); + input.skip (pos + 1); + length -= pos + 1; + } + else + { + output.append (input, length); + input.skip (length); + break; + } } - - if (offset < limit) - output.append (input, offset, limit - offset); } -bool -XCodecEncoder::encode_reference (Buffer& output, Buffer& input, unsigned offset, unsigned start, uint64_t hash, Buffer& old) +bool XCodecEncoder::encode_reference (Buffer& output, Buffer& input, unsigned start, uint64_t hash, Buffer& old) { uint8_t data[XCODEC_SEGMENT_LENGTH]; input.copyout (data, start, XCODEC_SEGMENT_LENGTH); if (old.equal (data, sizeof data)) { - if (offset < start) - encode_escape (output, input, offset, start); + if (start > 0) + encode_escape (output, input, start); output.append (XCODEC_MAGIC); output.append (XCODEC_OP_REF); uint64_t behash = BigEndian::encode (hash); output.append (&behash); + input.skip (XCODEC_SEGMENT_LENGTH); return true; } diff --git a/xcodec/xcodec_encoder.h b/xcodec/xcodec_encoder.h index 1aea77f..f16b434 100644 --- a/xcodec/xcodec_encoder.h +++ b/xcodec/xcodec_encoder.h @@ -26,6 +26,8 @@ #ifndef XCODEC_XCODEC_ENCODER_H #define XCODEC_XCODEC_ENCODER_H +#include + //////////////////////////////////////////////////////////////////////////////// // // // File: xcodec_encoder.h // @@ -42,16 +44,22 @@ class XCodecEncoder { LogHandle log_; XCodecCache* cache_; + Buffer source_; + XCodecHash xcodec_hash_; + int candidate_start_; + uint64_t candidate_symbol_; public: XCodecEncoder(XCodecCache*); ~XCodecEncoder(); - void encode (Buffer&, Buffer&); + void encode (Buffer&, Buffer&, bool); + bool flush (Buffer&); + private: - void encode_declaration (Buffer&, Buffer&, unsigned, unsigned, uint64_t); - void encode_escape (Buffer&, Buffer&, unsigned, unsigned); - bool encode_reference (Buffer&, Buffer&, unsigned, unsigned, uint64_t, Buffer&); + void encode_declaration (Buffer&, Buffer&, unsigned, uint64_t); + void encode_escape (Buffer&, Buffer&, unsigned); + bool encode_reference (Buffer&, Buffer&, unsigned, uint64_t, Buffer&); }; #endif /* !XCODEC_XCODEC_ENCODER_H */ diff --git a/xcodec/xcodec_filter.cc b/xcodec/xcodec_filter.cc index 089beb7..59d5e23 100644 --- a/xcodec/xcodec_filter.cc +++ b/xcodec/xcodec_filter.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include "xcodec_filter.h" @@ -120,8 +121,7 @@ bool EncodeFilter::consume (Buffer& buf) { - Buffer output; - Buffer enc; + Buffer enc, output; ASSERT(log_, ! flushing_); @@ -143,26 +143,18 @@ bool EncodeFilter::consume (Buffer& buf) return false; } - encoder_->encode (enc, buf); - - while (! enc.empty ()) - { - int n = enc.length (); - if (n > XCODEC_PIPE_MAX_FRAME) - n = XCODEC_PIPE_MAX_FRAME; - - Buffer frame; - enc.moveout (&frame, n); - - uint16_t len = n; - len = BigEndian::encode (len); - - output.append (XCODEC_PIPE_OP_FRAME); - output.append (&len); - output.append (frame); - } + encoder_->encode (enc, buf, waiting_); + while (! enc.empty ()) + encode_frame (enc, output); - return produce (output); + if (waiting_) + { + if (wait_action_) + wait_action_->cancel (); + wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout)); + } + + return (! output.empty () ? produce (output) : true); } void EncodeFilter::flush (int flg) @@ -175,7 +167,9 @@ void EncodeFilter::flush (int flg) flush_flags_ |= flg; if (! sent_eos_) { - Buffer output; + Buffer enc, output; + if (waiting_ && encoder_ && encoder_->flush (enc)) + encode_frame (enc, output); output.append (XCODEC_PIPE_OP_EOS); sent_eos_ = produce (output); } @@ -184,6 +178,35 @@ void EncodeFilter::flush (int flg) Filter::flush (flush_flags_); } +void EncodeFilter::encode_frame (Buffer& src, Buffer& trg) +{ + int n = src.length (); + if (n > XCODEC_PIPE_MAX_FRAME) + n = XCODEC_PIPE_MAX_FRAME; + + uint16_t len = n; + len = BigEndian::encode (len); + + trg.append (XCODEC_PIPE_OP_FRAME); + trg.append (&len); + trg.append (src, n); + + src.skip (n); +} + +void EncodeFilter::on_read_timeout (Event e) +{ + if (wait_action_) + wait_action_->cancel (), wait_action_ = 0; + + Buffer enc, output; + if (encoder_ && encoder_->flush (enc)) + { + encode_frame (enc, output); + produce (output); + } +} + // Decoding bool DecodeFilter::consume (Buffer& buf) diff --git a/xcodec/xcodec_filter.h b/xcodec/xcodec_filter.h index f2d8574..15f6f5f 100644 --- a/xcodec/xcodec_filter.h +++ b/xcodec/xcodec_filter.h @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include #include @@ -24,22 +26,30 @@ class EncodeFilter : public BufferedFilter private: XCodecCache* cache_; XCodecEncoder* encoder_; + Action* wait_action_; + bool waiting_; bool sent_eos_; bool eos_ack_; public: - EncodeFilter (const LogHandle& log, XCodecCache* cc) : BufferedFilter (log) + EncodeFilter (const LogHandle& log, XCodecCache* cc, int flg = 0) : BufferedFilter (log) { - cache_ = cc; encoder_ = 0; sent_eos_ = eos_ack_ = false; + cache_ = cc; encoder_ = 0; wait_action_ = 0; waiting_ = (flg & 1); sent_eos_ = eos_ack_ = false; } virtual ~EncodeFilter () { + if (wait_action_) + wait_action_->cancel (); delete encoder_; } virtual bool consume (Buffer& buf); virtual void flush (int flg); + +private: + void encode_frame (Buffer& src, Buffer& trg); + void on_read_timeout (Event e); }; class DecodeFilter : public LogisticFilter