/* Copyright: Boaz segev, 2016-2019 License: MIT Feel free to copy, use and enjoy according to the license provided. */ #define FIO_INCLUDE_LINKED_LIST #define FIO_INCLUDE_STR // #define DEBUG 1 #include #include #include #include #define REDIS_READ_BUFFER 8192 /* ***************************************************************************** The Redis Engine and Callbacks Object ***************************************************************************** */ typedef struct { fio_pubsub_engine_s en; struct redis_engine_internal_s { fio_protocol_s protocol; intptr_t uuid; resp_parser_s parser; void (*on_message)(struct redis_engine_internal_s *parser, FIOBJ msg); FIOBJ str; FIOBJ ary; uint32_t ary_count; uint16_t buf_pos; uint16_t nesting; } pub_data, sub_data; subscription_s *publication_forwarder; subscription_s *cmd_forwarder; subscription_s *cmd_reply; char *address; char *port; char *auth; FIOBJ last_ch; size_t auth_len; size_t ref; fio_ls_embd_s queue; fio_lock_i lock; fio_lock_i lock_connection; uint8_t ping_int; volatile uint8_t pub_sent; volatile uint8_t flag; uint8_t buf[]; } redis_engine_s; typedef struct { fio_ls_embd_s node; void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply, void *udata); void *udata; size_t cmd_len; uint8_t cmd[]; } redis_commands_s; /** converts from a publishing protocol to an `redis_engine_s`. */ #define pub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, pub_data, (pr)) /** converts from a subscribing protocol to an `redis_engine_s`. */ #define sub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, sub_data, (pr)) /** converts from a `resp_parser_s` to the internal data structure. */ #define parser2data(prsr) \ FIO_LS_EMBD_OBJ(struct redis_engine_internal_s, parser, (prsr)) /* releases any resources used by an internal engine*/ static inline void redis_internal_reset(struct redis_engine_internal_s *i) { i->buf_pos = 0; i->parser = (resp_parser_s){.obj_countdown = 0, .expecting = 0}; fiobj_free((FIOBJ)fio_ct_if(i->ary == FIOBJ_INVALID, (uintptr_t)i->str, (uintptr_t)i->ary)); i->str = FIOBJ_INVALID; i->ary = FIOBJ_INVALID; i->ary_count = 0; i->nesting = 0; i->uuid = -1; } /** cleans up and frees the engine data. */ static inline void redis_free(redis_engine_s *r) { if (fio_atomic_sub(&r->ref, 1)) return; FIO_LOG_DEBUG("freeing redis engine for %s:%s", r->address, r->port); redis_internal_reset(&r->pub_data); redis_internal_reset(&r->sub_data); fiobj_free(r->last_ch); while (fio_ls_embd_any(&r->queue)) { fio_free( FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue))); } fio_unsubscribe(r->publication_forwarder); r->publication_forwarder = NULL; fio_unsubscribe(r->cmd_forwarder); r->cmd_forwarder = NULL; fio_unsubscribe(r->cmd_reply); r->cmd_reply = NULL; fio_free(r); } /* ***************************************************************************** Simple RESP formatting ***************************************************************************** */ inline static void fiobj2resp___internal(FIOBJ dest, FIOBJ obj) { fio_str_info_s s; switch (FIOBJ_TYPE(obj)) { case FIOBJ_T_NULL: fiobj_str_write(dest, "$-1\r\n", 5); break; case FIOBJ_T_ARRAY: fiobj_str_write(dest, "*", 1); fiobj_str_write_i(dest, fiobj_ary_count(obj)); fiobj_str_write(dest, "\r\n", 2); break; case FIOBJ_T_HASH: fiobj_str_write(dest, "*", 1); fiobj_str_write_i(dest, fiobj_hash_count(obj) * 2); fiobj_str_write(dest, "\r\n", 2); break; case FIOBJ_T_TRUE: fiobj_str_write(dest, "$4\r\ntrue\r\n", 10); break; case FIOBJ_T_FALSE: fiobj_str_write(dest, "$4\r\nfalse\r\n", 11); break; #if 0 /* Numbers aren't as good for commands as one might think... */ case FIOBJ_T_NUMBER: fiobj_str_write(dest, ":", 1); fiobj_str_write_i(dest, fiobj_obj2num(obj)); fiobj_str_write(dest, "\r\n", 2); break; #else case FIOBJ_T_NUMBER: /* overflow */ #endif case FIOBJ_T_FLOAT: /* overflow */ case FIOBJ_T_UNKNOWN: /* overflow */ case FIOBJ_T_STRING: /* overflow */ case FIOBJ_T_DATA: s = fiobj_obj2cstr(obj); fiobj_str_write(dest, "$", 1); fiobj_str_write_i(dest, s.len); fiobj_str_write(dest, "\r\n", 2); fiobj_str_write(dest, s.data, s.len); fiobj_str_write(dest, "\r\n", 2); break; } } static int fiobj2resp_task(FIOBJ o, void *dest_) { if (fiobj_hash_key_in_loop()) fiobj2resp___internal((FIOBJ)dest_, fiobj_hash_key_in_loop()); fiobj2resp___internal((FIOBJ)dest_, o); return 0; } /** * Converts FIOBJ objects into a RESP string (client mode). */ static FIOBJ fiobj2resp(FIOBJ dest, FIOBJ obj) { fiobj_each2(obj, fiobj2resp_task, (void *)dest); return dest; } /** * Converts FIOBJ objects into a RESP string (client mode). * * Don't call `fiobj_free`, object will self-destruct. */ static inline FIOBJ fiobj2resp_tmp(FIOBJ obj) { return fiobj2resp(fiobj_str_tmp(), obj); } /* ***************************************************************************** RESP parser callbacks ***************************************************************************** */ /** a local static callback, called when a parser / protocol error occurs. */ static int resp_on_parser_error(resp_parser_s *parser) { struct redis_engine_internal_s *i = parser2data(parser); FIO_LOG_ERROR("(redis) parser error - attempting to restart connection.\n"); fio_close(i->uuid); return -1; } /** a local static callback, called when the RESP message is complete. */ static int resp_on_message(resp_parser_s *parser) { struct redis_engine_internal_s *i = parser2data(parser); FIOBJ msg = i->ary ? i->ary : i->str; i->on_message(i, msg); /* cleanup */ fiobj_free(msg); i->ary = FIOBJ_INVALID; i->str = FIOBJ_INVALID; return 0; } /** a local helper to add parsed objects to the data store. */ static inline void resp_add_obj(struct redis_engine_internal_s *dest, FIOBJ o) { if (dest->ary) { fiobj_ary_push(dest->ary, o); --dest->ary_count; if (!dest->ary_count && dest->nesting) { FIOBJ tmp = fiobj_ary_shift(dest->ary); dest->ary_count = fiobj_obj2num(tmp); fiobj_free(tmp); dest->ary = fiobj_ary_shift(dest->ary); --dest->nesting; } } dest->str = o; } /** a local static callback, called when a Number object is parsed. */ static int resp_on_number(resp_parser_s *parser, int64_t num) { struct redis_engine_internal_s *data = parser2data(parser); resp_add_obj(data, fiobj_num_new(num)); return 0; } /** a local static callback, called when a OK message is received. */ static int resp_on_okay(resp_parser_s *parser) { struct redis_engine_internal_s *data = parser2data(parser); resp_add_obj(data, fiobj_true()); return 0; } /** a local static callback, called when NULL is received. */ static int resp_on_null(resp_parser_s *parser) { struct redis_engine_internal_s *data = parser2data(parser); resp_add_obj(data, fiobj_null()); return 0; } /** * a local static callback, called when a String should be allocated. * * `str_len` is the expected number of bytes that will fill the final string * object, without any NUL byte marker (the string might be binary). * * If this function returns any value besides 0, parsing is stopped. */ static int resp_on_start_string(resp_parser_s *parser, size_t str_len) { struct redis_engine_internal_s *data = parser2data(parser); resp_add_obj(data, fiobj_str_buf(str_len)); return 0; } /** a local static callback, called as String objects are streamed. */ static int resp_on_string_chunk(resp_parser_s *parser, void *data, size_t len) { struct redis_engine_internal_s *i = parser2data(parser); fiobj_str_write(i->str, data, len); return 0; } /** a local static callback, called when a String object had finished * streaming. */ static int resp_on_end_string(resp_parser_s *parser) { return 0; (void)parser; } /** a local static callback, called an error message is received. */ static int resp_on_err_msg(resp_parser_s *parser, void *data, size_t len) { struct redis_engine_internal_s *i = parser2data(parser); resp_add_obj(i, fiobj_str_new(data, len)); return 0; } /** * a local static callback, called when an Array should be allocated. * * `array_len` is the expected number of objects that will fill the Array * object. * * There's no `resp_on_end_array` callback since the RESP protocol assumes the * message is finished along with the Array (`resp_on_message` is called). * However, just in case a non-conforming client/server sends nested Arrays, * the callback should test against possible overflow or nested Array endings. * * If this function returns any value besides 0, parsing is stopped. */ static int resp_on_start_array(resp_parser_s *parser, size_t array_len) { struct redis_engine_internal_s *i = parser2data(parser); if (i->ary) { ++i->nesting; FIOBJ tmp = fiobj_ary_new2(array_len + 2); fiobj_ary_push(tmp, fiobj_num_new(i->ary_count)); fiobj_ary_push(tmp, fiobj_num_new(i->ary)); i->ary = tmp; } else { i->ary = fiobj_ary_new2(array_len + 2); } i->ary_count = array_len; return 0; } /* ***************************************************************************** Publication and Command Handling ***************************************************************************** */ /* the deferred callback handler */ static void redis_perform_callback(void *e, void *cmd_) { redis_commands_s *cmd = cmd_; FIOBJ reply = (FIOBJ)cmd->node.next; if (cmd->callback) cmd->callback(e, reply, cmd->udata); fiobj_free(reply); FIO_LOG_DEBUG("Handled: %s\n", cmd->cmd); fio_free(cmd); } /* send command within lock, to ensure flag integrity */ static void redis_send_next_command_unsafe(redis_engine_s *r) { if (!r->pub_sent && fio_ls_embd_any(&r->queue)) { r->pub_sent = 1; redis_commands_s *cmd = FIO_LS_EMBD_OBJ(redis_commands_s, node, r->queue.next); fio_write2(r->pub_data.uuid, .data.buffer = cmd->cmd, .length = cmd->cmd_len, .after.dealloc = FIO_DEALLOC_NOOP); FIO_LOG_DEBUG("(redis %d) Sending (%zu bytes):\n%s\n", (int)getpid(), cmd->cmd_len, cmd->cmd); } } /* attach a command to the queue */ static void redis_attach_cmd(redis_engine_s *r, redis_commands_s *cmd) { fio_lock(&r->lock); fio_ls_embd_push(&r->queue, &cmd->node); redis_send_next_command_unsafe(r); fio_unlock(&r->lock); } /** a local static callback, called when the RESP message is complete. */ static void resp_on_pub_message(struct redis_engine_internal_s *i, FIOBJ msg) { redis_engine_s *r = pub2redis(i); // #if DEBUG if (FIO_LOG_LEVEL >= FIO_LOG_LEVEL_DEBUG) { FIOBJ json = fiobj_obj2json(msg, 1); FIO_LOG_DEBUG("Redis reply:\n%s\n", fiobj_obj2cstr(json).data); fiobj_free(json); } // #endif /* publishing / command parser */ fio_lock(&r->lock); fio_ls_embd_s *node = fio_ls_embd_shift(&r->queue); r->pub_sent = 0; redis_send_next_command_unsafe(r); fio_unlock(&r->lock); if (!node) { /* TODO: possible ping? from server?! not likely... */ FIO_LOG_WARNING("(redis %d) received a reply when no command was sent.", (int)getpid()); return; } node->next = (void *)fiobj_dup(msg); fio_defer(redis_perform_callback, &r->en, FIO_LS_EMBD_OBJ(redis_commands_s, node, node)); } /* ***************************************************************************** Subscription Message Handling ***************************************************************************** */ /** a local static callback, called when the RESP message is complete. */ static void resp_on_sub_message(struct redis_engine_internal_s *i, FIOBJ msg) { redis_engine_s *r = sub2redis(i); /* subscriotion parser */ if (FIOBJ_TYPE(msg) != FIOBJ_T_ARRAY) { if (FIOBJ_TYPE(msg) != FIOBJ_T_STRING || fiobj_obj2cstr(msg).len != 4 || fiobj_obj2cstr(msg).data[0] != 'P') { FIO_LOG_WARNING("(redis) unexpected data format in " "subscription stream (%zu bytes):\n %s\n", fiobj_obj2cstr(msg).len, fiobj_obj2cstr(msg).data); } } else { // FIOBJ *ary = fiobj_ary2ptr(msg); // for (size_t i = 0; i < fiobj_ary_count(msg); ++i) { // fio_str_info_s tmp = fiobj_obj2cstr(ary[i]); // fprintf(stderr, "(%lu) %s\n", (unsigned long)i, tmp.data); // } fio_str_info_s tmp = fiobj_obj2cstr(fiobj_ary_index(msg, 0)); if (tmp.len == 7) { /* "message" */ fiobj_free(r->last_ch); r->last_ch = fiobj_dup(fiobj_ary_index(msg, 1)); fio_publish(.channel = fiobj_obj2cstr(r->last_ch), .message = fiobj_obj2cstr(fiobj_ary_index(msg, 2)), .engine = FIO_PUBSUB_CLUSTER); } else if (tmp.len == 8) { /* "pmessage" */ if (!fiobj_iseq(r->last_ch, fiobj_ary_index(msg, 2))) fio_publish(.channel = fiobj_obj2cstr(fiobj_ary_index(msg, 2)), .message = fiobj_obj2cstr(fiobj_ary_index(msg, 3)), .engine = FIO_PUBSUB_CLUSTER); } } } /* ***************************************************************************** Connection Callbacks (fio_protocol_s) and Engine ***************************************************************************** */ /** defined later - connects to Redis */ static void redis_connect(void *r, void *i); #define defer_redis_connect(r, i) \ do { \ fio_atomic_add(&(r)->ref, 1); \ fio_defer(redis_connect, (r), (i)); \ } while (0); /** Called when a data is available, but will not run concurrently */ static void redis_on_data(intptr_t uuid, fio_protocol_s *pr) { struct redis_engine_internal_s *internal = (struct redis_engine_internal_s *)pr; uint8_t *buf; if (internal->on_message == resp_on_sub_message) { buf = sub2redis(pr)->buf + REDIS_READ_BUFFER; } else { buf = pub2redis(pr)->buf; } ssize_t i = fio_read(uuid, buf + internal->buf_pos, REDIS_READ_BUFFER - internal->buf_pos); if (i <= 0) return; internal->buf_pos += i; i = resp_parse(&internal->parser, buf, internal->buf_pos); if (i) { memmove(buf, buf + internal->buf_pos - i, i); } internal->buf_pos = i; } /** Called when the connection was closed, but will not run concurrently */ static void redis_on_close(intptr_t uuid, fio_protocol_s *pr) { struct redis_engine_internal_s *internal = (struct redis_engine_internal_s *)pr; redis_internal_reset(internal); redis_engine_s *r; if (internal->on_message == resp_on_sub_message) { r = sub2redis(pr); fiobj_free(r->last_ch); r->last_ch = FIOBJ_INVALID; if (r->flag) { /* reconnection for subscription connection. */ if (uuid != -1) { FIO_LOG_WARNING("(redis %d) subscription connection lost. " "Reconnecting...", (int)getpid()); } fio_atomic_sub(&r->ref, 1); defer_redis_connect(r, internal); } else { redis_free(r); } } else { r = pub2redis(pr); if (r->flag && uuid != -1) { FIO_LOG_WARNING("(redis %d) publication connection lost. " "Reconnecting...", (int)getpid()); } r->pub_sent = 0; fio_close(r->sub_data.uuid); redis_free(r); } (void)uuid; } /** Called before the facil.io reactor is shut down. */ static uint8_t redis_on_shutdown(intptr_t uuid, fio_protocol_s *pr) { fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nQUIT\r\n", .length = 14, .after.dealloc = FIO_DEALLOC_NOOP); return 0; (void)pr; } /** Called on connection timeout. */ static void redis_sub_ping(intptr_t uuid, fio_protocol_s *pr) { fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nPING\r\n", .length = 14, .after.dealloc = FIO_DEALLOC_NOOP); (void)pr; } /** Called on connection timeout. */ static void redis_pub_ping(intptr_t uuid, fio_protocol_s *pr) { redis_engine_s *r = pub2redis(pr); if (fio_ls_embd_any(&r->queue)) { FIO_LOG_WARNING("(redis) Redis server unresponsive, disconnecting."); fio_close(uuid); return; } redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + 15); *cmd = (redis_commands_s){.cmd_len = 14}; memcpy(cmd->cmd, "*1\r\n$4\r\nPING\r\n\0", 15); redis_attach_cmd(r, cmd); } /* ***************************************************************************** Connecting to Redis ***************************************************************************** */ static void redis_on_auth(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) { if (FIOBJ_TYPE_IS(reply, FIOBJ_T_TRUE)) { fio_str_info_s s = fiobj_obj2cstr(reply); FIO_LOG_WARNING("(redis) Authentication FAILED." " %.*s", (int)s.len, s.data); } (void)e; (void)udata; } static void redis_on_connect(intptr_t uuid, void *i_) { struct redis_engine_internal_s *i = i_; redis_engine_s *r; i->uuid = uuid; if (i->on_message == resp_on_sub_message) { r = sub2redis(i); if (r->auth_len) { fio_write2(uuid, .data.buffer = r->auth, .length = r->auth_len, .after.dealloc = FIO_DEALLOC_NOOP); } fio_pubsub_reattach(&r->en); if (r->pub_data.uuid == -1) { defer_redis_connect(r, &r->pub_data); } FIO_LOG_INFO("(redis %d) subscription connection established.", (int)getpid()); } else { r = pub2redis(i); if (r->auth_len) { redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + r->auth_len); *cmd = (redis_commands_s){.cmd_len = r->auth_len, .callback = redis_on_auth}; memcpy(cmd->cmd, r->auth, r->auth_len); fio_lock(&r->lock); r->pub_sent = 0; fio_ls_embd_unshift(&r->queue, &cmd->node); redis_send_next_command_unsafe(r); fio_unlock(&r->lock); } else { fio_lock(&r->lock); r->pub_sent = 0; redis_send_next_command_unsafe(r); fio_unlock(&r->lock); } FIO_LOG_INFO("(redis %d) publication connection established.", (int)getpid()); } i->protocol.rsv = 0; fio_attach(uuid, &i->protocol); fio_timeout_set(uuid, r->ping_int); return; } static void redis_on_connect_failed(intptr_t uuid, void *i_) { struct redis_engine_internal_s *i = i_; i->uuid = -1; i->protocol.on_close(-1, &i->protocol); (void)uuid; } static void redis_connect(void *r_, void *i_) { redis_engine_s *r = r_; struct redis_engine_internal_s *i = i_; fio_lock(&r->lock_connection); if (r->flag == 0 || i->uuid != -1 || !fio_is_running()) { fio_unlock(&r->lock_connection); redis_free(r); return; } // fio_atomic_add(&r->ref, 1); i->uuid = fio_connect(.address = r->address, .port = r->port, .on_connect = redis_on_connect, .udata = i, .on_fail = redis_on_connect_failed); fio_unlock(&r->lock_connection); } /* ***************************************************************************** Engine / Bridge Callbacks (Root Process) ***************************************************************************** */ static void redis_on_subscribe_root(const fio_pubsub_engine_s *eng, fio_str_info_s channel, fio_match_fn match) { redis_engine_s *r = (redis_engine_s *)eng; if (r->sub_data.uuid != -1) { FIOBJ cmd = fiobj_str_buf(96 + channel.len); if (match == FIO_MATCH_GLOB) fiobj_str_write(cmd, "*2\r\n$10\r\nPSUBSCRIBE\r\n$", 22); else fiobj_str_write(cmd, "*2\r\n$9\r\nSUBSCRIBE\r\n$", 20); fiobj_str_write_i(cmd, channel.len); fiobj_str_write(cmd, "\r\n", 2); fiobj_str_write(cmd, channel.data, channel.len); fiobj_str_write(cmd, "\r\n", 2); // { // fio_str_info_s s = fiobj_obj2cstr(cmd); // fprintf(stderr, "(%d) Sending Subscription (%p):\n%s\n", getpid(), // (void *)r->sub_data.uuid, s.data); // } fiobj_send_free(r->sub_data.uuid, cmd); } } static void redis_on_unsubscribe_root(const fio_pubsub_engine_s *eng, fio_str_info_s channel, fio_match_fn match) { redis_engine_s *r = (redis_engine_s *)eng; if (r->sub_data.uuid != -1) { fio_str_s *cmd = fio_str_new2(); fio_str_capa_assert(cmd, 96 + channel.len); if (match == FIO_MATCH_GLOB) fio_str_write(cmd, "*2\r\n$12\r\nPUNSUBSCRIBE\r\n$", 24); else fio_str_write(cmd, "*2\r\n$11\r\nUNSUBSCRIBE\r\n$", 23); fio_str_write_i(cmd, channel.len); fio_str_write(cmd, "\r\n", 2); fio_str_write(cmd, channel.data, channel.len); fio_str_write(cmd, "\r\n", 2); // { // fio_str_info_s s = fio_str_info(cmd); // fprintf(stderr, "(%d) Cancel Subscription (%p):\n%s\n", getpid(), // (void *)r->sub_data.uuid, s.data); // } fio_str_send_free2(r->sub_data.uuid, cmd); } } static void redis_on_publish_root(const fio_pubsub_engine_s *eng, fio_str_info_s channel, fio_str_info_s msg, uint8_t is_json) { redis_engine_s *r = (redis_engine_s *)eng; redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + channel.len + msg.len + 96); *cmd = (redis_commands_s){.cmd_len = 0}; memcpy(cmd->cmd, "*3\r\n$7\r\nPUBLISH\r\n$", 18); char *buf = (char *)cmd->cmd + 18; buf += fio_ltoa((void *)buf, channel.len, 10); *buf++ = '\r'; *buf++ = '\n'; memcpy(buf, channel.data, channel.len); buf += channel.len; *buf++ = '\r'; *buf++ = '\n'; *buf++ = '$'; buf += fio_ltoa(buf, msg.len, 10); *buf++ = '\r'; *buf++ = '\n'; memcpy(buf, msg.data, msg.len); buf += msg.len; *buf++ = '\r'; *buf++ = '\n'; *buf = 0; FIO_LOG_DEBUG("(%d) Publishing:\n%s", (int)getpid(), cmd->cmd); cmd->cmd_len = (uintptr_t)buf - (uintptr_t)(cmd + 1); redis_attach_cmd(r, cmd); return; (void)is_json; } /* ***************************************************************************** Engine / Bridge Stub Callbacks (Child Process) ***************************************************************************** */ static void redis_on_mock_subscribe_child(const fio_pubsub_engine_s *eng, fio_str_info_s channel, fio_match_fn match) { /* do nothing, root process is notified about (un)subscriptions by facil.io */ (void)eng; (void)channel; (void)match; } static void redis_on_publish_child(const fio_pubsub_engine_s *eng, fio_str_info_s channel, fio_str_info_s msg, uint8_t is_json) { /* attach engine data to channel (prepend) */ fio_str_s tmp = FIO_STR_INIT; /* by using fio_str_s, short names are allocated on the stack */ fio_str_info_s tmp_info = fio_str_resize(&tmp, channel.len + 8); fio_u2str64(tmp_info.data, (uint64_t)eng); memcpy(tmp_info.data + 8, channel.data, channel.len); /* forward publication request to Root */ fio_publish(.filter = -1, .channel = tmp_info, .message = msg, .engine = FIO_PUBSUB_ROOT, .is_json = is_json); fio_str_free(&tmp); (void)eng; } /* ***************************************************************************** Root Publication Handler ***************************************************************************** */ /* listens to filter -1 and publishes and messages */ static void redis_on_internal_publish(fio_msg_s *msg) { if (msg->channel.len < 8) return; /* internal error, unexpected data */ void *en = (void *)fio_str2u64(msg->channel.data); if (en != msg->udata1) return; /* should be delivered by a different engine */ /* step after the engine data */ msg->channel.len -= 8; msg->channel.data += 8; /* forward to publishing */ FIO_LOG_DEBUG("Forwarding to engine %p, on channel %s", msg->udata1, msg->channel.data); redis_on_publish_root(msg->udata1, msg->channel, msg->msg, msg->is_json); } /* ***************************************************************************** Sending commands using the Root connection ***************************************************************************** */ /* callback from the Redis reply */ static void redis_forward_reply(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) { uint8_t *data = udata; fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(data + 0); void *callback = (void *)fio_str2u64(data + 8); if (engine != e || !callback) { FIO_LOG_DEBUG("Redis reply not forwarded (callback: %p)", callback); return; } int32_t pid = (int32_t)fio_str2u32(data + 24); FIOBJ rp = fiobj_obj2json(reply, 0); fio_publish(.filter = (-10 - (int32_t)pid), .channel.data = (char *)data, .channel.len = 28, .message = fiobj_obj2cstr(rp), .is_json = 1); fiobj_free(rp); } /* listens to channel -2 for commands that need to be sent (only ROOT) */ static void redis_on_internal_cmd(fio_msg_s *msg) { // void*(void *)fio_str2u64(msg->msg.data); fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0); if (engine != msg->udata1) { return; } redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + msg->msg.len + 1 + 28); FIO_ASSERT_ALLOC(cmd); *cmd = (redis_commands_s){.callback = redis_forward_reply, .udata = (cmd->cmd + msg->msg.len + 1), .cmd_len = msg->msg.len}; memcpy(cmd->cmd, msg->msg.data, msg->msg.len); memcpy(cmd->cmd + msg->msg.len + 1, msg->channel.data, 28); redis_attach_cmd((redis_engine_s *)engine, cmd); // fprintf(stderr, " *** Attached CMD (%d) ***\n%s\n", getpid(), cmd->cmd); } /* Listens on filter `-10 -getpid()` for incoming reply data */ static void redis_on_internal_reply(fio_msg_s *msg) { fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0); if (engine != msg->udata1) { FIO_LOG_DEBUG("Redis reply not forwarded (engine mismatch: %p != %p)", (void *)engine, msg->udata1); return; } FIOBJ reply; fiobj_json2obj(&reply, msg->msg.data, msg->msg.len); void (*callback)(fio_pubsub_engine_s *, FIOBJ, void *) = (void (*)( fio_pubsub_engine_s *, FIOBJ, void *))fio_str2u64(msg->channel.data + 8); void *udata = (void *)fio_str2u64(msg->channel.data + 16); callback(engine, reply, udata); fiobj_free(reply); } /* publishes a Redis command to Root's filter -2 */ intptr_t redis_engine_send(fio_pubsub_engine_s *engine, FIOBJ command, void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply, void *udata), void *udata) { if ((uintptr_t)engine < 4) { FIO_LOG_WARNING("(redis send) trying to use one of the core engines"); return -1; } // if(fio_is_master()) { // FIOBJ resp = fiobj2resp_tmp(fio_str_info_s obj1, FIOBJ obj2); // TODO... // } else { /* forward publication request to Root */ fio_str_s tmp = FIO_STR_INIT; fio_str_info_s ti = fio_str_resize(&tmp, 28); /* combine metadata */ fio_u2str64(ti.data + 0, (uint64_t)engine); fio_u2str64(ti.data + 8, (uint64_t)callback); fio_u2str64(ti.data + 16, (uint64_t)udata); fio_u2str32(ti.data + 24, (uint32_t)getpid()); FIOBJ cmd = fiobj2resp_tmp(command); fio_publish(.filter = -2, .channel = ti, .message = fiobj_obj2cstr(cmd), .engine = FIO_PUBSUB_ROOT, .is_json = 0); fio_str_free(&tmp); // } return 0; } /* ***************************************************************************** Redis Engine Creation ***************************************************************************** */ static void redis_on_facil_start(void *r_) { redis_engine_s *r = r_; r->flag = 1; if (!fio_is_valid(r->sub_data.uuid)) { defer_redis_connect(r, &r->sub_data); } } static void redis_on_facil_shutdown(void *r_) { redis_engine_s *r = r_; r->flag = 0; } static void redis_on_engine_fork(void *r_) { redis_engine_s *r = r_; r->flag = 0; r->lock = FIO_LOCK_INIT; fio_force_close(r->sub_data.uuid); r->sub_data.uuid = -1; fio_force_close(r->pub_data.uuid); r->pub_data.uuid = -1; while (fio_ls_embd_any(&r->queue)) { redis_commands_s *cmd = FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue)); fio_free(cmd); } r->en = (fio_pubsub_engine_s){ .subscribe = redis_on_mock_subscribe_child, .unsubscribe = redis_on_mock_subscribe_child, .publish = redis_on_publish_child, }; fio_unsubscribe(r->publication_forwarder); r->publication_forwarder = NULL; fio_unsubscribe(r->cmd_forwarder); r->cmd_forwarder = NULL; fio_unsubscribe(r->cmd_reply); r->cmd_reply = fio_subscribe(.filter = -10 - (int32_t)getpid(), .on_message = redis_on_internal_reply, .udata1 = r); } fio_pubsub_engine_s *redis_engine_create FIO_IGNORE_MACRO(struct redis_engine_create_args args) { if (getpid() != fio_parent_pid()) { FIO_LOG_FATAL("(redis) Redis engine initialization can only " "be performed in the Root process."); kill(0, SIGINT); fio_stop(); return NULL; } if (!args.address.len && args.address.data) args.address.len = strlen(args.address.data); if (!args.port.len && args.port.data) args.port.len = strlen(args.port.data); if (!args.auth.len && args.auth.data) { args.auth.len = strlen(args.auth.data); } if (!args.address.data || !args.address.len) { args.address = (fio_str_info_s){.len = 9, .data = (char *)"localhost"}; } if (!args.port.data || !args.port.len) { args.port = (fio_str_info_s){.len = 4, .data = (char *)"6379"}; } redis_engine_s *r = fio_malloc(sizeof(*r) + args.port.len + 1 + args.address.len + 1 + args.auth.len + 1 + (REDIS_READ_BUFFER * 2)); FIO_ASSERT_ALLOC(r); *r = (redis_engine_s){ .en = { .subscribe = redis_on_subscribe_root, .unsubscribe = redis_on_unsubscribe_root, .publish = redis_on_publish_root, }, .pub_data = { .protocol = { .on_data = redis_on_data, .on_close = redis_on_close, .on_shutdown = redis_on_shutdown, .ping = redis_pub_ping, }, .uuid = -1, .on_message = resp_on_pub_message, }, .sub_data = { .protocol = { .on_data = redis_on_data, .on_close = redis_on_close, .on_shutdown = redis_on_shutdown, .ping = redis_sub_ping, }, .on_message = resp_on_sub_message, .uuid = -1, }, .publication_forwarder = fio_subscribe(.filter = -1, .udata1 = r, .on_message = redis_on_internal_publish), .cmd_forwarder = fio_subscribe(.filter = -2, .udata1 = r, .on_message = redis_on_internal_cmd), .cmd_reply = fio_subscribe(.filter = -10 - (uint32_t)getpid(), .udata1 = r, .on_message = redis_on_internal_reply), .address = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2)), .port = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len + 1), .auth = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len + args.port.len + 2), .auth_len = args.auth.len, .ref = 1, .queue = FIO_LS_INIT(r->queue), .lock = FIO_LOCK_INIT, .lock_connection = FIO_LOCK_INIT, .ping_int = args.ping_interval, .flag = 1, }; memcpy(r->address, args.address.data, args.address.len); memcpy(r->port, args.port.data, args.port.len); if (args.auth.len) memcpy(r->auth, args.auth.data, args.auth.len); fio_pubsub_attach(&r->en); redis_on_facil_start(r); fio_state_callback_add(FIO_CALL_IN_CHILD, redis_on_engine_fork, r); fio_state_callback_add(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r); /* if restarting */ fio_state_callback_add(FIO_CALL_PRE_START, redis_on_facil_start, r); FIO_LOG_DEBUG("Redis engine initialized %p", (void *)r); return &r->en; } /* ***************************************************************************** Redis Engine Destruction ***************************************************************************** */ void redis_engine_destroy(fio_pubsub_engine_s *engine) { redis_engine_s *r = (redis_engine_s *)engine; r->flag = 0; fio_pubsub_detach(&r->en); fio_state_callback_remove(FIO_CALL_IN_CHILD, redis_on_engine_fork, r); fio_state_callback_remove(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r); fio_state_callback_remove(FIO_CALL_PRE_START, redis_on_facil_start, r); FIO_LOG_DEBUG("Redis engine destroyed %p", (void *)r); redis_free(r); }