From 74ddaed5c978b6e25a2ed8c6a0f0253e7841ecd5 Mon Sep 17 00:00:00 2001 From: sea-grass Date: Mon, 21 Apr 2025 09:10:38 -0400 Subject: [PATCH] Use patched facil.io/lib/facil/http/websockets.c --- facil.io/build.zig | 10 +- facil.io/lib/facil/http/websockets.c | 731 +++++++++++++++++++++++++++ 2 files changed, 740 insertions(+), 1 deletion(-) create mode 100644 facil.io/lib/facil/http/websockets.c diff --git a/facil.io/build.zig b/facil.io/build.zig index fe24ed6..79ad9d5 100644 --- a/facil.io/build.zig +++ b/facil.io/build.zig @@ -68,7 +68,6 @@ pub fn build_facilio( "lib/facil/fio.c", "lib/facil/http/http.c", "lib/facil/http/http1.c", - "lib/facil/http/websockets.c", "lib/facil/http/http_internal.c", "lib/facil/fiobj/fiobj_numbers.c", "lib/facil/fiobj/fio_siphash.c", @@ -84,6 +83,15 @@ pub fn build_facilio( .flags = flags.items, }); + // Add patched files + lib.addCSourceFiles(.{ + .root = b.path("."), + .files = &.{ + "lib/facil/http/websockets.c", + }, + .flags = flags.items, + }); + if (use_openssl) { lib.addCSourceFiles(.{ .root = upstream.path("."), diff --git a/facil.io/lib/facil/http/websockets.c b/facil.io/lib/facil/http/websockets.c new file mode 100644 index 0000000..1e882da --- /dev/null +++ b/facil.io/lib/facil/http/websockets.c @@ -0,0 +1,731 @@ +/* +copyright: Boaz Segev, 2016-2019 +license: MIT + +Feel free to copy, use and enjoy according to the license provided. +*/ +#define FIO_INCLUDE_STR +#include + +/* subscription lists have a long lifetime */ +#define FIO_FORCE_MALLOC_TMP 1 +#define FIO_INCLUDE_LINKED_LIST +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) +#include +#if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) && \ + __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +#define __BIG_ENDIAN__ +#endif +#endif + +/******************************************************************************* +Buffer management - update to change the way the buffer is handled. +*/ +struct buffer_s { + void *data; + size_t size; +}; + +#pragma weak create_ws_buffer +/** returns a buffer_s struct, with a buffer (at least) `size` long. */ +struct buffer_s create_ws_buffer(ws_s *owner); + +#pragma weak resize_ws_buffer +/** returns a buffer_s struct, with a buffer (at least) `size` long. */ +struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s); + +#pragma weak free_ws_buffer +/** releases an existing buffer. */ +void free_ws_buffer(ws_s *owner, struct buffer_s); + +/** Sets the initial buffer size. (4Kb)*/ +#define WS_INITIAL_BUFFER_SIZE 4096UL + +/******************************************************************************* +Buffer management - simple implementation... +Since Websocket connections have a long life expectancy, optimizing this part of +the code probably wouldn't offer a high performance boost. +*/ + +// buffer increments by 4,096 Bytes (4Kb) +#define round_up_buffer_size(size) (((size) >> 12) + 1) << 12 + +struct buffer_s create_ws_buffer(ws_s *owner) { + (void)(owner); + struct buffer_s buff; + buff.size = WS_INITIAL_BUFFER_SIZE; + buff.data = malloc(buff.size); + return buff; +} + +struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s buff) { + buff.size = round_up_buffer_size(buff.size); + void *tmp = realloc(buff.data, buff.size); + if (!tmp) { + free_ws_buffer(owner, buff); + buff.data = NULL; + buff.size = 0; + } + buff.data = tmp; + return buff; +} +void free_ws_buffer(ws_s *owner, struct buffer_s buff) { + (void)(owner); + free(buff.data); +} + +#undef round_up_buffer_size + +/******************************************************************************* +Create/Destroy the websocket object (prototypes) +*/ + +static ws_s *new_websocket(intptr_t uuid); +static void destroy_ws(ws_s *ws); + +/******************************************************************************* +The Websocket object (protocol + parser) +*/ +struct ws_s { + /** The Websocket protocol */ + fio_protocol_s protocol; + /** connection data */ + intptr_t fd; + /** callbacks */ + void (*on_message)(ws_s *ws, fio_str_info_s msg, uint8_t is_text); + void (*on_shutdown)(ws_s *ws); + void (*on_ready)(ws_s *ws); + void (*on_open)(ws_s *ws); + void (*on_close)(intptr_t uuid, void *udata); + /** Opaque user data. */ + void *udata; + /** The maximum websocket message size */ + size_t max_msg_size; + /** active pub/sub subscriptions */ + fio_ls_s subscriptions; + fio_lock_i sub_lock; + /** socket buffer. */ + struct buffer_s buffer; + /** data length (how much of the buffer actually used). */ + size_t length; + /** message buffer. */ + FIOBJ msg; + /** latest text state. */ + uint8_t is_text; + /** websocket connection type. */ + uint8_t is_client; +}; + +/* ***************************************************************************** +Create/Destroy the websocket subscription objects +***************************************************************************** */ + +static inline void clear_subscriptions(ws_s *ws) { + fio_lock(&ws->sub_lock); + while (fio_ls_any(&ws->subscriptions)) { + fio_unsubscribe(fio_ls_pop(&ws->subscriptions)); + } + fio_unlock(&ws->sub_lock); +} + +/* ***************************************************************************** +Callbacks - Required functions for websocket_parser.h +***************************************************************************** */ + +static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len, + char first, char last, char text, + unsigned char rsv) { + ws_s *ws = ws_p; + if (last && first) { + ws->on_message(ws, (fio_str_info_s){.data = msg, .len = len}, + (uint8_t)text); + return; + } + if (first) { + ws->is_text = (uint8_t)text; + if (ws->msg == FIOBJ_INVALID) + ws->msg = fiobj_str_buf(len); + fiobj_str_resize(ws->msg, 0); + } + fiobj_str_write(ws->msg, msg, len); + if (last) { + ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text); + } + + (void)rsv; +} +static void websocket_on_protocol_ping(void *ws_p, void *msg_, uint64_t len) { + ws_s *ws = ws_p; + if (msg_) { + void *buff = malloc(len + 16); + len = (((ws_s *)ws)->is_client + ? websocket_client_wrap(buff, msg_, len, 10, 1, 1, 0) + : websocket_server_wrap(buff, msg_, len, 10, 1, 1, 0)); + fio_write2(ws->fd, .data.buffer = buff, .length = len); + } else { + if (((ws_s *)ws)->is_client) { + fio_write2(ws->fd, .data.buffer = "\x89\x80mask", .length = 2, + .after.dealloc = FIO_DEALLOC_NOOP); + } else { + fio_write2(ws->fd, .data.buffer = "\x89\x00", .length = 2, + .after.dealloc = FIO_DEALLOC_NOOP); + } + } +} +static void websocket_on_protocol_pong(void *ws_p, void *msg, uint64_t len) { + (void)len; + (void)msg; + (void)ws_p; +} +static void websocket_on_protocol_close(void *ws_p) { + ws_s *ws = ws_p; + fio_close(ws->fd); +} +static void websocket_on_protocol_error(void *ws_p) { + ws_s *ws = ws_p; + fio_close(ws->fd); +} + +/******************************************************************************* +The Websocket Protocol implementation +*/ + +#define ws_protocol(fd) ((ws_s *)(server_get_protocol(fd))) + +static void ws_ping(intptr_t fd, fio_protocol_s *ws) { + (void)(ws); + if (((ws_s *)ws)->is_client) { + fio_write2(fd, .data.buffer = "\x89\x80MASK", .length = 6, + .after.dealloc = FIO_DEALLOC_NOOP); + } else { + fio_write2(fd, .data.buffer = "\x89\x00", .length = 2, + .after.dealloc = FIO_DEALLOC_NOOP); + } +} + +static void on_close(intptr_t uuid, fio_protocol_s *_ws) { + destroy_ws((ws_s *)_ws); + (void)uuid; +} + +static void on_ready(intptr_t fduuid, fio_protocol_s *ws) { + (void)(fduuid); + if (((ws_s *)ws)->on_ready) + ((ws_s *)ws)->on_ready((ws_s *)ws); +} + +static uint8_t on_shutdown(intptr_t fd, fio_protocol_s *ws) { + (void)(fd); + if (ws && ((ws_s *)ws)->on_shutdown) + ((ws_s *)ws)->on_shutdown((ws_s *)ws); + if (((ws_s *)ws)->is_client) { + fio_write2(fd, .data.buffer = "\x8a\x80MASK", .length = 6, + .after.dealloc = FIO_DEALLOC_NOOP); + } else { + fio_write2(fd, .data.buffer = "\x8a\x00", .length = 2, + .after.dealloc = FIO_DEALLOC_NOOP); + } + return 0; +} + +static void on_data(intptr_t sockfd, fio_protocol_s *ws_) { + ws_s *const ws = (ws_s *)ws_; + if (ws == NULL) + return; + struct websocket_packet_info_s info = + websocket_buffer_peek(ws->buffer.data, ws->length); + const uint64_t raw_length = info.packet_length + info.head_length; + /* test expected data amount */ + if (ws->max_msg_size < raw_length) { + /* too big */ + websocket_close(ws); + return; + } + /* test buffer capacity */ + if (raw_length > ws->buffer.size) { + ws->buffer.size = (size_t)raw_length; + ws->buffer = resize_ws_buffer(ws, ws->buffer); + if (!ws->buffer.data) { + // no memory. + websocket_close(ws); + return; + } + } + + const ssize_t len = fio_read(sockfd, (uint8_t *)ws->buffer.data + ws->length, + ws->buffer.size - ws->length); + if (len <= 0) { + return; + } + ws->length = websocket_consume(ws->buffer.data, ws->length + len, ws, + (~(ws->is_client) & 1)); + + fio_force_event(sockfd, FIO_EVENT_ON_DATA); +} + +static void on_data_first(intptr_t sockfd, fio_protocol_s *ws_) { + ws_s *const ws = (ws_s *)ws_; + if (ws->on_open) + ws->on_open(ws); + ws->protocol.on_data = on_data; + ws->protocol.on_ready = on_ready; + + if (ws->length) { + ws->length = websocket_consume(ws->buffer.data, ws->length, ws, + (~(ws->is_client) & 1)); + } + fio_force_event(sockfd, FIO_EVENT_ON_DATA); + fio_force_event(sockfd, FIO_EVENT_ON_READY); +} + +/* later */ +static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, + char first, char last, char client); + +/******************************************************************************* +Create/Destroy the websocket object +*/ + +static ws_s *new_websocket(intptr_t uuid) { + // allocate the protocol object + ws_s *ws = malloc(sizeof(*ws)); + *ws = (ws_s){ + .protocol.ping = ws_ping, + .protocol.on_data = on_data_first, + .protocol.on_close = on_close, + .protocol.on_ready = NULL /* filled in after `on_open` */, + .protocol.on_shutdown = on_shutdown, + .subscriptions = FIO_LS_INIT(ws->subscriptions), + .is_client = 0, + .fd = uuid, + }; + return ws; +} +static void destroy_ws(ws_s *ws) { + if (ws->on_close) + ws->on_close(ws->fd, ws->udata); + if (ws->msg) + fiobj_free(ws->msg); + clear_subscriptions(ws); + free_ws_buffer(ws, ws->buffer); + free(ws); +} + +void websocket_attach(intptr_t uuid, http_settings_s *http_settings, + websocket_settings_s *args, void *data, size_t length) { + ws_s *ws = new_websocket(uuid); + FIO_ASSERT_ALLOC(ws); + // we have an active websocket connection - prep the connection buffer + ws->buffer = create_ws_buffer(ws); + // Setup ws callbacks + ws->on_open = args->on_open; + ws->on_close = args->on_close; + ws->on_message = args->on_message; + ws->on_ready = args->on_ready; + ws->on_shutdown = args->on_shutdown; + // setup any user data + ws->udata = args->udata; + if (http_settings) { + // client mode? + ws->is_client = http_settings->is_client; + // buffer limits + ws->max_msg_size = http_settings->ws_max_msg_size; + // update the timeout + fio_timeout_set(uuid, http_settings->ws_timeout); + } else { + ws->max_msg_size = (1024 * 256); + fio_timeout_set(uuid, 40); + } + + if (data && length) { + if (length > ws->buffer.size) { + ws->buffer.size = length; + ws->buffer = resize_ws_buffer(ws, ws->buffer); + if (!ws->buffer.data) { + // no memory. + fio_attach(uuid, (fio_protocol_s *)ws); + websocket_close(ws); + return; + } + } + memcpy(ws->buffer.data, data, length); + ws->length = length; + } + // update the protocol object, cleaning up the old one + fio_attach(uuid, (fio_protocol_s *)ws); + // allow the on_open and on_data to take over the control. + fio_force_event(uuid, FIO_EVENT_ON_DATA); +} + +/******************************************************************************* +Writing to the Websocket +*/ +#define WS_MAX_FRAME_SIZE \ + (FIO_MEMORY_BLOCK_ALLOC_LIMIT - 4096) // should be less then `unsigned short` + +static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, + char first, char last, char client) { + if (len <= WS_MAX_FRAME_SIZE) { + void *buff = fio_malloc(len + 16); + len = (client ? websocket_client_wrap(buff, data, len, (text ? 1 : 2), + first, last, 0) + : websocket_server_wrap(buff, data, len, (text ? 1 : 2), + first, last, 0)); + fio_write2(fd, .data.buffer = buff, .length = len, + .after.dealloc = fio_free); + } else { + /* frame fragmentation is better for large data then large frames */ + while (len > WS_MAX_FRAME_SIZE) { + websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client); + data = ((uint8_t *)data) + WS_MAX_FRAME_SIZE; + first = 0; + len -= WS_MAX_FRAME_SIZE; + } + websocket_write_impl(fd, data, len, text, first, 1, client); + } + return; +} + +/* ***************************************************************************** +Multi-client broadcast optimizations +***************************************************************************** */ + +static void websocket_optimize_free(fio_msg_s *msg, void *metadata) { + fiobj_free((FIOBJ)metadata); + (void)msg; +} + +static inline fio_msg_metadata_s websocket_optimize(fio_str_info_s msg, + unsigned char opcode) { + FIOBJ out = fiobj_str_buf(msg.len + 10); + fiobj_str_resize(out, + websocket_server_wrap(fiobj_obj2cstr(out).data, msg.data, + msg.len, opcode, 1, 1, 0)); + fio_msg_metadata_s ret = { + .on_finish = websocket_optimize_free, + .metadata = (void *)out, + }; + return ret; +} +static fio_msg_metadata_s websocket_optimize_generic(fio_str_info_s ch, + fio_str_info_s msg, + uint8_t is_json) { + fio_str_s tmp = FIO_STR_INIT_EXISTING(ch.data, ch.len, 0); // don't free + tmp.dealloc = NULL; + unsigned char opcode = 2; + if (tmp.len <= (2 << 19) && fio_str_utf8_valid(&tmp)) { + opcode = 1; + } + fio_msg_metadata_s ret = websocket_optimize(msg, opcode); + ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB; + return ret; + (void)ch; + (void)is_json; +} + +static fio_msg_metadata_s websocket_optimize_text(fio_str_info_s ch, + fio_str_info_s msg, + uint8_t is_json) { + fio_msg_metadata_s ret = websocket_optimize(msg, 1); + ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; + return ret; + (void)ch; + (void)is_json; +} + +static fio_msg_metadata_s websocket_optimize_binary(fio_str_info_s ch, + fio_str_info_s msg, + uint8_t is_json) { + fio_msg_metadata_s ret = websocket_optimize(msg, 2); + ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; + return ret; + (void)ch; + (void)is_json; +} + +/** + * Enables (or disables) broadcast optimizations. + * + * When using WebSocket pub/sub system is originally optimized for either + * non-direct transmission (messages are handled by callbacks) or direct + * transmission to 1-3 clients per channel (on average), meaning that the + * majority of the messages are meant for a single recipient (or multiple + * callback recipients) and only some are expected to be directly transmitted to + * a group. + * + * However, when most messages are intended for direct transmission to more than + * 3 clients (on average), certain optimizations can be made to improve memory + * consumption (minimize duplication or WebSocket network data). + * + * This function allows enablement (or disablement) of these optimizations. + * These optimizations include: + * + * * WEBSOCKET_OPTIMIZE_PUBSUB - optimize all direct transmission messages, + * best attempt to detect Text vs. Binary data. + * * WEBSOCKET_OPTIMIZE_PUBSUB_TEXT - optimize direct pub/sub text messages. + * * WEBSOCKET_OPTIMIZE_PUBSUB_BINARY - optimize direct pub/sub binary messages. + * + * Note: to disable an optimization it should be disabled the same amount of + * times it was enabled - multiple optimization enablements for the same type + * are merged, but reference counted (disabled when reference is zero). + */ +void websocket_optimize4broadcasts(intptr_t type, int enable) { + static intptr_t generic = 0; + static intptr_t text = 0; + static intptr_t binary = 0; + fio_msg_metadata_s (*callback)(fio_str_info_s, fio_str_info_s, uint8_t); + intptr_t *counter; + switch ((0 - type)) { + case (0 - WEBSOCKET_OPTIMIZE_PUBSUB): + counter = &generic; + callback = websocket_optimize_generic; + break; + case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_TEXT): + counter = &text; + callback = websocket_optimize_text; + break; + case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_BINARY): + counter = &binary; + callback = websocket_optimize_binary; + break; + default: + return; + } + if (enable) { + if (fio_atomic_add(counter, 1) == 1) { + fio_message_metadata_callback_set(callback, 1); + } + } else { + if (fio_atomic_sub(counter, 1) == 0) { + fio_message_metadata_callback_set(callback, 0); + } + } +} + +/* ***************************************************************************** +Subscription handling +***************************************************************************** */ + +typedef struct { + void (*on_message)(ws_s *ws, fio_str_info_s channel, fio_str_info_s msg, + void *udata); + void (*on_unsubscribe)(void *udata); + void *udata; +} websocket_sub_data_s; + +static inline void websocket_on_pubsub_message_direct_internal(fio_msg_s *msg, + uint8_t txt) { + fio_protocol_s *pr = + fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_WRITE); + if (!pr) { + if (errno == EBADF) + return; + fio_message_defer(msg); + return; + } + FIOBJ message = FIOBJ_INVALID; + FIOBJ pre_wrapped = FIOBJ_INVALID; + if (!((ws_s *)pr)->is_client) { + /* pre-wrapping is only for client data */ + switch (txt) { + case 0: + pre_wrapped = + (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_BINARY); + break; + case 1: + pre_wrapped = + (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_TEXT); + break; + case 2: + pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB); + break; + default: + break; + } + if (pre_wrapped) { + // FIO_LOG_DEBUG( + // "pub/sub WebSocket optimization route for pre-wrapped message."); + fiobj_send_free((intptr_t)msg->udata1, fiobj_dup(pre_wrapped)); + goto finish; + } + } + if (txt == 2) { + /* unknown text state */ + fio_str_s tmp = + FIO_STR_INIT_STATIC2(msg->msg.data, msg->msg.len); // don't free + txt = (tmp.len >= (2 << 14) ? 0 : fio_str_utf8_valid(&tmp)); + } + websocket_write((ws_s *)pr, msg->msg, txt & 1); + fiobj_free(message); +finish: + fio_protocol_unlock(pr, FIO_PR_LOCK_WRITE); +} + +static void websocket_on_pubsub_message_direct(fio_msg_s *msg) { + websocket_on_pubsub_message_direct_internal(msg, 2); +} + +static void websocket_on_pubsub_message_direct_txt(fio_msg_s *msg) { + websocket_on_pubsub_message_direct_internal(msg, 1); +} + +static void websocket_on_pubsub_message_direct_bin(fio_msg_s *msg) { + websocket_on_pubsub_message_direct_internal(msg, 0); +} + +static void websocket_on_pubsub_message(fio_msg_s *msg) { + fio_protocol_s *pr = + fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_TASK); + if (!pr) { + if (errno == EBADF) + return; + fio_message_defer(msg); + return; + } + websocket_sub_data_s *d = msg->udata2; + + if (d->on_message) + d->on_message((ws_s *)pr, msg->channel, msg->msg, d->udata); + fio_protocol_unlock(pr, FIO_PR_LOCK_TASK); +} + +static void websocket_on_unsubscribe(void *u1, void *u2) { + websocket_sub_data_s *d = u2; + if (d->on_unsubscribe) { + d->on_unsubscribe(d->udata); + } + + if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB) { + websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB, 0); + } else if ((intptr_t)d->on_message == + (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_TEXT) { + websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 0); + } else if ((intptr_t)d->on_message == + (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_BINARY) { + websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_BINARY, 0); + } + free(d); + (void)u1; +} + +/** + * Returns a subscription ID on success and 0 on failure. + */ +#undef websocket_subscribe +uintptr_t websocket_subscribe(struct websocket_subscribe_s args) { + if (!args.ws || !fio_is_valid(args.ws->fd)) + goto error; + websocket_sub_data_s *d = malloc(sizeof(*d)); + FIO_ASSERT_ALLOC(d); + *d = (websocket_sub_data_s){ + .udata = args.udata, + .on_message = args.on_message, + .on_unsubscribe = args.on_unsubscribe, + }; + void (*handler)(fio_msg_s *) = websocket_on_pubsub_message; + if (!args.on_message) { + intptr_t br_type; + if (args.force_binary) { + br_type = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; + handler = websocket_on_pubsub_message_direct_bin; + } else if (args.force_text) { + br_type = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; + handler = websocket_on_pubsub_message_direct_txt; + } else { + br_type = WEBSOCKET_OPTIMIZE_PUBSUB; + handler = websocket_on_pubsub_message_direct; + } + websocket_optimize4broadcasts(br_type, 1); + d->on_message = + (void (*)(ws_s *, fio_str_info_s, fio_str_info_s, void *))br_type; + } + subscription_s *sub = + fio_subscribe(.channel = args.channel, .match = args.match, + .on_unsubscribe = websocket_on_unsubscribe, + .on_message = handler, .udata1 = (void *)args.ws->fd, + .udata2 = d); + if (!sub) { + /* don't free `d`, return (`d` freed by fio_subscribe) */ + return 0; + } + fio_ls_s *pos; + fio_lock(&args.ws->sub_lock); + pos = fio_ls_push(&args.ws->subscriptions, sub); + fio_unlock(&args.ws->sub_lock); + + return (uintptr_t)pos; +error: + if (args.on_unsubscribe) + args.on_unsubscribe(args.udata); + return 0; +} + +/** + * Unsubscribes from a channel. + */ +void websocket_unsubscribe(ws_s *ws, uintptr_t subscription_id) { + fio_unsubscribe((subscription_s *)((fio_ls_s *)subscription_id)->obj); + fio_lock(&ws->sub_lock); + fio_ls_remove((fio_ls_s *)subscription_id); + fio_unlock(&ws->sub_lock); + + (void)ws; +} + +/******************************************************************************* +The API implementation +*/ + +/** Returns the opaque user data associated with the websocket. */ +void *websocket_udata_get(ws_s *ws) { return ws->udata; } + +/** Returns the the process specific connection's UUID (see `libsock`). */ +intptr_t websocket_uuid(ws_s *ws) { return ws->fd; } + +/** Sets the opaque user data associated with the websocket. + * Returns the old value, if any. */ +void *websocket_udata_set(ws_s *ws, void *udata) { + void *old = ws->udata; + ws->udata = udata; + return old; +} + +/** + * Returns 1 if the WebSocket connection is in Client mode (connected to a + * remote server) and 0 if the connection is in Server mode (a connection + * established using facil.io's HTTP server). + */ +uint8_t websocket_is_client(ws_s *ws) { return ws->is_client; } + +/** Writes data to the websocket. Returns -1 on failure (0 on success). */ +int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text) { + if (fio_is_valid(ws->fd)) { + websocket_write_impl(ws->fd, msg.data, msg.len, is_text, 1, 1, + ws->is_client); + return 0; + } + return -1; +} +/** Closes a websocket connection. */ +void websocket_close(ws_s *ws) { + fio_write2(ws->fd, .data.buffer = "\x88\x00", .length = 2, + .after.dealloc = FIO_DEALLOC_NOOP); + fio_close(ws->fd); + return; +}