#include #include static fio_lock_i global_lock = FIO_LOCK_INIT; static void ask4data_callback(fio_pubsub_engine_s *e, FIOBJ reply, void *udata) { if (udata != (void *)0x01) fprintf(stderr, "CRITICAL ERROR: redis callback udata mismatch (got %p)\n", udata); if (!FIOBJ_TYPE_IS(reply, FIOBJ_T_ARRAY)) { fprintf(stderr, "CRITICAL ERROR: redis callback return type mismatch (got %s)\n", fiobj_type_name(reply)); return; } size_t count = fiobj_ary_count(reply); fprintf(stderr, "Redis command results (%zu):\n", count); for (size_t i = 0; i < count; ++i) { fprintf(stderr, "* %s\n", fiobj_obj2cstr(fiobj_ary_index(reply, i)).data); } kill(SIGINT, 0); (void)e; } static void ask4data(void *ignr) { FIOBJ data = fiobj_ary_new(); fiobj_ary_push(data, fiobj_str_new("LRANGE", 6)); fiobj_ary_push(data, fiobj_str_new("pids", 4)); fiobj_ary_push(data, fiobj_num_new(0)); fiobj_ary_push(data, fiobj_num_new(-1)); (void)ignr; redis_engine_send(FIO_PUBSUB_DEFAULT, data, ask4data_callback, (void *)0x01); fiobj_free(data); fprintf(stderr, "* (%d) Asked redis for info.\n", getpid()); (void)ignr; } static void after_fork(void *ignr) { if (fio_is_master()) { fio_trylock(&global_lock); return; } if (fio_trylock(&global_lock) == 0) { /* runs only once */ fio_run_every(2000, 1, ask4data, NULL, NULL); } FIOBJ data = fiobj_ary_new(); fiobj_ary_push(data, fiobj_str_new("LPUSH", 5)); fiobj_ary_push(data, fiobj_str_new("pids", 4)); if (0) { /* nested arrays... but lists can't contain them */ FIOBJ tmp = fiobj_ary_new2(2); fiobj_ary_push(tmp, fiobj_str_new("worker pid", 10)); fiobj_ary_push(tmp, fiobj_str_copy(fiobj_num_tmp(getpid()))); fiobj_ary_push(data, tmp); } else { /* lists contain only Strings, so we need a string */ fiobj_ary_push(data, fiobj_str_copy(fiobj_num_tmp(getpid()))); } redis_engine_send(FIO_PUBSUB_DEFAULT, data, NULL, NULL); fiobj_free(data); fprintf(stderr, "* (%d) Sent info to redis.\n", getpid()); (void)ignr; } static void start_shutdown(void *ignr) { if (fio_is_master()) fio_stop(); (void)ignr; } int main(void) { fio_pubsub_engine_s *r = redis_engine_create(.ping_interval = 1); FIO_PUBSUB_DEFAULT = r; fio_run_every(10000, 1, start_shutdown, NULL, NULL); fio_state_callback_add(FIO_CALL_AFTER_FORK, after_fork, NULL); fio_start(.workers = 4); redis_engine_destroy(r); return 0; }