Compare commits
4 Commits
e26cc0d7cd
...
a22f0859ac
Author | SHA1 | Date |
---|---|---|
![]() |
a22f0859ac | |
![]() |
ca138e0137 | |
![]() |
d18981ed93 | |
![]() |
5735603eef |
|
@ -84,6 +84,9 @@ class BinanceWebsocketClosed(Exception):
|
|||
|
||||
pass
|
||||
|
||||
class ReadLoopClosed(Exception):
|
||||
"""Raised when trying to read from read loop but already closed"""
|
||||
pass
|
||||
|
||||
class NotImplementedException(Exception):
|
||||
def __init__(self, value):
|
||||
|
|
|
@ -36,6 +36,7 @@ from binance.exceptions import (
|
|||
BinanceWebsocketClosed,
|
||||
BinanceWebsocketUnableToConnect,
|
||||
BinanceWebsocketQueueOverflow,
|
||||
ReadLoopClosed,
|
||||
)
|
||||
from binance.helpers import get_loop
|
||||
from binance.ws.constants import WSListenerState
|
||||
|
@ -247,6 +248,8 @@ class ReconnectingWebsocket:
|
|||
"m": f"{e}",
|
||||
})
|
||||
break
|
||||
except Exception as e:
|
||||
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
|
||||
finally:
|
||||
self._handle_read_loop = None # Signal the coro is stopped
|
||||
self._reconnects = 0
|
||||
|
@ -272,6 +275,10 @@ class ReconnectingWebsocket:
|
|||
async def recv(self):
|
||||
res = None
|
||||
while not res:
|
||||
if not self._handle_read_loop:
|
||||
raise ReadLoopClosed(
|
||||
"Read loop has been closed, please reset the websocket connection and listen to the message error."
|
||||
)
|
||||
try:
|
||||
res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
|
|
|
@ -216,23 +216,61 @@ can do this.
|
|||
Websocket Errors
|
||||
----------------
|
||||
|
||||
If an error occurs, a message is sent to the callback to indicate this. The format is
|
||||
If an error occurs, a message is sent to the callback to indicate this. The format is:
|
||||
|
||||
.. code:: python
|
||||
|
||||
{
|
||||
'e': 'error',
|
||||
'type': 'BinanceWebsocketUnableToConnect',
|
||||
'm': 'Max reconnect retries reached'
|
||||
'type': '<ErrorType>',
|
||||
'm': '<Error message>'
|
||||
}
|
||||
|
||||
# check for it like so
|
||||
Where:
|
||||
- `'e'`: Always `'error'` for error messages.
|
||||
- `'type'`: The type of error encountered (see table below).
|
||||
- `'m'`: A human-readable error message.
|
||||
|
||||
**Possible Error Types:**
|
||||
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| Type | Description | Typical Action |
|
||||
+===============================+==============================================================+===============================+
|
||||
| BinanceWebsocketUnableToConnect| The websocket could not connect after maximum retries. | Check network, restart socket |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| BinanceWebsocketClosed | The websocket connection was closed. The system will attempt | Usually auto-reconnects |
|
||||
| | to reconnect automatically. | |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| BinanceWebsocketQueueOverflow | The internal message queue exceeded its maximum size | Process messages faster, or |
|
||||
| | (default 100). | increase queue size |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| CancelledError | The websocket task was cancelled (e.g., on shutdown). | Usually safe to ignore |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| IncompleteReadError | The websocket connection was interrupted during a read. | Will attempt to reconnect |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| gaierror | Network address-related error (e.g., DNS failure). | Check network |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| ConnectionClosedError | The websocket connection was closed unexpectedly. | Will attempt to reconnect |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
| *Other Exception Types* | Any other unexpected error. | Check error message |
|
||||
+-------------------------------+--------------------------------------------------------------+-------------------------------+
|
||||
|
||||
**Example error handling in your callback:**
|
||||
|
||||
.. code:: python
|
||||
|
||||
def process_message(msg):
|
||||
if msg['e'] == 'error':
|
||||
# close and restart the socket
|
||||
if msg.get('e') == 'error':
|
||||
print(f"WebSocket error: {msg.get('type')} - {msg.get('m')}")
|
||||
# Optionally close and restart the socket, or handle as needed
|
||||
else:
|
||||
# process message normally
|
||||
|
||||
**Notes:**
|
||||
- Most connection-related errors will trigger automatic reconnection attempts up to 5 times.
|
||||
- If the queue overflows, consider increasing `max_queue_size` or processing messages more quickly.
|
||||
- For persistent errors, check your network connection and API credentials.
|
||||
|
||||
Websocket Examples
|
||||
----------------
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ async def test_ws_futures_create_get_edit_cancel_order_with_orjson(futuresClient
|
|||
type="LIMIT",
|
||||
timeInForce="GTC",
|
||||
quantity=0.1,
|
||||
price=str(float(ticker["bidPrice"]) + 2),
|
||||
price=str(float(ticker["bidPrice"]) + 5),
|
||||
)
|
||||
assert_contract_order(futuresClientAsync, order)
|
||||
order = await futuresClientAsync.ws_futures_edit_order(
|
||||
|
@ -106,7 +106,7 @@ async def test_ws_futures_create_get_edit_cancel_order_without_orjson(
|
|||
type="LIMIT",
|
||||
timeInForce="GTC",
|
||||
quantity=0.1,
|
||||
price=str(float(ticker["bidPrice"]) + 2),
|
||||
price=str(float(ticker["bidPrice"]) + 5),
|
||||
)
|
||||
assert_contract_order(futuresClientAsync, order)
|
||||
order = await futuresClientAsync.ws_futures_edit_order(
|
||||
|
|
|
@ -2,10 +2,10 @@ import sys
|
|||
import pytest
|
||||
import gzip
|
||||
import json
|
||||
from unittest.mock import patch, create_autospec
|
||||
from unittest.mock import patch, create_autospec, Mock
|
||||
from binance.ws.reconnecting_websocket import ReconnectingWebsocket
|
||||
from binance.ws.constants import WSListenerState
|
||||
from binance.exceptions import BinanceWebsocketUnableToConnect
|
||||
from binance.exceptions import BinanceWebsocketUnableToConnect, ReadLoopClosed
|
||||
from websockets import WebSocketClientProtocol # type: ignore
|
||||
from websockets.protocol import State
|
||||
import asyncio
|
||||
|
@ -77,6 +77,8 @@ async def test_handle_message_invalid_json():
|
|||
async def test_recv_message():
|
||||
ws = ReconnectingWebsocket(url="wss://test.url")
|
||||
await ws._queue.put({"test": "data"})
|
||||
# Simulate the read loop being active
|
||||
ws._handle_read_loop = Mock()
|
||||
result = await ws.recv()
|
||||
assert result == {"test": "data"}
|
||||
|
||||
|
@ -206,3 +208,19 @@ async def test_connect_fails_to_connect_after_disconnect():
|
|||
async def delayed_return():
|
||||
await asyncio.sleep(0.1) # 100 ms delay
|
||||
return '{"e": "value"}'
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Requires Python 3.8+")
|
||||
@pytest.mark.asyncio
|
||||
async def test_recv_read_loop_closed():
|
||||
"""Test that recv() raises ReadLoopClosed when read loop is closed."""
|
||||
ws = ReconnectingWebsocket(url="wss://test.url")
|
||||
|
||||
# Simulate read loop being closed by setting _handle_read_loop to None
|
||||
ws._handle_read_loop = None
|
||||
|
||||
with pytest.raises(ReadLoopClosed) as exc_info:
|
||||
await ws.recv()
|
||||
|
||||
assert "Read loop has been closed" in str(exc_info.value)
|
||||
assert "please reset the websocket connection" in str(exc_info.value)
|
||||
|
|
Loading…
Reference in New Issue