Skip to content

Commit

Permalink
Improve on error
Browse files Browse the repository at this point in the history
  • Loading branch information
joente committed Jan 9, 2025
1 parent e9be638 commit 190990c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 23 deletions.
21 changes: 6 additions & 15 deletions itest/test_ws.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
#!/usr/bin/env python
import asyncio
import pickle
import time
import msgpack
import ssl
import struct
from lib import run_test
from lib import default_test_setup
from lib.testbase import TestBase
from lib.client import get_client
from thingsdb.exceptions import AssertionError
from thingsdb.exceptions import ValueError
from thingsdb.exceptions import TypeError
from thingsdb.exceptions import NumArgumentsError
from thingsdb.exceptions import BadDataError
from thingsdb.exceptions import LookupError
from thingsdb.exceptions import OverflowError
from thingsdb.exceptions import ZeroDivisionError
from thingsdb.exceptions import OperationError
from thingsdb.client import Client

# The following code can be used to lower or increase the max package size
# from thingsdb.client import protocol
# protocol.WEBSOCKET_MAX_SIZE = 2**8


class TestWS(TestBase):
Expand All @@ -39,7 +30,7 @@ async def run(self):
await client.wait_closed()
await asyncio.sleep(1) # sleep is required for nice close

async def test_simple_ws(self, client):
async def test_simple_ws(self, client: Client):
self.assertTrue(client.is_websocket())
info = client.connection_info()
self.assertIsInstance(info, str)
Expand Down
18 changes: 10 additions & 8 deletions src/ti/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ static int ws__callback_established(struct lws * wsi, ti_ws_t * pss)
return -1;
}

static inline void ws__write_done(ti_ws_t * pss)
static inline void ws__done(ti_ws_t * pss, ti_write_t * req, ex_enum status)
{
(void) queue_shift(pss->queue);
pss->f = 0; /* reset to frame 0 */
req->cb_(req, status);
}

static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss)
Expand Down Expand Up @@ -99,16 +100,12 @@ static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss)
if (m < (int) len)
{
log_error("ERROR %d; writing to WebSocket", m);
ws__write_done(pss); /* reset to frame 0 and shift from queue */
req->cb_(req, EX_WRITE_UV);
ws__done(pss, req, EX_WRITE_UV);
return -1;
}

if (is_end)
{
ws__write_done(pss); /* reset to frame 0 and shift from queue */
req->cb_(req, 0);
}
ws__done(pss, req, 0);
else
pss->f++; /* next frame */

Expand Down Expand Up @@ -179,6 +176,11 @@ struct per_vhost_data__minimal {
const struct lws_protocols *protocol;
};

static void ws__drop_req(ti_write_t * req)
{
req->cb_(req, EX_WRITE_UV);
}

/* Callback function for WebSocket server messages */
int ws__callback(
struct lws * wsi,
Expand All @@ -202,8 +204,8 @@ int ws__callback(
case LWS_CALLBACK_CLOSED:
if (pss->stream)
{
queue_destroy(pss->queue, (queue_destroy_cb) ws__drop_req);
ti_stream_close(pss->stream);
queue_destroy(pss->queue, free);
}
break;
default:
Expand Down

0 comments on commit 190990c

Please sign in to comment.