mirror of
https://github.com/zigzap/zap.git
synced 2025-10-20 15:14:08 +00:00
954 lines
33 KiB
C
954 lines
33 KiB
C
/*
|
|
Copyright: Boaz segev, 2016-2019
|
|
License: MIT
|
|
|
|
Feel free to copy, use and enjoy according to the license provided.
|
|
*/
|
|
|
|
#define FIO_INCLUDE_LINKED_LIST
|
|
#define FIO_INCLUDE_STR
|
|
// #define DEBUG 1
|
|
#include <fio.h>
|
|
|
|
#include <fiobj.h>
|
|
|
|
#include <redis_engine.h>
|
|
#include <resp_parser.h>
|
|
|
|
#define REDIS_READ_BUFFER 8192
|
|
/* *****************************************************************************
|
|
The Redis Engine and Callbacks Object
|
|
***************************************************************************** */
|
|
|
|
typedef struct {
|
|
fio_pubsub_engine_s en;
|
|
struct redis_engine_internal_s {
|
|
fio_protocol_s protocol;
|
|
intptr_t uuid;
|
|
resp_parser_s parser;
|
|
void (*on_message)(struct redis_engine_internal_s *parser, FIOBJ msg);
|
|
FIOBJ str;
|
|
FIOBJ ary;
|
|
uint32_t ary_count;
|
|
uint16_t buf_pos;
|
|
uint16_t nesting;
|
|
} pub_data, sub_data;
|
|
subscription_s *publication_forwarder;
|
|
subscription_s *cmd_forwarder;
|
|
subscription_s *cmd_reply;
|
|
char *address;
|
|
char *port;
|
|
char *auth;
|
|
FIOBJ last_ch;
|
|
size_t auth_len;
|
|
size_t ref;
|
|
fio_ls_embd_s queue;
|
|
fio_lock_i lock;
|
|
fio_lock_i lock_connection;
|
|
uint8_t ping_int;
|
|
volatile uint8_t pub_sent;
|
|
volatile uint8_t flag;
|
|
uint8_t buf[];
|
|
} redis_engine_s;
|
|
|
|
typedef struct {
|
|
fio_ls_embd_s node;
|
|
void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply, void *udata);
|
|
void *udata;
|
|
size_t cmd_len;
|
|
uint8_t cmd[];
|
|
} redis_commands_s;
|
|
|
|
/** converts from a publishing protocol to an `redis_engine_s`. */
|
|
#define pub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, pub_data, (pr))
|
|
/** converts from a subscribing protocol to an `redis_engine_s`. */
|
|
#define sub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, sub_data, (pr))
|
|
|
|
/** converts from a `resp_parser_s` to the internal data structure. */
|
|
#define parser2data(prsr) \
|
|
FIO_LS_EMBD_OBJ(struct redis_engine_internal_s, parser, (prsr))
|
|
|
|
/* releases any resources used by an internal engine*/
|
|
static inline void redis_internal_reset(struct redis_engine_internal_s *i) {
|
|
i->buf_pos = 0;
|
|
i->parser = (resp_parser_s){.obj_countdown = 0, .expecting = 0};
|
|
fiobj_free((FIOBJ)fio_ct_if(i->ary == FIOBJ_INVALID, (uintptr_t)i->str,
|
|
(uintptr_t)i->ary));
|
|
i->str = FIOBJ_INVALID;
|
|
i->ary = FIOBJ_INVALID;
|
|
i->ary_count = 0;
|
|
i->nesting = 0;
|
|
i->uuid = -1;
|
|
}
|
|
|
|
/** cleans up and frees the engine data. */
|
|
static inline void redis_free(redis_engine_s *r) {
|
|
if (fio_atomic_sub(&r->ref, 1))
|
|
return;
|
|
FIO_LOG_DEBUG("freeing redis engine for %s:%s", r->address, r->port);
|
|
redis_internal_reset(&r->pub_data);
|
|
redis_internal_reset(&r->sub_data);
|
|
fiobj_free(r->last_ch);
|
|
while (fio_ls_embd_any(&r->queue)) {
|
|
fio_free(
|
|
FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue)));
|
|
}
|
|
fio_unsubscribe(r->publication_forwarder);
|
|
r->publication_forwarder = NULL;
|
|
fio_unsubscribe(r->cmd_forwarder);
|
|
r->cmd_forwarder = NULL;
|
|
fio_unsubscribe(r->cmd_reply);
|
|
r->cmd_reply = NULL;
|
|
fio_free(r);
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Simple RESP formatting
|
|
***************************************************************************** */
|
|
|
|
inline static void fiobj2resp___internal(FIOBJ dest, FIOBJ obj) {
|
|
fio_str_info_s s;
|
|
switch (FIOBJ_TYPE(obj)) {
|
|
case FIOBJ_T_NULL:
|
|
fiobj_str_write(dest, "$-1\r\n", 5);
|
|
break;
|
|
case FIOBJ_T_ARRAY:
|
|
fiobj_str_write(dest, "*", 1);
|
|
fiobj_str_write_i(dest, fiobj_ary_count(obj));
|
|
fiobj_str_write(dest, "\r\n", 2);
|
|
break;
|
|
case FIOBJ_T_HASH:
|
|
fiobj_str_write(dest, "*", 1);
|
|
fiobj_str_write_i(dest, fiobj_hash_count(obj) * 2);
|
|
fiobj_str_write(dest, "\r\n", 2);
|
|
break;
|
|
case FIOBJ_T_TRUE:
|
|
fiobj_str_write(dest, "$4\r\ntrue\r\n", 10);
|
|
break;
|
|
case FIOBJ_T_FALSE:
|
|
fiobj_str_write(dest, "$4\r\nfalse\r\n", 11);
|
|
break;
|
|
#if 0
|
|
/* Numbers aren't as good for commands as one might think... */
|
|
case FIOBJ_T_NUMBER:
|
|
fiobj_str_write(dest, ":", 1);
|
|
fiobj_str_write_i(dest, fiobj_obj2num(obj));
|
|
fiobj_str_write(dest, "\r\n", 2);
|
|
break;
|
|
#else
|
|
case FIOBJ_T_NUMBER: /* overflow */
|
|
#endif
|
|
case FIOBJ_T_FLOAT: /* overflow */
|
|
case FIOBJ_T_UNKNOWN: /* overflow */
|
|
case FIOBJ_T_STRING: /* overflow */
|
|
case FIOBJ_T_DATA:
|
|
s = fiobj_obj2cstr(obj);
|
|
fiobj_str_write(dest, "$", 1);
|
|
fiobj_str_write_i(dest, s.len);
|
|
fiobj_str_write(dest, "\r\n", 2);
|
|
fiobj_str_write(dest, s.data, s.len);
|
|
fiobj_str_write(dest, "\r\n", 2);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static int fiobj2resp_task(FIOBJ o, void *dest_) {
|
|
if (fiobj_hash_key_in_loop())
|
|
fiobj2resp___internal((FIOBJ)dest_, fiobj_hash_key_in_loop());
|
|
fiobj2resp___internal((FIOBJ)dest_, o);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Converts FIOBJ objects into a RESP string (client mode).
|
|
*/
|
|
static FIOBJ fiobj2resp(FIOBJ dest, FIOBJ obj) {
|
|
fiobj_each2(obj, fiobj2resp_task, (void *)dest);
|
|
return dest;
|
|
}
|
|
|
|
/**
|
|
* Converts FIOBJ objects into a RESP string (client mode).
|
|
*
|
|
* Don't call `fiobj_free`, object will self-destruct.
|
|
*/
|
|
static inline FIOBJ fiobj2resp_tmp(FIOBJ obj) {
|
|
return fiobj2resp(fiobj_str_tmp(), obj);
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
RESP parser callbacks
|
|
***************************************************************************** */
|
|
|
|
/** a local static callback, called when a parser / protocol error occurs. */
|
|
static int resp_on_parser_error(resp_parser_s *parser) {
|
|
struct redis_engine_internal_s *i = parser2data(parser);
|
|
FIO_LOG_ERROR("(redis) parser error - attempting to restart connection.\n");
|
|
fio_close(i->uuid);
|
|
return -1;
|
|
}
|
|
|
|
/** a local static callback, called when the RESP message is complete. */
|
|
static int resp_on_message(resp_parser_s *parser) {
|
|
struct redis_engine_internal_s *i = parser2data(parser);
|
|
FIOBJ msg = i->ary ? i->ary : i->str;
|
|
i->on_message(i, msg);
|
|
/* cleanup */
|
|
fiobj_free(msg);
|
|
i->ary = FIOBJ_INVALID;
|
|
i->str = FIOBJ_INVALID;
|
|
return 0;
|
|
}
|
|
|
|
/** a local helper to add parsed objects to the data store. */
|
|
static inline void resp_add_obj(struct redis_engine_internal_s *dest, FIOBJ o) {
|
|
if (dest->ary) {
|
|
fiobj_ary_push(dest->ary, o);
|
|
--dest->ary_count;
|
|
if (!dest->ary_count && dest->nesting) {
|
|
FIOBJ tmp = fiobj_ary_shift(dest->ary);
|
|
dest->ary_count = fiobj_obj2num(tmp);
|
|
fiobj_free(tmp);
|
|
dest->ary = fiobj_ary_shift(dest->ary);
|
|
--dest->nesting;
|
|
}
|
|
}
|
|
dest->str = o;
|
|
}
|
|
|
|
/** a local static callback, called when a Number object is parsed. */
|
|
static int resp_on_number(resp_parser_s *parser, int64_t num) {
|
|
struct redis_engine_internal_s *data = parser2data(parser);
|
|
resp_add_obj(data, fiobj_num_new(num));
|
|
return 0;
|
|
}
|
|
/** a local static callback, called when a OK message is received. */
|
|
static int resp_on_okay(resp_parser_s *parser) {
|
|
struct redis_engine_internal_s *data = parser2data(parser);
|
|
resp_add_obj(data, fiobj_true());
|
|
return 0;
|
|
}
|
|
/** a local static callback, called when NULL is received. */
|
|
static int resp_on_null(resp_parser_s *parser) {
|
|
struct redis_engine_internal_s *data = parser2data(parser);
|
|
resp_add_obj(data, fiobj_null());
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* a local static callback, called when a String should be allocated.
|
|
*
|
|
* `str_len` is the expected number of bytes that will fill the final string
|
|
* object, without any NUL byte marker (the string might be binary).
|
|
*
|
|
* If this function returns any value besides 0, parsing is stopped.
|
|
*/
|
|
static int resp_on_start_string(resp_parser_s *parser, size_t str_len) {
|
|
struct redis_engine_internal_s *data = parser2data(parser);
|
|
resp_add_obj(data, fiobj_str_buf(str_len));
|
|
return 0;
|
|
}
|
|
/** a local static callback, called as String objects are streamed. */
|
|
static int resp_on_string_chunk(resp_parser_s *parser, void *data, size_t len) {
|
|
struct redis_engine_internal_s *i = parser2data(parser);
|
|
fiobj_str_write(i->str, data, len);
|
|
return 0;
|
|
}
|
|
/** a local static callback, called when a String object had finished
|
|
* streaming.
|
|
*/
|
|
static int resp_on_end_string(resp_parser_s *parser) {
|
|
return 0;
|
|
(void)parser;
|
|
}
|
|
|
|
/** a local static callback, called an error message is received. */
|
|
static int resp_on_err_msg(resp_parser_s *parser, void *data, size_t len) {
|
|
struct redis_engine_internal_s *i = parser2data(parser);
|
|
resp_add_obj(i, fiobj_str_new(data, len));
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* a local static callback, called when an Array should be allocated.
|
|
*
|
|
* `array_len` is the expected number of objects that will fill the Array
|
|
* object.
|
|
*
|
|
* There's no `resp_on_end_array` callback since the RESP protocol assumes the
|
|
* message is finished along with the Array (`resp_on_message` is called).
|
|
* However, just in case a non-conforming client/server sends nested Arrays,
|
|
* the callback should test against possible overflow or nested Array endings.
|
|
*
|
|
* If this function returns any value besides 0, parsing is stopped.
|
|
*/
|
|
static int resp_on_start_array(resp_parser_s *parser, size_t array_len) {
|
|
struct redis_engine_internal_s *i = parser2data(parser);
|
|
if (i->ary) {
|
|
++i->nesting;
|
|
FIOBJ tmp = fiobj_ary_new2(array_len + 2);
|
|
fiobj_ary_push(tmp, fiobj_num_new(i->ary_count));
|
|
fiobj_ary_push(tmp, fiobj_num_new(i->ary));
|
|
i->ary = tmp;
|
|
} else {
|
|
i->ary = fiobj_ary_new2(array_len + 2);
|
|
}
|
|
i->ary_count = array_len;
|
|
return 0;
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Publication and Command Handling
|
|
***************************************************************************** */
|
|
|
|
/* the deferred callback handler */
|
|
static void redis_perform_callback(void *e, void *cmd_) {
|
|
redis_commands_s *cmd = cmd_;
|
|
FIOBJ reply = (FIOBJ)cmd->node.next;
|
|
if (cmd->callback)
|
|
cmd->callback(e, reply, cmd->udata);
|
|
fiobj_free(reply);
|
|
FIO_LOG_DEBUG("Handled: %s\n", cmd->cmd);
|
|
fio_free(cmd);
|
|
}
|
|
|
|
/* send command within lock, to ensure flag integrity */
|
|
static void redis_send_next_command_unsafe(redis_engine_s *r) {
|
|
if (!r->pub_sent && fio_ls_embd_any(&r->queue)) {
|
|
r->pub_sent = 1;
|
|
redis_commands_s *cmd =
|
|
FIO_LS_EMBD_OBJ(redis_commands_s, node, r->queue.next);
|
|
fio_write2(r->pub_data.uuid, .data.buffer = cmd->cmd,
|
|
.length = cmd->cmd_len, .after.dealloc = FIO_DEALLOC_NOOP);
|
|
FIO_LOG_DEBUG("(redis %d) Sending (%zu bytes):\n%s\n", (int)getpid(),
|
|
cmd->cmd_len, cmd->cmd);
|
|
}
|
|
}
|
|
|
|
/* attach a command to the queue */
|
|
static void redis_attach_cmd(redis_engine_s *r, redis_commands_s *cmd) {
|
|
fio_lock(&r->lock);
|
|
fio_ls_embd_push(&r->queue, &cmd->node);
|
|
redis_send_next_command_unsafe(r);
|
|
fio_unlock(&r->lock);
|
|
}
|
|
|
|
/** a local static callback, called when the RESP message is complete. */
|
|
static void resp_on_pub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
|
|
redis_engine_s *r = pub2redis(i);
|
|
// #if DEBUG
|
|
if (FIO_LOG_LEVEL >= FIO_LOG_LEVEL_DEBUG) {
|
|
FIOBJ json = fiobj_obj2json(msg, 1);
|
|
FIO_LOG_DEBUG("Redis reply:\n%s\n", fiobj_obj2cstr(json).data);
|
|
fiobj_free(json);
|
|
}
|
|
// #endif
|
|
/* publishing / command parser */
|
|
fio_lock(&r->lock);
|
|
fio_ls_embd_s *node = fio_ls_embd_shift(&r->queue);
|
|
r->pub_sent = 0;
|
|
redis_send_next_command_unsafe(r);
|
|
fio_unlock(&r->lock);
|
|
if (!node) {
|
|
/* TODO: possible ping? from server?! not likely... */
|
|
FIO_LOG_WARNING("(redis %d) received a reply when no command was sent.",
|
|
(int)getpid());
|
|
return;
|
|
}
|
|
node->next = (void *)fiobj_dup(msg);
|
|
fio_defer(redis_perform_callback, &r->en,
|
|
FIO_LS_EMBD_OBJ(redis_commands_s, node, node));
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Subscription Message Handling
|
|
***************************************************************************** */
|
|
|
|
/** a local static callback, called when the RESP message is complete. */
|
|
static void resp_on_sub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
|
|
redis_engine_s *r = sub2redis(i);
|
|
/* subscriotion parser */
|
|
if (FIOBJ_TYPE(msg) != FIOBJ_T_ARRAY) {
|
|
if (FIOBJ_TYPE(msg) != FIOBJ_T_STRING || fiobj_obj2cstr(msg).len != 4 ||
|
|
fiobj_obj2cstr(msg).data[0] != 'P') {
|
|
FIO_LOG_WARNING("(redis) unexpected data format in "
|
|
"subscription stream (%zu bytes):\n %s\n",
|
|
fiobj_obj2cstr(msg).len, fiobj_obj2cstr(msg).data);
|
|
}
|
|
} else {
|
|
// FIOBJ *ary = fiobj_ary2ptr(msg);
|
|
// for (size_t i = 0; i < fiobj_ary_count(msg); ++i) {
|
|
// fio_str_info_s tmp = fiobj_obj2cstr(ary[i]);
|
|
// fprintf(stderr, "(%lu) %s\n", (unsigned long)i, tmp.data);
|
|
// }
|
|
fio_str_info_s tmp = fiobj_obj2cstr(fiobj_ary_index(msg, 0));
|
|
if (tmp.len == 7) { /* "message" */
|
|
fiobj_free(r->last_ch);
|
|
r->last_ch = fiobj_dup(fiobj_ary_index(msg, 1));
|
|
fio_publish(.channel = fiobj_obj2cstr(r->last_ch),
|
|
.message = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
|
|
.engine = FIO_PUBSUB_CLUSTER);
|
|
} else if (tmp.len == 8) { /* "pmessage" */
|
|
if (!fiobj_iseq(r->last_ch, fiobj_ary_index(msg, 2)))
|
|
fio_publish(.channel = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
|
|
.message = fiobj_obj2cstr(fiobj_ary_index(msg, 3)),
|
|
.engine = FIO_PUBSUB_CLUSTER);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Connection Callbacks (fio_protocol_s) and Engine
|
|
***************************************************************************** */
|
|
|
|
/** defined later - connects to Redis */
|
|
static void redis_connect(void *r, void *i);
|
|
|
|
#define defer_redis_connect(r, i) \
|
|
do { \
|
|
fio_atomic_add(&(r)->ref, 1); \
|
|
fio_defer(redis_connect, (r), (i)); \
|
|
} while (0);
|
|
|
|
/** Called when a data is available, but will not run concurrently */
|
|
static void redis_on_data(intptr_t uuid, fio_protocol_s *pr) {
|
|
struct redis_engine_internal_s *internal =
|
|
(struct redis_engine_internal_s *)pr;
|
|
uint8_t *buf;
|
|
if (internal->on_message == resp_on_sub_message) {
|
|
buf = sub2redis(pr)->buf + REDIS_READ_BUFFER;
|
|
} else {
|
|
buf = pub2redis(pr)->buf;
|
|
}
|
|
ssize_t i = fio_read(uuid, buf + internal->buf_pos,
|
|
REDIS_READ_BUFFER - internal->buf_pos);
|
|
if (i <= 0)
|
|
return;
|
|
|
|
internal->buf_pos += i;
|
|
i = resp_parse(&internal->parser, buf, internal->buf_pos);
|
|
if (i) {
|
|
memmove(buf, buf + internal->buf_pos - i, i);
|
|
}
|
|
internal->buf_pos = i;
|
|
}
|
|
|
|
/** Called when the connection was closed, but will not run concurrently */
|
|
static void redis_on_close(intptr_t uuid, fio_protocol_s *pr) {
|
|
struct redis_engine_internal_s *internal =
|
|
(struct redis_engine_internal_s *)pr;
|
|
redis_internal_reset(internal);
|
|
redis_engine_s *r;
|
|
if (internal->on_message == resp_on_sub_message) {
|
|
r = sub2redis(pr);
|
|
fiobj_free(r->last_ch);
|
|
r->last_ch = FIOBJ_INVALID;
|
|
if (r->flag) {
|
|
/* reconnection for subscription connection. */
|
|
if (uuid != -1) {
|
|
FIO_LOG_WARNING("(redis %d) subscription connection lost. "
|
|
"Reconnecting...",
|
|
(int)getpid());
|
|
}
|
|
fio_atomic_sub(&r->ref, 1);
|
|
defer_redis_connect(r, internal);
|
|
} else {
|
|
redis_free(r);
|
|
}
|
|
} else {
|
|
r = pub2redis(pr);
|
|
if (r->flag && uuid != -1) {
|
|
FIO_LOG_WARNING("(redis %d) publication connection lost. "
|
|
"Reconnecting...",
|
|
(int)getpid());
|
|
}
|
|
r->pub_sent = 0;
|
|
fio_close(r->sub_data.uuid);
|
|
redis_free(r);
|
|
}
|
|
(void)uuid;
|
|
}
|
|
|
|
/** Called before the facil.io reactor is shut down. */
|
|
static uint8_t redis_on_shutdown(intptr_t uuid, fio_protocol_s *pr) {
|
|
fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nQUIT\r\n", .length = 14,
|
|
.after.dealloc = FIO_DEALLOC_NOOP);
|
|
return 0;
|
|
(void)pr;
|
|
}
|
|
|
|
/** Called on connection timeout. */
|
|
static void redis_sub_ping(intptr_t uuid, fio_protocol_s *pr) {
|
|
fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nPING\r\n", .length = 14,
|
|
.after.dealloc = FIO_DEALLOC_NOOP);
|
|
(void)pr;
|
|
}
|
|
|
|
/** Called on connection timeout. */
|
|
static void redis_pub_ping(intptr_t uuid, fio_protocol_s *pr) {
|
|
redis_engine_s *r = pub2redis(pr);
|
|
if (fio_ls_embd_any(&r->queue)) {
|
|
FIO_LOG_WARNING("(redis) Redis server unresponsive, disconnecting.");
|
|
fio_close(uuid);
|
|
return;
|
|
}
|
|
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + 15);
|
|
*cmd = (redis_commands_s){.cmd_len = 14};
|
|
memcpy(cmd->cmd, "*1\r\n$4\r\nPING\r\n\0", 15);
|
|
redis_attach_cmd(r, cmd);
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Connecting to Redis
|
|
***************************************************************************** */
|
|
|
|
static void redis_on_auth(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) {
|
|
if (FIOBJ_TYPE_IS(reply, FIOBJ_T_TRUE)) {
|
|
fio_str_info_s s = fiobj_obj2cstr(reply);
|
|
FIO_LOG_WARNING("(redis) Authentication FAILED."
|
|
" %.*s",
|
|
(int)s.len, s.data);
|
|
}
|
|
(void)e;
|
|
(void)udata;
|
|
}
|
|
|
|
static void redis_on_connect(intptr_t uuid, void *i_) {
|
|
struct redis_engine_internal_s *i = i_;
|
|
redis_engine_s *r;
|
|
i->uuid = uuid;
|
|
|
|
if (i->on_message == resp_on_sub_message) {
|
|
r = sub2redis(i);
|
|
if (r->auth_len) {
|
|
fio_write2(uuid, .data.buffer = r->auth, .length = r->auth_len,
|
|
.after.dealloc = FIO_DEALLOC_NOOP);
|
|
}
|
|
fio_pubsub_reattach(&r->en);
|
|
if (r->pub_data.uuid == -1) {
|
|
defer_redis_connect(r, &r->pub_data);
|
|
}
|
|
FIO_LOG_INFO("(redis %d) subscription connection established.",
|
|
(int)getpid());
|
|
} else {
|
|
r = pub2redis(i);
|
|
if (r->auth_len) {
|
|
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + r->auth_len);
|
|
*cmd =
|
|
(redis_commands_s){.cmd_len = r->auth_len, .callback = redis_on_auth};
|
|
memcpy(cmd->cmd, r->auth, r->auth_len);
|
|
fio_lock(&r->lock);
|
|
r->pub_sent = 0;
|
|
fio_ls_embd_unshift(&r->queue, &cmd->node);
|
|
redis_send_next_command_unsafe(r);
|
|
fio_unlock(&r->lock);
|
|
} else {
|
|
fio_lock(&r->lock);
|
|
r->pub_sent = 0;
|
|
redis_send_next_command_unsafe(r);
|
|
fio_unlock(&r->lock);
|
|
}
|
|
FIO_LOG_INFO("(redis %d) publication connection established.",
|
|
(int)getpid());
|
|
}
|
|
|
|
i->protocol.rsv = 0;
|
|
fio_attach(uuid, &i->protocol);
|
|
fio_timeout_set(uuid, r->ping_int);
|
|
|
|
return;
|
|
}
|
|
|
|
static void redis_on_connect_failed(intptr_t uuid, void *i_) {
|
|
struct redis_engine_internal_s *i = i_;
|
|
i->uuid = -1;
|
|
i->protocol.on_close(-1, &i->protocol);
|
|
(void)uuid;
|
|
}
|
|
|
|
static void redis_connect(void *r_, void *i_) {
|
|
redis_engine_s *r = r_;
|
|
struct redis_engine_internal_s *i = i_;
|
|
fio_lock(&r->lock_connection);
|
|
if (r->flag == 0 || i->uuid != -1 || !fio_is_running()) {
|
|
fio_unlock(&r->lock_connection);
|
|
redis_free(r);
|
|
return;
|
|
}
|
|
// fio_atomic_add(&r->ref, 1);
|
|
i->uuid = fio_connect(.address = r->address, .port = r->port,
|
|
.on_connect = redis_on_connect, .udata = i,
|
|
.on_fail = redis_on_connect_failed);
|
|
fio_unlock(&r->lock_connection);
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Engine / Bridge Callbacks (Root Process)
|
|
***************************************************************************** */
|
|
|
|
static void redis_on_subscribe_root(const fio_pubsub_engine_s *eng,
|
|
fio_str_info_s channel,
|
|
fio_match_fn match) {
|
|
redis_engine_s *r = (redis_engine_s *)eng;
|
|
if (r->sub_data.uuid != -1) {
|
|
FIOBJ cmd = fiobj_str_buf(96 + channel.len);
|
|
if (match == FIO_MATCH_GLOB)
|
|
fiobj_str_write(cmd, "*2\r\n$10\r\nPSUBSCRIBE\r\n$", 22);
|
|
else
|
|
fiobj_str_write(cmd, "*2\r\n$9\r\nSUBSCRIBE\r\n$", 20);
|
|
fiobj_str_write_i(cmd, channel.len);
|
|
fiobj_str_write(cmd, "\r\n", 2);
|
|
fiobj_str_write(cmd, channel.data, channel.len);
|
|
fiobj_str_write(cmd, "\r\n", 2);
|
|
// {
|
|
// fio_str_info_s s = fiobj_obj2cstr(cmd);
|
|
// fprintf(stderr, "(%d) Sending Subscription (%p):\n%s\n", getpid(),
|
|
// (void *)r->sub_data.uuid, s.data);
|
|
// }
|
|
fiobj_send_free(r->sub_data.uuid, cmd);
|
|
}
|
|
}
|
|
|
|
static void redis_on_unsubscribe_root(const fio_pubsub_engine_s *eng,
|
|
fio_str_info_s channel,
|
|
fio_match_fn match) {
|
|
redis_engine_s *r = (redis_engine_s *)eng;
|
|
if (r->sub_data.uuid != -1) {
|
|
fio_str_s *cmd = fio_str_new2();
|
|
fio_str_capa_assert(cmd, 96 + channel.len);
|
|
if (match == FIO_MATCH_GLOB)
|
|
fio_str_write(cmd, "*2\r\n$12\r\nPUNSUBSCRIBE\r\n$", 24);
|
|
else
|
|
fio_str_write(cmd, "*2\r\n$11\r\nUNSUBSCRIBE\r\n$", 23);
|
|
fio_str_write_i(cmd, channel.len);
|
|
fio_str_write(cmd, "\r\n", 2);
|
|
fio_str_write(cmd, channel.data, channel.len);
|
|
fio_str_write(cmd, "\r\n", 2);
|
|
// {
|
|
// fio_str_info_s s = fio_str_info(cmd);
|
|
// fprintf(stderr, "(%d) Cancel Subscription (%p):\n%s\n", getpid(),
|
|
// (void *)r->sub_data.uuid, s.data);
|
|
// }
|
|
fio_str_send_free2(r->sub_data.uuid, cmd);
|
|
}
|
|
}
|
|
|
|
static void redis_on_publish_root(const fio_pubsub_engine_s *eng,
|
|
fio_str_info_s channel, fio_str_info_s msg,
|
|
uint8_t is_json) {
|
|
redis_engine_s *r = (redis_engine_s *)eng;
|
|
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + channel.len + msg.len + 96);
|
|
*cmd = (redis_commands_s){.cmd_len = 0};
|
|
memcpy(cmd->cmd, "*3\r\n$7\r\nPUBLISH\r\n$", 18);
|
|
char *buf = (char *)cmd->cmd + 18;
|
|
buf += fio_ltoa((void *)buf, channel.len, 10);
|
|
*buf++ = '\r';
|
|
*buf++ = '\n';
|
|
memcpy(buf, channel.data, channel.len);
|
|
buf += channel.len;
|
|
*buf++ = '\r';
|
|
*buf++ = '\n';
|
|
*buf++ = '$';
|
|
buf += fio_ltoa(buf, msg.len, 10);
|
|
*buf++ = '\r';
|
|
*buf++ = '\n';
|
|
memcpy(buf, msg.data, msg.len);
|
|
buf += msg.len;
|
|
*buf++ = '\r';
|
|
*buf++ = '\n';
|
|
*buf = 0;
|
|
FIO_LOG_DEBUG("(%d) Publishing:\n%s", (int)getpid(), cmd->cmd);
|
|
cmd->cmd_len = (uintptr_t)buf - (uintptr_t)(cmd + 1);
|
|
redis_attach_cmd(r, cmd);
|
|
return;
|
|
(void)is_json;
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Engine / Bridge Stub Callbacks (Child Process)
|
|
***************************************************************************** */
|
|
|
|
static void redis_on_mock_subscribe_child(const fio_pubsub_engine_s *eng,
|
|
fio_str_info_s channel,
|
|
fio_match_fn match) {
|
|
/* do nothing, root process is notified about (un)subscriptions by facil.io */
|
|
(void)eng;
|
|
(void)channel;
|
|
(void)match;
|
|
}
|
|
|
|
static void redis_on_publish_child(const fio_pubsub_engine_s *eng,
|
|
fio_str_info_s channel, fio_str_info_s msg,
|
|
uint8_t is_json) {
|
|
/* attach engine data to channel (prepend) */
|
|
fio_str_s tmp = FIO_STR_INIT;
|
|
/* by using fio_str_s, short names are allocated on the stack */
|
|
fio_str_info_s tmp_info = fio_str_resize(&tmp, channel.len + 8);
|
|
fio_u2str64(tmp_info.data, (uint64_t)eng);
|
|
memcpy(tmp_info.data + 8, channel.data, channel.len);
|
|
/* forward publication request to Root */
|
|
fio_publish(.filter = -1, .channel = tmp_info, .message = msg,
|
|
.engine = FIO_PUBSUB_ROOT, .is_json = is_json);
|
|
fio_str_free(&tmp);
|
|
(void)eng;
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Root Publication Handler
|
|
***************************************************************************** */
|
|
|
|
/* listens to filter -1 and publishes and messages */
|
|
static void redis_on_internal_publish(fio_msg_s *msg) {
|
|
if (msg->channel.len < 8)
|
|
return; /* internal error, unexpected data */
|
|
void *en = (void *)fio_str2u64(msg->channel.data);
|
|
if (en != msg->udata1)
|
|
return; /* should be delivered by a different engine */
|
|
/* step after the engine data */
|
|
msg->channel.len -= 8;
|
|
msg->channel.data += 8;
|
|
/* forward to publishing */
|
|
FIO_LOG_DEBUG("Forwarding to engine %p, on channel %s", msg->udata1,
|
|
msg->channel.data);
|
|
redis_on_publish_root(msg->udata1, msg->channel, msg->msg, msg->is_json);
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Sending commands using the Root connection
|
|
***************************************************************************** */
|
|
|
|
/* callback from the Redis reply */
|
|
static void redis_forward_reply(fio_pubsub_engine_s *e, FIOBJ reply,
|
|
void *udata) {
|
|
uint8_t *data = udata;
|
|
fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(data + 0);
|
|
void *callback = (void *)fio_str2u64(data + 8);
|
|
if (engine != e || !callback) {
|
|
FIO_LOG_DEBUG("Redis reply not forwarded (callback: %p)", callback);
|
|
return;
|
|
}
|
|
int32_t pid = (int32_t)fio_str2u32(data + 24);
|
|
FIOBJ rp = fiobj_obj2json(reply, 0);
|
|
fio_publish(.filter = (-10 - (int32_t)pid), .channel.data = (char *)data,
|
|
.channel.len = 28, .message = fiobj_obj2cstr(rp), .is_json = 1);
|
|
fiobj_free(rp);
|
|
}
|
|
|
|
/* listens to channel -2 for commands that need to be sent (only ROOT) */
|
|
static void redis_on_internal_cmd(fio_msg_s *msg) {
|
|
// void*(void *)fio_str2u64(msg->msg.data);
|
|
fio_pubsub_engine_s *engine =
|
|
(fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
|
|
if (engine != msg->udata1) {
|
|
return;
|
|
}
|
|
redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + msg->msg.len + 1 + 28);
|
|
FIO_ASSERT_ALLOC(cmd);
|
|
*cmd = (redis_commands_s){.callback = redis_forward_reply,
|
|
.udata = (cmd->cmd + msg->msg.len + 1),
|
|
.cmd_len = msg->msg.len};
|
|
memcpy(cmd->cmd, msg->msg.data, msg->msg.len);
|
|
memcpy(cmd->cmd + msg->msg.len + 1, msg->channel.data, 28);
|
|
redis_attach_cmd((redis_engine_s *)engine, cmd);
|
|
// fprintf(stderr, " *** Attached CMD (%d) ***\n%s\n", getpid(), cmd->cmd);
|
|
}
|
|
|
|
/* Listens on filter `-10 -getpid()` for incoming reply data */
|
|
static void redis_on_internal_reply(fio_msg_s *msg) {
|
|
fio_pubsub_engine_s *engine =
|
|
(fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
|
|
if (engine != msg->udata1) {
|
|
FIO_LOG_DEBUG("Redis reply not forwarded (engine mismatch: %p != %p)",
|
|
(void *)engine, msg->udata1);
|
|
return;
|
|
}
|
|
FIOBJ reply;
|
|
fiobj_json2obj(&reply, msg->msg.data, msg->msg.len);
|
|
void (*callback)(fio_pubsub_engine_s *, FIOBJ, void *) = (void (*)(
|
|
fio_pubsub_engine_s *, FIOBJ, void *))fio_str2u64(msg->channel.data + 8);
|
|
void *udata = (void *)fio_str2u64(msg->channel.data + 16);
|
|
callback(engine, reply, udata);
|
|
fiobj_free(reply);
|
|
}
|
|
|
|
/* publishes a Redis command to Root's filter -2 */
|
|
intptr_t redis_engine_send(fio_pubsub_engine_s *engine, FIOBJ command,
|
|
void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply,
|
|
void *udata),
|
|
void *udata) {
|
|
if ((uintptr_t)engine < 4) {
|
|
FIO_LOG_WARNING("(redis send) trying to use one of the core engines");
|
|
return -1;
|
|
}
|
|
// if(fio_is_master()) {
|
|
// FIOBJ resp = fiobj2resp_tmp(fio_str_info_s obj1, FIOBJ obj2);
|
|
// TODO...
|
|
// } else {
|
|
/* forward publication request to Root */
|
|
fio_str_s tmp = FIO_STR_INIT;
|
|
fio_str_info_s ti = fio_str_resize(&tmp, 28);
|
|
/* combine metadata */
|
|
fio_u2str64(ti.data + 0, (uint64_t)engine);
|
|
fio_u2str64(ti.data + 8, (uint64_t)callback);
|
|
fio_u2str64(ti.data + 16, (uint64_t)udata);
|
|
fio_u2str32(ti.data + 24, (uint32_t)getpid());
|
|
FIOBJ cmd = fiobj2resp_tmp(command);
|
|
fio_publish(.filter = -2, .channel = ti, .message = fiobj_obj2cstr(cmd),
|
|
.engine = FIO_PUBSUB_ROOT, .is_json = 0);
|
|
fio_str_free(&tmp);
|
|
// }
|
|
return 0;
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Redis Engine Creation
|
|
***************************************************************************** */
|
|
|
|
static void redis_on_facil_start(void *r_) {
|
|
redis_engine_s *r = r_;
|
|
r->flag = 1;
|
|
if (!fio_is_valid(r->sub_data.uuid)) {
|
|
defer_redis_connect(r, &r->sub_data);
|
|
}
|
|
}
|
|
static void redis_on_facil_shutdown(void *r_) {
|
|
redis_engine_s *r = r_;
|
|
r->flag = 0;
|
|
}
|
|
|
|
static void redis_on_engine_fork(void *r_) {
|
|
redis_engine_s *r = r_;
|
|
r->flag = 0;
|
|
r->lock = FIO_LOCK_INIT;
|
|
fio_force_close(r->sub_data.uuid);
|
|
r->sub_data.uuid = -1;
|
|
fio_force_close(r->pub_data.uuid);
|
|
r->pub_data.uuid = -1;
|
|
while (fio_ls_embd_any(&r->queue)) {
|
|
redis_commands_s *cmd =
|
|
FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue));
|
|
fio_free(cmd);
|
|
}
|
|
r->en = (fio_pubsub_engine_s){
|
|
.subscribe = redis_on_mock_subscribe_child,
|
|
.unsubscribe = redis_on_mock_subscribe_child,
|
|
.publish = redis_on_publish_child,
|
|
};
|
|
fio_unsubscribe(r->publication_forwarder);
|
|
r->publication_forwarder = NULL;
|
|
fio_unsubscribe(r->cmd_forwarder);
|
|
r->cmd_forwarder = NULL;
|
|
fio_unsubscribe(r->cmd_reply);
|
|
r->cmd_reply =
|
|
fio_subscribe(.filter = -10 - (int32_t)getpid(),
|
|
.on_message = redis_on_internal_reply, .udata1 = r);
|
|
}
|
|
|
|
fio_pubsub_engine_s *redis_engine_create
|
|
FIO_IGNORE_MACRO(struct redis_engine_create_args args) {
|
|
if (getpid() != fio_parent_pid()) {
|
|
FIO_LOG_FATAL("(redis) Redis engine initialization can only "
|
|
"be performed in the Root process.");
|
|
kill(0, SIGINT);
|
|
fio_stop();
|
|
return NULL;
|
|
}
|
|
if (!args.address.len && args.address.data)
|
|
args.address.len = strlen(args.address.data);
|
|
if (!args.port.len && args.port.data)
|
|
args.port.len = strlen(args.port.data);
|
|
if (!args.auth.len && args.auth.data) {
|
|
args.auth.len = strlen(args.auth.data);
|
|
}
|
|
|
|
if (!args.address.data || !args.address.len) {
|
|
args.address = (fio_str_info_s){.len = 9, .data = (char *)"localhost"};
|
|
}
|
|
if (!args.port.data || !args.port.len) {
|
|
args.port = (fio_str_info_s){.len = 4, .data = (char *)"6379"};
|
|
}
|
|
redis_engine_s *r =
|
|
fio_malloc(sizeof(*r) + args.port.len + 1 + args.address.len + 1 +
|
|
args.auth.len + 1 + (REDIS_READ_BUFFER * 2));
|
|
FIO_ASSERT_ALLOC(r);
|
|
*r = (redis_engine_s){
|
|
.en =
|
|
{
|
|
.subscribe = redis_on_subscribe_root,
|
|
.unsubscribe = redis_on_unsubscribe_root,
|
|
.publish = redis_on_publish_root,
|
|
},
|
|
.pub_data =
|
|
{
|
|
.protocol =
|
|
{
|
|
.on_data = redis_on_data,
|
|
.on_close = redis_on_close,
|
|
.on_shutdown = redis_on_shutdown,
|
|
.ping = redis_pub_ping,
|
|
},
|
|
.uuid = -1,
|
|
.on_message = resp_on_pub_message,
|
|
},
|
|
.sub_data =
|
|
{
|
|
.protocol =
|
|
{
|
|
.on_data = redis_on_data,
|
|
.on_close = redis_on_close,
|
|
.on_shutdown = redis_on_shutdown,
|
|
.ping = redis_sub_ping,
|
|
},
|
|
.on_message = resp_on_sub_message,
|
|
.uuid = -1,
|
|
},
|
|
.publication_forwarder =
|
|
fio_subscribe(.filter = -1, .udata1 = r,
|
|
.on_message = redis_on_internal_publish),
|
|
.cmd_forwarder = fio_subscribe(.filter = -2, .udata1 = r,
|
|
.on_message = redis_on_internal_cmd),
|
|
.cmd_reply =
|
|
fio_subscribe(.filter = -10 - (uint32_t)getpid(), .udata1 = r,
|
|
.on_message = redis_on_internal_reply),
|
|
.address = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2)),
|
|
.port =
|
|
((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len + 1),
|
|
.auth = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len +
|
|
args.port.len + 2),
|
|
.auth_len = args.auth.len,
|
|
.ref = 1,
|
|
.queue = FIO_LS_INIT(r->queue),
|
|
.lock = FIO_LOCK_INIT,
|
|
.lock_connection = FIO_LOCK_INIT,
|
|
.ping_int = args.ping_interval,
|
|
.flag = 1,
|
|
};
|
|
memcpy(r->address, args.address.data, args.address.len);
|
|
memcpy(r->port, args.port.data, args.port.len);
|
|
if (args.auth.len)
|
|
memcpy(r->auth, args.auth.data, args.auth.len);
|
|
fio_pubsub_attach(&r->en);
|
|
redis_on_facil_start(r);
|
|
fio_state_callback_add(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
|
|
fio_state_callback_add(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
|
|
/* if restarting */
|
|
fio_state_callback_add(FIO_CALL_PRE_START, redis_on_facil_start, r);
|
|
|
|
FIO_LOG_DEBUG("Redis engine initialized %p", (void *)r);
|
|
return &r->en;
|
|
}
|
|
|
|
/* *****************************************************************************
|
|
Redis Engine Destruction
|
|
***************************************************************************** */
|
|
|
|
void redis_engine_destroy(fio_pubsub_engine_s *engine) {
|
|
redis_engine_s *r = (redis_engine_s *)engine;
|
|
r->flag = 0;
|
|
fio_pubsub_detach(&r->en);
|
|
fio_state_callback_remove(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
|
|
fio_state_callback_remove(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
|
|
fio_state_callback_remove(FIO_CALL_PRE_START, redis_on_facil_start, r);
|
|
FIO_LOG_DEBUG("Redis engine destroyed %p", (void *)r);
|
|
redis_free(r);
|
|
}
|