Skip to content

Commit

Permalink
Implementing Windows socket selector with paranoid mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ktakashi committed Jan 24, 2025
1 parent d4240d0 commit 6ef0207
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
34 changes: 24 additions & 10 deletions ext/socket/selector-win.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ typedef struct win_context_rec
static void system_error(int code)
{
Sg_SystemError(code,
UC("Setting up IOCP failed: %A"),
UC("Setting up socket selector failed: %A"),
Sg_GetLastErrorMessageWithErrorCode(code));
}

Expand All @@ -75,40 +75,54 @@ static void * make_selector_context()
win_context_t *ctx = SG_NEW(win_context_t);

ctx->event = WSACreateEvent();
if (ctx->event == NULL) goto err;
if (ctx->event == WSA_INVALID_EVENT) goto err;
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
ctx->events[i] = WSACreateEvent();
if (ctx->events[i] == NULL) goto err;
if (ctx->events[i] == WSA_INVALID_EVENT) goto err;
}
ctx->lock = CreateMutex(NULL, FALSE, NULL);

return ctx;

err:
if (ctx->event) WSACloseEvent(ctx->event);
if (ctx->event != WSA_INVALID_EVENT) WSACloseEvent(ctx->event);
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
if (ctx->events[i]) WSACloseEvent(ctx->events[i]);
if (ctx->events[i] != WSA_INVALID_EVENT) WSACloseEvent(ctx->events[i]);
}
system_error(Sg_GetLastError());
return NULL; /* dummy */
}

static void selector_finalizer_win(SgObject self, void *data)
{
win_context_t *ctx = (win_context_t *)self;
HANDLE lock = (HANDLE)data;
CloseHandle(ctx->lock);
CloseHandle(lock);
}

void Sg_CloseSocketSelector(SgSocketSelector *selector)
{
if (!Sg_SocketSelectorClosedP(selector)) {
win_context_t *ctx = (win_context_t *)selector->context;
selector->context = NULL;


WaitForSingleObject(ctx->lock, INFINITE);

selector->context = NULL;
WSACloseEvent(ctx->event);
ctx->event = INVALID_HANDLE_VALUE;
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
WSACloseEvent(ctx->events[i]);
ctx->events[i] = INVALID_HANDLE_VALUE;
}
ReleaseMutex(ctx->lock);
CloseHandle(ctx->lock); /* hope it'd still work... */

Sg_UnregisterFinalizer(selector);
/*
In case the selector is waiting, we need to keep the lock...
We are even breaking the abstraction...
*/
Sg_RegisterFinalizer(ctx, selector_finalizer_win, selector->lock.mutex);
}
}

Expand Down Expand Up @@ -216,10 +230,10 @@ static SgObject win_selector_wait(win_context_t *ctx, int n,
return ret;
}

static SgObject selector_wait(SgSocketSelector *selector, int n,
static SgObject selector_wait(SgSocketSelector *selector, void *context, int n,
struct timespec *sp)
{
win_context_t *ctx = (win_context_t *)selector->context;
win_context_t *ctx = (win_context_t *)context;
SgObject sockets = Sg_Reverse(selector->sockets);
return win_selector_wait(ctx, n, selector, sockets, sp);
}
Expand Down
1 change: 0 additions & 1 deletion ext/socket/socket-selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ typedef struct
SG_HEADER;
SgObject sockets;
SgInternalMutex lock;
SgInternalCond cv;
int waiting;
int retry;
void *context; /* underlying implementation context */
Expand Down
13 changes: 9 additions & 4 deletions ext/socket/socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ SgObject Sg_MakeSocketSelector()
selector->context = make_selector_context();

Sg_InitMutex(&selector->lock, TRUE);
Sg_InitCond(&selector->cv);

Sg_RegisterFinalizer(selector, selector_finalizer, NULL);
return SG_OBJ(selector);
}

static SgObject selector_wait(SgSocketSelector *selector, int n,
struct timespec *sp);
static SgObject selector_wait(SgSocketSelector *selector, void *context,
int n, struct timespec *sp);

static SgObject earliest_timeout(SgObject sockets, SgTime *start,
SgObject *timedout)
Expand Down Expand Up @@ -230,6 +229,7 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
int n;
unsigned long sec, usec;
struct timespec spec, *sp, sock_to, *sto;
void *context;

if (Sg_SocketSelectorClosedP(selector)) {
Sg_Error(UC("Socket selector is closed: %A"), selector);
Expand Down Expand Up @@ -262,10 +262,15 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
}

selector->waiting = TRUE;
context = selector->context;
if (context == NULL) {
Sg_UnlockMutex(&selector->lock);
Sg_Error(UC("Socket selector is closed: %A"), selector);
}

retry:
Sg_UnlockMutex(&selector->lock);
r = selector_wait(selector, n, sp);
r = selector_wait(selector, context, n, sp);

Sg_LockMutex(&selector->lock);
selector->waiting = FALSE;
Expand Down
7 changes: 5 additions & 2 deletions ext/socket/unix-socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void Sg_CloseSocketSelector(SgSocketSelector *selector)
if (!Sg_SocketSelectorClosedP(selector)) {
unix_context_t *ctx = (unix_context_t *)selector->context;
selector->context = NULL;
Sg_DestroyMutex(&selector->lock);
close(ctx->fd);
if (!is_stop_initialised(ctx)) close_unix_stop(ctx);
Sg_UnregisterFinalizer(selector);
Expand All @@ -154,9 +155,11 @@ SgObject Sg_SocketSelectorInterrupt(SgSocketSelector *selector)
return selector;
}

static SgObject selector_wait(SgSocketSelector *selector, int n, struct timespec *sp)
static SgObject selector_wait(SgSocketSelector *selector,
void *context,
int n, struct timespec *sp)
{
unix_context_t *ctx = (unix_context_t *)selector->context;
unix_context_t *ctx = (unix_context_t *)context;
int err = 0;
/* the socket is reverse order, so correct it */
SgObject r = wait_selector(ctx, n, Sg_Reverse(selector->sockets), sp, &err);
Expand Down

0 comments on commit 6ef0207

Please sign in to comment.