version 3.02

This commit is contained in:
Bramfeld Team 2016-02-28 20:50:31 +01:00
parent 7adaaf99be
commit bc32a3a706
25 changed files with 287 additions and 99 deletions

114
common/count_filter.cc Normal file
View file

@ -0,0 +1,114 @@
////////////////////////////////////////////////////////////////////////////////
// //
// File: count_filter.cc //
// Description: byte counting filter for wanproxy streams //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
#include <map>
#include <vector>
#include <common/buffer.h>
#include <http/http_protocol.h>
#include "count_filter.h"
CountFilter::CountFilter (intmax_t& p, int flg) : total_count_ (p)
{
expected_ = count_ = 0; state_ = (flg & 1);
}
bool CountFilter::consume (Buffer& buf, int flg)
{
long n = buf.length ();
total_count_ += n;
if (state_ == 1 || state_ == 2)
{
header_.append (buf);
while (! explore_stream (header_)) continue;
}
else if (state_ == 3 || state_ == 4)
{
count_ += n;
if (count_ >= expected_)
{
state_ = 1, header_.clear ();
if (count_ > expected_)
{
header_ = buf, header_.skip (n - (count_ - expected_));
while (! explore_stream (header_)) continue;
}
}
}
return produce (buf, flg | (state_ == 4 ? TO_BE_CONTINUED : 0));
}
void CountFilter::flush (int flg)
{
state_ = 0;
Filter::flush (flg);
}
bool CountFilter::explore_stream (Buffer& buf)
{
if (state_ == 1 && buf.length () >= 5)
state_ = (buf.prefix ((const uint8_t*) "HTTP/", 5) ? 2 : 0);
if (state_ == 2)
{
unsigned pos, ext = buf.length ();
for (pos = 0; pos < ext - 1 && buf.find ('\n', pos, ext - pos - 1, &pos); ++pos)
{
uint8_t sfx[4] = {0, 0, 0, 0};
buf.copyout (sfx, pos + 1, (ext > pos + 1 ? 2 : 1));
if (sfx[0] == '\n' || (sfx[0] == '\r' && sfx[1] == '\n'))
{
HTTPProtocol::Message msg (HTTPProtocol::Message::Response);
std::map<std::string, std::vector<Buffer> >::iterator it;
unsigned lng = 0;
if (msg.decode (&buf) && msg.headers_.find ("Transfer-Encoding") == msg.headers_.end () &&
(it = msg.headers_.find ("Content-Length")) != msg.headers_.end () && it->second.size () > 0)
{
Buffer val = it->second[0];
while (val.length () > 0)
{
uint8_t c = val.peek ();
val.skip (1);
if (c >= '0' && c <= '9')
lng = (lng * 10) + (c - '0');
else
{
lng = 0;
break;
}
}
}
if (lng > 0)
{
if (lng > buf.length ())
{
expected_ = lng;
count_ = buf.length ();
state_ = (lng < 1000 ? 3 : 4); // don't wait for cookie resources
}
else
{
buf.skip (lng);
state_ = 1;
return false;
}
}
else
{
state_ = 0;
}
break;
}
}
}
return true;
}

38
common/count_filter.h Normal file
View file

@ -0,0 +1,38 @@
////////////////////////////////////////////////////////////////////////////////
// //
// File: count_filter.h //
// Description: byte counting filter for wanproxy streams //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
#ifndef COUNT_FILTER_H
#define COUNT_FILTER_H
#include <common/types.h>
#include <common/filter.h>
#define TO_BE_CONTINUED 1
class CountFilter : public Filter
{
private:
Buffer header_;
intmax_t& total_count_;
intmax_t expected_;
intmax_t count_;
int state_;
public:
CountFilter (intmax_t& p, int flg = 0);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
private:
bool explore_stream (Buffer& buf);
};
#endif // COUNT_FILTER_H

View file

@ -4,7 +4,7 @@
// Description: base classes for chained data processors //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -21,23 +21,13 @@ private:
Filter* recipient_;
public:
Filter () { recipient_ = 0; }
virtual ~Filter () { }
Filter () { recipient_ = 0; }
virtual ~Filter () { }
void chain (Filter* nxt) { recipient_ = nxt; }
virtual bool consume (Buffer& buf) { return produce (buf); }
virtual bool produce (Buffer& buf) { return (recipient_ && recipient_->consume (buf)); }
virtual void flush (int flg) { if (recipient_) recipient_->flush (flg); }
};
class CountFilter : public Filter
{
private:
intmax_t& counter_;
public:
CountFilter (intmax_t& p) : counter_(p) { }
virtual bool consume (Buffer& buf) { counter_ += buf.length (); return produce (buf); }
void chain (Filter* nxt) { recipient_ = nxt; }
virtual bool consume (Buffer& buf, int flg = 0) { return produce (buf, flg); }
virtual bool produce (Buffer& buf, int flg = 0) { return (recipient_ && recipient_->consume (buf, flg)); }
virtual void flush (int flg) { if (recipient_) recipient_->flush (flg); }
};
class BufferedFilter : public Filter

View file

@ -2,5 +2,6 @@ VPATH+= ${TOPDIR}/common
SRCS+= buffer.cc
SRCS+= log.cc
SRCS+= count_filter.cc
CXXFLAGS+=-include common/common.h

View file

@ -4,7 +4,7 @@
// Description: global event handling core class definition //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////

View file

@ -4,7 +4,7 @@
// Description: servicing of network IO requests for the event system //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////

View file

@ -4,7 +4,7 @@
// Description: servicing of network IO requests for the event system //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////

View file

@ -4,7 +4,7 @@
// Description: a filter to write into a target device //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -22,7 +22,7 @@ SinkFilter::~SinkFilter ()
if (write_action_) write_action_->cancel ();
}
bool SinkFilter::consume (Buffer& buf)
bool SinkFilter::consume (Buffer& buf, int flg)
{
if (! sink_ || closing_)
return false;

View file

@ -4,7 +4,7 @@
// Description: a filter to write into a target device //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -24,7 +24,7 @@ public:
SinkFilter (const LogHandle& log, Socket* sck, bool cln = 0);
virtual ~SinkFilter ();
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
void write_complete (Event e);
virtual void flush (int flg);
};

View file

@ -29,6 +29,7 @@
#include <ssh/ssh_filter.h>
#include <xcodec/xcodec_filter.h>
#include <zlib/zlib_filter.h>
#include <common/count_filter.h>
#include "proxy_connector.h"
////////////////////////////////////////////////////////////////////////////////
@ -37,19 +38,19 @@
// Description: carries data between endpoints through a filter chain //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
ProxyConnector::ProxyConnector (const std::string& name,
WANProxyCodec* interface_codec,
WANProxyCodec* local_codec,
WANProxyCodec* remote_codec,
Socket* local_socket,
SocketAddressFamily family,
const std::string& remote_name,
bool cln, bool ssh)
: log_("/wanproxy/" + name + "/connector"),
interface_codec_(interface_codec),
local_codec_(local_codec),
remote_codec_(remote_codec),
local_socket_(local_socket),
remote_socket_(0),
@ -61,7 +62,8 @@ ProxyConnector::ProxyConnector (const std::string& name,
stop_action_(0),
request_action_(0),
response_action_(0),
chain_ready_(0)
close_action_(0),
flushing_(0)
{
if (local_socket_ && (remote_socket_ = Socket::create (family, SocketTypeStream, "tcp", remote_name)))
{
@ -70,7 +72,7 @@ ProxyConnector::ProxyConnector (const std::string& name,
}
else
{
conclude ();
close_action_ = event_system.track (0, StreamModeWait, callback (this, &ProxyConnector::conclude));
}
}
@ -84,6 +86,8 @@ ProxyConnector::~ProxyConnector ()
request_action_->cancel ();
if (response_action_)
response_action_->cancel ();
if (close_action_)
close_action_->cancel ();
if (local_socket_)
local_socket_->close ();
if (remote_socket_)
@ -103,15 +107,15 @@ void ProxyConnector::connect_complete (Event e)
break;
case Event::Error:
INFO(log_) << "Connect failed: " << e;
conclude ();
conclude (e);
return;
default:
ERROR(log_) << "Unexpected event: " << e;
conclude ();
conclude (e);
return;
}
if (build_chains (interface_codec_, remote_codec_, local_socket_, remote_socket_))
if (build_chains (local_codec_, remote_codec_, local_socket_, remote_socket_))
{
request_action_ = local_socket_->read (callback (this, &ProxyConnector::on_request_data));
response_action_ = remote_socket_->read (callback (this, &ProxyConnector::on_response_data));
@ -158,7 +162,7 @@ bool ProxyConnector::build_chains (WANProxyCodec* cdc1, WANProxyCodec* cdc2, Soc
if (cdc1->counting_)
{
request_chain_.append (new CountFilter (cdc1->request_output_bytes_));
response_chain_.prepend (new CountFilter (cdc1->response_input_bytes_));
response_chain_.prepend (new CountFilter (cdc1->response_input_bytes_, 1));
}
}
@ -208,6 +212,8 @@ void ProxyConnector::on_request_data (Event e)
{
if (request_action_)
request_action_->cancel (), request_action_ = 0;
if (flushing_ & REQUEST_CHAIN_FLUSHING)
return;
switch (e.type_)
{
@ -217,11 +223,12 @@ void ProxyConnector::on_request_data (Event e)
break;
case Event::EOS:
DEBUG(log_) << "Flushing request";
flushing_ |= REQUEST_CHAIN_FLUSHING;
request_chain_.flush (REQUEST_CHAIN_READY);
break;
default:
DEBUG(log_) << "Unexpected event: " << e;
conclude ();
conclude (e);
return;
}
}
@ -230,6 +237,8 @@ void ProxyConnector::on_response_data (Event e)
{
if (response_action_)
response_action_->cancel (), response_action_ = 0;
if (flushing_ & RESPONSE_CHAIN_FLUSHING)
return;
switch (e.type_)
{
@ -239,23 +248,25 @@ void ProxyConnector::on_response_data (Event e)
break;
case Event::EOS:
DEBUG(log_) << "Flushing response";
flushing_ |= RESPONSE_CHAIN_FLUSHING;
response_chain_.flush (RESPONSE_CHAIN_READY);
break;
default:
DEBUG(log_) << "Unexpected event: " << e;
conclude ();
conclude (e);
return;
}
}
void ProxyConnector::flush (int flg)
{
chain_ready_ |= flg;
if ((chain_ready_ & REQUEST_CHAIN_READY) && (chain_ready_ & RESPONSE_CHAIN_READY))
conclude ();
flushing_ |= flg;
if ((flushing_ & (REQUEST_CHAIN_READY | RESPONSE_CHAIN_READY)) == (REQUEST_CHAIN_READY | RESPONSE_CHAIN_READY))
if (! close_action_)
close_action_ = event_system.track (0, StreamModeWait, callback (this, &ProxyConnector::conclude));
}
void ProxyConnector::conclude ()
void ProxyConnector::conclude (Event e)
{
delete this;
}

View file

@ -26,8 +26,10 @@
#ifndef PROGRAMS_WANPROXY_PROXY_CONNECTOR_H
#define PROGRAMS_WANPROXY_PROXY_CONNECTOR_H
#define REQUEST_CHAIN_READY 0x10000
#define RESPONSE_CHAIN_READY 0x20000
#define REQUEST_CHAIN_FLUSHING 0x10000
#define RESPONSE_CHAIN_FLUSHING 0x20000
#define REQUEST_CHAIN_READY 0x40000
#define RESPONSE_CHAIN_READY 0x80000
#include <common/filter.h>
#include <event/action.h>
@ -41,14 +43,14 @@
// Description: carries data between endpoints through a filter chain //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
class ProxyConnector : public Filter
{
LogHandle log_;
WANProxyCodec* interface_codec_;
WANProxyCodec* local_codec_;
WANProxyCodec* remote_codec_;
Socket* local_socket_;
Socket* remote_socket_;
@ -59,7 +61,8 @@ class ProxyConnector : public Filter
Action* stop_action_;
Action* request_action_;
Action* response_action_;
int chain_ready_;
Action* close_action_;
int flushing_;
public:
ProxyConnector (const std::string&, WANProxyCodec*, WANProxyCodec*,
@ -71,7 +74,7 @@ public:
void on_request_data (Event e);
void on_response_data (Event e);
virtual void flush (int flg);
void conclude ();
void conclude (Event e);
};
#endif /* !PROGRAMS_WANPROXY_PROXY_CONNECTOR_H */

View file

@ -33,7 +33,7 @@
// Description: listens on a port spawning a connector for each client //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -45,7 +45,7 @@ ProxyListener::ProxyListener (const std::string& name,
SocketAddressFamily remote_family,
const std::string& remote_address,
bool cln, bool ssh)
: log_("/wanproxy/proxy/" + name + "/listener"),
: log_("/wanproxy/" + name + "/listener"),
name_(name),
local_codec_(local_codec),
remote_codec_(remote_codec),

View file

@ -34,11 +34,11 @@
// Description: session start and global application management //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
#define PROGRAM_VERSION "3.0"
#define PROGRAM_VERSION "3.02"
WanProxyCore wanproxy;

View file

@ -4,7 +4,7 @@
// Description: global data for the wanproxy application //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -109,10 +109,31 @@ public:
if (reload_action_)
reload_action_->cancel (), reload_action_ = 0;
delete listener_; listener_ = 0;
print_stream_counts ();
std::map<UUID, XCodecCache*>::iterator it;
for (it = caches_.begin(); it != caches_.end(); it++)
delete it->second;
caches_.clear ();
}
void print_stream_counts ()
{
if (local_codec_.counting_)
{
INFO("wanproxy/core") << "Local codec request input bytes: " << local_codec_.request_input_bytes_;
INFO("wanproxy/core") << "Local codec request output bytes: " << local_codec_.request_output_bytes_;
INFO("wanproxy/core") << "Local codec response input bytes: " << local_codec_.response_input_bytes_;
INFO("wanproxy/core") << "Local codec response output bytes: " << local_codec_.response_output_bytes_;
}
if (remote_codec_.counting_)
{
INFO("wanproxy/core") << "Remote codec request input bytes: " << remote_codec_.request_input_bytes_;
INFO("wanproxy/core") << "Remote codec request output bytes: " << remote_codec_.request_output_bytes_;
INFO("wanproxy/core") << "Remote codec response input bytes: " << remote_codec_.response_input_bytes_;
INFO("wanproxy/core") << "Remote codec response output bytes: " << remote_codec_.response_output_bytes_;
}
}
};

View file

@ -31,6 +31,7 @@ set codec0.cache_path "/var/lib/wanproxy"
set codec0.local_size 2048
set codec0.compressor zlib
set codec0.compressor_level 6
set codec0.byte_counts 1
activate codec0
create interface if0

View file

@ -41,7 +41,7 @@
// Description: SSH encryption/decryption inside a data filter pair //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -63,13 +63,13 @@ SSH::EncryptFilter::EncryptFilter (SSH::Role role, int flg) : BufferedFilter ("/
session_.algorithm_negotiation_->add_algorithm (SSH::ServerHostKey::server (&session_, "ssh-server1.pem"));
session_.algorithm_negotiation_->add_algorithms ();
Buffer str ("SSH-2.0-WANProxy " + (std::string)log_);
Buffer str ("SSH-2.0-WANProxy " + (std::string) log_);
session_.local_version (str);
str.append ("\r\n");
Filter::produce (str);
}
bool SSH::EncryptFilter::consume (Buffer& buf)
bool SSH::EncryptFilter::consume (Buffer& buf, int flg)
{
buf.moveout (&pending_);
@ -83,7 +83,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf)
Buffer packet;
packet.append (SSHStreamPacket);
pending_.moveout (&packet);
return produce (packet);
return produce (packet, flg);
}
else
{
@ -100,7 +100,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf)
Buffer packet;
pending_.moveout (&packet, sizeof length, length);
if (! produce (packet))
if (! produce (packet, flg))
return false;
}
}
@ -109,7 +109,7 @@ bool SSH::EncryptFilter::consume (Buffer& buf)
return true;
}
bool SSH::EncryptFilter::produce (Buffer& buf)
bool SSH::EncryptFilter::produce (Buffer& buf, int flg)
{
Encryption *encryption_algorithm;
MAC *mac_algorithm;
@ -168,7 +168,7 @@ bool SSH::EncryptFilter::produce (Buffer& buf)
session_.local_sequence_number_++;
return Filter::produce (packet);
return Filter::produce (packet, flg);
}
void SSH::EncryptFilter::flush (int flg)
@ -198,7 +198,7 @@ SSH::DecryptFilter::DecryptFilter (int flg) : LogisticFilter ("/ssh/decrypt")
identified_ = false;
}
bool SSH::DecryptFilter::consume (Buffer& buf)
bool SSH::DecryptFilter::consume (Buffer& buf, int flg)
{
buf.moveout (&pending_);
@ -500,7 +500,7 @@ bool SSH::DecryptFilter::consume (Buffer& buf)
packet = b;
}
return produce (packet);
return produce (packet, flg);
}
return true;

View file

@ -4,7 +4,7 @@
// Description: SSH encryption/decryption inside a data filter pair //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -28,8 +28,8 @@ namespace SSH
public:
EncryptFilter (Role role, int flg = 0);
virtual bool consume (Buffer& buf);
virtual bool produce (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual bool produce (Buffer& buf, int flg = 0);
virtual void flush (int flg);
Session* current_session () { return &session_; }
@ -45,7 +45,7 @@ namespace SSH
public:
DecryptFilter (int flg = 0);
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
void set_encrypter (EncryptFilter* f) { session_ = (f ? f->current_session () : 0); set_upstream (f); }

View file

@ -23,7 +23,7 @@
// Description: persistent cache on disk for xcodec protocol streams //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -61,6 +61,9 @@ XCodecCacheCOSS::XCodecCacheCOSS (const UUID& uuid, const std::string& cache_dir
directory_ = new COSSMetadata[stripe_limit_];
memset (directory_, 0, sizeof (COSSMetadata) * stripe_limit_);
if (stream_.rdbuf())
stream_.rdbuf()->pubsetbuf (0, 0);
stream_.open (file_path_.c_str(), fstream::in | fstream::out | fstream::binary);
if (! read_file ())
{

View file

@ -38,7 +38,7 @@
// Description: decoding routines for the xcodex protocol //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////

View file

@ -36,7 +36,7 @@
// Description: encoding routines for the xcodex protocol //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -57,7 +57,7 @@ XCodecEncoder::~XCodecEncoder()
* escaped.
*/
void XCodecEncoder::encode (Buffer& output, Buffer& input, bool wait)
void XCodecEncoder::encode (Buffer& output, Buffer& input)
{
int off = source_.length ();
Buffer old;
@ -170,9 +170,6 @@ void XCodecEncoder::encode (Buffer& output, Buffer& input, bool wait)
}
}
}
if (! wait)
flush (output);
}
bool XCodecEncoder::flush (Buffer& output)

View file

@ -34,7 +34,7 @@
// Description: encoding routines for the xcodex protocol //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -53,7 +53,7 @@ public:
XCodecEncoder(XCodecCache*);
~XCodecEncoder();
void encode (Buffer&, Buffer&, bool);
void encode (Buffer&, Buffer&);
bool flush (Buffer&);
private:

View file

@ -25,6 +25,7 @@
#include <common/buffer.h>
#include <common/endian.h>
#include <common/count_filter.h>
#include <event/event_system.h>
#include <programs/wanproxy/wanproxy.h>
#include "xcodec_filter.h"
@ -35,7 +36,7 @@
// Description: instantiation of encoder/decoder in a data filter pair //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -119,7 +120,7 @@
// Encoding
bool EncodeFilter::consume (Buffer& buf)
bool EncodeFilter::consume (Buffer& buf, int flg)
{
Buffer enc, output;
@ -143,18 +144,24 @@ bool EncodeFilter::consume (Buffer& buf)
return false;
}
encoder_->encode (enc, buf, waiting_);
encoder_->encode (enc, buf);
if (! (flg & TO_BE_CONTINUED))
{
if (waiting_)
{
if (wait_action_)
wait_action_->cancel ();
wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout));
}
else
encoder_->flush (enc);
}
while (! enc.empty ())
encode_frame (enc, output);
if (waiting_)
{
if (wait_action_)
wait_action_->cancel ();
wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout));
}
return (! output.empty () ? produce (output) : true);
return (! output.empty () ? produce (output, flg) : true);
}
void EncodeFilter::flush (int flg)
@ -165,10 +172,12 @@ void EncodeFilter::flush (int flg)
{
flushing_ = true;
flush_flags_ |= flg;
if (wait_action_)
wait_action_->cancel (), wait_action_ = 0;
if (! sent_eos_)
{
Buffer enc, output;
if (waiting_ && encoder_ && encoder_->flush (enc))
if (encoder_ && encoder_->flush (enc))
encode_frame (enc, output);
output.append (XCODEC_PIPE_OP_EOS);
sent_eos_ = produce (output);
@ -200,7 +209,7 @@ void EncodeFilter::on_read_timeout (Event e)
wait_action_->cancel (), wait_action_ = 0;
Buffer enc, output;
if (encoder_ && encoder_->flush (enc))
if (! flushing_ && encoder_ && encoder_->flush (enc))
{
encode_frame (enc, output);
produce (output);
@ -209,7 +218,7 @@ void EncodeFilter::on_read_timeout (Event e)
// Decoding
bool DecodeFilter::consume (Buffer& buf)
bool DecodeFilter::consume (Buffer& buf, int flg)
{
if (! upstream_)
{
@ -412,7 +421,7 @@ bool DecodeFilter::consume (Buffer& buf)
if (! output.empty ())
{
ASSERT(log_, ! flushing_);
if (! produce (output))
if (! produce (output, flg))
return false;
}
else
@ -512,7 +521,7 @@ void DecodeFilter::flush (int flg)
if (! frame_buffer_.empty ())
DEBUG(log_) << "Flushing decoder with frame data outstanding.";
if (! upflushed_ && upstream_)
upstream_->flush (XCODEC_PIPE_OP_EOS_ACK);
upflushed_ = true, upstream_->flush (XCODEC_PIPE_OP_EOS_ACK);
Filter::flush (flush_flags_);
}

View file

@ -4,7 +4,7 @@
// Description: instantiation of encoder/decoder in a data filter pair //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-08-31 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -44,7 +44,7 @@ public:
delete encoder_;
}
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
private:
@ -77,7 +77,7 @@ public:
delete decoder_;
}
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
};

View file

@ -4,7 +4,7 @@
// Description: data filters for zlib inflate/deflate streams //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -32,7 +32,7 @@ DeflateFilter::~DeflateFilter ()
ERROR(log_) << "Deflate stream did not end cleanly.";
}
bool DeflateFilter::consume (Buffer& buf)
bool DeflateFilter::consume (Buffer& buf, int flg)
{
const BufferSegment* seg;
int cnt = 0, i = 0, rv;
@ -64,7 +64,7 @@ bool DeflateFilter::consume (Buffer& buf)
}
}
return produce (pending_);
return produce (pending_, flg);
}
void DeflateFilter::flush (int flg)
@ -108,7 +108,7 @@ InflateFilter::~InflateFilter()
ERROR(log_) << "Inflate stream did not end cleanly.";
}
bool InflateFilter::consume (Buffer& buf)
bool InflateFilter::consume (Buffer& buf, int flg)
{
const BufferSegment* seg;
int cnt = 0, i = 0, rv;
@ -140,7 +140,7 @@ bool InflateFilter::consume (Buffer& buf)
}
}
return produce (pending_);
return produce (pending_, flg);
}
void InflateFilter::flush (int flg)

View file

@ -4,7 +4,7 @@
// Description: data filters for zlib inflate/deflate streams //
// Project: WANProxy XTech //
// Author: Andreu Vidal Bramfeld-Software //
// Last modified: 2015-04-01 //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
@ -27,7 +27,7 @@ public:
DeflateFilter (int level = 0);
virtual ~DeflateFilter ();
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
};
@ -41,7 +41,7 @@ public:
InflateFilter ();
~InflateFilter ();
virtual bool consume (Buffer& buf);
virtual bool consume (Buffer& buf, int flg = 0);
virtual void flush (int flg);
};