toys/async/async_socket_service.py

95 lines
2.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
import sys
import asyncio
import time
import threading
class AsyncSocketService:
def __init__(self, address='127.0.0.1', port=5000,
bufsize: int = 10240,
required_pkg_size: int = 0, debug: bool=False):
self.__addr = address
self.__port = port
self.__bufsize = bufsize
self.__required_pkg_size = required_pkg_size
self.__debug = debug
async def handle_accept(self, reader, writer):
while True:
async def read_all():
cached_data = bytes()
while True:
cdata = await reader.read(self.__bufsize)
if len(cdata) <= 0:
break
cached_data += cdata
if cached_data == b'exit':
break
if self.__required_pkg_size > len(cached_data):
continue
else:
break
return cached_data
data = await read_all()
if not data or data == b'exit':
break
if self.__debug:
client = writer.get_extra_info('peername')
print('Received from {}: {!r}'.format(client, data))
# Do process
writer.write(data)
await writer.drain()
print('Close the client socket')
writer.close()
self.__loop.call_soon_threadsafe(self.__loop.stop)
def run_service(self, loop: asyncio.BaseEventLoop):
self.__loop = loop
asyncio.set_event_loop(loop)
server_coro = asyncio.start_server(self.handle_accept, self.__addr, self.__port,
loop=loop)
server = loop.run_until_complete(server_coro)
host = server.sockets[0].getsockname()
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever()
except KeyboardInterrupt:
print("CTRL+C")
print('Server shutting down.')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
def start_service(self):
start_msg = self.start_msg()
loop = asyncio.new_event_loop()
t = threading.Thread(target=self.run_service, args=(loop,))
t.start()
if __name__ == '__main__':
s1 = AsyncSocketService(address='127.0.0.1', port=5000, required_pkg_size=100)
s2 = AsyncSocketService(address='127.0.0.1', port=6000, required_pkg_size=100)
s1.start_service()
s2.start_service()
while True:
try:
time.sleep(1.0)
except:
break