SmartOS/TokenNet/TokenMessage.cpp

735 lines
15 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "Time.h"
#include "TokenMessage.h"
#include "Net\Net.h"
#include "Security\RC4.h"
#define MSG_DEBUG DEBUG
//#define MSG_DEBUG 0
/******************************** TokenMessage ********************************/
// 使用指定功能码初始化令牌消息
TokenMessage::TokenMessage(byte code) : Message(code)
{
Data = _Data;
/*_Code = 0;
_Reply = 0;
_Error = 0;
_Length = 0;*/
OneWay = false;
Seq = 0;
}
// 从数据流中读取消息
bool TokenMessage::Read(Stream& ms)
{
TS("TokenMessage::Read");
assert_ptr(this);
if(ms.Remain() < MinSize) return false;
byte temp = ms.ReadByte();
Code = temp & 0x3f;
Reply = temp >> 7;
if(!Reply)
OneWay = (temp >> 6) & 0x01;
else
Error = (temp >> 6) & 0x01;
Seq = ms.ReadByte();
ushort len = ms.ReadEncodeInt();
Length = len;
if(ms.Remain() < len) return false;
// 避免错误指令超长,导致溢出
if(Data == _Data && len > ArrayLength(_Data))
{
debug_printf("错误指令,长度 %d 大于消息数据缓冲区长度 %d \r\n", len, ArrayLength(_Data));
//assert_param(false);
return false;
}
if(len > 0)
{
Buffer bs(Data, len);
ms.Read(bs);
}
return true;
}
// 把消息写入到数据流中
void TokenMessage::Write(Stream& ms) const
{
TS("TokenMessage::Write");
assert_ptr(this);
byte tmp = Code | (Reply << 7);
if(!Reply && OneWay || Reply && Error) tmp |= (1 << 6);
ms.Write(tmp);
ms.Write(Seq);
ms.WriteArray(Buffer(Data, Length));
}
// 验证消息校验码是否有效
bool TokenMessage::Valid() const
{
return true;
}
// 消息总长度,包括头部、负载数据和校验
uint TokenMessage::Size() const
{
return HeaderSize + Length;
}
uint TokenMessage::MaxDataSize() const
{
return Data == _Data ? ArrayLength(_Data) : Length;
}
/*// 设置错误信息字符串
void TokenMessage::SetError(byte errorCode, const char* error, int errLength)
{
Error = errorCode != 0;
Length = 1 + errLength;
Data[0] = errorCode;
if(errLength > 0)
{
assert_ptr(error);
Buffer(Data + 1, errLength) = error;
}
}*/
// 创建当前消息对应的响应消息。设置序列号、标识位
TokenMessage TokenMessage::CreateReply() const
{
TokenMessage msg;
msg.Code = Code;
msg.Reply = true;
msg.Seq = Seq;
msg.State = State;
return msg;
}
void TokenMessage::Show() const
{
#if MSG_DEBUG
TS("TokenMessage::Show");
assert_ptr(this);
byte code = Code;
if(Reply) code |= 0x80;
if(!Reply && OneWay || Reply && Error) code |= (1 << 6);
debug_printf("%02X", code);
if(Reply)
{
if(Error)
debug_printf("$");
else
debug_printf("#");
//debug_printf(" _Code=0x%02X", Code);
}
else
{
if(OneWay)
debug_printf("~");
else
debug_printf(" ");
}
debug_printf(" Seq=%02X", Seq);
ushort len = Length;
if(len > 0)
{
assert_ptr(Data);
debug_printf(" Data[%d]=", len);
// 大于32字节时反正都要换行显示干脆一开始就换行让它对齐
if(len > 32) debug_printf("\r\n");
ByteArray(Data, len).Show();
}
debug_printf("\r\n");
#endif
}
/******************************** TokenController ********************************/
TokenController::TokenController() : Controller(), Key(0)
{
Token = 0;
MinSize = TokenMessage::MinSize;
//MaxSize = 1500;
Server = nullptr;
// 默认屏蔽心跳日志和确认日志
Buffer(NoLogCodes, sizeof(NoLogCodes)).Clear();
NoLogCodes[0] = 0x03;
//NoLogCodes[1] = 0x08;
_Response = nullptr;
Stat = nullptr;
Buffer(_Queue, ArrayLength(_Queue) * sizeof(_Queue[0])).Clear();
}
TokenController::~TokenController()
{
}
void TokenController::Open()
{
if(Opened) return;
assert(Port, "还没有传输口呢");
debug_printf("TokenNet::Inited 使用传输接口 ");
#if DEBUG
auto obj = dynamic_cast<Object*>(Port);
if(obj) obj->Show(true);
//Port->Show(true);
#endif
auto sock = dynamic_cast<ISocket*>(Port);
if(sock) Server = &sock->Remote;
if(!Stat)
{
Stat = new TokenStat();
//Stat->Start();
//debug_printf("TokenStat::令牌统计 ");
#if DEBUG
_taskID = Sys.AddTask([](void* param){ ((TokenController*)param)->ShowStat(); }, this, 5000, 30000, "令牌统计");
#endif
}
Controller::Open();
}
void TokenController::Close()
{
delete Stat;
Stat = nullptr;
Sys.RemoveTask(_taskID);
}
// 发送消息,传输口参数为空时向所有传输口发送消息
bool TokenController::Send(byte code, const Buffer& arr)
{
TokenMessage msg;
msg.Code = code;
msg.SetData(arr);
return Send(msg);
}
bool TokenController::Dispatch(Stream& ms, Message* pmsg, void* param)
{
TokenMessage msg;
return Controller::Dispatch(ms, &msg, param);
}
// 收到消息校验后调用该函数。返回值决定消息是否有效,无效消息不交给处理器处理
bool TokenController::Valid(const Message& msg)
{
TS("TokenController::Valid");
// 代码为0是非法的
if(!msg.Code) return false;
// 握手和登录指令可直接通过
if(msg.Code <= 0x02) return true;
if(Token != 0) return true;
// 合法来源验证,暂时验证云平台,其它连接将来验证
auto svr = (IPEndPoint*)Server;
auto rmt = (IPEndPoint*)msg.State;
if(!rmt || svr && *svr != *rmt)
{
debug_printf("Token::Valid 非法来源 ");
if(rmt) rmt->Show();
debug_printf("\r\n");
return false;
}
#if MSG_DEBUG
/*msg_printf("TokenController::Dispatch ");
// 输出整条信息
ByteArray(buf, ms.Length).Show(true);*/
#endif
return true;
}
static bool Encrypt(Message& msg, const Buffer& pass)
{
// 加解密。握手不加密,握手响应不加密
if(msg.Length > 0 && pass.Length() > 0 && !(msg.Code == 0x01 || msg.Code == 0x08))
{
Buffer bs(msg.Data, msg.Length);
RC4::Encrypt(bs, pass);
return true;
}
return false;
}
// 接收处理函数
bool TokenController::OnReceive(Message& msg)
{
TS("TokenController::OnReceive");
/*byte code = msg.Code;
if(msg.Reply) code |= 0x80;
//if(msg.Error) code |= 0x40;
if(!msg.Reply && msg.OneWay || msg.Reply && msg.Error) code |= (1 << 6);*/
/*// 起点和终点节点,收到响应时需要发出确认指令,而收到请求时不需要
// 系统指令也不需要确认
if(msg.Code >= 0x10 && msg.Code != 0x08)
{
// 需要为请求发出确认,因为转发以后不知道还要等多久才能收到另一方的响应
//if(msg.Reply)
{
TokenMessage ack;
ack.Code = 0x08;
ack.Length = 1;
ack.Data[0] = code; // 这里考虑最高位
Reply(ack);
}
}*/
// 如果有等待响应,则交给它
if(msg.Reply && _Response && (msg.Code == _Response->Code || msg.Code == 0x08 && msg.Data[0] == _Response->Code))
{
_Response->SetData(Buffer(msg.Data, msg.Length));
_Response->Reply = true;
// 加解密。握手不加密,登录响应不加密
Encrypt(msg, Key);
ShowMessage("RecvSync", msg);
return true;
}
/*// 确认指令已完成使命直接跳出
if(msg.Code == 0x08)
{
if(msg.Length >= 1) EndSendStat(msg.Data[0], true);
ShowMessage("Recv", msg);
return true;
}*/
if(msg.Reply)
{
bool rs = EndSendStat(msg.Code, true);
// 如果匹配了发送队列,那么这里不再作为收到响应
if(!rs) Stat->RecvReplyAsync++;
}
else
{
Stat->RecvRequest++;
}
//ShowMessage("Recv$", msg);
// 加解密。握手不加密,登录响应不加密
Encrypt(msg, Key);
ShowMessage("Recv", msg);
return Controller::OnReceive(msg);
}
static byte _Sequence = 0;
// 发送消息,传输口参数为空时向所有传输口发送消息
bool TokenController::Send(Message& msg)
{
TS("TokenController::Send");
// 未登录,登录注册,握手可通过
//if(Token == 0&&!( msg.Code <= 0x2||msg.Code == 0x07)) return false;
//static byte _Sequence = 0;
auto& tmsg = (TokenMessage&)msg;
// 附上序列号。响应消息保持序列号不变
if(!msg.Reply && tmsg.Seq == 0) tmsg.Seq = ++_Sequence;
if(msg.Reply)
ShowMessage("Reply", msg);
else
ShowMessage("Send", msg);
// 加解密。握手不加密,登录响应不加密
Encrypt(msg, Key);
// 加入统计
if(!msg.Reply) StartSendStat(msg.Code);
return Controller::Send(msg);
}
// 发送消息并接受响应msTimeout毫秒超时时间内如果对方没有响应会重复发送
bool TokenController::SendAndReceive(TokenMessage& msg, int retry, int msTimeout)
{
TS("TokenController::SendAndReceive");
#if MSG_DEBUG
if(_Response) debug_printf("设计错误正在等待Code=0x%02X的消息完成之前不能再次调用\r\n", _Response->Code);
TimeCost ct;
#endif
if(msg.Reply) return Send(msg) != 0;
byte code = msg.Code;
if(msg.Reply) code |= 0x80;
//if(msg.Error) code |= 0x40;
if(!msg.Reply && msg.OneWay || msg.Reply && msg.Error) code |= (1 << 6);
// 加入统计
if(!msg.Reply) StartSendStat(msg.Code);
_Response = &msg;
bool rs = false;
while(retry-- >= 0)
{
if(!Send(msg)) break;
// 等待响应
TimeWheel tw(0, msTimeout);
tw.Sleep = 1;
do
{
if(_Response->Reply)
{
rs = true;
break;
}
}while(!tw.Expired());
if(rs) break;
}
#if MSG_DEBUG
debug_printf("Token::SendAndReceive Len=%d Time=%dus ", msg.Size(), ct.Elapsed());
if(rs) _Response->Show();
debug_printf("\r\n");
#endif
_Response = nullptr;
EndSendStat(code, rs);
return rs;
}
void TokenController::ShowMessage(const char* action, const Message& msg)
{
#if MSG_DEBUG
TS("TokenController::ShowMessage");
for(int i=0; i<ArrayLength(NoLogCodes); i++)
{
if(msg.Code == NoLogCodes[i]) return;
if(NoLogCodes[i] == 0) break;
}
debug_printf("Token::%s ", action);
if(msg.State)
{
auto svr = (IPEndPoint*)Server;
auto rmt = (IPEndPoint*)msg.State;
if(!svr || *svr != *rmt)
{
rmt->Show();
debug_printf(" ");
}
}
msg.Show();
// 如果是错误,显示错误信息
if(msg.Error)
{
debug_printf("Error=0x%02X ", msg.Data[0]);
if(msg.Data[0] == 0x01 && msg.Length - 1 < 0x40)
{
Stream ms(msg.Data + 1, msg.Length - 1);
ms.ReadString().Show(false);
}
debug_printf("\r\n");
}
#endif
}
bool TokenController::StartSendStat(byte code)
{
TS("TokenController::StartSendStat");
// 仅统计请求信息,不统计响应信息
if ((code & 0x80) != 0)
{
Stat->SendReply++;
return true;
}
Stat->SendRequest++;
byte code2 = code & 0x3F;
if (code2 == 0x15 || code2 == 0x05)
Stat->Read++;
else if (code2 == 0x16 || code2 == 0x06)
Stat->Write++;
for(int i=0; i<ArrayLength(_Queue); i++)
{
if(_Queue[i].Code == 0)
{
_Queue[i].Code = code;
_Queue[i].Time = Sys.Ms();
return true;
}
}
return false;
}
bool TokenController::EndSendStat(byte code, bool success)
{
TS("TokenController::EndSendStat");
byte code2 = code & 0x3F;
for(int i=0; i<ArrayLength(_Queue); i++)
{
if(_Queue[i].Code == code2)
{
bool rs = false;
if(success)
{
int cost = (int)(Sys.Ms() - _Queue[i].Time);
// 莫名其妙,有时候竟然是负数
if(cost < 0) cost = -cost;
if(cost < 1000)
{
Stat->RecvReply++;
Stat->Time += cost;
rs = true;
}
}
_Queue[i].Code = 0;
return rs;
}
}
if ((code & 0x80) != 0)
{
if (code2 == 0x15 || code2 == 0x05)
Stat->ReadReply++;
else if (code2 == 0x16 || code2 == 0x06)
Stat->WriteReply++;
}
return false;
}
void TokenController::ShowStat()
{
TS("TokenController::ShowStat");
char cs[128];
String str(cs, ArrayLength(cs));
//str.Clear();
Stat->ToStr(str);
str.Show(true);
Stat->Clear();
// 向以太网广播
auto sock = dynamic_cast<ISocket*>(Port);
if(sock)
{
IPEndPoint ep(IPAddress::Broadcast(), 514);
sock->SendTo(str, ep);
}
}
/*void TokenController::StatTask(void* param)
{
auto st = (TokenController*)param;
st->ShowStat();
}*/
/******************************** TokenStat ********************************/
TokenStat::TokenStat()
{
/*SendRequest = 0;
RecvReply = 0;
SendReply = 0;
Time = 0;
RecvRequest = 0;
RecvReplyAsync = 0;
Read = 0;
_Last = nullptr;
_Total = nullptr;*/
/*int start = offsetof(TokenStat, SendRequest);
Buffer((byte*)this + start, sizeof(TokenStat) - start).Clear();*/
Buffer(&SendRequest, (byte*)&_Total + sizeof(_Total) - (byte*)&SendRequest).Clear();
}
TokenStat::~TokenStat()
{
if (_Last == nullptr) delete _Last;
if (_Total == nullptr) delete _Total;
}
String CaclPercent(int d1, int d2)
{
String str;
if(d2 == 0) return str + "0";
// 分开处理整数部分和小数部分
d1 *= 100;
int d = d1 / d2;
//d1 %= d2;
// %会产生乘减指令MLS再算一次除法
d1 -= d * d2;
d1 *= 100;
int f = d1 / d2;
str += d;
if(f > 0)
{
str += ".";
if(f < 10) str += "0";
str += f;
}
return str;
}
String TokenStat::Percent() const
{
int send = SendRequest;
int sucs = RecvReply;
if(_Last)
{
send += _Last->SendRequest;
sucs += _Last->RecvReply;
}
return CaclPercent(sucs, send);
}
int TokenStat::Speed() const
{
int time = Time;
int sucs = RecvReply;
if(_Last)
{
time += _Last->Time;
sucs += _Last->RecvReply;
}
if(sucs == 0) return 0;
return time / sucs;
}
String TokenStat::PercentReply() const
{
int req = RecvRequest;
int rep = SendReply;
if(_Last)
{
req += _Last->RecvRequest;
rep += _Last->SendReply;
}
return CaclPercent(rep, req);
}
void TokenStat::Clear()
{
if (_Last == nullptr) _Last = new TokenStat();
if (_Total == nullptr) _Total = new TokenStat();
_Last->SendRequest = SendRequest;
_Last->RecvReply = RecvReply;
_Last->SendReply = SendReply;
_Last->Time = Time;
_Last->RecvRequest = RecvRequest;
_Last->RecvReplyAsync = RecvReplyAsync;
_Last->Read = Read;
_Last->ReadReply = ReadReply;
_Last->Write = Write;
_Last->WriteReply = WriteReply;
_Total->SendRequest += SendRequest;
_Total->RecvReply += RecvReply;
_Total->SendReply += SendReply;
_Total->Time += Time;
_Total->RecvRequest += RecvRequest;
_Total->RecvReplyAsync += RecvReplyAsync;
_Total->Read += Read;
_Total->ReadReply += ReadReply;
_Total->Write += Write;
_Total->WriteReply += WriteReply;
SendRequest = 0;
RecvReply = 0;
Time = 0;
SendReply = 0;
RecvRequest = 0;
RecvReplyAsync = 0;
Read = 0;
ReadReply = 0;
Write = 0;
WriteReply = 0;
}
String& TokenStat::ToStr(String& str) const
{
TS("TokenStat::ToStr");
assert_ptr(this);
/*debug_printf("this=0x%08X _Last=0x%08X _Total=0x%08X ", this, _Last, _Total);
Percent().Show(true);*/
str = str + "发:" + Percent() + "% " + RecvReply + "/" + SendRequest + " " + Speed() + "ms";
str = str + " 收:" + PercentReply() + "% " + SendReply + "/" + RecvRequest;
if(RecvReplyAsync > 0) str = str + " 异步 " + RecvReplyAsync;
if (Read > 0) str = str + " 读:" + (ReadReply * 100 / Read) + " " + ReadReply + "/" + Read;
if (Write > 0) str = str + " 写:" + (WriteReply * 100 / Write) + " " + WriteReply + "/" + Write;
if(_Total)
{
str += "";
_Total->ToStr(str);
}
return str;
}