wanproxy/xcodec/xcodec_filter.cc
2018-06-28 13:25:11 +02:00

526 lines
13 KiB
C++

/*
* Copyright (c) 2011-2012 Juli Mallett. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <common/buffer.h>
#include <common/endian.h>
#include <common/count_filter.h>
#include <event/event_system.h>
#include "xcodec_filter.h"
////////////////////////////////////////////////////////////////////////////////
// //
// File: xcodec_filter.cc //
// Description: instantiation of encoder/decoder in a data filter pair //
// Project: WANProxy XTech //
// Adapted by: Andreu Vidal Bramfeld-Software //
// Last modified: 2016-02-28 //
// //
////////////////////////////////////////////////////////////////////////////////
/*
* Usage:
* <OP_HELLO> length[uint8_t] data[uint8_t x length]
*
* Effects:
* Must appear at the start of and only at the start of an encoded stream.
*
* Sife-effects:
* Possibly many.
*/
#define XCODEC_PIPE_OP_HELLO ((uint8_t)0xff)
/*
* Usage:
* <OP_LEARN> data[uint8_t x XCODEC_PIPE_SEGMENT_LENGTH]
*
* Effects:
* The `data' is hashed, the hash is associated with the data if possible.
*
* Side-effects:
* None.
*/
#define XCODEC_PIPE_OP_LEARN ((uint8_t)0xfe)
/*
* Usage:
* <OP_ASK> hash[uint64_t]
*
* Effects:
* An OP_LEARN will be sent in response with the data corresponding to the
* hash.
*
* If the hash is unknown, error will be indicated.
*
* Side-effects:
* None.
*/
#define XCODEC_PIPE_OP_ASK ((uint8_t)0xfd)
/*
* Usage:
* <OP_EOS>
*
* Effects:
* Alert the other party that we have no intention of sending more data.
*
* Side-effects:
* The other party will send <OP_EOS_ACK> when it has processed all of
* the data we have sent.
*/
#define XCODEC_PIPE_OP_EOS ((uint8_t)0xfc)
/*
* Usage:
* <OP_EOS_ACK>
*
* Effects:
* Alert the other party that we have no intention of reading more data.
*
* Side-effects:
* The connection will be torn down.
*/
#define XCODEC_PIPE_OP_EOS_ACK ((uint8_t)0xfb)
/*
* Usage:
* <FRAME> length[uint16_t] data[uint8_t x length]
*
* Effects:
* Frames an encoded chunk.
*
* Side-effects:
* None.
*/
#define XCODEC_PIPE_OP_FRAME ((uint8_t)0x00)
#define XCODEC_PIPE_MAX_FRAME (32768)
// Encoding
bool EncodeFilter::consume (Buffer& buf, int flg)
{
Buffer enc, output;
ASSERT(log_, ! flushing_);
if (! encoder_)
{
if (! cache_ || ! cache_->identifier().is_valid ())
{
ERROR(log_) << "Could not encode UUID for <HELLO>.";
return false;
}
output.append (XCODEC_PIPE_OP_HELLO);
uint64_t mb = cache_->nominal_size ();
output.append ((uint8_t) (UUID_STRING_SIZE + sizeof mb));
cache_->identifier().encode (output);
output.append (&mb);
if (! (encoder_ = new XCodecEncoder (cache_)))
return false;
}
encoder_->encode (enc, buf);
if (! (flg & TO_BE_CONTINUED))
{
if (waiting_)
{
if (wait_action_)
wait_action_->cancel ();
wait_action_ = event_system.track (150, StreamModeWait, callback (this, &EncodeFilter::on_read_timeout));
}
else
encoder_->flush (enc);
}
while (! enc.empty ())
encode_frame (enc, output);
return (! output.empty () ? produce (output, flg) : true);
}
void EncodeFilter::flush (int flg)
{
if (flg == XCODEC_PIPE_OP_EOS_ACK)
eos_ack_ = true;
else
{
flushing_ = true;
flush_flags_ |= flg;
if (wait_action_)
wait_action_->cancel (), wait_action_ = 0;
if (! sent_eos_)
{
Buffer enc, output;
if (encoder_ && encoder_->flush (enc))
encode_frame (enc, output);
output.append (XCODEC_PIPE_OP_EOS);
sent_eos_ = produce (output);
}
}
if (flushing_ && eos_ack_)
Filter::flush (flush_flags_);
}
void EncodeFilter::encode_frame (Buffer& src, Buffer& trg)
{
int n = src.length ();
if (n > XCODEC_PIPE_MAX_FRAME)
n = XCODEC_PIPE_MAX_FRAME;
uint16_t len = n;
len = BigEndian::encode (len);
trg.append (XCODEC_PIPE_OP_FRAME);
trg.append (&len);
trg.append (src, n);
src.skip (n);
}
void EncodeFilter::on_read_timeout (Event e)
{
if (wait_action_)
wait_action_->cancel (), wait_action_ = 0;
Buffer enc, output;
if (! flushing_ && encoder_ && encoder_->flush (enc))
{
encode_frame (enc, output);
produce (output);
}
}
// Decoding
bool DecodeFilter::consume (Buffer& buf, int flg)
{
if (! upstream_)
{
ERROR(log_) << "Decoder not configured";
return false;
}
pending_.append (buf);
while (! pending_.empty ())
{
uint8_t op = pending_.peek ();
switch (op)
{
case XCODEC_PIPE_OP_HELLO:
if (decoder_cache_)
{
ERROR(log_) << "Got <HELLO> twice.";
return false;
}
else if (codec_)
{
uint8_t len;
if (pending_.length() < sizeof op + sizeof len)
return true;
pending_.extract (&len, sizeof op);
if (pending_.length() < sizeof op + sizeof len + len)
return true;
uint64_t mb;
if (len != UUID_STRING_SIZE + sizeof mb)
{
ERROR(log_) << "Unsupported <HELLO> length: " << (unsigned)len;
return false;
}
UUID uuid;
pending_.skip (sizeof op + sizeof len);
if (! uuid.decode (pending_))
{
ERROR(log_) << "Invalid UUID in <HELLO>.";
return false;
}
pending_.extract (&mb);
pending_.skip (sizeof mb);
if (! (decoder_cache_ = wanproxy.find_cache (uuid)))
decoder_cache_ = wanproxy.add_cache (codec_->cache_type_, codec_->cache_path_, mb, uuid);
ASSERT(log_, decoder_ == NULL);
if (decoder_cache_)
decoder_ = new XCodecDecoder (decoder_cache_);
DEBUG(log_) << "Peer connected with UUID: " << uuid;
}
break;
case XCODEC_PIPE_OP_ASK:
if (! encoder_cache_)
{
ERROR(log_) << "Decoder not configured";
return false;
}
else
{
uint64_t hash;
if (pending_.length() < sizeof op + sizeof hash)
return true;
pending_.skip (sizeof op);
pending_.moveout (&hash);
hash = BigEndian::decode (hash);
Buffer learn;
learn.append (XCODEC_PIPE_OP_LEARN);
if (encoder_cache_->lookup (hash, learn))
{
DEBUG(log_) << "Responding to <ASK> with <LEARN>.";
if (! upstream_->produce (learn))
return false;
}
else
{
ERROR(log_) << "Unknown hash in <ASK>: " << hash;
return false;
}
}
break;
case XCODEC_PIPE_OP_LEARN:
if (! decoder_cache_)
{
ERROR(log_) << "Got <LEARN> before <HELLO>.";
return false;
}
else
{
if (pending_.length() < sizeof op + XCODEC_SEGMENT_LENGTH)
return true;
pending_.skip (sizeof op);
uint8_t data[XCODEC_SEGMENT_LENGTH];
pending_.copyout (data, XCODEC_SEGMENT_LENGTH);
uint64_t hash = XCodecHash::hash (data);
if (unknown_hashes_.find (hash) == unknown_hashes_.end ())
INFO(log_) << "Gratuitous <LEARN> without <ASK>.";
else
unknown_hashes_.erase (hash);
Buffer old;
if (decoder_cache_->lookup (hash, old))
{
if (old.equal (data, sizeof data))
{
DEBUG(log_) << "Redundant <LEARN>.";
}
else
{
ERROR(log_) << "Collision in <LEARN>.";
return false;
}
old.clear ();
}
else
{
DEBUG(log_) << "Successful <LEARN>.";
decoder_cache_->enter (hash, pending_, 0);
}
pending_.skip (XCODEC_SEGMENT_LENGTH);
}
break;
case XCODEC_PIPE_OP_EOS:
if (received_eos_)
{
ERROR(log_) << "Duplicate <EOS>.";
return false;
}
pending_.skip (sizeof op);
received_eos_ = true;
break;
case XCODEC_PIPE_OP_EOS_ACK:
if (received_eos_ack_)
{
ERROR(log_) << "Duplicate <EOS_ACK>.";
return false;
}
pending_.skip (sizeof op);
received_eos_ack_ = true;
break;
case XCODEC_PIPE_OP_FRAME:
if (! decoder_)
{
ERROR(log_) << "Got frame data before decoder initialized.";
return false;
}
else
{
uint16_t len;
if (pending_.length() < sizeof op + sizeof len)
return true;
pending_.extract (&len, sizeof op);
len = BigEndian::decode (len);
if (len == 0 || len > XCODEC_PIPE_MAX_FRAME)
{
ERROR(log_) << "Invalid framed data length.";
return false;
}
if (pending_.length() < sizeof op + sizeof len + len)
return true;
pending_.moveout (&frame_buffer_, sizeof op + sizeof len, len);
}
break;
default:
ERROR(log_) << "Unsupported operation in pipe stream.";
return false;
}
if (frame_buffer_.empty ())
continue;
if (! unknown_hashes_.empty ())
{
DEBUG(log_) << "Waiting for unknown hashes to continue processing data.";
continue;
}
Buffer output;
if (! decoder_->decode (output, frame_buffer_, unknown_hashes_))
{
ERROR(log_) << "Decoder exiting with error.";
return false;
}
if (! output.empty ())
{
ASSERT(log_, ! flushing_);
if (! produce (output, flg))
return false;
}
else
{
/*
* We should only get no output from the decoder if
* we're waiting on the next frame or we need an
* unknown hash. It would be nice to make the
* encoder framing aware so that it would not end
* up with encoded data that straddles a frame
* boundary. (Fixing that would also allow us to
* simplify length checking within the decoder
* considerably.)
*/
ASSERT(log_, !frame_buffer_.empty() || !unknown_hashes_.empty());
}
Buffer ask;
std::set<uint64_t>::const_iterator it;
for (it = unknown_hashes_.begin(); it != unknown_hashes_.end(); ++it)
{
uint64_t hash = *it;
hash = BigEndian::encode (hash);
ask.append (XCODEC_PIPE_OP_ASK);
ask.append (&hash);
}
if (! ask.empty ())
{
DEBUG(log_) << "Sending <ASK>s.";
if (! upstream_->produce (ask))
return false;
}
}
if (received_eos_ && ! sent_eos_ack_ && frame_buffer_.empty ())
{
DEBUG(log_) << "Decoder received <EOS>, sending <EOS_ACK>.";
Buffer eos_ack;
eos_ack.append (XCODEC_PIPE_OP_EOS_ACK);
sent_eos_ack_ = true;
if (! upstream_->produce (eos_ack))
return false;
}
/*
* If we have received EOS and not yet sent it, we can send it now.
* The only caveat is that if we have outstanding <ASK>s, i.e. we have
* not yet emptied decoder_unknown_hashes_, then we can't send EOS yet.
*/
if (received_eos_ && ! flushing_)
{
if (unknown_hashes_.empty ())
{
if (! frame_buffer_.empty ())
return false;
DEBUG(log_) << "Decoder received <EOS>, shutting down decoder output channel.";
flushing_ = true;
Filter::flush (0);
}
else
{
if (frame_buffer_.empty ())
return false;
DEBUG(log_) << "Decoder waiting to send <EOS> until <ASK>s are answered.";
}
}
/*
* NB:
* Along with the comment above, there is some relevance here. If we
* use some kind of hierarchical decoding, then we need to be able to
* handle the case where an <ASK>'s response necessitates us to send
* another <ASK> or something of that sort. There are other conditions
* where we may still need to send something out of the encoder, but
* thankfully none seem to arise yet.
*/
if (sent_eos_ack_ && received_eos_ack_ && ! upflushed_)
{
ASSERT(log_, pending_.empty());
ASSERT(log_, frame_buffer_.empty());
DEBUG(log_) << "Decoder finished, got <EOS_ACK>, shutting down encoder output channel.";
upflushed_ = true;
upstream_->flush (XCODEC_PIPE_OP_EOS_ACK);
}
return true;
}
void DecodeFilter::flush (int flg)
{
flushing_ = true;
flush_flags_ |= flg;
if (! pending_.empty ())
DEBUG(log_) << "Flushing decoder with data outstanding.";
if (! frame_buffer_.empty ())
DEBUG(log_) << "Flushing decoder with frame data outstanding.";
if (! upflushed_ && upstream_)
upflushed_ = true, upstream_->flush (XCODEC_PIPE_OP_EOS_ACK);
Filter::flush (flush_flags_);
}