1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

For #913, AMF0 and RTMP support complex error.

This commit is contained in:
winlin 2017-12-31 20:52:04 +08:00
parent 204ef041da
commit 60accb6e54
13 changed files with 1231 additions and 1891 deletions

File diff suppressed because it is too large Load diff

View file

@ -253,11 +253,11 @@ public:
/** /**
* read AMF0 instance from stream. * read AMF0 instance from stream.
*/ */
virtual int read(SrsBuffer* stream) = 0; virtual srs_error_t read(SrsBuffer* stream) = 0;
/** /**
* write AMF0 instance to stream. * write AMF0 instance to stream.
*/ */
virtual int write(SrsBuffer* stream) = 0; virtual srs_error_t write(SrsBuffer* stream) = 0;
/** /**
* copy current AMF0 instance. * copy current AMF0 instance.
*/ */
@ -324,7 +324,7 @@ public:
* @remark, instance is created without read from stream, user must * @remark, instance is created without read from stream, user must
* use (*ppvalue)->read(stream) to get the instance. * use (*ppvalue)->read(stream) to get the instance.
*/ */
static int discovery(SrsBuffer* stream, SrsAmf0Any** ppvalue); static srs_error_t discovery(SrsBuffer* stream, SrsAmf0Any** ppvalue);
}; };
/** /**
@ -349,8 +349,8 @@ public:
// serialize/deserialize to/from stream. // serialize/deserialize to/from stream.
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
/** /**
* convert amf0 to json. * convert amf0 to json.
@ -440,8 +440,8 @@ public:
// serialize/deserialize to/from stream. // serialize/deserialize to/from stream.
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
/** /**
* convert amf0 to json. * convert amf0 to json.
@ -525,8 +525,8 @@ public:
// serialize/deserialize to/from stream. // serialize/deserialize to/from stream.
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
/** /**
* convert amf0 to json. * convert amf0 to json.
@ -582,15 +582,15 @@ public:
* @param ppvalue, the output amf0 any elem. * @param ppvalue, the output amf0 any elem.
* NULL if error; otherwise, never NULL and user must free it. * NULL if error; otherwise, never NULL and user must free it.
*/ */
extern int srs_amf0_read_any(SrsBuffer* stream, SrsAmf0Any** ppvalue); extern srs_error_t srs_amf0_read_any(SrsBuffer* stream, SrsAmf0Any** ppvalue);
/** /**
* read amf0 string from stream. * read amf0 string from stream.
* 2.4 String Type * 2.4 String Type
* string-type = string-marker UTF-8 * string-type = string-marker UTF-8
*/ */
extern int srs_amf0_read_string(SrsBuffer* stream, std::string& value); extern srs_error_t srs_amf0_read_string(SrsBuffer* stream, std::string& value);
extern int srs_amf0_write_string(SrsBuffer* stream, std::string value); extern srs_error_t srs_amf0_write_string(SrsBuffer* stream, std::string value);
/** /**
* read amf0 boolean from stream. * read amf0 boolean from stream.
@ -598,32 +598,32 @@ extern int srs_amf0_write_string(SrsBuffer* stream, std::string value);
* boolean-type = boolean-marker U8 * boolean-type = boolean-marker U8
* 0 is false, <> 0 is true * 0 is false, <> 0 is true
*/ */
extern int srs_amf0_read_boolean(SrsBuffer* stream, bool& value); extern srs_error_t srs_amf0_read_boolean(SrsBuffer* stream, bool& value);
extern int srs_amf0_write_boolean(SrsBuffer* stream, bool value); extern srs_error_t srs_amf0_write_boolean(SrsBuffer* stream, bool value);
/** /**
* read amf0 number from stream. * read amf0 number from stream.
* 2.2 Number Type * 2.2 Number Type
* number-type = number-marker DOUBLE * number-type = number-marker DOUBLE
*/ */
extern int srs_amf0_read_number(SrsBuffer* stream, double& value); extern srs_error_t srs_amf0_read_number(SrsBuffer* stream, double& value);
extern int srs_amf0_write_number(SrsBuffer* stream, double value); extern srs_error_t srs_amf0_write_number(SrsBuffer* stream, double value);
/** /**
* read amf0 null from stream. * read amf0 null from stream.
* 2.7 null Type * 2.7 null Type
* null-type = null-marker * null-type = null-marker
*/ */
extern int srs_amf0_read_null(SrsBuffer* stream); extern srs_error_t srs_amf0_read_null(SrsBuffer* stream);
extern int srs_amf0_write_null(SrsBuffer* stream); extern srs_error_t srs_amf0_write_null(SrsBuffer* stream);
/** /**
* read amf0 undefined from stream. * read amf0 undefined from stream.
* 2.8 undefined Type * 2.8 undefined Type
* undefined-type = undefined-marker * undefined-type = undefined-marker
*/ */
extern int srs_amf0_read_undefined(SrsBuffer* stream); extern srs_error_t srs_amf0_read_undefined(SrsBuffer* stream);
extern int srs_amf0_write_undefined(SrsBuffer* stream); extern srs_error_t srs_amf0_write_undefined(SrsBuffer* stream);
// internal objects, user should never use it. // internal objects, user should never use it.
namespace _srs_internal namespace _srs_internal
@ -650,8 +650,8 @@ namespace _srs_internal
virtual ~SrsAmf0String(); virtual ~SrsAmf0String();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -677,8 +677,8 @@ namespace _srs_internal
virtual ~SrsAmf0Boolean(); virtual ~SrsAmf0Boolean();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -703,8 +703,8 @@ namespace _srs_internal
virtual ~SrsAmf0Number(); virtual ~SrsAmf0Number();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -731,8 +731,8 @@ namespace _srs_internal
// serialize/deserialize to/from stream. // serialize/deserialize to/from stream.
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
public: public:
/** /**
@ -763,8 +763,8 @@ namespace _srs_internal
virtual ~SrsAmf0Null(); virtual ~SrsAmf0Null();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -786,8 +786,8 @@ namespace _srs_internal
virtual ~SrsAmf0Undefined(); virtual ~SrsAmf0Undefined();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -837,8 +837,8 @@ namespace _srs_internal
virtual ~SrsAmf0ObjectEOF(); virtual ~SrsAmf0ObjectEOF();
public: public:
virtual int total_size(); virtual int total_size();
virtual int read(SrsBuffer* stream); virtual srs_error_t read(SrsBuffer* stream);
virtual int write(SrsBuffer* stream); virtual srs_error_t write(SrsBuffer* stream);
virtual SrsAmf0Any* copy(); virtual SrsAmf0Any* copy();
}; };
@ -850,13 +850,13 @@ namespace _srs_internal
* UTF8-1 = %x00-7F * UTF8-1 = %x00-7F
* @remark only support UTF8-1 char. * @remark only support UTF8-1 char.
*/ */
extern int srs_amf0_read_utf8(SrsBuffer* stream, std::string& value); extern srs_error_t srs_amf0_read_utf8(SrsBuffer* stream, std::string& value);
extern int srs_amf0_write_utf8(SrsBuffer* stream, std::string value); extern srs_error_t srs_amf0_write_utf8(SrsBuffer* stream, std::string value);
extern bool srs_amf0_is_object_eof(SrsBuffer* stream); extern bool srs_amf0_is_object_eof(SrsBuffer* stream);
extern int srs_amf0_write_object_eof(SrsBuffer* stream, SrsAmf0ObjectEOF* value); extern srs_error_t srs_amf0_write_object_eof(SrsBuffer* stream, SrsAmf0ObjectEOF* value);
extern int srs_amf0_write_any(SrsBuffer* stream, SrsAmf0Any* value); extern srs_error_t srs_amf0_write_any(SrsBuffer* stream, SrsAmf0Any* value);
}; };
#endif #endif

View file

@ -103,7 +103,7 @@ public:
* read specified size bytes of data * read specified size bytes of data
* @param nread, the actually read size, NULL to ignore. * @param nread, the actually read size, NULL to ignore.
*/ */
virtual int read_fully(void* buf, size_t size, ssize_t* nread) = 0; virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread) = 0;
}; };
/** /**

View file

@ -131,13 +131,13 @@ void SrsFastStream::skip(int size)
p += size; p += size;
} }
int SrsFastStream::grow(ISrsReader* reader, int required_size) srs_error_t SrsFastStream::grow(ISrsReader* reader, int required_size)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
// already got required size of bytes. // already got required size of bytes.
if (end - p >= required_size) { if (end - p >= required_size) {
return ret; return err;
} }
// must be positive. // must be positive.
@ -153,13 +153,10 @@ int SrsFastStream::grow(ISrsReader* reader, int required_size)
// resize the space when no left space. // resize the space when no left space.
if (nb_free_space < required_size - nb_exists_bytes) { if (nb_free_space < required_size - nb_exists_bytes) {
srs_verbose("move fast buffer %d bytes", nb_exists_bytes);
// reset or move to get more space. // reset or move to get more space.
if (!nb_exists_bytes) { if (!nb_exists_bytes) {
// reset when buffer is empty. // reset when buffer is empty.
p = end = buffer; p = end = buffer;
srs_verbose("all consumed, reset fast buffer");
} else if (nb_exists_bytes < nb_buffer && p > buffer) { } else if (nb_exists_bytes < nb_buffer && p > buffer) {
// move the left bytes to start of buffer. // move the left bytes to start of buffer.
// @remark Only move memory when space is enough, or failed at next check. // @remark Only move memory when space is enough, or failed at next check.
@ -172,18 +169,15 @@ int SrsFastStream::grow(ISrsReader* reader, int required_size)
// check whether enough free space in buffer. // check whether enough free space in buffer.
nb_free_space = (int)(buffer + nb_buffer - end); nb_free_space = (int)(buffer + nb_buffer - end);
if (nb_free_space < required_size - nb_exists_bytes) { if (nb_free_space < required_size - nb_exists_bytes) {
ret = ERROR_READER_BUFFER_OVERFLOW; return srs_error_new(ERROR_READER_BUFFER_OVERFLOW, "overflow, required=%d, max=%d, left=%d", required_size, nb_buffer, nb_free_space);
srs_error("buffer overflow, required=%d, max=%d, left=%d, ret=%d",
required_size, nb_buffer, nb_free_space, ret);
return ret;
} }
} }
// buffer is ok, read required size of bytes. // buffer is ok, read required size of bytes.
while (end - p < required_size) { while (end - p < required_size) {
ssize_t nread; ssize_t nread;
if ((ret = reader->read(end, nb_free_space, &nread)) != ERROR_SUCCESS) { if ((err = reader->read(end, nb_free_space, &nread)) != srs_success) {
return ret; return srs_error_wrap(err, "read bytes");
} }
#ifdef SRS_PERF_MERGED_READ #ifdef SRS_PERF_MERGED_READ
@ -204,7 +198,7 @@ int SrsFastStream::grow(ISrsReader* reader, int required_size)
nb_free_space -= nread; nb_free_space -= nread;
} }
return ret; return err;
} }
#ifdef SRS_PERF_MERGED_READ #ifdef SRS_PERF_MERGED_READ

View file

@ -134,7 +134,7 @@ public:
* @return an int error code, error if required_size negative. * @return an int error code, error if required_size negative.
* @remark, we actually maybe read more than required_size, maybe 4k for example. * @remark, we actually maybe read more than required_size, maybe 4k for example.
*/ */
virtual int grow(ISrsReader* reader, int required_size); virtual srs_error_t grow(ISrsReader* reader, int required_size);
public: public:
#ifdef SRS_PERF_MERGED_READ #ifdef SRS_PERF_MERGED_READ
/** /**

View file

@ -309,9 +309,9 @@ string srs_generate_rtmp_url(string server, int port, string vhost, string app,
return ss.str(); return ss.str();
} }
int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite) srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
// the limits of writev iovs. // the limits of writev iovs.
// for srs-librtmp, @see https://github.com/ossrs/srs/issues/213 // for srs-librtmp, @see https://github.com/ossrs/srs/issues/213
@ -324,29 +324,23 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s
// send in a time. // send in a time.
if (size < limits) { if (size < limits) {
if ((ret = skt->writev(iovs, size, pnwrite)) != ERROR_SUCCESS) { if ((err = skt->writev(iovs, size, pnwrite)) != srs_success) {
if (!srs_is_client_gracefully_close(ret)) { return srs_error_wrap(err, "writev");
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
} }
return ret; return err;
} }
// send in multiple times. // send in multiple times.
int cur_iov = 0; int cur_iov = 0;
while (cur_iov < size) { while (cur_iov < size) {
int cur_count = srs_min(limits, size - cur_iov); int cur_count = srs_min(limits, size - cur_iov);
if ((ret = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != ERROR_SUCCESS) { if ((err = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != srs_success) {
if (!srs_is_client_gracefully_close(ret)) { return srs_error_wrap(err, "writev");
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
} }
cur_iov += cur_count; cur_iov += cur_count;
} }
return ret; return err;
} }
string srs_join_vector_string(vector<string>& vs, string separator) string srs_join_vector_string(vector<string>& vs, string separator)

View file

@ -111,7 +111,7 @@ extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string&
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string vhost, std::string app, std::string stream); extern std::string srs_generate_rtmp_url(std::string server, int port, std::string vhost, std::string app, std::string stream);
// write large numbers of iovs. // write large numbers of iovs.
extern int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL); extern srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);
// join string in vector with indicated separator // join string in vector with indicated separator
extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator); extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);

File diff suppressed because it is too large Load diff

View file

@ -42,8 +42,8 @@ namespace _srs_internal
#define SRS_OpensslHashSize 512 #define SRS_OpensslHashSize 512
extern uint8_t SrsGenuineFMSKey[]; extern uint8_t SrsGenuineFMSKey[];
extern uint8_t SrsGenuineFPKey[]; extern uint8_t SrsGenuineFPKey[];
int openssl_HMACsha256(const void* key, int key_size, const void* data, int data_size, void* digest); srs_error_t openssl_HMACsha256(const void* key, int key_size, const void* data, int data_size, void* digest);
int openssl_generate_key(char* public_key, int32_t size); srs_error_t openssl_generate_key(char* public_key, int32_t size);
/** /**
* the DH wrapper. * the DH wrapper.
@ -64,7 +64,7 @@ namespace _srs_internal
* sometimes openssl generate 127bytes public key. * sometimes openssl generate 127bytes public key.
* default to false to donot ensure. * default to false to donot ensure.
*/ */
virtual int initialize(bool ensure_128bytes_public_key = false); virtual srs_error_t initialize(bool ensure_128bytes_public_key = false);
/** /**
* copy the public key. * copy the public key.
* @param pkey the bytes to copy the public key. * @param pkey the bytes to copy the public key.
@ -72,7 +72,7 @@ namespace _srs_internal
* user should never ignore this size. * user should never ignore this size.
* @remark, when ensure_128bytes_public_key, the size always 128. * @remark, when ensure_128bytes_public_key, the size always 128.
*/ */
virtual int copy_public_key(char* pkey, int32_t& pkey_size); virtual srs_error_t copy_public_key(char* pkey, int32_t& pkey_size);
/** /**
* generate and copy the shared key. * generate and copy the shared key.
* generate the shared key with peer public key. * generate the shared key with peer public key.
@ -82,9 +82,9 @@ namespace _srs_internal
* @param skey_size the max shared key size, output the actual shared key size. * @param skey_size the max shared key size, output the actual shared key size.
* user should never ignore this size. * user should never ignore this size.
*/ */
virtual int copy_shared_key(const char* ppkey, int32_t ppkey_size, char* skey, int32_t& skey_size); virtual srs_error_t copy_shared_key(const char* ppkey, int32_t ppkey_size, char* skey, int32_t& skey_size);
private: private:
virtual int do_initialize(); virtual srs_error_t do_initialize();
}; };
/** /**
* the schema type. * the schema type.
@ -137,7 +137,7 @@ namespace _srs_internal
// parse key block from c1s1. // parse key block from c1s1.
// if created, user must free it by srs_key_block_free // if created, user must free it by srs_key_block_free
// @stream contains c1s1_key_bytes the key start bytes // @stream contains c1s1_key_bytes the key start bytes
int parse(SrsBuffer* stream); srs_error_t parse(SrsBuffer* stream);
private: private:
// calc the offset of key, // calc the offset of key,
// the key->offset cannot be used as the offset of key. // the key->offset cannot be used as the offset of key.
@ -175,7 +175,7 @@ namespace _srs_internal
// parse digest block from c1s1. // parse digest block from c1s1.
// if created, user must free it by srs_digest_block_free // if created, user must free it by srs_digest_block_free
// @stream contains c1s1_digest_bytes the digest start bytes // @stream contains c1s1_digest_bytes the digest start bytes
int parse(SrsBuffer* stream); srs_error_t parse(SrsBuffer* stream);
private: private:
// calc the offset of digest, // calc the offset of digest,
// the key->offset cannot be used as the offset of digest. // the key->offset cannot be used as the offset of digest.
@ -214,12 +214,12 @@ namespace _srs_internal
* copy to bytes. * copy to bytes.
* @param size must be 1536. * @param size must be 1536.
*/ */
virtual int dump(c1s1* owner, char* _c1s1, int size); virtual srs_error_t dump(c1s1* owner, char* _c1s1, int size);
/** /**
* server: parse the c1s1, discovery the key and digest by schema. * server: parse the c1s1, discovery the key and digest by schema.
* use the c1_validate_digest() to valid the digest of c1. * use the c1_validate_digest() to valid the digest of c1.
*/ */
virtual int parse(char* _c1s1, int size) = 0; virtual srs_error_t parse(char* _c1s1, int size) = 0;
public: public:
/** /**
* client: create and sign c1 by schema. * client: create and sign c1 by schema.
@ -236,11 +236,11 @@ namespace _srs_internal
* digest-data = calc_c1_digest(c1, schema) * digest-data = calc_c1_digest(c1, schema)
* copy digest-data to c1 * copy digest-data to c1
*/ */
virtual int c1_create(c1s1* owner); virtual srs_error_t c1_create(c1s1* owner);
/** /**
* server: validate the parsed c1 schema * server: validate the parsed c1 schema
*/ */
virtual int c1_validate_digest(c1s1* owner, bool& is_valid); virtual srs_error_t c1_validate_digest(c1s1* owner, bool& is_valid);
/** /**
* server: create and sign the s1 from c1. * server: create and sign the s1 from c1.
* // decode c1 try schema0 then schema1 * // decode c1 try schema0 then schema1
@ -268,25 +268,25 @@ namespace _srs_internal
* copy s1-digest-data and s1-key-data to s1. * copy s1-digest-data and s1-key-data to s1.
* @param c1, to get the peer_pub_key of client. * @param c1, to get the peer_pub_key of client.
*/ */
virtual int s1_create(c1s1* owner, c1s1* c1); virtual srs_error_t s1_create(c1s1* owner, c1s1* c1);
/** /**
* server: validate the parsed s1 schema * server: validate the parsed s1 schema
*/ */
virtual int s1_validate_digest(c1s1* owner, bool& is_valid); virtual srs_error_t s1_validate_digest(c1s1* owner, bool& is_valid);
public: public:
/** /**
* calc the digest for c1 * calc the digest for c1
*/ */
virtual int calc_c1_digest(c1s1* owner, char*& c1_digest); virtual srs_error_t calc_c1_digest(c1s1* owner, char*& c1_digest);
/** /**
* calc the digest for s1 * calc the digest for s1
*/ */
virtual int calc_s1_digest(c1s1* owner, char*& s1_digest); virtual srs_error_t calc_s1_digest(c1s1* owner, char*& s1_digest);
/** /**
* copy whole c1s1 to bytes. * copy whole c1s1 to bytes.
* @param size must always be 1536 with digest, and 1504 without digest. * @param size must always be 1536 with digest, and 1504 without digest.
*/ */
virtual int copy_to(c1s1* owner, char* bytes, int size, bool with_digest) = 0; virtual srs_error_t copy_to(c1s1* owner, char* bytes, int size, bool with_digest) = 0;
/** /**
* copy time and version to stream. * copy time and version to stream.
*/ */
@ -313,9 +313,9 @@ namespace _srs_internal
virtual ~c1s1_strategy_schema0(); virtual ~c1s1_strategy_schema0();
public: public:
virtual srs_schema_type schema(); virtual srs_schema_type schema();
virtual int parse(char* _c1s1, int size); virtual srs_error_t parse(char* _c1s1, int size);
public: public:
virtual int copy_to(c1s1* owner, char* bytes, int size, bool with_digest); virtual srs_error_t copy_to(c1s1* owner, char* bytes, int size, bool with_digest);
}; };
/** /**
@ -330,9 +330,9 @@ namespace _srs_internal
virtual ~c1s1_strategy_schema1(); virtual ~c1s1_strategy_schema1();
public: public:
virtual srs_schema_type schema(); virtual srs_schema_type schema();
virtual int parse(char* _c1s1, int size); virtual srs_error_t parse(char* _c1s1, int size);
public: public:
virtual int copy_to(c1s1* owner, char* bytes, int size, bool with_digest); virtual srs_error_t copy_to(c1s1* owner, char* bytes, int size, bool with_digest);
}; };
/** /**
@ -378,14 +378,14 @@ namespace _srs_internal
* copy to bytes. * copy to bytes.
* @param size, must always be 1536. * @param size, must always be 1536.
*/ */
virtual int dump(char* _c1s1, int size); virtual srs_error_t dump(char* _c1s1, int size);
/** /**
* server: parse the c1s1, discovery the key and digest by schema. * server: parse the c1s1, discovery the key and digest by schema.
* @param size, must always be 1536. * @param size, must always be 1536.
* use the c1_validate_digest() to valid the digest of c1. * use the c1_validate_digest() to valid the digest of c1.
* use the s1_validate_digest() to valid the digest of s1. * use the s1_validate_digest() to valid the digest of s1.
*/ */
virtual int parse(char* _c1s1, int size, srs_schema_type _schema); virtual srs_error_t parse(char* _c1s1, int size, srs_schema_type _schema);
public: public:
/** /**
* client: create and sign c1 by schema. * client: create and sign c1 by schema.
@ -402,11 +402,11 @@ namespace _srs_internal
* digest-data = calc_c1_digest(c1, schema) * digest-data = calc_c1_digest(c1, schema)
* copy digest-data to c1 * copy digest-data to c1
*/ */
virtual int c1_create(srs_schema_type _schema); virtual srs_error_t c1_create(srs_schema_type _schema);
/** /**
* server: validate the parsed c1 schema * server: validate the parsed c1 schema
*/ */
virtual int c1_validate_digest(bool& is_valid); virtual srs_error_t c1_validate_digest(bool& is_valid);
public: public:
/** /**
* server: create and sign the s1 from c1. * server: create and sign the s1 from c1.
@ -434,11 +434,11 @@ namespace _srs_internal
* s1-digest-data = HMACsha256(c1s1-joined, FMSKey, 36) * s1-digest-data = HMACsha256(c1s1-joined, FMSKey, 36)
* copy s1-digest-data and s1-key-data to s1. * copy s1-digest-data and s1-key-data to s1.
*/ */
virtual int s1_create(c1s1* c1); virtual srs_error_t s1_create(c1s1* c1);
/** /**
* server: validate the parsed s1 schema * server: validate the parsed s1 schema
*/ */
virtual int s1_validate_digest(bool& is_valid); virtual srs_error_t s1_validate_digest(bool& is_valid);
}; };
/** /**
@ -460,12 +460,12 @@ namespace _srs_internal
* copy to bytes. * copy to bytes.
* @param size, must always be 1536. * @param size, must always be 1536.
*/ */
virtual int dump(char* _c2s2, int size); virtual srs_error_t dump(char* _c2s2, int size);
/** /**
* parse the c2s2 * parse the c2s2
* @param size, must always be 1536. * @param size, must always be 1536.
*/ */
virtual int parse(char* _c2s2, int size); virtual srs_error_t parse(char* _c2s2, int size);
public: public:
/** /**
* create c2. * create c2.
@ -475,12 +475,12 @@ namespace _srs_internal
* temp-key = HMACsha256(s1-digest, FPKey, 62) * temp-key = HMACsha256(s1-digest, FPKey, 62)
* c2-digest-data = HMACsha256(c2-random-data, temp-key, 32) * c2-digest-data = HMACsha256(c2-random-data, temp-key, 32)
*/ */
virtual int c2_create(c1s1* s1); virtual srs_error_t c2_create(c1s1* s1);
/** /**
* validate the c2 from client. * validate the c2 from client.
*/ */
virtual int c2_validate(c1s1* s1, bool& is_valid); virtual srs_error_t c2_validate(c1s1* s1, bool& is_valid);
public: public:
/** /**
* create s2. * create s2.
@ -490,12 +490,12 @@ namespace _srs_internal
* temp-key = HMACsha256(c1-digest, FMSKey, 68) * temp-key = HMACsha256(c1-digest, FMSKey, 68)
* s2-digest-data = HMACsha256(s2-random-data, temp-key, 32) * s2-digest-data = HMACsha256(s2-random-data, temp-key, 32)
*/ */
virtual int s2_create(c1s1* c1); virtual srs_error_t s2_create(c1s1* c1);
/** /**
* validate the s2 from server. * validate the s2 from server.
*/ */
virtual int s2_validate(c1s1* c1, bool& is_valid); virtual srs_error_t s2_validate(c1s1* c1, bool& is_valid);
}; };
} }
@ -515,8 +515,8 @@ public:
/** /**
* simple handshake. * simple handshake.
*/ */
virtual int handshake_with_client(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io); virtual srs_error_t handshake_with_client(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io);
virtual int handshake_with_server(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io); virtual srs_error_t handshake_with_server(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io);
}; };
/** /**
@ -537,8 +537,8 @@ public:
* try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS, * try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS,
* otherwise, disconnect * otherwise, disconnect
*/ */
virtual int handshake_with_client(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io); virtual srs_error_t handshake_with_client(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io);
virtual int handshake_with_server(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io); virtual srs_error_t handshake_with_server(SrsHandshakeBytes* hs_bytes, ISrsProtocolReaderWriter* io);
}; };
#endif #endif

File diff suppressed because it is too large Load diff

View file

@ -142,15 +142,15 @@ public:
* other packet which need to serialize/encode to bytes by override the * other packet which need to serialize/encode to bytes by override the
* get_size and encode_packet. * get_size and encode_packet.
*/ */
virtual int encode(int& size, char*& payload); virtual srs_error_t encode(int& size, char*& payload);
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
/** /**
* subpacket must override to decode packet from stream. * subpacket must override to decode packet from stream.
* @remark never invoke the super.decode, it always failed. * @remark never invoke the super.decode, it always failed.
*/ */
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
/** /**
* the cid(chunk id) specifies the chunk to send data over. * the cid(chunk id) specifies the chunk to send data over.
@ -173,7 +173,7 @@ protected:
* subpacket can override to encode the payload to stream. * subpacket can override to encode the payload to stream.
* @remark never invoke the super.encode_packet, it always failed. * @remark never invoke the super.encode_packet, it always failed.
*/ */
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -207,7 +207,7 @@ private:
* value: the request command name * value: the request command name
*/ */
std::map<double, std::string> requests; std::map<double, std::string> requests;
// peer in // peer in
private: private:
/** /**
* chunk stream to decode RTMP messages. * chunk stream to decode RTMP messages.
@ -247,7 +247,7 @@ private:
* when not auto response message, manual flush the messages in queue. * when not auto response message, manual flush the messages in queue.
*/ */
std::vector<SrsPacket*> manual_response_queue; std::vector<SrsPacket*> manual_response_queue;
// peer out // peer out
private: private:
/** /**
* cache for multiple messages send, * cache for multiple messages send,
@ -287,7 +287,7 @@ public:
* need to call this api(the protocol sdk will auto send message). * need to call this api(the protocol sdk will auto send message).
* @see the auto_response_when_recv and manual_response_queue. * @see the auto_response_when_recv and manual_response_queue.
*/ */
virtual int manual_response_flush(); virtual srs_error_t manual_response_flush();
public: public:
#ifdef SRS_PERF_MERGED_READ #ifdef SRS_PERF_MERGED_READ
/** /**
@ -333,7 +333,7 @@ public:
// because it wait for server to send acknowledge, but server default to 0 which means no need // because it wait for server to send acknowledge, but server default to 0 which means no need
// to ack encoder. We can change the default input ack size. We will always response the // to ack encoder. We can change the default input ack size. We will always response the
// ack size whatever the encoder set or not. // ack size whatever the encoder set or not.
virtual int set_in_window_ack_size(int ack_size); virtual srs_error_t set_in_window_ack_size(int ack_size);
public: public:
/** /**
* recv a RTMP message, which is bytes oriented. * recv a RTMP message, which is bytes oriented.
@ -344,14 +344,14 @@ public:
* never NULL if decode success. * never NULL if decode success.
* @remark, drop message when msg is empty or payload length is empty. * @remark, drop message when msg is empty or payload length is empty.
*/ */
virtual int recv_message(SrsCommonMessage** pmsg); virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
/** /**
* decode bytes oriented RTMP message to RTMP packet, * decode bytes oriented RTMP message to RTMP packet,
* @param ppacket, output decoded packet, * @param ppacket, output decoded packet,
* always NULL if error, never NULL if success. * always NULL if error, never NULL if success.
* @return error when unknown packet, error when decode failed. * @return error when unknown packet, error when decode failed.
*/ */
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
/** /**
* send the RTMP message and always free it. * send the RTMP message and always free it.
* user must never free or use the msg after this method, * user must never free or use the msg after this method,
@ -359,7 +359,7 @@ public:
* @param msg, the msg to send out, never be NULL. * @param msg, the msg to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message. * @param stream_id, the stream id of packet to send over, 0 for control message.
*/ */
virtual int send_and_free_message(SrsSharedPtrMessage* msg, int stream_id); virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg, int stream_id);
/** /**
* send the RTMP message and always free it. * send the RTMP message and always free it.
* user must never free or use the msg after this method, * user must never free or use the msg after this method,
@ -368,7 +368,7 @@ public:
* @param nb_msgs, the size of msgs to send out. * @param nb_msgs, the size of msgs to send out.
* @param stream_id, the stream id of packet to send over, 0 for control message. * @param stream_id, the stream id of packet to send over, 0 for control message.
*/ */
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
/** /**
* send the RTMP packet and always free it. * send the RTMP packet and always free it.
* user must never free or use the packet after this method, * user must never free or use the packet after this method,
@ -376,7 +376,7 @@ public:
* @param packet, the packet to send out, never be NULL. * @param packet, the packet to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message. * @param stream_id, the stream id of packet to send over, 0 for control message.
*/ */
virtual int send_and_free_packet(SrsPacket* packet, int stream_id); virtual srs_error_t send_and_free_packet(SrsPacket* packet, int stream_id);
public: public:
/** /**
* expect a specified message, drop others util got specified one. * expect a specified message, drop others util got specified one.
@ -396,36 +396,28 @@ public:
* if need to set timeout, use set timeout of SrsProtocol. * if need to set timeout, use set timeout of SrsProtocol.
*/ */
template<class T> template<class T>
int expect_message(SrsCommonMessage** pmsg, T** ppacket) srs_error_t expect_message(SrsCommonMessage** pmsg, T** ppacket)
{ {
*pmsg = NULL; *pmsg = NULL;
*ppacket = NULL; *ppacket = NULL;
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
while (true) { while (true) {
SrsCommonMessage* msg = NULL; SrsCommonMessage* msg = NULL;
if ((ret = recv_message(&msg)) != ERROR_SUCCESS) { if ((err = recv_message(&msg)) != srs_success) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { return srs_error_wrap(err, "recv message");
srs_error("recv message failed. ret=%d", ret);
}
return ret;
} }
srs_verbose("recv message success.");
SrsPacket* packet = NULL; SrsPacket* packet = NULL;
if ((ret = decode_message(msg, &packet)) != ERROR_SUCCESS) { if ((err = decode_message(msg, &packet)) != srs_success) {
srs_error("decode message failed. ret=%d", ret);
srs_freep(msg); srs_freep(msg);
srs_freep(packet); srs_freep(packet);
return ret; return srs_error_wrap(err, "decode message");
} }
T* pkt = dynamic_cast<T*>(packet); T* pkt = dynamic_cast<T*>(packet);
if (!pkt) { if (!pkt) {
srs_info("drop message(type=%d, size=%d, time=%" PRId64 ", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg); srs_freep(msg);
srs_freep(packet); srs_freep(packet);
continue; continue;
@ -436,70 +428,70 @@ public:
break; break;
} }
return ret; return err;
} }
private: private:
/** /**
* send out the messages, donot free it, * send out the messages, donot free it,
* the caller must free the param msgs. * the caller must free the param msgs.
*/ */
virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs); virtual srs_error_t do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
/** /**
* send iovs. send multiple times if exceed limits. * send iovs. send multiple times if exceed limits.
*/ */
virtual int do_iovs_send(iovec* iovs, int size); virtual srs_error_t do_iovs_send(iovec* iovs, int size);
/** /**
* underlayer api for send and free packet. * underlayer api for send and free packet.
*/ */
virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id); virtual srs_error_t do_send_and_free_packet(SrsPacket* packet, int stream_id);
/** /**
* use simple algorithm to send the header and bytes. * use simple algorithm to send the header and bytes.
* @remark, for do_send_and_free_packet to send. * @remark, for do_send_and_free_packet to send.
*/ */
virtual int do_simple_send(SrsMessageHeader* mh, char* payload, int size); virtual srs_error_t do_simple_send(SrsMessageHeader* mh, char* payload, int size);
/** /**
* imp for decode_message * imp for decode_message
*/ */
virtual int do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket); virtual srs_error_t do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket);
/** /**
* recv bytes oriented RTMP message from protocol stack. * recv bytes oriented RTMP message from protocol stack.
* return error if error occur and nerver set the pmsg, * return error if error occur and nerver set the pmsg,
* return success and pmsg set to NULL if no entire message got, * return success and pmsg set to NULL if no entire message got,
* return success and pmsg set to entire message if got one. * return success and pmsg set to entire message if got one.
*/ */
virtual int recv_interlaced_message(SrsCommonMessage** pmsg); virtual srs_error_t recv_interlaced_message(SrsCommonMessage** pmsg);
/** /**
* read the chunk basic header(fmt, cid) from chunk stream. * read the chunk basic header(fmt, cid) from chunk stream.
* user can discovery a SrsChunkStream by cid. * user can discovery a SrsChunkStream by cid.
*/ */
virtual int read_basic_header(char& fmt, int& cid); virtual srs_error_t read_basic_header(char& fmt, int& cid);
/** /**
* read the chunk message header(timestamp, payload_length, message_type, stream_id) * read the chunk message header(timestamp, payload_length, message_type, stream_id)
* from chunk stream and save to SrsChunkStream. * from chunk stream and save to SrsChunkStream.
*/ */
virtual int read_message_header(SrsChunkStream* chunk, char fmt); virtual srs_error_t read_message_header(SrsChunkStream* chunk, char fmt);
/** /**
* read the chunk payload, remove the used bytes in buffer, * read the chunk payload, remove the used bytes in buffer,
* if got entire message, set the pmsg. * if got entire message, set the pmsg.
*/ */
virtual int read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg); virtual srs_error_t read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg);
/** /**
* when recv message, update the context. * when recv message, update the context.
*/ */
virtual int on_recv_message(SrsCommonMessage* msg); virtual srs_error_t on_recv_message(SrsCommonMessage* msg);
/** /**
* when message sentout, update the context. * when message sentout, update the context.
*/ */
virtual int on_send_packet(SrsMessageHeader* mh, SrsPacket* packet); virtual srs_error_t on_send_packet(SrsMessageHeader* mh, SrsPacket* packet);
private: private:
/** /**
* auto response the ack message. * auto response the ack message.
*/ */
virtual int response_acknowledgement_message(); virtual srs_error_t response_acknowledgement_message();
/** /**
* auto response the ping message. * auto response the ping message.
*/ */
virtual int response_ping_message(int32_t timestamp); virtual srs_error_t response_ping_message(int32_t timestamp);
private: private:
virtual void print_debug_info(); virtual void print_debug_info();
}; };
@ -658,12 +650,12 @@ public:
SrsHandshakeBytes(); SrsHandshakeBytes();
virtual ~SrsHandshakeBytes(); virtual ~SrsHandshakeBytes();
public: public:
virtual int read_c0c1(ISrsProtocolReaderWriter* io); virtual srs_error_t read_c0c1(ISrsProtocolReaderWriter* io);
virtual int read_s0s1s2(ISrsProtocolReaderWriter* io); virtual srs_error_t read_s0s1s2(ISrsProtocolReaderWriter* io);
virtual int read_c2(ISrsProtocolReaderWriter* io); virtual srs_error_t read_c2(ISrsProtocolReaderWriter* io);
virtual int create_c0c1(); virtual srs_error_t create_c0c1();
virtual int create_s0s1s2(const char* c1 = NULL); virtual srs_error_t create_s0s1s2(const char* c1 = NULL);
virtual int create_c2(); virtual srs_error_t create_c2();
}; };
/** /**
@ -696,30 +688,30 @@ protected:
public: public:
SrsRtmpClient(ISrsProtocolReaderWriter* skt); SrsRtmpClient(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpClient(); virtual ~SrsRtmpClient();
// protocol methods proxy // protocol methods proxy
public: public:
virtual void set_recv_timeout(int64_t tm); virtual void set_recv_timeout(int64_t tm);
virtual void set_send_timeout(int64_t tm); virtual void set_send_timeout(int64_t tm);
virtual int64_t get_recv_bytes(); virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes(); virtual int64_t get_send_bytes();
virtual int recv_message(SrsCommonMessage** pmsg); virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual int send_and_free_message(SrsSharedPtrMessage* msg, int stream_id); virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg, int stream_id);
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
virtual int send_and_free_packet(SrsPacket* packet, int stream_id); virtual srs_error_t send_and_free_packet(SrsPacket* packet, int stream_id);
public: public:
/** /**
* handshake with server, try complex, then simple handshake. * handshake with server, try complex, then simple handshake.
*/ */
virtual int handshake(); virtual srs_error_t handshake();
/** /**
* only use simple handshake * only use simple handshake
*/ */
virtual int simple_handshake(); virtual srs_error_t simple_handshake();
/** /**
* only use complex handshake * only use complex handshake
*/ */
virtual int complex_handshake(); virtual srs_error_t complex_handshake();
/** /**
* Connect to RTMP tcUrl and app, get the server info. * Connect to RTMP tcUrl and app, get the server info.
* *
@ -729,25 +721,25 @@ public:
* @param dsu, Whether debug SRS upnode. For edge, set to true to send its info to upnode. * @param dsu, Whether debug SRS upnode. For edge, set to true to send its info to upnode.
* @param si, The server information, retrieve from response of connect app request. NULL to ignore. * @param si, The server information, retrieve from response of connect app request. NULL to ignore.
*/ */
virtual int connect_app(std::string app, std::string tcUrl, SrsRequest* r, bool dsu, SrsServerInfo* si); virtual srs_error_t connect_app(std::string app, std::string tcUrl, SrsRequest* r, bool dsu, SrsServerInfo* si);
/** /**
* create a stream, then play/publish data over this stream. * create a stream, then play/publish data over this stream.
*/ */
virtual int create_stream(int& stream_id); virtual srs_error_t create_stream(int& stream_id);
/** /**
* start play stream. * start play stream.
*/ */
virtual int play(std::string stream, int stream_id); virtual srs_error_t play(std::string stream, int stream_id);
/** /**
* start publish stream. use flash publish workflow: * start publish stream. use flash publish workflow:
* connect-app => create-stream => flash-publish * connect-app => create-stream => flash-publish
*/ */
virtual int publish(std::string stream, int stream_id); virtual srs_error_t publish(std::string stream, int stream_id);
/** /**
* start publish stream. use FMLE publish workflow: * start publish stream. use FMLE publish workflow:
* connect-app => FMLE publish * connect-app => FMLE publish
*/ */
virtual int fmle_publish(std::string stream, int& stream_id); virtual srs_error_t fmle_publish(std::string stream, int& stream_id);
public: public:
/** /**
* expect a specified message, drop others util got specified one. * expect a specified message, drop others util got specified one.
@ -767,7 +759,7 @@ public:
* if need to set timeout, use set timeout of SrsProtocol. * if need to set timeout, use set timeout of SrsProtocol.
*/ */
template<class T> template<class T>
int expect_message(SrsCommonMessage** pmsg, T** ppacket) srs_error_t expect_message(SrsCommonMessage** pmsg, T** ppacket)
{ {
return protocol->expect_message<T>(pmsg, ppacket); return protocol->expect_message<T>(pmsg, ppacket);
} }
@ -787,7 +779,7 @@ private:
public: public:
SrsRtmpServer(ISrsProtocolReaderWriter* skt); SrsRtmpServer(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpServer(); virtual ~SrsRtmpServer();
// protocol methods proxy // protocol methods proxy
public: public:
/** /**
* set the auto response message when recv for protocol stack. * set the auto response message when recv for protocol stack.
@ -840,14 +832,14 @@ public:
* never NULL if decode success. * never NULL if decode success.
* @remark, drop message when msg is empty or payload length is empty. * @remark, drop message when msg is empty or payload length is empty.
*/ */
virtual int recv_message(SrsCommonMessage** pmsg); virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
/** /**
* decode bytes oriented RTMP message to RTMP packet, * decode bytes oriented RTMP message to RTMP packet,
* @param ppacket, output decoded packet, * @param ppacket, output decoded packet,
* always NULL if error, never NULL if success. * always NULL if error, never NULL if success.
* @return error when unknown packet, error when decode failed. * @return error when unknown packet, error when decode failed.
*/ */
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
/** /**
* send the RTMP message and always free it. * send the RTMP message and always free it.
* user must never free or use the msg after this method, * user must never free or use the msg after this method,
@ -855,7 +847,7 @@ public:
* @param msg, the msg to send out, never be NULL. * @param msg, the msg to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message. * @param stream_id, the stream id of packet to send over, 0 for control message.
*/ */
virtual int send_and_free_message(SrsSharedPtrMessage* msg, int stream_id); virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg, int stream_id);
/** /**
* send the RTMP message and always free it. * send the RTMP message and always free it.
* user must never free or use the msg after this method, * user must never free or use the msg after this method,
@ -867,7 +859,7 @@ public:
* @remark performance issue, to support 6k+ 250kbps client, * @remark performance issue, to support 6k+ 250kbps client,
* @see https://github.com/ossrs/srs/issues/194 * @see https://github.com/ossrs/srs/issues/194
*/ */
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
/** /**
* send the RTMP packet and always free it. * send the RTMP packet and always free it.
* user must never free or use the packet after this method, * user must never free or use the packet after this method,
@ -875,37 +867,37 @@ public:
* @param packet, the packet to send out, never be NULL. * @param packet, the packet to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message. * @param stream_id, the stream id of packet to send over, 0 for control message.
*/ */
virtual int send_and_free_packet(SrsPacket* packet, int stream_id); virtual srs_error_t send_and_free_packet(SrsPacket* packet, int stream_id);
public: public:
/** /**
* handshake with client, try complex then simple. * handshake with client, try complex then simple.
*/ */
virtual int handshake(); virtual srs_error_t handshake();
/** /**
* do connect app with client, to discovery tcUrl. * do connect app with client, to discovery tcUrl.
*/ */
virtual int connect_app(SrsRequest* req); virtual srs_error_t connect_app(SrsRequest* req);
/** /**
* set output ack size to client, client will send ack-size for each ack window * set output ack size to client, client will send ack-size for each ack window
*/ */
virtual int set_window_ack_size(int ack_size); virtual srs_error_t set_window_ack_size(int ack_size);
// Set the default input ack size value. // Set the default input ack size value.
virtual int set_in_window_ack_size(int ack_size); virtual srs_error_t set_in_window_ack_size(int ack_size);
/** /**
* @type: The sender can mark this message hard (0), soft (1), or dynamic (2) * @type: The sender can mark this message hard (0), soft (1), or dynamic (2)
* using the Limit type field. * using the Limit type field.
*/ */
virtual int set_peer_bandwidth(int bandwidth, int type); virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
/** /**
* @param server_ip the ip of server. * @param server_ip the ip of server.
*/ */
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL); virtual srs_error_t response_connect_app(SrsRequest* req, const char* server_ip = NULL);
/** /**
* redirect the connection to another rtmp server. * redirect the connection to another rtmp server.
* @param the hostname or ip of target. * @param the hostname or ip of target.
* @param whether the client accept the redirect. * @param whether the client accept the redirect.
*/ */
virtual int redirect(SrsRequest* r, std::string host, int port, bool& accepted); virtual srs_error_t redirect(SrsRequest* r, std::string host, int port, bool& accepted);
/** /**
* reject the connect app request. * reject the connect app request.
*/ */
@ -913,7 +905,7 @@ public:
/** /**
* response client the onBWDone message. * response client the onBWDone message.
*/ */
virtual int on_bw_done(); virtual srs_error_t on_bw_done();
/** /**
* recv some message to identify the client. * recv some message to identify the client.
* @stream_id, client will createStream to play or publish by flash, * @stream_id, client will createStream to play or publish by flash,
@ -922,11 +914,11 @@ public:
* @stream_name, output the client publish/play stream name. @see: SrsRequest.stream * @stream_name, output the client publish/play stream name. @see: SrsRequest.stream
* @duration, output the play client duration. @see: SrsRequest.duration * @duration, output the play client duration. @see: SrsRequest.duration
*/ */
virtual int identify_client(int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration); virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration);
/** /**
* set the chunk size when client type identified. * set the chunk size when client type identified.
*/ */
virtual int set_chunk_size(int chunk_size); virtual srs_error_t set_chunk_size(int chunk_size);
/** /**
* when client type is play, response with packets: * when client type is play, response with packets:
* StreamBegin, * StreamBegin,
@ -934,7 +926,7 @@ public:
* |RtmpSampleAccess(false, false), * |RtmpSampleAccess(false, false),
* onStatus(NetStream.Data.Start). * onStatus(NetStream.Data.Start).
*/ */
virtual int start_play(int stream_id); virtual srs_error_t start_play(int stream_id);
/** /**
* when client(type is play) send pause message, * when client(type is play) send pause message,
* if is_pause, response the following packets: * if is_pause, response the following packets:
@ -944,7 +936,7 @@ public:
* onStatus(NetStream.Unpause.Notify) * onStatus(NetStream.Unpause.Notify)
* StreamBegin * StreamBegin
*/ */
virtual int on_play_client_pause(int stream_id, bool is_pause); virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
/** /**
* when client type is publish, response with packets: * when client type is publish, response with packets:
* releaseStream response * releaseStream response
@ -954,22 +946,22 @@ public:
* onFCPublish(NetStream.Publish.Start) * onFCPublish(NetStream.Publish.Start)
* onStatus(NetStream.Publish.Start) * onStatus(NetStream.Publish.Start)
*/ */
virtual int start_fmle_publish(int stream_id); virtual srs_error_t start_fmle_publish(int stream_id);
/** /**
* For encoder of Haivision, response the startup request. * For encoder of Haivision, response the startup request.
* @see https://github.com/ossrs/srs/issues/844 * @see https://github.com/ossrs/srs/issues/844
*/ */
virtual int start_haivision_publish(int stream_id); virtual srs_error_t start_haivision_publish(int stream_id);
/** /**
* process the FMLE unpublish event. * process the FMLE unpublish event.
* @unpublish_tid the unpublish request transaction id. * @unpublish_tid the unpublish request transaction id.
*/ */
virtual int fmle_unpublish(int stream_id, double unpublish_tid); virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
/** /**
* when client type is publish, response with packets: * when client type is publish, response with packets:
* onStatus(NetStream.Publish.Start) * onStatus(NetStream.Publish.Start)
*/ */
virtual int start_flash_publish(int stream_id); virtual srs_error_t start_flash_publish(int stream_id);
public: public:
/** /**
* expect a specified message, drop others util got specified one. * expect a specified message, drop others util got specified one.
@ -989,17 +981,17 @@ public:
* if need to set timeout, use set timeout of SrsProtocol. * if need to set timeout, use set timeout of SrsProtocol.
*/ */
template<class T> template<class T>
int expect_message(SrsCommonMessage** pmsg, T** ppacket) srs_error_t expect_message(SrsCommonMessage** pmsg, T** ppacket)
{ {
return protocol->expect_message<T>(pmsg, ppacket); return protocol->expect_message<T>(pmsg, ppacket);
} }
private: private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration); virtual srs_error_t identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name); virtual srs_error_t identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name);
virtual int identify_haivision_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name); virtual srs_error_t identify_haivision_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name);
virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsRtmpConnType& type, std::string& stream_name); virtual srs_error_t identify_flash_publish_client(SrsPublishPacket* req, SrsRtmpConnType& type, std::string& stream_name);
private: private:
virtual int identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, std::string& stream_name, double& duration); virtual srs_error_t identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, std::string& stream_name, double& duration);
}; };
/** /**
@ -1033,16 +1025,16 @@ public:
public: public:
SrsConnectAppPacket(); SrsConnectAppPacket();
virtual ~SrsConnectAppPacket(); virtual ~SrsConnectAppPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
* response for SrsConnectAppPacket. * response for SrsConnectAppPacket.
@ -1072,16 +1064,16 @@ public:
public: public:
SrsConnectAppResPacket(); SrsConnectAppResPacket();
virtual ~SrsConnectAppResPacket(); virtual ~SrsConnectAppResPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1115,16 +1107,16 @@ public:
public: public:
SrsCallPacket(); SrsCallPacket();
virtual ~SrsCallPacket(); virtual ~SrsCallPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
* response for SrsCallPacket. * response for SrsCallPacket.
@ -1153,13 +1145,13 @@ public:
public: public:
SrsCallResPacket(double _transaction_id); SrsCallResPacket(double _transaction_id);
virtual ~SrsCallResPacket(); virtual ~SrsCallResPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1188,16 +1180,16 @@ public:
public: public:
SrsCreateStreamPacket(); SrsCreateStreamPacket();
virtual ~SrsCreateStreamPacket(); virtual ~SrsCreateStreamPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
* response for SrsCreateStreamPacket. * response for SrsCreateStreamPacket.
@ -1225,16 +1217,16 @@ public:
public: public:
SrsCreateStreamResPacket(double _transaction_id, double _stream_id); SrsCreateStreamResPacket(double _transaction_id, double _stream_id);
virtual ~SrsCreateStreamResPacket(); virtual ~SrsCreateStreamResPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1259,9 +1251,9 @@ public:
public: public:
SrsCloseStreamPacket(); SrsCloseStreamPacket();
virtual ~SrsCloseStreamPacket(); virtual ~SrsCloseStreamPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
}; };
/** /**
@ -1290,17 +1282,17 @@ public:
public: public:
SrsFMLEStartPacket(); SrsFMLEStartPacket();
virtual ~SrsFMLEStartPacket(); virtual ~SrsFMLEStartPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
// factory method to create specified FMLE packet. // factory method to create specified FMLE packet.
public: public:
static SrsFMLEStartPacket* create_release_stream(std::string stream); static SrsFMLEStartPacket* create_release_stream(std::string stream);
static SrsFMLEStartPacket* create_FC_publish(std::string stream); static SrsFMLEStartPacket* create_FC_publish(std::string stream);
@ -1332,16 +1324,16 @@ public:
public: public:
SrsFMLEStartResPacket(double _transaction_id); SrsFMLEStartResPacket(double _transaction_id);
virtual ~SrsFMLEStartResPacket(); virtual ~SrsFMLEStartResPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1387,16 +1379,16 @@ public:
public: public:
SrsPublishPacket(); SrsPublishPacket();
virtual ~SrsPublishPacket(); virtual ~SrsPublishPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1434,9 +1426,9 @@ public:
public: public:
SrsPausePacket(); SrsPausePacket();
virtual ~SrsPausePacket(); virtual ~SrsPausePacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
}; };
/** /**
@ -1506,16 +1498,16 @@ public:
public: public:
SrsPlayPacket(); SrsPlayPacket();
virtual ~SrsPlayPacket(); virtual ~SrsPlayPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1549,13 +1541,13 @@ public:
public: public:
SrsPlayResPacket(); SrsPlayResPacket();
virtual ~SrsPlayResPacket(); virtual ~SrsPlayResPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1580,13 +1572,13 @@ public:
public: public:
SrsOnBWDonePacket(); SrsOnBWDonePacket();
virtual ~SrsOnBWDonePacket(); virtual ~SrsOnBWDonePacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1618,13 +1610,13 @@ public:
public: public:
SrsOnStatusCallPacket(); SrsOnStatusCallPacket();
virtual ~SrsOnStatusCallPacket(); virtual ~SrsOnStatusCallPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1658,17 +1650,17 @@ public:
public: public:
SrsBandwidthPacket(); SrsBandwidthPacket();
virtual ~SrsBandwidthPacket(); virtual ~SrsBandwidthPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
// help function for bandwidth packet. // help function for bandwidth packet.
public: public:
virtual bool is_start_play(); virtual bool is_start_play();
virtual bool is_starting_play(); virtual bool is_starting_play();
@ -1716,13 +1708,13 @@ public:
public: public:
SrsOnStatusDataPacket(); SrsOnStatusDataPacket();
virtual ~SrsOnStatusDataPacket(); virtual ~SrsOnStatusDataPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1751,13 +1743,13 @@ public:
public: public:
SrsSampleAccessPacket(); SrsSampleAccessPacket();
virtual ~SrsSampleAccessPacket(); virtual ~SrsSampleAccessPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1780,16 +1772,16 @@ public:
public: public:
SrsOnMetaDataPacket(); SrsOnMetaDataPacket();
virtual ~SrsOnMetaDataPacket(); virtual ~SrsOnMetaDataPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1804,16 +1796,16 @@ public:
public: public:
SrsSetWindowAckSizePacket(); SrsSetWindowAckSizePacket();
virtual ~SrsSetWindowAckSizePacket(); virtual ~SrsSetWindowAckSizePacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1828,16 +1820,16 @@ public:
public: public:
SrsAcknowledgementPacket(); SrsAcknowledgementPacket();
virtual ~SrsAcknowledgementPacket(); virtual ~SrsAcknowledgementPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
/** /**
@ -1856,16 +1848,16 @@ public:
public: public:
SrsSetChunkSizePacket(); SrsSetChunkSizePacket();
virtual ~SrsSetChunkSizePacket(); virtual ~SrsSetChunkSizePacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
// 5.6. Set Peer Bandwidth (6) // 5.6. Set Peer Bandwidth (6)
@ -1892,13 +1884,13 @@ public:
public: public:
SrsSetPeerBandwidthPacket(); SrsSetPeerBandwidthPacket();
virtual ~SrsSetPeerBandwidthPacket(); virtual ~SrsSetPeerBandwidthPacket();
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
// 3.7. User Control message // 3.7. User Control message
@ -2024,16 +2016,16 @@ public:
public: public:
SrsUserControlPacket(); SrsUserControlPacket();
virtual ~SrsUserControlPacket(); virtual ~SrsUserControlPacket();
// decode functions for concrete packet to override. // decode functions for concrete packet to override.
public: public:
virtual int decode(SrsBuffer* stream); virtual srs_error_t decode(SrsBuffer* stream);
// encode functions for concrete packet to override. // encode functions for concrete packet to override.
public: public:
virtual int get_prefer_cid(); virtual int get_prefer_cid();
virtual int get_message_type(); virtual int get_message_type();
protected: protected:
virtual int get_size(); virtual int get_size();
virtual int encode_packet(SrsBuffer* stream); virtual srs_error_t encode_packet(SrsBuffer* stream);
}; };
#endif #endif

View file

@ -286,9 +286,9 @@ int64_t SrsStSocket::get_send_bytes()
return sbytes; return sbytes;
} }
int SrsStSocket::read(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
ssize_t nb_read; ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) { if (rtm == SRS_CONSTS_NO_TMMS) {
@ -307,24 +307,24 @@ int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
if (nb_read <= 0) { if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200 // @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) { if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT; return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
} }
if (nb_read == 0) { if (nb_read == 0) {
errno = ECONNRESET; errno = ECONNRESET;
} }
return ERROR_SOCKET_READ; return srs_error_new(ERROR_SOCKET_READ, "read");
} }
rbytes += nb_read; rbytes += nb_read;
return ret; return err;
} }
int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
ssize_t nb_read; ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) { if (rtm == SRS_CONSTS_NO_TMMS) {
@ -343,24 +343,24 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
if (nb_read != (ssize_t)size) { if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200 // @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) { if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT; return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
} }
if (nb_read >= 0) { if (nb_read >= 0) {
errno = ECONNRESET; errno = ECONNRESET;
} }
return ERROR_SOCKET_READ_FULLY; return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
} }
rbytes += nb_read; rbytes += nb_read;
return ret; return err;
} }
int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
ssize_t nb_write; ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) { if (stm == SRS_CONSTS_NO_TMMS) {
@ -378,20 +378,20 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
if (nb_write <= 0) { if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200 // @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) { if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT; return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm);
} }
return ERROR_SOCKET_WRITE; return srs_error_new(ERROR_SOCKET_WRITE, "write");
} }
sbytes += nb_write; sbytes += nb_write;
return ret; return err;
} }
int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
ssize_t nb_write; ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) { if (stm == SRS_CONSTS_NO_TMMS) {
@ -409,15 +409,15 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
if (nb_write <= 0) { if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200 // @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) { if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT; return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm);
} }
return ERROR_SOCKET_WRITE; return srs_error_new(ERROR_SOCKET_WRITE, "writev");
} }
sbytes += nb_write; sbytes += nb_write;
return ret; return err;
} }
SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm) SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm)
@ -500,22 +500,22 @@ int64_t SrsTcpClient::get_send_bytes()
return io->get_send_bytes(); return io->get_send_bytes();
} }
int SrsTcpClient::read(void* buf, size_t size, ssize_t* nread) srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{ {
return io->read(buf, size, nread); return io->read(buf, size, nread);
} }
int SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread) srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{ {
return io->read_fully(buf, size, nread); return io->read_fully(buf, size, nread);
} }
int SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite) srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{ {
return io->write(buf, size, nwrite); return io->write(buf, size, nwrite);
} }
int SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite) srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{ {
return io->writev(iov, iov_size, nwrite); return io->writev(iov, iov_size, nwrite);
} }

View file

@ -139,13 +139,13 @@ public:
/** /**
* @param nread, the actual read bytes, ignore if NULL. * @param nread, the actual read bytes, ignore if NULL.
*/ */
virtual int read(void* buf, size_t size, ssize_t* nread); virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread); virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
/** /**
* @param nwrite, the actual write bytes, ignore if NULL. * @param nwrite, the actual write bytes, ignore if NULL.
*/ */
virtual int write(void* buf, size_t size, ssize_t* nwrite); virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite); virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
}; };
/** /**
@ -198,10 +198,10 @@ public:
virtual int64_t get_send_timeout(); virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes(); virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes(); virtual int64_t get_send_bytes();
virtual int read(void* buf, size_t size, ssize_t* nread); virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread); virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual int write(void* buf, size_t size, ssize_t* nwrite); virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite); virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
}; };
#endif #endif