Skip to content

Commit

Permalink
Websocket write in iterations (#403)
Browse files Browse the repository at this point in the history
* Fix writing large data over websockets
  • Loading branch information
joente authored Jan 9, 2025
1 parent de7895a commit b0eacf3
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 28 deletions.
5 changes: 4 additions & 1 deletion inc/ti/ws.t.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ typedef struct ti_ws_s ti_ws_t;

struct ti_ws_s
{
queue_t * queue; /* ti_write_t */
queue_t * queue; /* ws__req_t */
ti_stream_t * stream;
struct lws * wsi;
size_t f; /* current frame */
size_t nf; /* total number of frames */
size_t n; /* total size to send */
};

#endif /* TI_WS_T_H_ */
1 change: 1 addition & 0 deletions itest/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -2687,5 +2687,6 @@ async def test_def_bool(self, client):
});
""")


if __name__ == '__main__':
run_test(TestAdvanced())
11 changes: 10 additions & 1 deletion itest/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,18 @@ async def test_simple_ws(self, client):
self.assertIsInstance(info, str)
res = await client.query('6 * 7;')
self.assertEqual(res, 42)

# test with exact module frame size
n = 10_156
res = await client.query("""//ti
[range(n).map(|i| 'aa')];
""", n=n)
self.assertEqual(len(res[0]), n)

# test with large data
n = 100_000
res = await client.query("""//ti
range(n).map(|i| `this is item number {i}`);
range(n).map(|i| `this is item with number {i}`);
""", n=n)
self.assertEqual(len(res), n)

Expand Down
6 changes: 3 additions & 3 deletions itest/ws/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

<script>
window.onload = () => {
const thingsdb = new ThingsDB('ws://localhost:9270');
const thingsdb = new ThingsDB('ws://localhost:9780');
thingsdb.connect().then(() => {
thingsdb.auth().then(() => {
thingsdb.query('@thingsdb', '"Hello World!";').then(response => {
thingsdb.query('@thingsdb', 'n=90000; range(n).map(|i| `this is item number {i}`);').then(response => {
console.log('Query done!');
console.log(response); // will be "Hello World!"
console.log(response.length); // will be "Hello World!"
});
});
});
Expand Down
81 changes: 58 additions & 23 deletions src/ti/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <util/fx.h>

static struct lws_context * ws__context;
const size_t ws__mf = LWS_SS_MTU-LWS_PRE;

static int ws__callback_established(struct lws * wsi, ti_ws_t * pss)
{
Expand All @@ -34,6 +35,10 @@ static int ws__callback_established(struct lws * wsi, ti_ws_t * pss)

pss->stream->with.ws = pss;
pss->wsi = wsi;
pss->f = 0;
pss->n = 0;
pss->nf = 0;

return 0;

fail2:
Expand All @@ -46,42 +51,70 @@ static int ws__callback_established(struct lws * wsi, ti_ws_t * pss)
return -1;
}

static inline void ws__write_done(ti_ws_t * pss)
{
(void) queue_shift(pss->queue);
pss->f = 0; /* reset to frame 0 */
}

static int ws__callback_server_writable(struct lws * wsi, ti_ws_t * pss)
{
const size_t mf = LWS_SS_MTU-LWS_PRE;
unsigned char mtubuff[LWS_SS_MTU];
unsigned char * out = mtubuff + LWS_PRE;
unsigned char * pt;
int flags, m;
size_t n, f, nf, len;
ti_pkg_t * pkg;
ti_write_t * req = queue_shift(pss->queue);
int flags, m, is_end;
size_t len;
ti_write_t * req = queue_first(pss->queue);
if (!req)
return 0; /* nothing to write */

pkg = req->pkg;
if (pss->f == 0)
{
/* notice we allowed for LWS_PRE in the payload already */
size_t n = sizeof(ti_pkg_t) + req->pkg->n;

pss->f = 1;
pss->n = n;
pss->nf = (n-1)/ws__mf+1; /* calculate the number of frames */
}

/* set write flags for frame */
is_end = pss->f==pss->nf;
flags = lws_write_ws_flags(LWS_WRITE_BINARY, pss->f==1, is_end);

/* calculate how much data must be send; if this is the last frame we use
* the module, with the exception when the data is exact */
len = is_end ? pss->n % ws__mf : ws__mf;
len = len ? len : ws__mf;

/* pointer to the data */
pt = (unsigned char *) req->pkg;

/* copy data to the buffer */
memcpy(out, pt + (pss->f-1)*ws__mf, len);

/* notice we allowed for LWS_PRE in the payload already */
n = sizeof(ti_pkg_t) + pkg->n;
nf = (n-1)/mf+1;
pt = (unsigned char *) pkg;
/* write to websocket */
m = lws_write(wsi, out, len, flags);

for (f=1; f<=nf; ++f, pt+=mf, n-=mf)
if (m < (int) len)
{
flags = lws_write_ws_flags(LWS_WRITE_BINARY, f==1, f==nf);
len = mf > n ? n : mf;
memcpy(out, pt, len);
m = lws_write(wsi, out, len, flags);
if (m < (int) len)
{
log_error("ERROR %d; writing to WebSocket", m);
req->cb_(req, EX_WRITE_UV);
return -1;
}
log_error("ERROR %d; writing to WebSocket", m);
ws__write_done(pss); /* reset to frame 0 and shift from queue */
req->cb_(req, EX_WRITE_UV);
return -1;
}
lws_callback_on_writable(wsi);

req->cb_(req, 0);
if (is_end)
{
ws__write_done(pss); /* reset to frame 0 and shift from queue */
req->cb_(req, 0);
}
else
pss->f++; /* next frame */

/* request next callback, even when finished as a new package might exist
* in the queue */
lws_callback_on_writable(wsi);
return 0;
}

Expand Down Expand Up @@ -330,8 +363,10 @@ int ti_ws_write(ti_ws_t * pss, ti_write_t * req)
{
if (!pss->wsi)
return 0; /* ignore dead connections */

if (queue_push(&pss->queue, req))
return -1;

lws_callback_on_writable(pss->wsi);
return 0;
}
Expand Down

0 comments on commit b0eacf3

Please sign in to comment.