mirror of
				https://github.com/zigzap/zap.git
				synced 2025-10-23 00:14:09 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			954 lines
		
	
	
	
		
			33 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			954 lines
		
	
	
	
		
			33 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
| Copyright: Boaz segev, 2016-2019
 | |
| License: MIT
 | |
| 
 | |
| Feel free to copy, use and enjoy according to the license provided.
 | |
| */
 | |
| 
 | |
| #define FIO_INCLUDE_LINKED_LIST
 | |
| #define FIO_INCLUDE_STR
 | |
| // #define DEBUG 1
 | |
| #include <fio.h>
 | |
| 
 | |
| #include <fiobj.h>
 | |
| 
 | |
| #include <redis_engine.h>
 | |
| #include <resp_parser.h>
 | |
| 
 | |
| #define REDIS_READ_BUFFER 8192
 | |
| /* *****************************************************************************
 | |
| The Redis Engine and Callbacks Object
 | |
| ***************************************************************************** */
 | |
| 
 | |
| typedef struct {
 | |
|   fio_pubsub_engine_s en;
 | |
|   struct redis_engine_internal_s {
 | |
|     fio_protocol_s protocol;
 | |
|     intptr_t uuid;
 | |
|     resp_parser_s parser;
 | |
|     void (*on_message)(struct redis_engine_internal_s *parser, FIOBJ msg);
 | |
|     FIOBJ str;
 | |
|     FIOBJ ary;
 | |
|     uint32_t ary_count;
 | |
|     uint16_t buf_pos;
 | |
|     uint16_t nesting;
 | |
|   } pub_data, sub_data;
 | |
|   subscription_s *publication_forwarder;
 | |
|   subscription_s *cmd_forwarder;
 | |
|   subscription_s *cmd_reply;
 | |
|   char *address;
 | |
|   char *port;
 | |
|   char *auth;
 | |
|   FIOBJ last_ch;
 | |
|   size_t auth_len;
 | |
|   size_t ref;
 | |
|   fio_ls_embd_s queue;
 | |
|   fio_lock_i lock;
 | |
|   fio_lock_i lock_connection;
 | |
|   uint8_t ping_int;
 | |
|   volatile uint8_t pub_sent;
 | |
|   volatile uint8_t flag;
 | |
|   uint8_t buf[];
 | |
| } redis_engine_s;
 | |
| 
 | |
| typedef struct {
 | |
|   fio_ls_embd_s node;
 | |
|   void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply, void *udata);
 | |
|   void *udata;
 | |
|   size_t cmd_len;
 | |
|   uint8_t cmd[];
 | |
| } redis_commands_s;
 | |
| 
 | |
| /** converts from a publishing protocol to an `redis_engine_s`. */
 | |
| #define pub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, pub_data, (pr))
 | |
| /** converts from a subscribing protocol to an `redis_engine_s`. */
 | |
| #define sub2redis(pr) FIO_LS_EMBD_OBJ(redis_engine_s, sub_data, (pr))
 | |
| 
 | |
| /** converts from a `resp_parser_s` to the internal data structure. */
 | |
| #define parser2data(prsr)                                                      \
 | |
|   FIO_LS_EMBD_OBJ(struct redis_engine_internal_s, parser, (prsr))
 | |
| 
 | |
| /* releases any resources used by an internal engine*/
 | |
| static inline void redis_internal_reset(struct redis_engine_internal_s *i) {
 | |
|   i->buf_pos = 0;
 | |
|   i->parser = (resp_parser_s){.obj_countdown = 0, .expecting = 0};
 | |
|   fiobj_free((FIOBJ)fio_ct_if(i->ary == FIOBJ_INVALID, (uintptr_t)i->str,
 | |
|                               (uintptr_t)i->ary));
 | |
|   i->str = FIOBJ_INVALID;
 | |
|   i->ary = FIOBJ_INVALID;
 | |
|   i->ary_count = 0;
 | |
|   i->nesting = 0;
 | |
|   i->uuid = -1;
 | |
| }
 | |
| 
 | |
| /** cleans up and frees the engine data. */
 | |
| static inline void redis_free(redis_engine_s *r) {
 | |
|   if (fio_atomic_sub(&r->ref, 1))
 | |
|     return;
 | |
|   FIO_LOG_DEBUG("freeing redis engine for %s:%s", r->address, r->port);
 | |
|   redis_internal_reset(&r->pub_data);
 | |
|   redis_internal_reset(&r->sub_data);
 | |
|   fiobj_free(r->last_ch);
 | |
|   while (fio_ls_embd_any(&r->queue)) {
 | |
|     fio_free(
 | |
|         FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue)));
 | |
|   }
 | |
|   fio_unsubscribe(r->publication_forwarder);
 | |
|   r->publication_forwarder = NULL;
 | |
|   fio_unsubscribe(r->cmd_forwarder);
 | |
|   r->cmd_forwarder = NULL;
 | |
|   fio_unsubscribe(r->cmd_reply);
 | |
|   r->cmd_reply = NULL;
 | |
|   fio_free(r);
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Simple RESP formatting
 | |
| ***************************************************************************** */
 | |
| 
 | |
| inline static void fiobj2resp___internal(FIOBJ dest, FIOBJ obj) {
 | |
|   fio_str_info_s s;
 | |
|   switch (FIOBJ_TYPE(obj)) {
 | |
|   case FIOBJ_T_NULL:
 | |
|     fiobj_str_write(dest, "$-1\r\n", 5);
 | |
|     break;
 | |
|   case FIOBJ_T_ARRAY:
 | |
|     fiobj_str_write(dest, "*", 1);
 | |
|     fiobj_str_write_i(dest, fiobj_ary_count(obj));
 | |
|     fiobj_str_write(dest, "\r\n", 2);
 | |
|     break;
 | |
|   case FIOBJ_T_HASH:
 | |
|     fiobj_str_write(dest, "*", 1);
 | |
|     fiobj_str_write_i(dest, fiobj_hash_count(obj) * 2);
 | |
|     fiobj_str_write(dest, "\r\n", 2);
 | |
|     break;
 | |
|   case FIOBJ_T_TRUE:
 | |
|     fiobj_str_write(dest, "$4\r\ntrue\r\n", 10);
 | |
|     break;
 | |
|   case FIOBJ_T_FALSE:
 | |
|     fiobj_str_write(dest, "$4\r\nfalse\r\n", 11);
 | |
|     break;
 | |
| #if 0
 | |
|     /* Numbers aren't as good for commands as one might think... */
 | |
|   case FIOBJ_T_NUMBER:
 | |
|     fiobj_str_write(dest, ":", 1);
 | |
|     fiobj_str_write_i(dest, fiobj_obj2num(obj));
 | |
|     fiobj_str_write(dest, "\r\n", 2);
 | |
|     break;
 | |
| #else
 | |
|   case FIOBJ_T_NUMBER: /* overflow */
 | |
| #endif
 | |
|   case FIOBJ_T_FLOAT:   /* overflow */
 | |
|   case FIOBJ_T_UNKNOWN: /* overflow */
 | |
|   case FIOBJ_T_STRING:  /* overflow */
 | |
|   case FIOBJ_T_DATA:
 | |
|     s = fiobj_obj2cstr(obj);
 | |
|     fiobj_str_write(dest, "$", 1);
 | |
|     fiobj_str_write_i(dest, s.len);
 | |
|     fiobj_str_write(dest, "\r\n", 2);
 | |
|     fiobj_str_write(dest, s.data, s.len);
 | |
|     fiobj_str_write(dest, "\r\n", 2);
 | |
|     break;
 | |
|   }
 | |
| }
 | |
| 
 | |
| static int fiobj2resp_task(FIOBJ o, void *dest_) {
 | |
|   if (fiobj_hash_key_in_loop())
 | |
|     fiobj2resp___internal((FIOBJ)dest_, fiobj_hash_key_in_loop());
 | |
|   fiobj2resp___internal((FIOBJ)dest_, o);
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Converts FIOBJ objects into a RESP string (client mode).
 | |
|  */
 | |
| static FIOBJ fiobj2resp(FIOBJ dest, FIOBJ obj) {
 | |
|   fiobj_each2(obj, fiobj2resp_task, (void *)dest);
 | |
|   return dest;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Converts FIOBJ objects into a RESP string (client mode).
 | |
|  *
 | |
|  * Don't call `fiobj_free`, object will self-destruct.
 | |
|  */
 | |
| static inline FIOBJ fiobj2resp_tmp(FIOBJ obj) {
 | |
|   return fiobj2resp(fiobj_str_tmp(), obj);
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| RESP parser callbacks
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /** a local static callback, called when a parser / protocol error occurs. */
 | |
| static int resp_on_parser_error(resp_parser_s *parser) {
 | |
|   struct redis_engine_internal_s *i = parser2data(parser);
 | |
|   FIO_LOG_ERROR("(redis) parser error - attempting to restart connection.\n");
 | |
|   fio_close(i->uuid);
 | |
|   return -1;
 | |
| }
 | |
| 
 | |
| /** a local static callback, called when the RESP message is complete. */
 | |
| static int resp_on_message(resp_parser_s *parser) {
 | |
|   struct redis_engine_internal_s *i = parser2data(parser);
 | |
|   FIOBJ msg = i->ary ? i->ary : i->str;
 | |
|   i->on_message(i, msg);
 | |
|   /* cleanup */
 | |
|   fiobj_free(msg);
 | |
|   i->ary = FIOBJ_INVALID;
 | |
|   i->str = FIOBJ_INVALID;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /** a local helper to add parsed objects to the data store. */
 | |
| static inline void resp_add_obj(struct redis_engine_internal_s *dest, FIOBJ o) {
 | |
|   if (dest->ary) {
 | |
|     fiobj_ary_push(dest->ary, o);
 | |
|     --dest->ary_count;
 | |
|     if (!dest->ary_count && dest->nesting) {
 | |
|       FIOBJ tmp = fiobj_ary_shift(dest->ary);
 | |
|       dest->ary_count = fiobj_obj2num(tmp);
 | |
|       fiobj_free(tmp);
 | |
|       dest->ary = fiobj_ary_shift(dest->ary);
 | |
|       --dest->nesting;
 | |
|     }
 | |
|   }
 | |
|   dest->str = o;
 | |
| }
 | |
| 
 | |
| /** a local static callback, called when a Number object is parsed. */
 | |
| static int resp_on_number(resp_parser_s *parser, int64_t num) {
 | |
|   struct redis_engine_internal_s *data = parser2data(parser);
 | |
|   resp_add_obj(data, fiobj_num_new(num));
 | |
|   return 0;
 | |
| }
 | |
| /** a local static callback, called when a OK message is received. */
 | |
| static int resp_on_okay(resp_parser_s *parser) {
 | |
|   struct redis_engine_internal_s *data = parser2data(parser);
 | |
|   resp_add_obj(data, fiobj_true());
 | |
|   return 0;
 | |
| }
 | |
| /** a local static callback, called when NULL is received. */
 | |
| static int resp_on_null(resp_parser_s *parser) {
 | |
|   struct redis_engine_internal_s *data = parser2data(parser);
 | |
|   resp_add_obj(data, fiobj_null());
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * a local static callback, called when a String should be allocated.
 | |
|  *
 | |
|  * `str_len` is the expected number of bytes that will fill the final string
 | |
|  * object, without any NUL byte marker (the string might be binary).
 | |
|  *
 | |
|  * If this function returns any value besides 0, parsing is stopped.
 | |
|  */
 | |
| static int resp_on_start_string(resp_parser_s *parser, size_t str_len) {
 | |
|   struct redis_engine_internal_s *data = parser2data(parser);
 | |
|   resp_add_obj(data, fiobj_str_buf(str_len));
 | |
|   return 0;
 | |
| }
 | |
| /** a local static callback, called as String objects are streamed. */
 | |
| static int resp_on_string_chunk(resp_parser_s *parser, void *data, size_t len) {
 | |
|   struct redis_engine_internal_s *i = parser2data(parser);
 | |
|   fiobj_str_write(i->str, data, len);
 | |
|   return 0;
 | |
| }
 | |
| /** a local static callback, called when a String object had finished
 | |
|  * streaming.
 | |
|  */
 | |
| static int resp_on_end_string(resp_parser_s *parser) {
 | |
|   return 0;
 | |
|   (void)parser;
 | |
| }
 | |
| 
 | |
| /** a local static callback, called an error message is received. */
 | |
| static int resp_on_err_msg(resp_parser_s *parser, void *data, size_t len) {
 | |
|   struct redis_engine_internal_s *i = parser2data(parser);
 | |
|   resp_add_obj(i, fiobj_str_new(data, len));
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * a local static callback, called when an Array should be allocated.
 | |
|  *
 | |
|  * `array_len` is the expected number of objects that will fill the Array
 | |
|  * object.
 | |
|  *
 | |
|  * There's no `resp_on_end_array` callback since the RESP protocol assumes the
 | |
|  * message is finished along with the Array (`resp_on_message` is called).
 | |
|  * However, just in case a non-conforming client/server sends nested Arrays,
 | |
|  * the callback should test against possible overflow or nested Array endings.
 | |
|  *
 | |
|  * If this function returns any value besides 0, parsing is stopped.
 | |
|  */
 | |
| static int resp_on_start_array(resp_parser_s *parser, size_t array_len) {
 | |
|   struct redis_engine_internal_s *i = parser2data(parser);
 | |
|   if (i->ary) {
 | |
|     ++i->nesting;
 | |
|     FIOBJ tmp = fiobj_ary_new2(array_len + 2);
 | |
|     fiobj_ary_push(tmp, fiobj_num_new(i->ary_count));
 | |
|     fiobj_ary_push(tmp, fiobj_num_new(i->ary));
 | |
|     i->ary = tmp;
 | |
|   } else {
 | |
|     i->ary = fiobj_ary_new2(array_len + 2);
 | |
|   }
 | |
|   i->ary_count = array_len;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Publication and Command Handling
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /* the deferred callback handler */
 | |
| static void redis_perform_callback(void *e, void *cmd_) {
 | |
|   redis_commands_s *cmd = cmd_;
 | |
|   FIOBJ reply = (FIOBJ)cmd->node.next;
 | |
|   if (cmd->callback)
 | |
|     cmd->callback(e, reply, cmd->udata);
 | |
|   fiobj_free(reply);
 | |
|   FIO_LOG_DEBUG("Handled: %s\n", cmd->cmd);
 | |
|   fio_free(cmd);
 | |
| }
 | |
| 
 | |
| /* send command within lock, to ensure flag integrity */
 | |
| static void redis_send_next_command_unsafe(redis_engine_s *r) {
 | |
|   if (!r->pub_sent && fio_ls_embd_any(&r->queue)) {
 | |
|     r->pub_sent = 1;
 | |
|     redis_commands_s *cmd =
 | |
|         FIO_LS_EMBD_OBJ(redis_commands_s, node, r->queue.next);
 | |
|     fio_write2(r->pub_data.uuid, .data.buffer = cmd->cmd,
 | |
|                .length = cmd->cmd_len, .after.dealloc = FIO_DEALLOC_NOOP);
 | |
|     FIO_LOG_DEBUG("(redis %d) Sending (%zu bytes):\n%s\n", (int)getpid(),
 | |
|                   cmd->cmd_len, cmd->cmd);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /* attach a command to the queue */
 | |
| static void redis_attach_cmd(redis_engine_s *r, redis_commands_s *cmd) {
 | |
|   fio_lock(&r->lock);
 | |
|   fio_ls_embd_push(&r->queue, &cmd->node);
 | |
|   redis_send_next_command_unsafe(r);
 | |
|   fio_unlock(&r->lock);
 | |
| }
 | |
| 
 | |
| /** a local static callback, called when the RESP message is complete. */
 | |
| static void resp_on_pub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
 | |
|   redis_engine_s *r = pub2redis(i);
 | |
|   // #if DEBUG
 | |
|   if (FIO_LOG_LEVEL >= FIO_LOG_LEVEL_DEBUG) {
 | |
|     FIOBJ json = fiobj_obj2json(msg, 1);
 | |
|     FIO_LOG_DEBUG("Redis reply:\n%s\n", fiobj_obj2cstr(json).data);
 | |
|     fiobj_free(json);
 | |
|   }
 | |
|   // #endif
 | |
|   /* publishing / command parser */
 | |
|   fio_lock(&r->lock);
 | |
|   fio_ls_embd_s *node = fio_ls_embd_shift(&r->queue);
 | |
|   r->pub_sent = 0;
 | |
|   redis_send_next_command_unsafe(r);
 | |
|   fio_unlock(&r->lock);
 | |
|   if (!node) {
 | |
|     /* TODO: possible ping? from server?! not likely... */
 | |
|     FIO_LOG_WARNING("(redis %d) received a reply when no command was sent.",
 | |
|                     (int)getpid());
 | |
|     return;
 | |
|   }
 | |
|   node->next = (void *)fiobj_dup(msg);
 | |
|   fio_defer(redis_perform_callback, &r->en,
 | |
|             FIO_LS_EMBD_OBJ(redis_commands_s, node, node));
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Subscription Message Handling
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /** a local static callback, called when the RESP message is complete. */
 | |
| static void resp_on_sub_message(struct redis_engine_internal_s *i, FIOBJ msg) {
 | |
|   redis_engine_s *r = sub2redis(i);
 | |
|   /* subscriotion parser */
 | |
|   if (FIOBJ_TYPE(msg) != FIOBJ_T_ARRAY) {
 | |
|     if (FIOBJ_TYPE(msg) != FIOBJ_T_STRING || fiobj_obj2cstr(msg).len != 4 ||
 | |
|         fiobj_obj2cstr(msg).data[0] != 'P') {
 | |
|       FIO_LOG_WARNING("(redis) unexpected data format in "
 | |
|                       "subscription stream (%zu bytes):\n     %s\n",
 | |
|                       fiobj_obj2cstr(msg).len, fiobj_obj2cstr(msg).data);
 | |
|     }
 | |
|   } else {
 | |
|     // FIOBJ *ary = fiobj_ary2ptr(msg);
 | |
|     // for (size_t i = 0; i < fiobj_ary_count(msg); ++i) {
 | |
|     //   fio_str_info_s tmp = fiobj_obj2cstr(ary[i]);
 | |
|     //   fprintf(stderr, "(%lu) %s\n", (unsigned long)i, tmp.data);
 | |
|     // }
 | |
|     fio_str_info_s tmp = fiobj_obj2cstr(fiobj_ary_index(msg, 0));
 | |
|     if (tmp.len == 7) { /* "message"  */
 | |
|       fiobj_free(r->last_ch);
 | |
|       r->last_ch = fiobj_dup(fiobj_ary_index(msg, 1));
 | |
|       fio_publish(.channel = fiobj_obj2cstr(r->last_ch),
 | |
|                   .message = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
 | |
|                   .engine = FIO_PUBSUB_CLUSTER);
 | |
|     } else if (tmp.len == 8) { /* "pmessage" */
 | |
|       if (!fiobj_iseq(r->last_ch, fiobj_ary_index(msg, 2)))
 | |
|         fio_publish(.channel = fiobj_obj2cstr(fiobj_ary_index(msg, 2)),
 | |
|                     .message = fiobj_obj2cstr(fiobj_ary_index(msg, 3)),
 | |
|                     .engine = FIO_PUBSUB_CLUSTER);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Connection Callbacks (fio_protocol_s) and Engine
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /** defined later - connects to Redis */
 | |
| static void redis_connect(void *r, void *i);
 | |
| 
 | |
| #define defer_redis_connect(r, i)                                              \
 | |
|   do {                                                                         \
 | |
|     fio_atomic_add(&(r)->ref, 1);                                              \
 | |
|     fio_defer(redis_connect, (r), (i));                                        \
 | |
|   } while (0);
 | |
| 
 | |
| /** Called when a data is available, but will not run concurrently */
 | |
| static void redis_on_data(intptr_t uuid, fio_protocol_s *pr) {
 | |
|   struct redis_engine_internal_s *internal =
 | |
|       (struct redis_engine_internal_s *)pr;
 | |
|   uint8_t *buf;
 | |
|   if (internal->on_message == resp_on_sub_message) {
 | |
|     buf = sub2redis(pr)->buf + REDIS_READ_BUFFER;
 | |
|   } else {
 | |
|     buf = pub2redis(pr)->buf;
 | |
|   }
 | |
|   ssize_t i = fio_read(uuid, buf + internal->buf_pos,
 | |
|                        REDIS_READ_BUFFER - internal->buf_pos);
 | |
|   if (i <= 0)
 | |
|     return;
 | |
| 
 | |
|   internal->buf_pos += i;
 | |
|   i = resp_parse(&internal->parser, buf, internal->buf_pos);
 | |
|   if (i) {
 | |
|     memmove(buf, buf + internal->buf_pos - i, i);
 | |
|   }
 | |
|   internal->buf_pos = i;
 | |
| }
 | |
| 
 | |
| /** Called when the connection was closed, but will not run concurrently */
 | |
| static void redis_on_close(intptr_t uuid, fio_protocol_s *pr) {
 | |
|   struct redis_engine_internal_s *internal =
 | |
|       (struct redis_engine_internal_s *)pr;
 | |
|   redis_internal_reset(internal);
 | |
|   redis_engine_s *r;
 | |
|   if (internal->on_message == resp_on_sub_message) {
 | |
|     r = sub2redis(pr);
 | |
|     fiobj_free(r->last_ch);
 | |
|     r->last_ch = FIOBJ_INVALID;
 | |
|     if (r->flag) {
 | |
|       /* reconnection for subscription connection. */
 | |
|       if (uuid != -1) {
 | |
|         FIO_LOG_WARNING("(redis %d) subscription connection lost. "
 | |
|                         "Reconnecting...",
 | |
|                         (int)getpid());
 | |
|       }
 | |
|       fio_atomic_sub(&r->ref, 1);
 | |
|       defer_redis_connect(r, internal);
 | |
|     } else {
 | |
|       redis_free(r);
 | |
|     }
 | |
|   } else {
 | |
|     r = pub2redis(pr);
 | |
|     if (r->flag && uuid != -1) {
 | |
|       FIO_LOG_WARNING("(redis %d) publication connection lost. "
 | |
|                       "Reconnecting...",
 | |
|                       (int)getpid());
 | |
|     }
 | |
|     r->pub_sent = 0;
 | |
|     fio_close(r->sub_data.uuid);
 | |
|     redis_free(r);
 | |
|   }
 | |
|   (void)uuid;
 | |
| }
 | |
| 
 | |
| /** Called before the facil.io reactor is shut down. */
 | |
| static uint8_t redis_on_shutdown(intptr_t uuid, fio_protocol_s *pr) {
 | |
|   fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nQUIT\r\n", .length = 14,
 | |
|              .after.dealloc = FIO_DEALLOC_NOOP);
 | |
|   return 0;
 | |
|   (void)pr;
 | |
| }
 | |
| 
 | |
| /** Called on connection timeout. */
 | |
| static void redis_sub_ping(intptr_t uuid, fio_protocol_s *pr) {
 | |
|   fio_write2(uuid, .data.buffer = "*1\r\n$4\r\nPING\r\n", .length = 14,
 | |
|              .after.dealloc = FIO_DEALLOC_NOOP);
 | |
|   (void)pr;
 | |
| }
 | |
| 
 | |
| /** Called on connection timeout. */
 | |
| static void redis_pub_ping(intptr_t uuid, fio_protocol_s *pr) {
 | |
|   redis_engine_s *r = pub2redis(pr);
 | |
|   if (fio_ls_embd_any(&r->queue)) {
 | |
|     FIO_LOG_WARNING("(redis) Redis server unresponsive, disconnecting.");
 | |
|     fio_close(uuid);
 | |
|     return;
 | |
|   }
 | |
|   redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + 15);
 | |
|   *cmd = (redis_commands_s){.cmd_len = 14};
 | |
|   memcpy(cmd->cmd, "*1\r\n$4\r\nPING\r\n\0", 15);
 | |
|   redis_attach_cmd(r, cmd);
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Connecting to Redis
 | |
| ***************************************************************************** */
 | |
| 
 | |
| static void redis_on_auth(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) {
 | |
|   if (FIOBJ_TYPE_IS(reply, FIOBJ_T_TRUE)) {
 | |
|     fio_str_info_s s = fiobj_obj2cstr(reply);
 | |
|     FIO_LOG_WARNING("(redis) Authentication FAILED."
 | |
|                     "        %.*s",
 | |
|                     (int)s.len, s.data);
 | |
|   }
 | |
|   (void)e;
 | |
|   (void)udata;
 | |
| }
 | |
| 
 | |
| static void redis_on_connect(intptr_t uuid, void *i_) {
 | |
|   struct redis_engine_internal_s *i = i_;
 | |
|   redis_engine_s *r;
 | |
|   i->uuid = uuid;
 | |
| 
 | |
|   if (i->on_message == resp_on_sub_message) {
 | |
|     r = sub2redis(i);
 | |
|     if (r->auth_len) {
 | |
|       fio_write2(uuid, .data.buffer = r->auth, .length = r->auth_len,
 | |
|                  .after.dealloc = FIO_DEALLOC_NOOP);
 | |
|     }
 | |
|     fio_pubsub_reattach(&r->en);
 | |
|     if (r->pub_data.uuid == -1) {
 | |
|       defer_redis_connect(r, &r->pub_data);
 | |
|     }
 | |
|     FIO_LOG_INFO("(redis %d) subscription connection established.",
 | |
|                  (int)getpid());
 | |
|   } else {
 | |
|     r = pub2redis(i);
 | |
|     if (r->auth_len) {
 | |
|       redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + r->auth_len);
 | |
|       *cmd =
 | |
|           (redis_commands_s){.cmd_len = r->auth_len, .callback = redis_on_auth};
 | |
|       memcpy(cmd->cmd, r->auth, r->auth_len);
 | |
|       fio_lock(&r->lock);
 | |
|       r->pub_sent = 0;
 | |
|       fio_ls_embd_unshift(&r->queue, &cmd->node);
 | |
|       redis_send_next_command_unsafe(r);
 | |
|       fio_unlock(&r->lock);
 | |
|     } else {
 | |
|       fio_lock(&r->lock);
 | |
|       r->pub_sent = 0;
 | |
|       redis_send_next_command_unsafe(r);
 | |
|       fio_unlock(&r->lock);
 | |
|     }
 | |
|     FIO_LOG_INFO("(redis %d) publication connection established.",
 | |
|                  (int)getpid());
 | |
|   }
 | |
| 
 | |
|   i->protocol.rsv = 0;
 | |
|   fio_attach(uuid, &i->protocol);
 | |
|   fio_timeout_set(uuid, r->ping_int);
 | |
| 
 | |
|   return;
 | |
| }
 | |
| 
 | |
| static void redis_on_connect_failed(intptr_t uuid, void *i_) {
 | |
|   struct redis_engine_internal_s *i = i_;
 | |
|   i->uuid = -1;
 | |
|   i->protocol.on_close(-1, &i->protocol);
 | |
|   (void)uuid;
 | |
| }
 | |
| 
 | |
| static void redis_connect(void *r_, void *i_) {
 | |
|   redis_engine_s *r = r_;
 | |
|   struct redis_engine_internal_s *i = i_;
 | |
|   fio_lock(&r->lock_connection);
 | |
|   if (r->flag == 0 || i->uuid != -1 || !fio_is_running()) {
 | |
|     fio_unlock(&r->lock_connection);
 | |
|     redis_free(r);
 | |
|     return;
 | |
|   }
 | |
|   // fio_atomic_add(&r->ref, 1);
 | |
|   i->uuid = fio_connect(.address = r->address, .port = r->port,
 | |
|                         .on_connect = redis_on_connect, .udata = i,
 | |
|                         .on_fail = redis_on_connect_failed);
 | |
|   fio_unlock(&r->lock_connection);
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Engine / Bridge Callbacks (Root Process)
 | |
| ***************************************************************************** */
 | |
| 
 | |
| static void redis_on_subscribe_root(const fio_pubsub_engine_s *eng,
 | |
|                                     fio_str_info_s channel,
 | |
|                                     fio_match_fn match) {
 | |
|   redis_engine_s *r = (redis_engine_s *)eng;
 | |
|   if (r->sub_data.uuid != -1) {
 | |
|     FIOBJ cmd = fiobj_str_buf(96 + channel.len);
 | |
|     if (match == FIO_MATCH_GLOB)
 | |
|       fiobj_str_write(cmd, "*2\r\n$10\r\nPSUBSCRIBE\r\n$", 22);
 | |
|     else
 | |
|       fiobj_str_write(cmd, "*2\r\n$9\r\nSUBSCRIBE\r\n$", 20);
 | |
|     fiobj_str_write_i(cmd, channel.len);
 | |
|     fiobj_str_write(cmd, "\r\n", 2);
 | |
|     fiobj_str_write(cmd, channel.data, channel.len);
 | |
|     fiobj_str_write(cmd, "\r\n", 2);
 | |
|     // {
 | |
|     //   fio_str_info_s s = fiobj_obj2cstr(cmd);
 | |
|     //   fprintf(stderr, "(%d) Sending Subscription (%p):\n%s\n", getpid(),
 | |
|     //           (void *)r->sub_data.uuid, s.data);
 | |
|     // }
 | |
|     fiobj_send_free(r->sub_data.uuid, cmd);
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void redis_on_unsubscribe_root(const fio_pubsub_engine_s *eng,
 | |
|                                       fio_str_info_s channel,
 | |
|                                       fio_match_fn match) {
 | |
|   redis_engine_s *r = (redis_engine_s *)eng;
 | |
|   if (r->sub_data.uuid != -1) {
 | |
|     fio_str_s *cmd = fio_str_new2();
 | |
|     fio_str_capa_assert(cmd, 96 + channel.len);
 | |
|     if (match == FIO_MATCH_GLOB)
 | |
|       fio_str_write(cmd, "*2\r\n$12\r\nPUNSUBSCRIBE\r\n$", 24);
 | |
|     else
 | |
|       fio_str_write(cmd, "*2\r\n$11\r\nUNSUBSCRIBE\r\n$", 23);
 | |
|     fio_str_write_i(cmd, channel.len);
 | |
|     fio_str_write(cmd, "\r\n", 2);
 | |
|     fio_str_write(cmd, channel.data, channel.len);
 | |
|     fio_str_write(cmd, "\r\n", 2);
 | |
|     // {
 | |
|     //   fio_str_info_s s = fio_str_info(cmd);
 | |
|     //   fprintf(stderr, "(%d) Cancel Subscription (%p):\n%s\n", getpid(),
 | |
|     //           (void *)r->sub_data.uuid, s.data);
 | |
|     // }
 | |
|     fio_str_send_free2(r->sub_data.uuid, cmd);
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void redis_on_publish_root(const fio_pubsub_engine_s *eng,
 | |
|                                   fio_str_info_s channel, fio_str_info_s msg,
 | |
|                                   uint8_t is_json) {
 | |
|   redis_engine_s *r = (redis_engine_s *)eng;
 | |
|   redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + channel.len + msg.len + 96);
 | |
|   *cmd = (redis_commands_s){.cmd_len = 0};
 | |
|   memcpy(cmd->cmd, "*3\r\n$7\r\nPUBLISH\r\n$", 18);
 | |
|   char *buf = (char *)cmd->cmd + 18;
 | |
|   buf += fio_ltoa((void *)buf, channel.len, 10);
 | |
|   *buf++ = '\r';
 | |
|   *buf++ = '\n';
 | |
|   memcpy(buf, channel.data, channel.len);
 | |
|   buf += channel.len;
 | |
|   *buf++ = '\r';
 | |
|   *buf++ = '\n';
 | |
|   *buf++ = '$';
 | |
|   buf += fio_ltoa(buf, msg.len, 10);
 | |
|   *buf++ = '\r';
 | |
|   *buf++ = '\n';
 | |
|   memcpy(buf, msg.data, msg.len);
 | |
|   buf += msg.len;
 | |
|   *buf++ = '\r';
 | |
|   *buf++ = '\n';
 | |
|   *buf = 0;
 | |
|   FIO_LOG_DEBUG("(%d) Publishing:\n%s", (int)getpid(), cmd->cmd);
 | |
|   cmd->cmd_len = (uintptr_t)buf - (uintptr_t)(cmd + 1);
 | |
|   redis_attach_cmd(r, cmd);
 | |
|   return;
 | |
|   (void)is_json;
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Engine / Bridge Stub Callbacks (Child Process)
 | |
| ***************************************************************************** */
 | |
| 
 | |
| static void redis_on_mock_subscribe_child(const fio_pubsub_engine_s *eng,
 | |
|                                           fio_str_info_s channel,
 | |
|                                           fio_match_fn match) {
 | |
|   /* do nothing, root process is notified about (un)subscriptions by facil.io */
 | |
|   (void)eng;
 | |
|   (void)channel;
 | |
|   (void)match;
 | |
| }
 | |
| 
 | |
| static void redis_on_publish_child(const fio_pubsub_engine_s *eng,
 | |
|                                    fio_str_info_s channel, fio_str_info_s msg,
 | |
|                                    uint8_t is_json) {
 | |
|   /* attach engine data to channel (prepend) */
 | |
|   fio_str_s tmp = FIO_STR_INIT;
 | |
|   /* by using fio_str_s, short names are allocated on the stack */
 | |
|   fio_str_info_s tmp_info = fio_str_resize(&tmp, channel.len + 8);
 | |
|   fio_u2str64(tmp_info.data, (uint64_t)eng);
 | |
|   memcpy(tmp_info.data + 8, channel.data, channel.len);
 | |
|   /* forward publication request to Root */
 | |
|   fio_publish(.filter = -1, .channel = tmp_info, .message = msg,
 | |
|               .engine = FIO_PUBSUB_ROOT, .is_json = is_json);
 | |
|   fio_str_free(&tmp);
 | |
|   (void)eng;
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Root Publication Handler
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /* listens to filter -1 and publishes and messages */
 | |
| static void redis_on_internal_publish(fio_msg_s *msg) {
 | |
|   if (msg->channel.len < 8)
 | |
|     return; /* internal error, unexpected data */
 | |
|   void *en = (void *)fio_str2u64(msg->channel.data);
 | |
|   if (en != msg->udata1)
 | |
|     return; /* should be delivered by a different engine */
 | |
|   /* step after the engine data */
 | |
|   msg->channel.len -= 8;
 | |
|   msg->channel.data += 8;
 | |
|   /* forward to publishing */
 | |
|   FIO_LOG_DEBUG("Forwarding to engine %p, on channel %s", msg->udata1,
 | |
|                 msg->channel.data);
 | |
|   redis_on_publish_root(msg->udata1, msg->channel, msg->msg, msg->is_json);
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Sending commands using the Root connection
 | |
| ***************************************************************************** */
 | |
| 
 | |
| /* callback from the Redis reply */
 | |
| static void redis_forward_reply(fio_pubsub_engine_s *e, FIOBJ reply,
 | |
|                                 void *udata) {
 | |
|   uint8_t *data = udata;
 | |
|   fio_pubsub_engine_s *engine = (fio_pubsub_engine_s *)fio_str2u64(data + 0);
 | |
|   void *callback = (void *)fio_str2u64(data + 8);
 | |
|   if (engine != e || !callback) {
 | |
|     FIO_LOG_DEBUG("Redis reply not forwarded (callback: %p)", callback);
 | |
|     return;
 | |
|   }
 | |
|   int32_t pid = (int32_t)fio_str2u32(data + 24);
 | |
|   FIOBJ rp = fiobj_obj2json(reply, 0);
 | |
|   fio_publish(.filter = (-10 - (int32_t)pid), .channel.data = (char *)data,
 | |
|               .channel.len = 28, .message = fiobj_obj2cstr(rp), .is_json = 1);
 | |
|   fiobj_free(rp);
 | |
| }
 | |
| 
 | |
| /* listens to channel -2 for commands that need to be sent (only ROOT) */
 | |
| static void redis_on_internal_cmd(fio_msg_s *msg) {
 | |
|   // void*(void *)fio_str2u64(msg->msg.data);
 | |
|   fio_pubsub_engine_s *engine =
 | |
|       (fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
 | |
|   if (engine != msg->udata1) {
 | |
|     return;
 | |
|   }
 | |
|   redis_commands_s *cmd = fio_malloc(sizeof(*cmd) + msg->msg.len + 1 + 28);
 | |
|   FIO_ASSERT_ALLOC(cmd);
 | |
|   *cmd = (redis_commands_s){.callback = redis_forward_reply,
 | |
|                             .udata = (cmd->cmd + msg->msg.len + 1),
 | |
|                             .cmd_len = msg->msg.len};
 | |
|   memcpy(cmd->cmd, msg->msg.data, msg->msg.len);
 | |
|   memcpy(cmd->cmd + msg->msg.len + 1, msg->channel.data, 28);
 | |
|   redis_attach_cmd((redis_engine_s *)engine, cmd);
 | |
|   // fprintf(stderr, " *** Attached CMD (%d) ***\n%s\n", getpid(), cmd->cmd);
 | |
| }
 | |
| 
 | |
| /* Listens on filter `-10 -getpid()` for incoming reply data */
 | |
| static void redis_on_internal_reply(fio_msg_s *msg) {
 | |
|   fio_pubsub_engine_s *engine =
 | |
|       (fio_pubsub_engine_s *)fio_str2u64(msg->channel.data + 0);
 | |
|   if (engine != msg->udata1) {
 | |
|     FIO_LOG_DEBUG("Redis reply not forwarded (engine mismatch: %p != %p)",
 | |
|                   (void *)engine, msg->udata1);
 | |
|     return;
 | |
|   }
 | |
|   FIOBJ reply;
 | |
|   fiobj_json2obj(&reply, msg->msg.data, msg->msg.len);
 | |
|   void (*callback)(fio_pubsub_engine_s *, FIOBJ, void *) = (void (*)(
 | |
|       fio_pubsub_engine_s *, FIOBJ, void *))fio_str2u64(msg->channel.data + 8);
 | |
|   void *udata = (void *)fio_str2u64(msg->channel.data + 16);
 | |
|   callback(engine, reply, udata);
 | |
|   fiobj_free(reply);
 | |
| }
 | |
| 
 | |
| /* publishes a Redis command to Root's filter -2 */
 | |
| intptr_t redis_engine_send(fio_pubsub_engine_s *engine, FIOBJ command,
 | |
|                            void (*callback)(fio_pubsub_engine_s *e, FIOBJ reply,
 | |
|                                             void *udata),
 | |
|                            void *udata) {
 | |
|   if ((uintptr_t)engine < 4) {
 | |
|     FIO_LOG_WARNING("(redis send) trying to use one of the core engines");
 | |
|     return -1;
 | |
|   }
 | |
|   // if(fio_is_master()) {
 | |
|   // FIOBJ resp = fiobj2resp_tmp(fio_str_info_s obj1, FIOBJ obj2);
 | |
|   // TODO...
 | |
|   // } else {
 | |
|   /* forward publication request to Root */
 | |
|   fio_str_s tmp = FIO_STR_INIT;
 | |
|   fio_str_info_s ti = fio_str_resize(&tmp, 28);
 | |
|   /* combine metadata */
 | |
|   fio_u2str64(ti.data + 0, (uint64_t)engine);
 | |
|   fio_u2str64(ti.data + 8, (uint64_t)callback);
 | |
|   fio_u2str64(ti.data + 16, (uint64_t)udata);
 | |
|   fio_u2str32(ti.data + 24, (uint32_t)getpid());
 | |
|   FIOBJ cmd = fiobj2resp_tmp(command);
 | |
|   fio_publish(.filter = -2, .channel = ti, .message = fiobj_obj2cstr(cmd),
 | |
|               .engine = FIO_PUBSUB_ROOT, .is_json = 0);
 | |
|   fio_str_free(&tmp);
 | |
|   // }
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Redis Engine Creation
 | |
| ***************************************************************************** */
 | |
| 
 | |
| static void redis_on_facil_start(void *r_) {
 | |
|   redis_engine_s *r = r_;
 | |
|   r->flag = 1;
 | |
|   if (!fio_is_valid(r->sub_data.uuid)) {
 | |
|     defer_redis_connect(r, &r->sub_data);
 | |
|   }
 | |
| }
 | |
| static void redis_on_facil_shutdown(void *r_) {
 | |
|   redis_engine_s *r = r_;
 | |
|   r->flag = 0;
 | |
| }
 | |
| 
 | |
| static void redis_on_engine_fork(void *r_) {
 | |
|   redis_engine_s *r = r_;
 | |
|   r->flag = 0;
 | |
|   r->lock = FIO_LOCK_INIT;
 | |
|   fio_force_close(r->sub_data.uuid);
 | |
|   r->sub_data.uuid = -1;
 | |
|   fio_force_close(r->pub_data.uuid);
 | |
|   r->pub_data.uuid = -1;
 | |
|   while (fio_ls_embd_any(&r->queue)) {
 | |
|     redis_commands_s *cmd =
 | |
|         FIO_LS_EMBD_OBJ(redis_commands_s, node, fio_ls_embd_pop(&r->queue));
 | |
|     fio_free(cmd);
 | |
|   }
 | |
|   r->en = (fio_pubsub_engine_s){
 | |
|       .subscribe = redis_on_mock_subscribe_child,
 | |
|       .unsubscribe = redis_on_mock_subscribe_child,
 | |
|       .publish = redis_on_publish_child,
 | |
|   };
 | |
|   fio_unsubscribe(r->publication_forwarder);
 | |
|   r->publication_forwarder = NULL;
 | |
|   fio_unsubscribe(r->cmd_forwarder);
 | |
|   r->cmd_forwarder = NULL;
 | |
|   fio_unsubscribe(r->cmd_reply);
 | |
|   r->cmd_reply =
 | |
|       fio_subscribe(.filter = -10 - (int32_t)getpid(),
 | |
|                     .on_message = redis_on_internal_reply, .udata1 = r);
 | |
| }
 | |
| 
 | |
| fio_pubsub_engine_s *redis_engine_create
 | |
| FIO_IGNORE_MACRO(struct redis_engine_create_args args) {
 | |
|   if (getpid() != fio_parent_pid()) {
 | |
|     FIO_LOG_FATAL("(redis) Redis engine initialization can only "
 | |
|                   "be performed in the Root process.");
 | |
|     kill(0, SIGINT);
 | |
|     fio_stop();
 | |
|     return NULL;
 | |
|   }
 | |
|   if (!args.address.len && args.address.data)
 | |
|     args.address.len = strlen(args.address.data);
 | |
|   if (!args.port.len && args.port.data)
 | |
|     args.port.len = strlen(args.port.data);
 | |
|   if (!args.auth.len && args.auth.data) {
 | |
|     args.auth.len = strlen(args.auth.data);
 | |
|   }
 | |
| 
 | |
|   if (!args.address.data || !args.address.len) {
 | |
|     args.address = (fio_str_info_s){.len = 9, .data = (char *)"localhost"};
 | |
|   }
 | |
|   if (!args.port.data || !args.port.len) {
 | |
|     args.port = (fio_str_info_s){.len = 4, .data = (char *)"6379"};
 | |
|   }
 | |
|   redis_engine_s *r =
 | |
|       fio_malloc(sizeof(*r) + args.port.len + 1 + args.address.len + 1 +
 | |
|                  args.auth.len + 1 + (REDIS_READ_BUFFER * 2));
 | |
|   FIO_ASSERT_ALLOC(r);
 | |
|   *r = (redis_engine_s){
 | |
|       .en =
 | |
|           {
 | |
|               .subscribe = redis_on_subscribe_root,
 | |
|               .unsubscribe = redis_on_unsubscribe_root,
 | |
|               .publish = redis_on_publish_root,
 | |
|           },
 | |
|       .pub_data =
 | |
|           {
 | |
|               .protocol =
 | |
|                   {
 | |
|                       .on_data = redis_on_data,
 | |
|                       .on_close = redis_on_close,
 | |
|                       .on_shutdown = redis_on_shutdown,
 | |
|                       .ping = redis_pub_ping,
 | |
|                   },
 | |
|               .uuid = -1,
 | |
|               .on_message = resp_on_pub_message,
 | |
|           },
 | |
|       .sub_data =
 | |
|           {
 | |
|               .protocol =
 | |
|                   {
 | |
|                       .on_data = redis_on_data,
 | |
|                       .on_close = redis_on_close,
 | |
|                       .on_shutdown = redis_on_shutdown,
 | |
|                       .ping = redis_sub_ping,
 | |
|                   },
 | |
|               .on_message = resp_on_sub_message,
 | |
|               .uuid = -1,
 | |
|           },
 | |
|       .publication_forwarder =
 | |
|           fio_subscribe(.filter = -1, .udata1 = r,
 | |
|                         .on_message = redis_on_internal_publish),
 | |
|       .cmd_forwarder = fio_subscribe(.filter = -2, .udata1 = r,
 | |
|                                      .on_message = redis_on_internal_cmd),
 | |
|       .cmd_reply =
 | |
|           fio_subscribe(.filter = -10 - (uint32_t)getpid(), .udata1 = r,
 | |
|                         .on_message = redis_on_internal_reply),
 | |
|       .address = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2)),
 | |
|       .port =
 | |
|           ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len + 1),
 | |
|       .auth = ((char *)(r + 1) + (REDIS_READ_BUFFER * 2) + args.address.len +
 | |
|                args.port.len + 2),
 | |
|       .auth_len = args.auth.len,
 | |
|       .ref = 1,
 | |
|       .queue = FIO_LS_INIT(r->queue),
 | |
|       .lock = FIO_LOCK_INIT,
 | |
|       .lock_connection = FIO_LOCK_INIT,
 | |
|       .ping_int = args.ping_interval,
 | |
|       .flag = 1,
 | |
|   };
 | |
|   memcpy(r->address, args.address.data, args.address.len);
 | |
|   memcpy(r->port, args.port.data, args.port.len);
 | |
|   if (args.auth.len)
 | |
|     memcpy(r->auth, args.auth.data, args.auth.len);
 | |
|   fio_pubsub_attach(&r->en);
 | |
|   redis_on_facil_start(r);
 | |
|   fio_state_callback_add(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
 | |
|   fio_state_callback_add(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
 | |
|   /* if restarting */
 | |
|   fio_state_callback_add(FIO_CALL_PRE_START, redis_on_facil_start, r);
 | |
| 
 | |
|   FIO_LOG_DEBUG("Redis engine initialized %p", (void *)r);
 | |
|   return &r->en;
 | |
| }
 | |
| 
 | |
| /* *****************************************************************************
 | |
| Redis Engine Destruction
 | |
| ***************************************************************************** */
 | |
| 
 | |
| void redis_engine_destroy(fio_pubsub_engine_s *engine) {
 | |
|   redis_engine_s *r = (redis_engine_s *)engine;
 | |
|   r->flag = 0;
 | |
|   fio_pubsub_detach(&r->en);
 | |
|   fio_state_callback_remove(FIO_CALL_IN_CHILD, redis_on_engine_fork, r);
 | |
|   fio_state_callback_remove(FIO_CALL_ON_SHUTDOWN, redis_on_facil_shutdown, r);
 | |
|   fio_state_callback_remove(FIO_CALL_PRE_START, redis_on_facil_start, r);
 | |
|   FIO_LOG_DEBUG("Redis engine destroyed %p", (void *)r);
 | |
|   redis_free(r);
 | |
| }
 | 
