/* copyright: Boaz Segev, 2016-2019 license: MIT Feel free to copy, use and enjoy according to the license provided. */ #define FIO_INCLUDE_STR #include /* subscription lists have a long lifetime */ #define FIO_FORCE_MALLOC_TMP 1 #define FIO_INCLUDE_LINKED_LIST #include #include #include #include #include #include #include #include #include #include #include #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) #include #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) && \ __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ #define __BIG_ENDIAN__ #endif #endif /******************************************************************************* Buffer management - update to change the way the buffer is handled. */ struct buffer_s { void *data; size_t size; }; #pragma weak create_ws_buffer /** returns a buffer_s struct, with a buffer (at least) `size` long. */ struct buffer_s create_ws_buffer(ws_s *owner); #pragma weak resize_ws_buffer /** returns a buffer_s struct, with a buffer (at least) `size` long. */ struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s); #pragma weak free_ws_buffer /** releases an existing buffer. */ void free_ws_buffer(ws_s *owner, struct buffer_s); /** Sets the initial buffer size. (4Kb)*/ #define WS_INITIAL_BUFFER_SIZE 4096UL /******************************************************************************* Buffer management - simple implementation... Since Websocket connections have a long life expectancy, optimizing this part of the code probably wouldn't offer a high performance boost. */ // buffer increments by 4,096 Bytes (4Kb) #define round_up_buffer_size(size) (((size) >> 12) + 1) << 12 struct buffer_s create_ws_buffer(ws_s *owner) { (void)(owner); struct buffer_s buff; buff.size = WS_INITIAL_BUFFER_SIZE; buff.data = malloc(buff.size); return buff; } struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s buff) { buff.size = round_up_buffer_size(buff.size); void *tmp = realloc(buff.data, buff.size); if (!tmp) { free_ws_buffer(owner, buff); buff.data = NULL; buff.size = 0; } buff.data = tmp; return buff; } void free_ws_buffer(ws_s *owner, struct buffer_s buff) { (void)(owner); free(buff.data); } #undef round_up_buffer_size /******************************************************************************* Create/Destroy the websocket object (prototypes) */ static ws_s *new_websocket(); static void destroy_ws(ws_s *ws); /******************************************************************************* The Websocket object (protocol + parser) */ struct ws_s { /** The Websocket protocol */ fio_protocol_s protocol; /** connection data */ intptr_t fd; /** callbacks */ void (*on_message)(ws_s *ws, fio_str_info_s msg, uint8_t is_text); void (*on_shutdown)(ws_s *ws); void (*on_ready)(ws_s *ws); void (*on_open)(ws_s *ws); void (*on_close)(intptr_t uuid, void *udata); /** Opaque user data. */ void *udata; /** The maximum websocket message size */ size_t max_msg_size; /** active pub/sub subscriptions */ fio_ls_s subscriptions; fio_lock_i sub_lock; /** socket buffer. */ struct buffer_s buffer; /** data length (how much of the buffer actually used). */ size_t length; /** message buffer. */ FIOBJ msg; /** latest text state. */ uint8_t is_text; /** websocket connection type. */ uint8_t is_client; }; /* ***************************************************************************** Create/Destroy the websocket subscription objects ***************************************************************************** */ static inline void clear_subscriptions(ws_s *ws) { fio_lock(&ws->sub_lock); while (fio_ls_any(&ws->subscriptions)) { fio_unsubscribe(fio_ls_pop(&ws->subscriptions)); } fio_unlock(&ws->sub_lock); } /* ***************************************************************************** Callbacks - Required functions for websocket_parser.h ***************************************************************************** */ static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len, char first, char last, char text, unsigned char rsv) { ws_s *ws = ws_p; if (last && first) { ws->on_message(ws, (fio_str_info_s){.data = msg, .len = len}, (uint8_t)text); return; } if (first) { ws->is_text = (uint8_t)text; if (ws->msg == FIOBJ_INVALID) ws->msg = fiobj_str_buf(len); fiobj_str_resize(ws->msg, 0); } fiobj_str_write(ws->msg, msg, len); if (last) { ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text); } (void)rsv; } static void websocket_on_protocol_ping(void *ws_p, void *msg_, uint64_t len) { ws_s *ws = ws_p; if (msg_) { void *buff = malloc(len + 16); len = (((ws_s *)ws)->is_client ? websocket_client_wrap(buff, msg_, len, 10, 1, 1, 0) : websocket_server_wrap(buff, msg_, len, 10, 1, 1, 0)); fio_write2(ws->fd, .data.buffer = buff, .length = len); } else { if (((ws_s *)ws)->is_client) { fio_write2(ws->fd, .data.buffer = "\x89\x80mask", .length = 2, .after.dealloc = FIO_DEALLOC_NOOP); } else { fio_write2(ws->fd, .data.buffer = "\x89\x00", .length = 2, .after.dealloc = FIO_DEALLOC_NOOP); } } } static void websocket_on_protocol_pong(void *ws_p, void *msg, uint64_t len) { (void)len; (void)msg; (void)ws_p; } static void websocket_on_protocol_close(void *ws_p) { ws_s *ws = ws_p; fio_close(ws->fd); } static void websocket_on_protocol_error(void *ws_p) { ws_s *ws = ws_p; fio_close(ws->fd); } /******************************************************************************* The Websocket Protocol implementation */ #define ws_protocol(fd) ((ws_s *)(server_get_protocol(fd))) static void ws_ping(intptr_t fd, fio_protocol_s *ws) { (void)(ws); if (((ws_s *)ws)->is_client) { fio_write2(fd, .data.buffer = "\x89\x80MASK", .length = 6, .after.dealloc = FIO_DEALLOC_NOOP); } else { fio_write2(fd, .data.buffer = "\x89\x00", .length = 2, .after.dealloc = FIO_DEALLOC_NOOP); } } static void on_close(intptr_t uuid, fio_protocol_s *_ws) { destroy_ws((ws_s *)_ws); (void)uuid; } static void on_ready(intptr_t fduuid, fio_protocol_s *ws) { (void)(fduuid); if (((ws_s *)ws)->on_ready) ((ws_s *)ws)->on_ready((ws_s *)ws); } static uint8_t on_shutdown(intptr_t fd, fio_protocol_s *ws) { (void)(fd); if (ws && ((ws_s *)ws)->on_shutdown) ((ws_s *)ws)->on_shutdown((ws_s *)ws); if (((ws_s *)ws)->is_client) { fio_write2(fd, .data.buffer = "\x8a\x80MASK", .length = 6, .after.dealloc = FIO_DEALLOC_NOOP); } else { fio_write2(fd, .data.buffer = "\x8a\x00", .length = 2, .after.dealloc = FIO_DEALLOC_NOOP); } return 0; } static void on_data(intptr_t sockfd, fio_protocol_s *ws_) { ws_s *const ws = (ws_s *)ws_; if (ws == NULL) return; struct websocket_packet_info_s info = websocket_buffer_peek(ws->buffer.data, ws->length); const uint64_t raw_length = info.packet_length + info.head_length; /* test expected data amount */ if (ws->max_msg_size < raw_length) { /* too big */ websocket_close(ws); return; } /* test buffer capacity */ if (raw_length > ws->buffer.size) { ws->buffer.size = (size_t)raw_length; ws->buffer = resize_ws_buffer(ws, ws->buffer); if (!ws->buffer.data) { // no memory. websocket_close(ws); return; } } const ssize_t len = fio_read(sockfd, (uint8_t *)ws->buffer.data + ws->length, ws->buffer.size - ws->length); if (len <= 0) { return; } ws->length = websocket_consume(ws->buffer.data, ws->length + len, ws, (~(ws->is_client) & 1)); fio_force_event(sockfd, FIO_EVENT_ON_DATA); } static void on_data_first(intptr_t sockfd, fio_protocol_s *ws_) { ws_s *const ws = (ws_s *)ws_; if (ws->on_open) ws->on_open(ws); ws->protocol.on_data = on_data; ws->protocol.on_ready = on_ready; if (ws->length) { ws->length = websocket_consume(ws->buffer.data, ws->length, ws, (~(ws->is_client) & 1)); } fio_force_event(sockfd, FIO_EVENT_ON_DATA); fio_force_event(sockfd, FIO_EVENT_ON_READY); } /* later */ static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, char first, char last, char client); /******************************************************************************* Create/Destroy the websocket object */ static ws_s *new_websocket(intptr_t uuid) { // allocate the protocol object ws_s *ws = malloc(sizeof(*ws)); *ws = (ws_s){ .protocol.ping = ws_ping, .protocol.on_data = on_data_first, .protocol.on_close = on_close, .protocol.on_ready = NULL /* filled in after `on_open` */, .protocol.on_shutdown = on_shutdown, .subscriptions = FIO_LS_INIT(ws->subscriptions), .is_client = 0, .fd = uuid, }; return ws; } static void destroy_ws(ws_s *ws) { if (ws->on_close) ws->on_close(ws->fd, ws->udata); if (ws->msg) fiobj_free(ws->msg); clear_subscriptions(ws); free_ws_buffer(ws, ws->buffer); free(ws); } void websocket_attach(intptr_t uuid, http_settings_s *http_settings, websocket_settings_s *args, void *data, size_t length) { ws_s *ws = new_websocket(uuid); FIO_ASSERT_ALLOC(ws); // we have an active websocket connection - prep the connection buffer ws->buffer = create_ws_buffer(ws); // Setup ws callbacks ws->on_open = args->on_open; ws->on_close = args->on_close; ws->on_message = args->on_message; ws->on_ready = args->on_ready; ws->on_shutdown = args->on_shutdown; // setup any user data ws->udata = args->udata; if (http_settings) { // client mode? ws->is_client = http_settings->is_client; // buffer limits ws->max_msg_size = http_settings->ws_max_msg_size; // update the timeout fio_timeout_set(uuid, http_settings->ws_timeout); } else { ws->max_msg_size = (1024 * 256); fio_timeout_set(uuid, 40); } if (data && length) { if (length > ws->buffer.size) { ws->buffer.size = length; ws->buffer = resize_ws_buffer(ws, ws->buffer); if (!ws->buffer.data) { // no memory. fio_attach(uuid, (fio_protocol_s *)ws); websocket_close(ws); return; } } memcpy(ws->buffer.data, data, length); ws->length = length; } // update the protocol object, cleaning up the old one fio_attach(uuid, (fio_protocol_s *)ws); // allow the on_open and on_data to take over the control. fio_force_event(uuid, FIO_EVENT_ON_DATA); } /******************************************************************************* Writing to the Websocket */ #define WS_MAX_FRAME_SIZE \ (FIO_MEMORY_BLOCK_ALLOC_LIMIT - 4096) // should be less then `unsigned short` static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, char first, char last, char client) { if (len <= WS_MAX_FRAME_SIZE) { void *buff = fio_malloc(len + 16); len = (client ? websocket_client_wrap(buff, data, len, (text ? 1 : 2), first, last, 0) : websocket_server_wrap(buff, data, len, (text ? 1 : 2), first, last, 0)); fio_write2(fd, .data.buffer = buff, .length = len, .after.dealloc = fio_free); } else { /* frame fragmentation is better for large data then large frames */ while (len > WS_MAX_FRAME_SIZE) { websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client); data = ((uint8_t *)data) + WS_MAX_FRAME_SIZE; first = 0; len -= WS_MAX_FRAME_SIZE; } websocket_write_impl(fd, data, len, text, first, 1, client); } return; } /* ***************************************************************************** Multi-client broadcast optimizations ***************************************************************************** */ static void websocket_optimize_free(fio_msg_s *msg, void *metadata) { fiobj_free((FIOBJ)metadata); (void)msg; } static inline fio_msg_metadata_s websocket_optimize(fio_str_info_s msg, unsigned char opcode) { FIOBJ out = fiobj_str_buf(msg.len + 10); fiobj_str_resize(out, websocket_server_wrap(fiobj_obj2cstr(out).data, msg.data, msg.len, opcode, 1, 1, 0)); fio_msg_metadata_s ret = { .on_finish = websocket_optimize_free, .metadata = (void *)out, }; return ret; } static fio_msg_metadata_s websocket_optimize_generic(fio_str_info_s ch, fio_str_info_s msg, uint8_t is_json) { fio_str_s tmp = FIO_STR_INIT_EXISTING(ch.data, ch.len, 0); // don't free tmp.dealloc = NULL; unsigned char opcode = 2; if (tmp.len <= (2 << 19) && fio_str_utf8_valid(&tmp)) { opcode = 1; } fio_msg_metadata_s ret = websocket_optimize(msg, opcode); ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB; return ret; (void)ch; (void)is_json; } static fio_msg_metadata_s websocket_optimize_text(fio_str_info_s ch, fio_str_info_s msg, uint8_t is_json) { fio_msg_metadata_s ret = websocket_optimize(msg, 1); ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; return ret; (void)ch; (void)is_json; } static fio_msg_metadata_s websocket_optimize_binary(fio_str_info_s ch, fio_str_info_s msg, uint8_t is_json) { fio_msg_metadata_s ret = websocket_optimize(msg, 2); ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; return ret; (void)ch; (void)is_json; } /** * Enables (or disables) broadcast optimizations. * * When using WebSocket pub/sub system is originally optimized for either * non-direct transmission (messages are handled by callbacks) or direct * transmission to 1-3 clients per channel (on average), meaning that the * majority of the messages are meant for a single recipient (or multiple * callback recipients) and only some are expected to be directly transmitted to * a group. * * However, when most messages are intended for direct transmission to more than * 3 clients (on average), certain optimizations can be made to improve memory * consumption (minimize duplication or WebSocket network data). * * This function allows enablement (or disablement) of these optimizations. * These optimizations include: * * * WEBSOCKET_OPTIMIZE_PUBSUB - optimize all direct transmission messages, * best attempt to detect Text vs. Binary data. * * WEBSOCKET_OPTIMIZE_PUBSUB_TEXT - optimize direct pub/sub text messages. * * WEBSOCKET_OPTIMIZE_PUBSUB_BINARY - optimize direct pub/sub binary messages. * * Note: to disable an optimization it should be disabled the same amount of * times it was enabled - multiple optimization enablements for the same type * are merged, but reference counted (disabled when reference is zero). */ void websocket_optimize4broadcasts(intptr_t type, int enable) { static intptr_t generic = 0; static intptr_t text = 0; static intptr_t binary = 0; fio_msg_metadata_s (*callback)(fio_str_info_s, fio_str_info_s, uint8_t); intptr_t *counter; switch ((0 - type)) { case (0 - WEBSOCKET_OPTIMIZE_PUBSUB): counter = &generic; callback = websocket_optimize_generic; break; case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_TEXT): counter = &text; callback = websocket_optimize_text; break; case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_BINARY): counter = &binary; callback = websocket_optimize_binary; break; default: return; } if (enable) { if (fio_atomic_add(counter, 1) == 1) { fio_message_metadata_callback_set(callback, 1); } } else { if (fio_atomic_sub(counter, 1) == 0) { fio_message_metadata_callback_set(callback, 0); } } } /* ***************************************************************************** Subscription handling ***************************************************************************** */ typedef struct { void (*on_message)(ws_s *ws, fio_str_info_s channel, fio_str_info_s msg, void *udata); void (*on_unsubscribe)(void *udata); void *udata; } websocket_sub_data_s; static inline void websocket_on_pubsub_message_direct_internal(fio_msg_s *msg, uint8_t txt) { fio_protocol_s *pr = fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_WRITE); if (!pr) { if (errno == EBADF) return; fio_message_defer(msg); return; } FIOBJ message = FIOBJ_INVALID; FIOBJ pre_wrapped = FIOBJ_INVALID; if (!((ws_s *)pr)->is_client) { /* pre-wrapping is only for client data */ switch (txt) { case 0: pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_BINARY); break; case 1: pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_TEXT); break; case 2: pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB); break; default: break; } if (pre_wrapped) { // FIO_LOG_DEBUG( // "pub/sub WebSocket optimization route for pre-wrapped message."); fiobj_send_free((intptr_t)msg->udata1, fiobj_dup(pre_wrapped)); goto finish; } } if (txt == 2) { /* unknown text state */ fio_str_s tmp = FIO_STR_INIT_STATIC2(msg->msg.data, msg->msg.len); // don't free txt = (tmp.len >= (2 << 14) ? 0 : fio_str_utf8_valid(&tmp)); } websocket_write((ws_s *)pr, msg->msg, txt & 1); fiobj_free(message); finish: fio_protocol_unlock(pr, FIO_PR_LOCK_WRITE); } static void websocket_on_pubsub_message_direct(fio_msg_s *msg) { websocket_on_pubsub_message_direct_internal(msg, 2); } static void websocket_on_pubsub_message_direct_txt(fio_msg_s *msg) { websocket_on_pubsub_message_direct_internal(msg, 1); } static void websocket_on_pubsub_message_direct_bin(fio_msg_s *msg) { websocket_on_pubsub_message_direct_internal(msg, 0); } static void websocket_on_pubsub_message(fio_msg_s *msg) { fio_protocol_s *pr = fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_TASK); if (!pr) { if (errno == EBADF) return; fio_message_defer(msg); return; } websocket_sub_data_s *d = msg->udata2; if (d->on_message) d->on_message((ws_s *)pr, msg->channel, msg->msg, d->udata); fio_protocol_unlock(pr, FIO_PR_LOCK_TASK); } static void websocket_on_unsubscribe(void *u1, void *u2) { websocket_sub_data_s *d = u2; if (d->on_unsubscribe) { d->on_unsubscribe(d->udata); } if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB) { websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB, 0); } else if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_TEXT) { websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 0); } else if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_BINARY) { websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_BINARY, 0); } free(d); (void)u1; } /** * Returns a subscription ID on success and 0 on failure. */ #undef websocket_subscribe uintptr_t websocket_subscribe(struct websocket_subscribe_s args) { if (!args.ws || !fio_is_valid(args.ws->fd)) goto error; websocket_sub_data_s *d = malloc(sizeof(*d)); FIO_ASSERT_ALLOC(d); *d = (websocket_sub_data_s){ .udata = args.udata, .on_message = args.on_message, .on_unsubscribe = args.on_unsubscribe, }; void (*handler)(fio_msg_s *) = websocket_on_pubsub_message; if (!args.on_message) { intptr_t br_type; if (args.force_binary) { br_type = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; handler = websocket_on_pubsub_message_direct_bin; } else if (args.force_text) { br_type = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; handler = websocket_on_pubsub_message_direct_txt; } else { br_type = WEBSOCKET_OPTIMIZE_PUBSUB; handler = websocket_on_pubsub_message_direct; } websocket_optimize4broadcasts(br_type, 1); d->on_message = (void (*)(ws_s *, fio_str_info_s, fio_str_info_s, void *))br_type; } subscription_s *sub = fio_subscribe(.channel = args.channel, .match = args.match, .on_unsubscribe = websocket_on_unsubscribe, .on_message = handler, .udata1 = (void *)args.ws->fd, .udata2 = d); if (!sub) { /* don't free `d`, return (`d` freed by fio_subscribe) */ return 0; } fio_ls_s *pos; fio_lock(&args.ws->sub_lock); pos = fio_ls_push(&args.ws->subscriptions, sub); fio_unlock(&args.ws->sub_lock); return (uintptr_t)pos; error: if (args.on_unsubscribe) args.on_unsubscribe(args.udata); return 0; } /** * Unsubscribes from a channel. */ void websocket_unsubscribe(ws_s *ws, uintptr_t subscription_id) { fio_unsubscribe((subscription_s *)((fio_ls_s *)subscription_id)->obj); fio_lock(&ws->sub_lock); fio_ls_remove((fio_ls_s *)subscription_id); fio_unlock(&ws->sub_lock); (void)ws; } /******************************************************************************* The API implementation */ /** Returns the opaque user data associated with the websocket. */ void *websocket_udata_get(ws_s *ws) { return ws->udata; } /** Returns the the process specific connection's UUID (see `libsock`). */ intptr_t websocket_uuid(ws_s *ws) { return ws->fd; } /** Sets the opaque user data associated with the websocket. * Returns the old value, if any. */ void *websocket_udata_set(ws_s *ws, void *udata) { void *old = ws->udata; ws->udata = udata; return old; } /** * Returns 1 if the WebSocket connection is in Client mode (connected to a * remote server) and 0 if the connection is in Server mode (a connection * established using facil.io's HTTP server). */ uint8_t websocket_is_client(ws_s *ws) { return ws->is_client; } /** Writes data to the websocket. Returns -1 on failure (0 on success). */ int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text) { if (fio_is_valid(ws->fd)) { websocket_write_impl(ws->fd, msg.data, msg.len, is_text, 1, 1, ws->is_client); return 0; } return -1; } /** Closes a websocket connection. */ void websocket_close(ws_s *ws) { fio_write2(ws->fd, .data.buffer = "\x88\x00", .length = 2, .after.dealloc = FIO_DEALLOC_NOOP); fio_close(ws->fd); return; }