使用新的同步等待机制,配合时间轮,避免独占CPU太长时间

This commit is contained in:
nnhy 2015-06-23 09:00:17 +00:00
parent 4f17a22ab1
commit d434cbb5a7
5 changed files with 108 additions and 26 deletions

View File

@ -2,6 +2,22 @@
#define NET_DEBUG 0
class ArpSession
{
public:
IPAddress IP;
MacAddress Mac;
bool Success;
ArpSession(IPAddress& ip)
{
IP = ip;
Success = false;
}
};
ArpSession* _ArpSession;
ArpSocket::ArpSocket(TinyIP* tip) : Socket(tip)
{
//Type = ETH_ARP;
@ -54,6 +70,13 @@ bool ArpSocket::Process(IP_HEADER& ip, Stream& ms)
// 是否发给本机。
if(arp->DestIP != Tip->IP.Value) return true;
if(arp->Option == 0x0200 && _ArpSession && _ArpSession->IP.Value == arp->SrcIP)
{
_ArpSession->Mac = arp->SrcMac.Value();
_ArpSession->Success = true;
return true;
}
#if NET_DEBUG
// 数据校验
assert_param(arp->HardType == 0x0100);
@ -102,7 +125,7 @@ bool ArpSocket::Process(IP_HEADER& ip, Stream& ms)
return true;
}
bool RequestCallback(TinyIP* tip, void* param, Stream& ms)
/*bool RequestCallback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = ms.Retrieve<ETH_HEADER>();
ARP_HEADER* arp = ms.Retrieve<ARP_HEADER>();
@ -122,7 +145,7 @@ bool RequestCallback(TinyIP* tip, void* param, Stream& ms)
}
return false;
}
}*/
// 请求Arp并返回其Mac。timeout超时3秒如果没有超时时间表示异步请求不用等待结果
bool ArpSocket::Request(IPAddress& ip, MacAddress& mac, int timeout)
@ -156,7 +179,25 @@ bool ArpSocket::Request(IPAddress& ip, MacAddress& mac, int timeout)
if(timeout <= 0) return false;
// 等待反馈
if(Tip->LoopWait(RequestCallback, &mac, timeout * 1000)) return true;
//if(Tip->LoopWait(RequestCallback, &mac, timeout * 1000)) return true;
ArpSession ss(ip);
_ArpSession = &ss;
// 等待响应
TimeWheel tw(0, timeout * 1000);
tw.Sleep = 1;
do{
if(ss.Success) break;
}while(!tw.Expired());
if(ss.Success)
{
mac = ss.Mac;
return true;
}
_ArpSession = NULL;
return false;
}

View File

@ -18,10 +18,19 @@ public:
Sequence = seq;
Success = false;
}
bool Check(IPAddress& remote, ICMP_HEADER* icmp)
{
if(remote != Address) return false;
if(Identifier != icmp->Identifier) return false;
if(Sequence != icmp->Sequence) return false;
return true;
}
};
// 用于等待Ping响应的会话
PingSession* Session = NULL;
PingSession* _IcmpSession = NULL;
IcmpSocket::IcmpSocket(TinyIP* tip) : Socket(tip)
{
@ -38,9 +47,9 @@ bool IcmpSocket::Process(IP_HEADER& ip, Stream& ms)
IPAddress remote = ip.SrcIP;
// 检查有没有会话等待
if(icmp->Type == 0 && Session != NULL && remote == Session->Address)
if(icmp->Type == 0 && _IcmpSession != NULL && _IcmpSession->Check(remote, icmp))
{
Session->Success = true;
_IcmpSession->Success = true;
return true;
}
@ -80,7 +89,7 @@ bool IcmpSocket::Process(IP_HEADER& ip, Stream& ms)
return true;
}
bool PingCallback(TinyIP* tip, void* param, Stream& ms)
/*bool PingCallback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = ms.Retrieve<ETH_HEADER>();
IP_HEADER* _ip = ms.Retrieve<IP_HEADER>();
@ -102,7 +111,7 @@ bool PingCallback(TinyIP* tip, void* param, Stream& ms)
}
return false;
}
}*/
// Ping目的地址附带a~z重复的负载数据
bool IcmpSocket::Ping(IPAddress& ip, uint payloadLength)
@ -150,7 +159,7 @@ bool IcmpSocket::Ping(IPAddress& ip, uint payloadLength)
//if(Tip->LoopWait(PingCallback, ps, 1000)) return true;
PingSession ps(ip, id, seq);
Session = &ps;
_IcmpSession = &ps;
// 等待响应
TimeWheel tw(0, 1000);
@ -159,7 +168,7 @@ bool IcmpSocket::Ping(IPAddress& ip, uint payloadLength)
if(ps.Success) break;
}while(!tw.Expired());
Session = NULL;
_IcmpSession = NULL;
#if NET_DEBUG
uint cost = ct.Elapsed() / 1000;

View File

@ -3,6 +3,8 @@
#define NET_DEBUG 0
//#define NET_DEBUG DEBUG
bool* WaitAck;
bool Callback(TinyIP* tip, void* param, Stream& ms);
TcpSocket::TcpSocket(TinyIP* tip) : Socket(tip)
@ -82,6 +84,11 @@ bool TcpSocket::Process(IP_HEADER& ip, Stream& ms)
LocalPort = port;
LocalIP = ip.DestIP;
if(WaitAck && (tcp->Flags & TCP_FLAGS_ACK))
{
*WaitAck = true;
}
OnProcess(*tcp, ms);
return true;
@ -330,7 +337,7 @@ void TcpSocket::SendAck(uint len)
{
Stream ms(sizeof(ETH_HEADER) + sizeof(IP_HEADER) + sizeof(TCP_HEADER) + len);
ms.Seek(sizeof(ETH_HEADER) + sizeof(IP_HEADER));
TCP_HEADER* tcp = ms.Retrieve<TCP_HEADER>();
tcp->Init(true);
Send(*tcp, len, TCP_FLAGS_ACK | TCP_FLAGS_PUSH);
@ -344,7 +351,7 @@ void TcpSocket::Disconnect()
Stream ms(sizeof(ETH_HEADER) + sizeof(IP_HEADER) + sizeof(TCP_HEADER));
ms.Seek(sizeof(ETH_HEADER) + sizeof(IP_HEADER));
TCP_HEADER* tcp = ms.Retrieve<TCP_HEADER>();
tcp->Init(true);
Send(*tcp, 0, TCP_FLAGS_ACK | TCP_FLAGS_PUSH | TCP_FLAGS_FIN);
@ -369,10 +376,10 @@ void TcpSocket::Send(ByteArray& bs)
Stream ms(sizeof(ETH_HEADER) + sizeof(IP_HEADER) + sizeof(TCP_HEADER) + bs.Length());
ms.Seek(sizeof(ETH_HEADER) + sizeof(IP_HEADER));
TCP_HEADER* tcp = ms.Retrieve<TCP_HEADER>();
tcp->Init(true);
// 复制数据,确保数据不会溢出
ms.Write(bs);
@ -386,9 +393,21 @@ void TcpSocket::Send(ByteArray& bs)
//debug_printf("Seq=0x%04x Ack=0x%04x \r\n", Seq, Ack);
Send(*tcp, bs.Length(), TCP_FLAGS_PUSH | TCP_FLAGS_ACK);
Tip->LoopWait(Callback, this, 3000);
//Tip->LoopWait(Callback, this, 3000);
if(tcp->Flags & TCP_FLAGS_ACK)
bool wait = false;
WaitAck = &wait;
// 等待响应
TimeWheel tw(0, 3000);
tw.Sleep = 1;
do{
if(wait) break;
}while(!tw.Expired());
WaitAck = NULL;
if(wait)
debug_printf("发送成功!\r\n");
else
debug_printf("发送失败!\r\n");
@ -413,7 +432,7 @@ bool TcpSocket::Connect(IPAddress& ip, ushort port)
Stream ms(sizeof(ETH_HEADER) + sizeof(IP_HEADER) + sizeof(TCP_HEADER) + 3);
ms.Seek(sizeof(ETH_HEADER) + sizeof(IP_HEADER));
TCP_HEADER* tcp = ms.Retrieve<TCP_HEADER>();
tcp->Init(true);
//tcp->Seq = 0; // 仅仅是为了Ack=0tcp->Seq还是会被Socket的顺序Seq替代
@ -425,9 +444,22 @@ bool TcpSocket::Connect(IPAddress& ip, ushort port)
Status = SynSent;
Send(*tcp, 0, TCP_FLAGS_SYN);
if(Tip->LoopWait(Callback, this, 3000))
//if(Tip->LoopWait(Callback, this, 3000))
bool wait = false;
WaitAck = &wait;
// 等待响应
TimeWheel tw(0, 3000);
tw.Sleep = 1;
do{
if(wait) break;
}while(!tw.Expired());
WaitAck = NULL;
if(wait)
{
//if(tcp->Flags & TCP_FLAGS_SYN)
if(Status == SynAck)
{
Status = Established;
@ -444,7 +476,7 @@ bool TcpSocket::Connect(IPAddress& ip, ushort port)
return false;
}
bool Callback(TinyIP* tip, void* param, Stream& ms)
/*bool Callback(TinyIP* tip, void* param, Stream& ms)
{
ETH_HEADER* eth = ms.Retrieve<ETH_HEADER>();
if(eth->Type != ETH_IP) return false;
@ -480,7 +512,7 @@ bool Callback(TinyIP* tip, void* param, Stream& ms)
}
return false;
}
}*/
bool TcpSocket::OnWrite(const byte* buf, uint len)
{

View File

@ -180,7 +180,7 @@ void TinyIP::Work(void* param)
}
}
bool TinyIP::LoopWait(LoopFilter filter, void* param, uint msTimeout)
/*bool TinyIP::LoopWait(LoopFilter filter, void* param, uint msTimeout)
{
// 分配一个同样大小的缓冲区
byte buf[ArrayLength(Buffer)];
@ -209,7 +209,7 @@ bool TinyIP::LoopWait(LoopFilter filter, void* param, uint msTimeout)
}
return false;
}
}*/
bool TinyIP::Open()
{

View File

@ -33,8 +33,8 @@ public:
Socket* FindByType(ushort type);
};
class TinyIP;
typedef bool (*LoopFilter)(TinyIP* tip, void* param, Stream& ms);
//class TinyIP;
//typedef bool (*LoopFilter)(TinyIP* tip, void* param, Stream& ms);
// 精简以太网协议。封装以太网帧以及IP协议不包含其它协议实现仅提供底层支持。
class TinyIP
@ -54,7 +54,7 @@ public:
// 任务函数
static void Work(void* param);
// 带过滤器的轮询
bool LoopWait(LoopFilter filter, void* param, uint msTimeout);
//bool LoopWait(LoopFilter filter, void* param, uint msTimeout);
// 处理数据包
void Process(Stream& ms);
// 修正IP包负载数据的长度。物理层送来的长度可能有误一般超长