From 190990cb771c4a4aca72f54c71ac91182f31b662 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Thu, 9 Jan 2025 19:11:24 +0100 Subject: [PATCH] Improve on error --- itest/test_ws.py | 21 ++++++--------------- src/ti/ws.c | 18 ++++++++++-------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/itest/test_ws.py b/itest/test_ws.py index 296b2720..9ab8634c 100755 --- a/itest/test_ws.py +++ b/itest/test_ws.py @@ -1,23 +1,14 @@ #!/usr/bin/env python import asyncio -import pickle -import time -import msgpack -import ssl -import struct from lib import run_test from lib import default_test_setup from lib.testbase import TestBase from lib.client import get_client -from thingsdb.exceptions import AssertionError -from thingsdb.exceptions import ValueError -from thingsdb.exceptions import TypeError -from thingsdb.exceptions import NumArgumentsError -from thingsdb.exceptions import BadDataError -from thingsdb.exceptions import LookupError -from thingsdb.exceptions import OverflowError -from thingsdb.exceptions import ZeroDivisionError -from thingsdb.exceptions import OperationError +from thingsdb.client import Client + +# The following code can be used to lower or increase the max package size +# from thingsdb.client import protocol +# protocol.WEBSOCKET_MAX_SIZE = 2**8 class TestWS(TestBase): @@ -39,7 +30,7 @@ async def run(self): await client.wait_closed() await asyncio.sleep(1) # sleep is required for nice close - async def test_simple_ws(self, client): + async def test_simple_ws(self, client: Client): self.assertTrue(client.is_websocket()) info = client.connection_info() self.assertIsInstance(info, str) diff --git a/src/ti/ws.c b/src/ti/ws.c index 9c432b49..3598d2d9 100644 --- a/src/ti/ws.c +++ b/src/ti/ws.c @@ -51,10 +51,11 @@ static int ws__callback_established(struct lws * wsi, ti_ws_t * pss) return -1; } -static inline void ws__write_done(ti_ws_t * pss) +static inline void ws__done(ti_ws_t * pss, ti_write_t * req, ex_enum status) { (void) queue_shift(pss->queue); pss->f = 0; /* reset to frame 0 */ + req->cb_(req, status); } static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss) @@ -99,16 +100,12 @@ static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss) if (m < (int) len) { log_error("ERROR %d; writing to WebSocket", m); - ws__write_done(pss); /* reset to frame 0 and shift from queue */ - req->cb_(req, EX_WRITE_UV); + ws__done(pss, req, EX_WRITE_UV); return -1; } if (is_end) - { - ws__write_done(pss); /* reset to frame 0 and shift from queue */ - req->cb_(req, 0); - } + ws__done(pss, req, 0); else pss->f++; /* next frame */ @@ -179,6 +176,11 @@ struct per_vhost_data__minimal { const struct lws_protocols *protocol; }; +static void ws__drop_req(ti_write_t * req) +{ + req->cb_(req, EX_WRITE_UV); +} + /* Callback function for WebSocket server messages */ int ws__callback( struct lws * wsi, @@ -202,8 +204,8 @@ int ws__callback( case LWS_CALLBACK_CLOSED: if (pss->stream) { + queue_destroy(pss->queue, (queue_destroy_cb) ws__drop_req); ti_stream_close(pss->stream); - queue_destroy(pss->queue, free); } break; default: