数据流独立,网关程序测试通过

This commit is contained in:
nnhy 2015-06-16 14:51:37 +00:00
parent 5c99ffda78
commit 3acdae119c
31 changed files with 305 additions and 266 deletions

View File

@ -11,7 +11,7 @@ Modbus::Modbus()
Crc2 = 0;
}
bool Modbus::Read(MemoryStream& ms)
bool Modbus::Read(Stream& ms)
{
if(ms.Remain() < 4) return false;
@ -37,7 +37,7 @@ bool Modbus::Read(MemoryStream& ms)
Crc2 = Sys.Crc16(buf, ms.Position() - p - 2);
}
void Modbus::Write(MemoryStream& ms)
void Modbus::Write(Stream& ms)
{
uint p = ms.Position();

View File

@ -51,8 +51,8 @@ public:
Modbus();
bool Read(MemoryStream& ms);
void Write(MemoryStream& ms);
bool Read(Stream& ms);
void Write(Stream& ms);
void SetError(ModbusErrors::Errors error);
private:

View File

@ -19,7 +19,7 @@ void Slave::OnReceive(ITransport* transport, byte* buf, uint len, void* param)
Slave* slave = (Slave*)param;
MemoryStream ms(buf, len);
Stream ms(buf, len);
Modbus entity;
if(!entity.Read(ms)) return;

219
Stream.cpp Normal file
View File

@ -0,0 +1,219 @@
#include "Stream.h"
Stream::Stream(uint len)
{
if(len <= ArrayLength(_Arr))
{
len = ArrayLength(_Arr);
_Buffer = _Arr;
_needFree = false;
}
else
{
_Buffer = new byte[len];
assert_ptr(_Buffer);
_needFree = true;
}
_Capacity = len;
_Position = 0;
Length = 0;
}
// 使用缓冲区初始化数据流。注意此时指针位于0而内容长度为缓冲区长度
Stream::Stream(byte* buf, uint len)
{
assert_ptr(buf);
assert_param(len > 0);
_Buffer = buf;
_Capacity = len;
_Position = 0;
_needFree = false;
Length = len;
}
// 销毁数据流
Stream::~Stream()
{
assert_ptr(this);
if(_needFree)
{
if(_Buffer) delete[] _Buffer;
_Buffer = NULL;
}
}
// 数据流容量
uint Stream::Capacity() { return _Capacity; }
// 当前位置
uint Stream::Position() { return _Position; }
// 设置位置
void Stream::SetPosition(uint p)
{
// 允许移动到最后一个字节之后也就是Length
assert_param(p <= Length);
_Position = p;
}
// 余下的有效数据流长度。0表示已经到达终点
uint Stream::Remain() { return Length - _Position; };
// 尝试前后移动一段距离,返回成功或者失败。如果失败,不移动游标
bool Stream::Seek(int offset)
{
if(offset == 0) return true;
int p = offset + _Position;
//if(p < 0 || p >= Length) return false;
// 允许移动到最后一个字节之后也就是Length
if(p < 0 || p > Length) return false;
_Position = p;
return true;
}
// 数据流指针。注意:扩容后指针会改变!
byte* Stream::GetBuffer() { return _Buffer; }
// 数据流当前位置指针。注意:扩容后指针会改变!
byte* Stream::Current() { return &_Buffer[_Position]; }
// 从当前位置读取数据
uint Stream::Read(byte* buf, uint offset, int count)
{
assert_param(buf);
if(count == 0) return 0;
uint remain = Remain();
if(count < 0)
count = remain;
else if(count > remain)
count = remain;
// 复制需要的数据
memcpy(buf + offset, Current(), count);
// 游标移动
_Position += count;
return count;
}
uint Stream::ReadEncodeInt()
{
uint value = 0;
uint temp = 0;
for(int i = 0; i < 4; i++)
{
temp = (uint)ReadBytes(1);
if(temp<127)
{
value |= ( temp << (7*i));
return value;
}
temp &= 0x7f;
value |= ( temp << (7*i));
}
return 0xffffffff;
}
// 把数据写入当前位置
void Stream::Write(byte* buf, uint offset, uint count)
{
assert_param(buf);
if(!CheckCapacity(count)) return;
memcpy(Current(), buf + offset, count);
_Position += count;
//Length += count;
// 内容长度不是累加,而是根据位置而扩大
if(_Position > Length) Length = _Position;
}
uint Stream::WriteEncodeInt(uint value)
{
byte temp;
for( int i = 0 ; i < 4 ; i++ )
{
temp = (byte)value;
if(temp < 127)
{
Write(&temp, 0, 1);
return i+1;
}
temp |= 0x80;
Write(&temp, 0, 1);
value>>=7;
}
return 0;
}
// 写入字符串,先写入压缩编码整数表示的长度
uint Stream::Write(string str)
{
int len = 0;
string p = str;
while(*p++) len++;
WriteEncodeInt(len);
Write((byte*)str, 0, len);
return len;
}
byte* Stream::ReadBytes(int count)
{
// 默认小于0时读取全部数据
if(count < 0) count = Remain();
byte* p = Current();
if(!Seek(count)) return NULL;
return p;
}
// 读取一个字节,不移动游标。如果没有可用数据,则返回-1
int Stream::Peek()
{
if(!Remain()) return -1;
return *Current();
}
bool Stream::CheckCapacity(uint count)
{
uint remain = _Capacity - _Position;
// 容量不够,需要扩容
if(count > remain)
{
if(!_needFree)
{
debug_printf("数据流剩余容量%d不足%d而外部缓冲区无法扩容", remain, count);
assert_param(false);
return false;
}
// 原始容量成倍扩容
uint total = _Position + count;
uint size = _Capacity;
while(size < total) size <<= 1;
// 申请新的空间,并复制数据
byte* bufNew = new byte[size];
if(_Position > 0) memcpy(bufNew, _Buffer, _Position);
delete[] _Buffer;
_Buffer = bufNew;
_Capacity = size;
}
return true;
}

218
Stream.h
View File

@ -6,7 +6,7 @@
// 内存数据流
// 数据流内有一个缓冲区,游标位置,数据长度。实际有效数据仅占用缓冲区中间部分,头尾都可能有剩余
class MemoryStream
class Stream
{
private:
byte* _Buffer; // 数据缓冲区。扩容后会重新分配缓冲区
@ -21,176 +21,38 @@ public:
uint Length; // 数据长度
// 分配指定大小的数据流
MemoryStream(uint len = 0)
{
//assert_param(len > 0);
if(len <= ArrayLength(_Arr))
{
len = ArrayLength(_Arr);
_Buffer = _Arr;
_needFree = false;
}
else
{
_Buffer = new byte[len];
assert_ptr(_Buffer);
_needFree = true;
}
_Capacity = len;
_Position = 0;
Length = 0;
}
Stream(uint len = 0);
// 使用缓冲区初始化数据流。注意此时指针位于0而内容长度为缓冲区长度
MemoryStream(byte* buf, uint len)
{
assert_ptr(buf);
assert_param(len > 0);
_Buffer = buf;
_Capacity = len;
_Position = 0;
_needFree = false;
Length = len;
}
Stream(byte* buf, uint len);
// 销毁数据流
~MemoryStream()
{
assert_ptr(this);
if(_needFree)
{
if(_Buffer) delete[] _Buffer;
_Buffer = NULL;
}
}
~Stream();
// 数据流容量
uint Capacity() { return _Capacity; }
uint Capacity();
// 当前位置
uint Position() { return _Position; }
uint Position();
// 设置位置
void SetPosition(uint p)
{
// 允许移动到最后一个字节之后也就是Length
assert_param(p <= Length);
_Position = p;
}
void SetPosition(uint p);
// 余下的有效数据流长度。0表示已经到达终点
uint Remain() { return Length - _Position; };
uint Remain();
// 尝试前后移动一段距离,返回成功或者失败。如果失败,不移动游标
bool Seek(int offset)
{
if(offset == 0) return true;
int p = offset + _Position;
//if(p < 0 || p >= Length) return false;
// 允许移动到最后一个字节之后也就是Length
if(p < 0 || p > Length) return false;
_Position = p;
return true;
}
bool Seek(int offset);
// 数据流指针。注意:扩容后指针会改变!
byte* GetBuffer() { return _Buffer; }
byte* GetBuffer();
// 数据流当前位置指针。注意:扩容后指针会改变!
byte* Current() { return &_Buffer[_Position]; }
byte* Current();
// 从当前位置读取数据
uint Read(byte* buf, uint offset = 0, int count = -1)
{
assert_param(buf);
if(count == 0) return 0;
uint remain = Remain();
if(count < 0)
count = remain;
else if(count > remain)
count = remain;
// 复制需要的数据
memcpy(buf + offset, Current(), count);
// 游标移动
_Position += count;
return count;
}
uint ReadEncodeInt()
{
uint value = 0;
uint temp = 0;
for(int i = 0; i < 4; i++)
{
temp = (uint)ReadBytes(1);
if(temp<127)
{
value |= ( temp << (7*i));
return value;
}
temp &= 0x7f;
value |= ( temp << (7*i));
}
return 0xffffffff;
}
uint Read(byte* buf, uint offset = 0, int count = -1);
uint ReadEncodeInt();
// 把数据写入当前位置
void Write(byte* buf, uint offset, uint count)
{
assert_param(buf);
if(!CheckCapacity(count)) return;
memcpy(Current(), buf + offset, count);
_Position += count;
//Length += count;
// 内容长度不是累加,而是根据位置而扩大
if(_Position > Length) Length = _Position;
}
uint WriteEncodeInt(uint value)
{
byte temp;
for( int i = 0 ; i < 4 ; i++ )
{
temp = (byte)value;
if(temp < 127)
{
Write(&temp, 0, 1);
return i+1;
}
temp |= 0x80;
Write(&temp, 0, 1);
value>>=7;
}
return 0;
}
void Write(byte* buf, uint offset, uint count);
uint WriteEncodeInt(uint value);
// 写入字符串,先写入压缩编码整数表示的长度
uint Write(string str)
{
int len = 0;
string p = str;
while(*p++) len++;
WriteEncodeInt(len);
Write((byte*)str, 0, len);
return len;
}
uint Write(string str);
// 取回指定结构体指针,并移动游标位置
template<typename T>
@ -237,55 +99,13 @@ public:
if(_Position > Length) Length = _Position;
}
byte* ReadBytes(int count = -1)
{
// 默认小于0时读取全部数据
if(count < 0) count = Remain();
byte* p = Current();
if(!Seek(count)) return NULL;
return p;
}
byte* ReadBytes(int count = -1);
// 读取一个字节,不移动游标。如果没有可用数据,则返回-1
int Peek()
{
if(!Remain()) return -1;
return *Current();
}
int Peek();
private:
bool CheckCapacity(uint count)
{
uint remain = _Capacity - _Position;
// 容量不够,需要扩容
if(count > remain)
{
if(!_needFree)
{
debug_printf("数据流剩余容量%d不足%d而外部缓冲区无法扩容", remain, count);
assert_param(false);
return false;
}
// 原始容量成倍扩容
uint total = _Position + count;
uint size = _Capacity;
while(size < total) size <<= 1;
// 申请新的空间,并复制数据
byte* bufNew = new byte[size];
if(_Position > 0) memcpy(bufNew, _Buffer, _Position);
delete[] _Buffer;
_Buffer = bufNew;
_Capacity = size;
}
return true;
}
bool CheckCapacity(uint count);
};
#endif

View File

@ -27,7 +27,7 @@ ArpSocket::~ArpSocket()
_Arps = NULL;
}
bool ArpSocket::Process(MemoryStream* ms)
bool ArpSocket::Process(Stream* ms)
{
// 如果ms为空可能是纯为了更新ARP表
if(!ms)
@ -109,7 +109,7 @@ bool ArpSocket::Process(MemoryStream* ms)
return true;
}
bool RequestCallback(TinyIP* tip, void* param, MemoryStream& ms)
bool RequestCallback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = (ETH_HEADER*)tip->Buffer;
ARP_HEADER* arp = (ARP_HEADER*)eth->Next();
@ -138,7 +138,7 @@ const MacAddress* ArpSocket::Request(IPAddress ip, int timeout)
byte buf[sizeof(ETH_HEADER) + sizeof(ARP_HEADER) + 4];
uint bufSize = ArrayLength(buf);
// 注意此时指针位于0而内容长度为缓冲区长度
MemoryStream ms(buf, bufSize);
Stream ms(buf, bufSize);
ETH_HEADER* eth = (ETH_HEADER*)buf;
ARP_HEADER* arp = (ARP_HEADER*)eth->Next();

View File

@ -24,7 +24,7 @@ public:
virtual ~ArpSocket();
// 处理数据包
virtual bool Process(MemoryStream* ms);
virtual bool Process(Stream* ms);
// 请求Arp并返回其Mac。timeout超时3秒如果没有超时时间表示异步请求不用等待结果
const MacAddress* Request(IPAddress ip, int timeout = 3);

View File

@ -219,7 +219,7 @@ void Dhcp::SendDiscover(void* param)
_dhcp->Discover(dhcp);
}
void Dhcp::OnProcess(UDP_HEADER* udp, MemoryStream& ms)
void Dhcp::OnProcess(UDP_HEADER* udp, Stream& ms)
{
DHCP_HEADER* dhcp = (DHCP_HEADER*)udp->Next();
if(!dhcp->Valid()) return;

View File

@ -32,7 +32,7 @@ public:
EventHandler OnStop;
protected:
virtual void OnProcess(UDP_HEADER* udp, MemoryStream& ms);
virtual void OnProcess(UDP_HEADER* udp, Stream& ms);
};
#endif

View File

@ -3,7 +3,7 @@
#define NET_DEBUG DEBUG
bool Callback(TinyIP* tip, void* param, MemoryStream& ms);
bool Callback(TinyIP* tip, void* param, Stream& ms);
TcpSocket::TcpSocket(TinyIP* tip) : Socket(tip)
{
@ -60,7 +60,7 @@ void TcpSocket::OnClose()
Enable = false;
}
bool TcpSocket::Process(MemoryStream* ms)
bool TcpSocket::Process(Stream* ms)
{
TCP_HEADER* tcp = (TCP_HEADER*)ms->Current();
if(!ms->Seek(tcp->Size())) return false;
@ -87,7 +87,7 @@ bool TcpSocket::Process(MemoryStream* ms)
return true;
}
void TcpSocket::OnProcess(TCP_HEADER* tcp, MemoryStream& ms)
void TcpSocket::OnProcess(TCP_HEADER* tcp, Stream& ms)
{
// 计算标称的数据长度
//uint len = tcp->Size() - sizeof(TCP_HEADER);
@ -408,7 +408,7 @@ bool TcpSocket::Connect(IPAddress ip, ushort port)
return false;
}
bool Callback(TinyIP* tip, void* param, MemoryStream& ms)
bool Callback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = ms.Retrieve<ETH_HEADER>();
if(eth->Type != ETH_IP) return false;
@ -434,7 +434,7 @@ bool Callback(TinyIP* tip, void* param, MemoryStream& ms)
socket->Status = TcpSocket::SynAck;
// 处理。如果对方回发第二次握手包,或者终止握手
//MemoryStream ms(tip->Buffer, tip->BufferSize);
//Stream ms(tip->Buffer, tip->BufferSize);
socket->Process(&ms);
return true;

View File

@ -11,7 +11,7 @@ public:
HttpClient(TinyIP* tip);
// 处理数据包
virtual bool Process(MemoryStream* ms);
virtual bool Process(Stream* ms);
bool Connect(IPAddress ip, ushort port); // 连接远程服务器记录远程服务器IP和端口后续发送数据和关闭连接需要
void Send(const byte* buf, uint len); // 向Socket发送数据可能是外部数据包
@ -32,7 +32,7 @@ protected:
void SetMss(TCP_HEADER* tcp);
void Send(TCP_HEADER* tcp, uint len, byte flags);
virtual void OnProcess(TCP_HEADER* tcp, MemoryStream& ms);
virtual void OnProcess(TCP_HEADER* tcp, Stream& ms);
virtual void OnAccept(TCP_HEADER* tcp, uint len);
virtual void Accepted2(TCP_HEADER* tcp, uint len);
virtual void OnDataReceive(TCP_HEADER* tcp, uint len);

View File

@ -10,7 +10,7 @@ IcmpSocket::IcmpSocket(TinyIP* tip) : Socket(tip)
Enable = true;
}
bool IcmpSocket::Process(MemoryStream* ms)
bool IcmpSocket::Process(Stream* ms)
{
ICMP_HEADER* icmp = ms->Retrieve<ICMP_HEADER>();
if(!icmp) return false;
@ -51,7 +51,7 @@ bool IcmpSocket::Process(MemoryStream* ms)
return true;
}
bool PingCallback(TinyIP* tip, void* param, MemoryStream& ms)
bool PingCallback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = (ETH_HEADER*)tip->Buffer;
IP_HEADER* _ip = (IP_HEADER*)eth->Next();
@ -97,7 +97,7 @@ bool IcmpSocket::Ping(IPAddress ip, uint payloadLength)
byte buf[sizeof(ETH_HEADER) + sizeof(IP_HEADER) + sizeof(ICMP_HEADER) + 64];
uint bufSize = ArrayLength(buf);
// 注意此时指针位于0而内容长度为缓冲区长度
MemoryStream ms(buf, bufSize);
Stream ms(buf, bufSize);
ETH_HEADER* eth = (ETH_HEADER*)buf;
IP_HEADER* _ip = (IP_HEADER*)eth->Next();

View File

@ -13,7 +13,7 @@ public:
IcmpSocket(TinyIP* tip);
// 处理数据包
virtual bool Process(MemoryStream* ms);
virtual bool Process(Stream* ms);
// 收到Ping请求时触发传递结构体和负载数据长度。返回值指示是否向对方发送数据包
typedef bool (*PingHandler)(IcmpSocket* socket, ICMP_HEADER* icmp, byte* buf, uint len);

View File

@ -4,7 +4,7 @@
//#define NET_DEBUG DEBUG
bool Callback(TinyIP* tip, void* param, MemoryStream& ms);
bool Callback(TinyIP* tip, void* param, Stream& ms);
TcpSocket::TcpSocket(TinyIP* tip) : Socket(tip)
{
@ -61,7 +61,7 @@ void TcpSocket::OnClose()
Enable = false;
}
bool TcpSocket::Process(MemoryStream* ms)
bool TcpSocket::Process(Stream* ms)
{
TCP_HEADER* tcp = (TCP_HEADER*)ms->Current();
if(!ms->Seek(tcp->Size())) return false;
@ -88,7 +88,7 @@ bool TcpSocket::Process(MemoryStream* ms)
return true;
}
void TcpSocket::OnProcess(TCP_HEADER* tcp, MemoryStream& ms)
void TcpSocket::OnProcess(TCP_HEADER* tcp, Stream& ms)
{
// 计算标称的数据长度
//uint len = tcp->Size() - sizeof(TCP_HEADER);
@ -444,7 +444,7 @@ bool TcpSocket::Connect(IPAddress ip, ushort port)
return false;
}
bool Callback(TinyIP* tip, void* param, MemoryStream& ms)
bool Callback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = ms.Retrieve<ETH_HEADER>();
if(eth->Type != ETH_IP) return false;
@ -471,7 +471,7 @@ bool Callback(TinyIP* tip, void* param, MemoryStream& ms)
if(socket->Status == TcpSocket::SynSent) socket->Status = TcpSocket::SynAck;
// 处理。如果对方回发第二次握手包,或者终止握手
//MemoryStream ms(tip->Buffer, tip->BufferSize);
//Stream ms(tip->Buffer, tip->BufferSize);
tip->FixPayloadLength(_ip, &ms);
socket->Process(&ms);

View File

@ -36,7 +36,7 @@ public:
TcpSocket(TinyIP* tip);
// 处理数据包
virtual bool Process(MemoryStream* ms);
virtual bool Process(Stream* ms);
bool Connect(IPAddress ip, ushort port); // 连接远程服务器记录远程服务器IP和端口后续发送数据和关闭连接需要
void Send(const byte* buf, uint len); // 向Socket发送数据可能是外部数据包
@ -57,7 +57,7 @@ protected:
void SetMss(TCP_HEADER* tcp);
void Send(TCP_HEADER* tcp, uint len, byte flags);
virtual void OnProcess(TCP_HEADER* tcp, MemoryStream& ms);
virtual void OnProcess(TCP_HEADER* tcp, Stream& ms);
virtual void OnAccept(TCP_HEADER* tcp, uint len);
virtual void OnAccept3(TCP_HEADER* tcp, uint len);
virtual void OnDataReceive(TCP_HEADER* tcp, uint len);

View File

@ -87,7 +87,7 @@ uint TinyIP::Fetch(byte* buf, uint len)
return len;
}
void TinyIP::Process(MemoryStream* ms)
void TinyIP::Process(Stream* ms)
{
if(!ms) return;
@ -166,7 +166,7 @@ void TinyIP::Process(MemoryStream* ms)
}
// 修正IP包负载数据的长度。物理层送来的长度可能有误一般超长
void TinyIP::FixPayloadLength(IP_HEADER* ip, MemoryStream* ms)
void TinyIP::FixPayloadLength(IP_HEADER* ip, Stream* ms)
{
// 前面的len不准确必须以这个为准
uint size = __REV16(ip->TotalLength) - (ip->Length << 2);
@ -186,7 +186,7 @@ void TinyIP::Work(void* param)
if(len)
{
// 注意此时指针位于0而内容长度为缓冲区长度
MemoryStream ms(tip->Buffer, tip->BufferSize);
Stream ms(tip->Buffer, tip->BufferSize);
ms.Length = len;
tip->Process(&ms);
}
@ -195,7 +195,7 @@ void TinyIP::Work(void* param)
bool TinyIP::LoopWait(LoopFilter filter, void* param, uint msTimeout)
{
MemoryStream ms(Buffer, BufferSize);
Stream ms(Buffer, BufferSize);
// 总等待时间
TimeWheel tw(0, msTimeout, 0);

View File

@ -23,7 +23,7 @@ public:
virtual ~Socket();
// 处理数据包
virtual bool Process(MemoryStream* ms) = 0;
virtual bool Process(Stream* ms) = 0;
};
// Socket列表
@ -34,7 +34,7 @@ public:
};
class TinyIP;
typedef bool (*LoopFilter)(TinyIP* tip, void* param, MemoryStream& ms);
typedef bool (*LoopFilter)(TinyIP* tip, void* param, Stream& ms);
// 精简以太网协议。封装以太网帧以及IP协议不包含其它协议实现仅提供底层支持。
class TinyIP
@ -56,9 +56,9 @@ public:
// 带过滤器的轮询
bool LoopWait(LoopFilter filter, void* param, uint msTimeout);
// 处理数据包
void Process(MemoryStream* ms);
void Process(Stream* ms);
// 修正IP包负载数据的长度。物理层送来的长度可能有误一般超长
void FixPayloadLength(IP_HEADER* ip, MemoryStream* ms);
void FixPayloadLength(IP_HEADER* ip, Stream* ms);
public:
IPAddress IP; // 本地IP地址

View File

@ -43,7 +43,7 @@ void UdpSocket::OnClose()
Enable = false;
}
bool UdpSocket::Process(MemoryStream* ms)
bool UdpSocket::Process(Stream* ms)
{
UDP_HEADER* udp = ms->Retrieve<UDP_HEADER>();
if(!udp) return false;
@ -74,7 +74,7 @@ bool UdpSocket::Process(MemoryStream* ms)
return true;
}
void UdpSocket::OnProcess(UDP_HEADER* udp, MemoryStream& ms)
void UdpSocket::OnProcess(UDP_HEADER* udp, Stream& ms)
{
byte* data = ms.Current();
//uint len = ms.Remain();

View File

@ -20,7 +20,7 @@ public:
UdpSocket(TinyIP* tip);
// 处理数据包
virtual bool Process(MemoryStream* ms);
virtual bool Process(Stream* ms);
// 收到Udp数据时触发传递结构体和负载数据长度。返回值指示是否向对方发送数据包
typedef bool (*UdpHandler)(UdpSocket* socket, UDP_HEADER* udp, byte* buf, uint len);
@ -33,7 +33,7 @@ public:
protected:
void Send(UDP_HEADER* udp, uint len, bool checksum = true);
virtual void OnProcess(UDP_HEADER* udp, MemoryStream& ms);
virtual void OnProcess(UDP_HEADER* udp, Stream& ms);
virtual bool OnOpen();
virtual void OnClose();

View File

@ -106,7 +106,7 @@ uint Controller::Dispatch(ITransport* transport, byte* buf, uint len, void* para
// 这里使用数据流,可能多个消息粘包在一起
// 注意此时指针位于0而内容长度为缓冲区长度
MemoryStream ms(buf, len);
Stream ms(buf, len);
while(ms.Remain() >= control->MinSize)
{
// 如果不是有效数据包,则直接退出,避免产生死循环。当然,也可以逐字节移动测试,不过那样性能太差
@ -116,7 +116,7 @@ uint Controller::Dispatch(ITransport* transport, byte* buf, uint len, void* para
return 0;
}
bool Controller::Dispatch(MemoryStream& ms, ITransport* port)
bool Controller::Dispatch(Stream& ms, ITransport* port)
{
byte* buf = ms.Current();
@ -214,7 +214,7 @@ int Controller::Send(Message& msg, ITransport* port)
uint len = msg.Size();
// ms需要在外面这里声明否则离开大括号作用域以后变量被销毁导致缓冲区不可用
MemoryStream ms(len);
Stream ms(len);
// 带有负载数据,需要合并成为一段连续的内存
msg.Write(ms);
assert_param(len == ms.Length);

View File

@ -18,7 +18,7 @@ class Controller
private:
void Init();
static uint Dispatch(ITransport* transport, byte* buf, uint len, void* param);
bool Dispatch(MemoryStream& ms, ITransport* port);
bool Dispatch(Stream& ms, ITransport* port);
protected:
List<ITransport*> _ports; // 数据传输口数组

View File

@ -21,9 +21,9 @@ public:
virtual uint Size() const = 0;
// 从数据流中读取消息
virtual bool Read(MemoryStream& ms) = 0;
virtual bool Read(Stream& ms) = 0;
// 把消息写入数据流中
virtual void Write(MemoryStream& ms) = 0;
virtual void Write(Stream& ms) = 0;
// 验证消息校验码是否有效
virtual bool Valid() const = 0;

View File

@ -97,7 +97,7 @@ void TinyClient::Discover()
msg.Code = 1;
// 发送的广播消息设备类型和系统ID
MemoryStream ms(msg.Data, ArrayLength(msg.Data));
Stream ms(msg.Data, ArrayLength(msg.Data));
ms.Length = 0;
ms.Write(DeviceType);
ms.Write(Sys.ID, 0, 20);
@ -126,7 +126,7 @@ bool TinyClient::Discover(Message& msg, void* param)
TinyController* ctrl = (TinyController*)client->_control;
// 解析数据
MemoryStream ms(msg.Data, msg.Length);
Stream ms(msg.Data, msg.Length);
if(ms.Remain())
ctrl->Address = ms.Read<byte>();
if(ms.Remain() >= 8)

View File

@ -43,7 +43,7 @@ TinyMessage::TinyMessage(TinyMessage& msg) : Message(msg)
}
// 分析数据,转为消息。负载数据部分将指向数据区,外部不要提前释放内存
bool TinyMessage::Read(MemoryStream& ms)
bool TinyMessage::Read(Stream& ms)
{
// 消息至少4个头部字节、2字节长度和2字节校验没有负载数据的情况下
if(ms.Remain() < MinSize) return false;
@ -92,7 +92,7 @@ bool TinyMessage::Read(MemoryStream& ms)
return true;
}
void TinyMessage::Write(MemoryStream& ms)
void TinyMessage::Write(Stream& ms)
{
// 实际数据拷贝到占位符
_Code = Code;
@ -117,7 +117,7 @@ void TinyMessage::Write(MemoryStream& ms)
void TinyMessage::ComputeCrc()
{
MemoryStream ms(Size());
Stream ms(Size());
Write(ms);
@ -571,7 +571,7 @@ void MessageNode::SetMessage(TinyMessage& msg)
LastSend = 0;
// 注意此时指针位于0而内容长度为缓冲区长度
MemoryStream ms(Data, ArrayLength(Data));
Stream ms(Data, ArrayLength(Data));
ms.Length = 0;
msg.Write(ms);
Length = ms.Length;

View File

@ -57,9 +57,9 @@ public:
virtual uint Size() const;
// 分析数据,转为消息。负载数据部分将指向数据区,外部不要提前释放内存
virtual bool Read(MemoryStream& ms);
virtual bool Read(Stream& ms);
// 写入指定数据流
virtual void Write(MemoryStream& ms);
virtual void Write(Stream& ms);
// 验证消息校验码是否有效
virtual bool Valid() const;

View File

@ -86,7 +86,7 @@ void TinyServer::Discover()
msg.Code = 1;
// 发送的广播消息设备类型和系统ID
MemoryStream ms(msg.Data, ArrayLength(msg.Data));
Stream ms(msg.Data, ArrayLength(msg.Data));
ms.Length = 0;
ms.Write(DeviceType);
ms.Write(Sys.ID, 0, 20);
@ -114,7 +114,7 @@ bool TinyServer::Discover(Message& msg, void* param)
TinyServer* client = (TinyServer*)param;
// 解析数据
MemoryStream ms(msg.Data, msg.Length);
Stream ms(msg.Data, msg.Length);
TinyController* ctrl = (TinyController*)client->_control;
if(ms.Remain())
ctrl->Address = ms.Read<byte>();

View File

@ -90,7 +90,7 @@ void TokenClient::Hello()
TokenMessage msg(1);
// 发送的广播消息设备类型和系统ID
MemoryStream ms(msg._Data, ArrayLength(msg._Data));
Stream ms(msg._Data, ArrayLength(msg._Data));
ms.Length = 0;
ms.Write(Sys.Version);
@ -135,7 +135,7 @@ bool TokenClient::Hello(Message& msg, void* param)
//TokenController* ctrl = (TokenController*)client->_control;
// 解析数据
MemoryStream ms(msg.Data, msg.Length);
Stream ms(msg.Data, msg.Length);
// 取消Discover任务
debug_printf("停止寻找服务端 ");

View File

@ -11,7 +11,7 @@ TokenMessage::TokenMessage(byte code) : Message(code)
}
// 从数据流中读取消息
bool TokenMessage::Read(MemoryStream& ms)
bool TokenMessage::Read(Stream& ms)
{
assert_ptr(this);
if(ms.Remain() < MinSize) return false;
@ -34,7 +34,7 @@ bool TokenMessage::Read(MemoryStream& ms)
}
// 把消息写入到数据流中
void TokenMessage::Write(MemoryStream& ms)
void TokenMessage::Write(Stream& ms)
{
assert_ptr(this);
// 实际数据拷贝到占位符

View File

@ -27,9 +27,9 @@ public:
TokenMessage(byte code = 0);
// 从数据流中读取消息
virtual bool Read(MemoryStream& ms);
virtual bool Read(Stream& ms);
// 把消息写入数据流中
virtual void Write(MemoryStream& ms);
virtual void Write(Stream& ms);
// 消息总长度,包括头部、负载数据和校验
virtual uint Size() const;

View File

@ -23,9 +23,9 @@ public:
virtual uint Size() const = 0;
// 从数据流中读取消息
virtual bool Read(MemoryStream& ms) = 0;
virtual bool Read(Stream& ms) = 0;
// 把消息写入数据流中
virtual void Write(MemoryStream& ms) = 0;
virtual void Write(Stream& ms) = 0;
// 验证消息校验码是否有效
virtual bool Valid() const = 0;

View File

@ -21,9 +21,9 @@ public:
virtual uint Size() const = 0;
// 从数据流中读取消息
virtual bool Read(MemoryStream& ms) = 0;
virtual bool Read(Stream& ms) = 0;
// 把消息写入数据流中
virtual void Write(MemoryStream& ms) = 0;
virtual void Write(Stream& ms) = 0;
// 验证消息校验码是否有效
virtual bool Valid() const = 0;