diff --git a/common/count_filter.cc b/common/count_filter.cc new file mode 100644 index 0000000..31cf911 --- /dev/null +++ b/common/count_filter.cc @@ -0,0 +1,114 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// File: count_filter.cc // +// Description: byte counting filter for wanproxy streams // +// Project: WANProxy XTech // +// Author: Andreu Vidal Bramfeld-Software // +// Last modified: 2016-02-28 // +// // +//////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include +#include "count_filter.h" + +CountFilter::CountFilter (intmax_t& p, int flg) : total_count_ (p) +{ + expected_ = count_ = 0; state_ = (flg & 1); +} + +bool CountFilter::consume (Buffer& buf, int flg) +{ + long n = buf.length (); + total_count_ += n; + + if (state_ == 1 || state_ == 2) + { + header_.append (buf); + while (! explore_stream (header_)) continue; + } + else if (state_ == 3 || state_ == 4) + { + count_ += n; + if (count_ >= expected_) + { + state_ = 1, header_.clear (); + if (count_ > expected_) + { + header_ = buf, header_.skip (n - (count_ - expected_)); + while (! explore_stream (header_)) continue; + } + } + } + + return produce (buf, flg | (state_ == 4 ? TO_BE_CONTINUED : 0)); +} + +void CountFilter::flush (int flg) +{ + state_ = 0; + Filter::flush (flg); +} + +bool CountFilter::explore_stream (Buffer& buf) +{ + if (state_ == 1 && buf.length () >= 5) + state_ = (buf.prefix ((const uint8_t*) "HTTP/", 5) ? 2 : 0); + + if (state_ == 2) + { + unsigned pos, ext = buf.length (); + for (pos = 0; pos < ext - 1 && buf.find ('\n', pos, ext - pos - 1, &pos); ++pos) + { + uint8_t sfx[4] = {0, 0, 0, 0}; + buf.copyout (sfx, pos + 1, (ext > pos + 1 ? 2 : 1)); + if (sfx[0] == '\n' || (sfx[0] == '\r' && sfx[1] == '\n')) + { + HTTPProtocol::Message msg (HTTPProtocol::Message::Response); + std::map >::iterator it; + unsigned lng = 0; + if (msg.decode (&buf) && msg.headers_.find ("Transfer-Encoding") == msg.headers_.end () && + (it = msg.headers_.find ("Content-Length")) != msg.headers_.end () && it->second.size () > 0) + { + Buffer val = it->second[0]; + while (val.length () > 0) + { + uint8_t c = val.peek (); + val.skip (1); + if (c >= '0' && c <= '9') + lng = (lng * 10) + (c - '0'); + else + { + lng = 0; + break; + } + } + } + if (lng > 0) + { + if (lng > buf.length ()) + { + expected_ = lng; + count_ = buf.length (); + state_ = (lng < 1000 ? 3 : 4); // don't wait for cookie resources + } + else + { + buf.skip (lng); + state_ = 1; + return false; + } + } + else + { + state_ = 0; + } + break; + } + } + } + + return true; +} diff --git a/common/count_filter.h b/common/count_filter.h new file mode 100644 index 0000000..b74b7c5 --- /dev/null +++ b/common/count_filter.h @@ -0,0 +1,38 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// File: count_filter.h // +// Description: byte counting filter for wanproxy streams // +// Project: WANProxy XTech // +// Author: Andreu Vidal Bramfeld-Software // +// Last modified: 2016-02-28 // +// // +//////////////////////////////////////////////////////////////////////////////// + +#ifndef COUNT_FILTER_H +#define COUNT_FILTER_H + +#include +#include + +#define TO_BE_CONTINUED 1 + +class CountFilter : public Filter +{ +private: + Buffer header_; + intmax_t& total_count_; + intmax_t expected_; + intmax_t count_; + int state_; + +public: + CountFilter (intmax_t& p, int flg = 0); + + virtual bool consume (Buffer& buf, int flg = 0); + virtual void flush (int flg); + +private: + bool explore_stream (Buffer& buf); +}; + +#endif // COUNT_FILTER_H diff --git a/common/filter.h b/common/filter.h index 4460555..be2a03f 100644 --- a/common/filter.h +++ b/common/filter.h @@ -4,7 +4,7 @@ // Description: base classes for chained data processors // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -21,23 +21,13 @@ private: Filter* recipient_; public: - Filter () { recipient_ = 0; } - virtual ~Filter () { } + Filter () { recipient_ = 0; } + virtual ~Filter () { } - void chain (Filter* nxt) { recipient_ = nxt; } - virtual bool consume (Buffer& buf) { return produce (buf); } - virtual bool produce (Buffer& buf) { return (recipient_ && recipient_->consume (buf)); } - virtual void flush (int flg) { if (recipient_) recipient_->flush (flg); } -}; - -class CountFilter : public Filter -{ -private: - intmax_t& counter_; - -public: - CountFilter (intmax_t& p) : counter_(p) { } - virtual bool consume (Buffer& buf) { counter_ += buf.length (); return produce (buf); } + void chain (Filter* nxt) { recipient_ = nxt; } + virtual bool consume (Buffer& buf, int flg = 0) { return produce (buf, flg); } + virtual bool produce (Buffer& buf, int flg = 0) { return (recipient_ && recipient_->consume (buf, flg)); } + virtual void flush (int flg) { if (recipient_) recipient_->flush (flg); } }; class BufferedFilter : public Filter diff --git a/common/lib.mk b/common/lib.mk index ac69aa2..28fc6f0 100644 --- a/common/lib.mk +++ b/common/lib.mk @@ -2,5 +2,6 @@ VPATH+= ${TOPDIR}/common SRCS+= buffer.cc SRCS+= log.cc +SRCS+= count_filter.cc CXXFLAGS+=-include common/common.h diff --git a/event/event_system.h b/event/event_system.h index d9ed427..96488c6 100644 --- a/event/event_system.h +++ b/event/event_system.h @@ -4,7 +4,7 @@ // Description: global event handling core class definition // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// diff --git a/event/io_service.cc b/event/io_service.cc index 6a9fc69..5aac0be 100644 --- a/event/io_service.cc +++ b/event/io_service.cc @@ -4,7 +4,7 @@ // Description: servicing of network IO requests for the event system // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// diff --git a/event/io_service.h b/event/io_service.h index 45a61f5..3c13b39 100644 --- a/event/io_service.h +++ b/event/io_service.h @@ -4,7 +4,7 @@ // Description: servicing of network IO requests for the event system // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// diff --git a/io/sink_filter.cc b/io/sink_filter.cc index 498b319..60ad566 100644 --- a/io/sink_filter.cc +++ b/io/sink_filter.cc @@ -4,7 +4,7 @@ // Description: a filter to write into a target device // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -22,7 +22,7 @@ SinkFilter::~SinkFilter () if (write_action_) write_action_->cancel (); } -bool SinkFilter::consume (Buffer& buf) +bool SinkFilter::consume (Buffer& buf, int flg) { if (! sink_ || closing_) return false; diff --git a/io/sink_filter.h b/io/sink_filter.h index 7ed55cf..9ebc843 100644 --- a/io/sink_filter.h +++ b/io/sink_filter.h @@ -4,7 +4,7 @@ // Description: a filter to write into a target device // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -24,7 +24,7 @@ public: SinkFilter (const LogHandle& log, Socket* sck, bool cln = 0); virtual ~SinkFilter (); - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); void write_complete (Event e); virtual void flush (int flg); }; diff --git a/programs/wanproxy/proxy_connector.cc b/programs/wanproxy/proxy_connector.cc index ce76ced..502f833 100644 --- a/programs/wanproxy/proxy_connector.cc +++ b/programs/wanproxy/proxy_connector.cc @@ -29,6 +29,7 @@ #include #include #include +#include #include "proxy_connector.h" //////////////////////////////////////////////////////////////////////////////// @@ -37,19 +38,19 @@ // Description: carries data between endpoints through a filter chain // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// ProxyConnector::ProxyConnector (const std::string& name, - WANProxyCodec* interface_codec, + WANProxyCodec* local_codec, WANProxyCodec* remote_codec, Socket* local_socket, SocketAddressFamily family, const std::string& remote_name, bool cln, bool ssh) : log_("/wanproxy/" + name + "/connector"), - interface_codec_(interface_codec), + local_codec_(local_codec), remote_codec_(remote_codec), local_socket_(local_socket), remote_socket_(0), @@ -61,7 +62,8 @@ ProxyConnector::ProxyConnector (const std::string& name, stop_action_(0), request_action_(0), response_action_(0), - chain_ready_(0) + close_action_(0), + flushing_(0) { if (local_socket_ && (remote_socket_ = Socket::create (family, SocketTypeStream, "tcp", remote_name))) { @@ -70,7 +72,7 @@ ProxyConnector::ProxyConnector (const std::string& name, } else { - conclude (); + close_action_ = event_system.track (0, StreamModeWait, callback (this, &ProxyConnector::conclude)); } } @@ -84,6 +86,8 @@ ProxyConnector::~ProxyConnector () request_action_->cancel (); if (response_action_) response_action_->cancel (); + if (close_action_) + close_action_->cancel (); if (local_socket_) local_socket_->close (); if (remote_socket_) @@ -103,15 +107,15 @@ void ProxyConnector::connect_complete (Event e) break; case Event::Error: INFO(log_) << "Connect failed: " << e; - conclude (); + conclude (e); return; default: ERROR(log_) << "Unexpected event: " << e; - conclude (); + conclude (e); return; } - if (build_chains (interface_codec_, remote_codec_, local_socket_, remote_socket_)) + if (build_chains (local_codec_, remote_codec_, local_socket_, remote_socket_)) { request_action_ = local_socket_->read (callback (this, &ProxyConnector::on_request_data)); response_action_ = remote_socket_->read (callback (this, &ProxyConnector::on_response_data)); @@ -158,7 +162,7 @@ bool ProxyConnector::build_chains (WANProxyCodec* cdc1, WANProxyCodec* cdc2, Soc if (cdc1->counting_) { request_chain_.append (new CountFilter (cdc1->request_output_bytes_)); - response_chain_.prepend (new CountFilter (cdc1->response_input_bytes_)); + response_chain_.prepend (new CountFilter (cdc1->response_input_bytes_, 1)); } } @@ -208,6 +212,8 @@ void ProxyConnector::on_request_data (Event e) { if (request_action_) request_action_->cancel (), request_action_ = 0; + if (flushing_ & REQUEST_CHAIN_FLUSHING) + return; switch (e.type_) { @@ -217,11 +223,12 @@ void ProxyConnector::on_request_data (Event e) break; case Event::EOS: DEBUG(log_) << "Flushing request"; + flushing_ |= REQUEST_CHAIN_FLUSHING; request_chain_.flush (REQUEST_CHAIN_READY); break; default: DEBUG(log_) << "Unexpected event: " << e; - conclude (); + conclude (e); return; } } @@ -230,6 +237,8 @@ void ProxyConnector::on_response_data (Event e) { if (response_action_) response_action_->cancel (), response_action_ = 0; + if (flushing_ & RESPONSE_CHAIN_FLUSHING) + return; switch (e.type_) { @@ -239,23 +248,25 @@ void ProxyConnector::on_response_data (Event e) break; case Event::EOS: DEBUG(log_) << "Flushing response"; + flushing_ |= RESPONSE_CHAIN_FLUSHING; response_chain_.flush (RESPONSE_CHAIN_READY); break; default: DEBUG(log_) << "Unexpected event: " << e; - conclude (); + conclude (e); return; } } void ProxyConnector::flush (int flg) { - chain_ready_ |= flg; - if ((chain_ready_ & REQUEST_CHAIN_READY) && (chain_ready_ & RESPONSE_CHAIN_READY)) - conclude (); + flushing_ |= flg; + if ((flushing_ & (REQUEST_CHAIN_READY | RESPONSE_CHAIN_READY)) == (REQUEST_CHAIN_READY | RESPONSE_CHAIN_READY)) + if (! close_action_) + close_action_ = event_system.track (0, StreamModeWait, callback (this, &ProxyConnector::conclude)); } -void ProxyConnector::conclude () +void ProxyConnector::conclude (Event e) { delete this; } diff --git a/programs/wanproxy/proxy_connector.h b/programs/wanproxy/proxy_connector.h index d949b1a..c7fc766 100644 --- a/programs/wanproxy/proxy_connector.h +++ b/programs/wanproxy/proxy_connector.h @@ -26,8 +26,10 @@ #ifndef PROGRAMS_WANPROXY_PROXY_CONNECTOR_H #define PROGRAMS_WANPROXY_PROXY_CONNECTOR_H -#define REQUEST_CHAIN_READY 0x10000 -#define RESPONSE_CHAIN_READY 0x20000 +#define REQUEST_CHAIN_FLUSHING 0x10000 +#define RESPONSE_CHAIN_FLUSHING 0x20000 +#define REQUEST_CHAIN_READY 0x40000 +#define RESPONSE_CHAIN_READY 0x80000 #include #include @@ -41,14 +43,14 @@ // Description: carries data between endpoints through a filter chain // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// class ProxyConnector : public Filter { LogHandle log_; - WANProxyCodec* interface_codec_; + WANProxyCodec* local_codec_; WANProxyCodec* remote_codec_; Socket* local_socket_; Socket* remote_socket_; @@ -59,7 +61,8 @@ class ProxyConnector : public Filter Action* stop_action_; Action* request_action_; Action* response_action_; - int chain_ready_; + Action* close_action_; + int flushing_; public: ProxyConnector (const std::string&, WANProxyCodec*, WANProxyCodec*, @@ -71,7 +74,7 @@ public: void on_request_data (Event e); void on_response_data (Event e); virtual void flush (int flg); - void conclude (); + void conclude (Event e); }; #endif /* !PROGRAMS_WANPROXY_PROXY_CONNECTOR_H */ diff --git a/programs/wanproxy/proxy_listener.cc b/programs/wanproxy/proxy_listener.cc index 5f9efb0..84d8e9a 100644 --- a/programs/wanproxy/proxy_listener.cc +++ b/programs/wanproxy/proxy_listener.cc @@ -33,7 +33,7 @@ // Description: listens on a port spawning a connector for each client // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -45,7 +45,7 @@ ProxyListener::ProxyListener (const std::string& name, SocketAddressFamily remote_family, const std::string& remote_address, bool cln, bool ssh) - : log_("/wanproxy/proxy/" + name + "/listener"), + : log_("/wanproxy/" + name + "/listener"), name_(name), local_codec_(local_codec), remote_codec_(remote_codec), diff --git a/programs/wanproxy/wanproxy.cc b/programs/wanproxy/wanproxy.cc index 9d04e6a..2eaa0d4 100644 --- a/programs/wanproxy/wanproxy.cc +++ b/programs/wanproxy/wanproxy.cc @@ -34,11 +34,11 @@ // Description: session start and global application management // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// -#define PROGRAM_VERSION "3.0" +#define PROGRAM_VERSION "3.02" WanProxyCore wanproxy; diff --git a/programs/wanproxy/wanproxy.h b/programs/wanproxy/wanproxy.h index 0a4d271..24b9d68 100644 --- a/programs/wanproxy/wanproxy.h +++ b/programs/wanproxy/wanproxy.h @@ -4,7 +4,7 @@ // Description: global data for the wanproxy application // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -109,10 +109,31 @@ public: if (reload_action_) reload_action_->cancel (), reload_action_ = 0; delete listener_; listener_ = 0; + print_stream_counts (); std::map::iterator it; for (it = caches_.begin(); it != caches_.end(); it++) delete it->second; caches_.clear (); + + } + + void print_stream_counts () + { + if (local_codec_.counting_) + { + INFO("wanproxy/core") << "Local codec request input bytes: " << local_codec_.request_input_bytes_; + INFO("wanproxy/core") << "Local codec request output bytes: " << local_codec_.request_output_bytes_; + INFO("wanproxy/core") << "Local codec response input bytes: " << local_codec_.response_input_bytes_; + INFO("wanproxy/core") << "Local codec response output bytes: " << local_codec_.response_output_bytes_; + } + + if (remote_codec_.counting_) + { + INFO("wanproxy/core") << "Remote codec request input bytes: " << remote_codec_.request_input_bytes_; + INFO("wanproxy/core") << "Remote codec request output bytes: " << remote_codec_.request_output_bytes_; + INFO("wanproxy/core") << "Remote codec response input bytes: " << remote_codec_.response_input_bytes_; + INFO("wanproxy/core") << "Remote codec response output bytes: " << remote_codec_.response_output_bytes_; + } } }; diff --git a/sample.conf b/sample.conf index e13d712..11be02d 100644 --- a/sample.conf +++ b/sample.conf @@ -31,6 +31,7 @@ set codec0.cache_path "/var/lib/wanproxy" set codec0.local_size 2048 set codec0.compressor zlib set codec0.compressor_level 6 +set codec0.byte_counts 1 activate codec0 create interface if0 diff --git a/ssh/ssh_filter.cc b/ssh/ssh_filter.cc index 783fba2..fef21aa 100644 --- a/ssh/ssh_filter.cc +++ b/ssh/ssh_filter.cc @@ -41,7 +41,7 @@ // Description: SSH encryption/decryption inside a data filter pair // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -63,13 +63,13 @@ SSH::EncryptFilter::EncryptFilter (SSH::Role role, int flg) : BufferedFilter ("/ session_.algorithm_negotiation_->add_algorithm (SSH::ServerHostKey::server (&session_, "ssh-server1.pem")); session_.algorithm_negotiation_->add_algorithms (); - Buffer str ("SSH-2.0-WANProxy " + (std::string)log_); + Buffer str ("SSH-2.0-WANProxy " + (std::string) log_); session_.local_version (str); str.append ("\r\n"); Filter::produce (str); } -bool SSH::EncryptFilter::consume (Buffer& buf) +bool SSH::EncryptFilter::consume (Buffer& buf, int flg) { buf.moveout (&pending_); @@ -83,7 +83,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf) Buffer packet; packet.append (SSHStreamPacket); pending_.moveout (&packet); - return produce (packet); + return produce (packet, flg); } else { @@ -100,7 +100,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf) Buffer packet; pending_.moveout (&packet, sizeof length, length); - if (! produce (packet)) + if (! produce (packet, flg)) return false; } } @@ -109,7 +109,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf) return true; } -bool SSH::EncryptFilter::produce (Buffer& buf) +bool SSH::EncryptFilter::produce (Buffer& buf, int flg) { Encryption *encryption_algorithm; MAC *mac_algorithm; @@ -168,7 +168,7 @@ bool SSH::EncryptFilter::produce (Buffer& buf) session_.local_sequence_number_++; - return Filter::produce (packet); + return Filter::produce (packet, flg); } void SSH::EncryptFilter::flush (int flg) @@ -198,7 +198,7 @@ SSH::DecryptFilter::DecryptFilter (int flg) : LogisticFilter ("/ssh/decrypt") identified_ = false; } -bool SSH::DecryptFilter::consume (Buffer& buf) +bool SSH::DecryptFilter::consume (Buffer& buf, int flg) { buf.moveout (&pending_); @@ -500,7 +500,7 @@ bool SSH::DecryptFilter::consume (Buffer& buf) packet = b; } - return produce (packet); + return produce (packet, flg); } return true; diff --git a/ssh/ssh_filter.h b/ssh/ssh_filter.h index aa4abd2..170565e 100644 --- a/ssh/ssh_filter.h +++ b/ssh/ssh_filter.h @@ -4,7 +4,7 @@ // Description: SSH encryption/decryption inside a data filter pair // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -28,8 +28,8 @@ namespace SSH public: EncryptFilter (Role role, int flg = 0); - virtual bool consume (Buffer& buf); - virtual bool produce (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); + virtual bool produce (Buffer& buf, int flg = 0); virtual void flush (int flg); Session* current_session () { return &session_; } @@ -45,7 +45,7 @@ namespace SSH public: DecryptFilter (int flg = 0); - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); virtual void flush (int flg); void set_encrypter (EncryptFilter* f) { session_ = (f ? f->current_session () : 0); set_upstream (f); } diff --git a/xcodec/cache/coss/xcodec_cache_coss.cc b/xcodec/cache/coss/xcodec_cache_coss.cc index f62a3fa..93c7a8a 100644 --- a/xcodec/cache/coss/xcodec_cache_coss.cc +++ b/xcodec/cache/coss/xcodec_cache_coss.cc @@ -23,7 +23,7 @@ // Description: persistent cache on disk for xcodec protocol streams // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -61,6 +61,9 @@ XCodecCacheCOSS::XCodecCacheCOSS (const UUID& uuid, const std::string& cache_dir directory_ = new COSSMetadata[stripe_limit_]; memset (directory_, 0, sizeof (COSSMetadata) * stripe_limit_); + if (stream_.rdbuf()) + stream_.rdbuf()->pubsetbuf (0, 0); + stream_.open (file_path_.c_str(), fstream::in | fstream::out | fstream::binary); if (! read_file ()) { diff --git a/xcodec/xcodec_decoder.cc b/xcodec/xcodec_decoder.cc index cc29f4d..ba43a0c 100644 --- a/xcodec/xcodec_decoder.cc +++ b/xcodec/xcodec_decoder.cc @@ -38,7 +38,7 @@ // Description: decoding routines for the xcodex protocol // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// diff --git a/xcodec/xcodec_encoder.cc b/xcodec/xcodec_encoder.cc index fa4bc6a..935fc4d 100644 --- a/xcodec/xcodec_encoder.cc +++ b/xcodec/xcodec_encoder.cc @@ -36,7 +36,7 @@ // Description: encoding routines for the xcodex protocol // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -57,7 +57,7 @@ XCodecEncoder::~XCodecEncoder() * escaped. */ -void XCodecEncoder::encode (Buffer& output, Buffer& input, bool wait) +void XCodecEncoder::encode (Buffer& output, Buffer& input) { int off = source_.length (); Buffer old; @@ -170,9 +170,6 @@ void XCodecEncoder::encode (Buffer& output, Buffer& input, bool wait) } } } - - if (! wait) - flush (output); } bool XCodecEncoder::flush (Buffer& output) diff --git a/xcodec/xcodec_encoder.h b/xcodec/xcodec_encoder.h index f16b434..4a167f2 100644 --- a/xcodec/xcodec_encoder.h +++ b/xcodec/xcodec_encoder.h @@ -34,7 +34,7 @@ // Description: encoding routines for the xcodex protocol // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -53,7 +53,7 @@ public: XCodecEncoder(XCodecCache*); ~XCodecEncoder(); - void encode (Buffer&, Buffer&, bool); + void encode (Buffer&, Buffer&); bool flush (Buffer&); private: diff --git a/xcodec/xcodec_filter.cc b/xcodec/xcodec_filter.cc index 59d5e23..5acd594 100644 --- a/xcodec/xcodec_filter.cc +++ b/xcodec/xcodec_filter.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include "xcodec_filter.h" @@ -35,7 +36,7 @@ // Description: instantiation of encoder/decoder in a data filter pair // // Project: WANProxy XTech // // Adapted by: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -119,7 +120,7 @@ // Encoding -bool EncodeFilter::consume (Buffer& buf) +bool EncodeFilter::consume (Buffer& buf, int flg) { Buffer enc, output; @@ -143,18 +144,24 @@ bool EncodeFilter::consume (Buffer& buf) return false; } - encoder_->encode (enc, buf, waiting_); + encoder_->encode (enc, buf); + + if (! (flg & TO_BE_CONTINUED)) + { + if (waiting_) + { + if (wait_action_) + wait_action_->cancel (); + wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout)); + } + else + encoder_->flush (enc); + } + while (! enc.empty ()) encode_frame (enc, output); - if (waiting_) - { - if (wait_action_) - wait_action_->cancel (); - wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout)); - } - - return (! output.empty () ? produce (output) : true); + return (! output.empty () ? produce (output, flg) : true); } void EncodeFilter::flush (int flg) @@ -165,10 +172,12 @@ void EncodeFilter::flush (int flg) { flushing_ = true; flush_flags_ |= flg; + if (wait_action_) + wait_action_->cancel (), wait_action_ = 0; if (! sent_eos_) { Buffer enc, output; - if (waiting_ && encoder_ && encoder_->flush (enc)) + if (encoder_ && encoder_->flush (enc)) encode_frame (enc, output); output.append (XCODEC_PIPE_OP_EOS); sent_eos_ = produce (output); @@ -200,7 +209,7 @@ void EncodeFilter::on_read_timeout (Event e) wait_action_->cancel (), wait_action_ = 0; Buffer enc, output; - if (encoder_ && encoder_->flush (enc)) + if (! flushing_ && encoder_ && encoder_->flush (enc)) { encode_frame (enc, output); produce (output); @@ -209,7 +218,7 @@ void EncodeFilter::on_read_timeout (Event e) // Decoding -bool DecodeFilter::consume (Buffer& buf) +bool DecodeFilter::consume (Buffer& buf, int flg) { if (! upstream_) { @@ -412,7 +421,7 @@ bool DecodeFilter::consume (Buffer& buf) if (! output.empty ()) { ASSERT(log_, ! flushing_); - if (! produce (output)) + if (! produce (output, flg)) return false; } else @@ -512,7 +521,7 @@ void DecodeFilter::flush (int flg) if (! frame_buffer_.empty ()) DEBUG(log_) << "Flushing decoder with frame data outstanding."; if (! upflushed_ && upstream_) - upstream_->flush (XCODEC_PIPE_OP_EOS_ACK); + upflushed_ = true, upstream_->flush (XCODEC_PIPE_OP_EOS_ACK); Filter::flush (flush_flags_); } diff --git a/xcodec/xcodec_filter.h b/xcodec/xcodec_filter.h index 15f6f5f..eecb46f 100644 --- a/xcodec/xcodec_filter.h +++ b/xcodec/xcodec_filter.h @@ -4,7 +4,7 @@ // Description: instantiation of encoder/decoder in a data filter pair // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-08-31 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -44,7 +44,7 @@ public: delete encoder_; } - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); virtual void flush (int flg); private: @@ -77,7 +77,7 @@ public: delete decoder_; } - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); virtual void flush (int flg); }; diff --git a/zlib/zlib_filter.cc b/zlib/zlib_filter.cc index 4979071..46a5239 100644 --- a/zlib/zlib_filter.cc +++ b/zlib/zlib_filter.cc @@ -4,7 +4,7 @@ // Description: data filters for zlib inflate/deflate streams // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -32,7 +32,7 @@ DeflateFilter::~DeflateFilter () ERROR(log_) << "Deflate stream did not end cleanly."; } -bool DeflateFilter::consume (Buffer& buf) +bool DeflateFilter::consume (Buffer& buf, int flg) { const BufferSegment* seg; int cnt = 0, i = 0, rv; @@ -64,7 +64,7 @@ bool DeflateFilter::consume (Buffer& buf) } } - return produce (pending_); + return produce (pending_, flg); } void DeflateFilter::flush (int flg) @@ -108,7 +108,7 @@ InflateFilter::~InflateFilter() ERROR(log_) << "Inflate stream did not end cleanly."; } -bool InflateFilter::consume (Buffer& buf) +bool InflateFilter::consume (Buffer& buf, int flg) { const BufferSegment* seg; int cnt = 0, i = 0, rv; @@ -140,7 +140,7 @@ bool InflateFilter::consume (Buffer& buf) } } - return produce (pending_); + return produce (pending_, flg); } void InflateFilter::flush (int flg) diff --git a/zlib/zlib_filter.h b/zlib/zlib_filter.h index eb79e36..1dedd44 100644 --- a/zlib/zlib_filter.h +++ b/zlib/zlib_filter.h @@ -4,7 +4,7 @@ // Description: data filters for zlib inflate/deflate streams // // Project: WANProxy XTech // // Author: Andreu Vidal Bramfeld-Software // -// Last modified: 2015-04-01 // +// Last modified: 2016-02-28 // // // //////////////////////////////////////////////////////////////////////////////// @@ -27,7 +27,7 @@ public: DeflateFilter (int level = 0); virtual ~DeflateFilter (); - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); virtual void flush (int flg); }; @@ -41,7 +41,7 @@ public: InflateFilter (); ~InflateFilter (); - virtual bool consume (Buffer& buf); + virtual bool consume (Buffer& buf, int flg = 0); virtual void flush (int flg); };