From: Eugene Crosser Date: Wed, 13 Mar 2019 22:24:47 +0000 (+0100) Subject: rudimentary subscribe/unsubscribe X-Git-Url: http://average.org/gitweb/?a=commitdiff_plain;h=4f4539e776640bc1e7d04af3f0a0b3af4b75bfcd;p=psmb.git rudimentary subscribe/unsubscribe --- diff --git a/examples/psmb.c b/examples/psmb.c index 2f0d6ea..8637a0b 100644 --- a/examples/psmb.c +++ b/examples/psmb.c @@ -20,6 +20,8 @@ int main(int argc, char *argv[], char *envp[]) printf("created psmb at %p\n", ctx); res = psmb_set_logf(ctx, logprt, NULL); res = psmb_open(ctx); + res = psmb_subscribe(ctx, "test-channel"); + res = psmb_unsubscribe(ctx, "test-channel"); psmb_destroy(ctx); - return 0; + return psmb_success(res); } diff --git a/include/psmb.h b/include/psmb.h index 022d5b8..c5869ec 100644 --- a/include/psmb.h +++ b/include/psmb.h @@ -21,14 +21,18 @@ psmb_result_t psmb_set_mgrp(psmb_ctx_t *ctx, struct in6_addr prefix, psmb_result_t psmb_open(psmb_ctx_t *ctx); void psmb_destroy(psmb_ctx_t *ctx); int psmb_getfd(psmb_ctx_t *ctx); +bool psmb_success(psmb_result_t result); +bool psmb_message_waiting(psmb_result_t result); bool psmb_need_write_wait(psmb_result_t result); psmb_result_t psmb_ev_rd(psmb_ctx_t *ctx); psmb_result_t psmb_ev_wr(psmb_ctx_t *ctx); psmb_result_t psmb_ev_ex(psmb_ctx_t *ctx); psmb_result_t psmb_subscribe(psmb_ctx_t *ctx, char *channel); +psmb_result_t psmb_unsubscribe(psmb_ctx_t *ctx, char *channel); psmb_result_t psmb_publish(psmb_ctx_t *ctx, char *channel, void *data, size_t size); -bool psmb_message(psmb_ctx_t *ctx, char **channel, +psmb_result_t psmb_get_message(psmb_ctx_t *ctx, char **channel, void **data, size_t *size); +psmb_result_t psmb_acknowledge(psmb_ctx_t *ctx); #endif diff --git a/src/psmb_priv.h b/src/psmb_priv.h index 5e3c2ad..c80ae62 100644 --- a/src/psmb_priv.h +++ b/src/psmb_priv.h @@ -7,6 +7,7 @@ #define PSMB_OK 0 #define PSMB_ERROR 1 #define PSMB_NEED_WRITE 2 +#define PSMB_MESSAGE 4 #define PSMB_DEFAULT_PORT 5313 #define PSMB_DEFAULT_PMTU 1452 diff --git a/src/psmb_socket.c b/src/psmb_socket.c index 538b87c..06b90a8 100644 --- a/src/psmb_socket.c +++ b/src/psmb_socket.c @@ -31,6 +31,11 @@ psmb_ctx_t *psmb_new_mm(void *(*malloc)(size_t size), .fd = -1, .malloc = malloc, .free = free, .realloc = realloc, .logf = dummy_log, + .prefix = (struct in6_addr){{{ 0xff, 0xff, 0x01, 0x05, + 0xb0, 0x55, 0xff, 0xe7, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00 }}}, + .prefixlen = 64, .pmtu = PSMB_DEFAULT_PMTU, .port = PSMB_DEFAULT_PORT}; return ctx; @@ -140,6 +145,50 @@ psmb_result_t psmb_open(psmb_ctx_t *ctx) return (psmb_result_t){PSMB_OK}; } +static psmb_result_t psmb_sub_unsub(psmb_ctx_t *ctx, char *channel, int option) +{ + struct ipv6_mreq mreq = { 0 }; + + if (ctx->fd == -1) { + LOG(ctx, LOG_ERR, "subscribe: psmb is not open"); + errno = EINVAL; + return (psmb_result_t){PSMB_ERROR}; + } + mreq.ipv6mr_multiaddr = ctx->prefix; /* use hash of the channel */ + mreq.ipv6mr_interface = 0; /* how to use this??? */ + if (setsockopt(ctx->fd, IPPROTO_IPV6, option, + (void *)&mreq, sizeof(mreq)) == -1) { + int sverr = errno; + LOG(ctx, LOG_ERR, "add_membership(): %m"); + errno = sverr; + return (psmb_result_t){PSMB_ERROR}; + } + return (psmb_result_t){PSMB_OK}; +} + +psmb_result_t psmb_subscribe(psmb_ctx_t *ctx, char *channel) { + return psmb_sub_unsub(ctx, channel, IPV6_ADD_MEMBERSHIP); +} + +psmb_result_t psmb_unsubscribe(psmb_ctx_t *ctx, char *channel) { + return psmb_sub_unsub(ctx, channel, IPV6_DROP_MEMBERSHIP); +} + +bool psmb_success(psmb_result_t result) +{ + return !(result.code & PSMB_ERROR); +} + +bool psmb_message_waiting(psmb_result_t result) +{ + return !!(result.code & PSMB_MESSAGE); +} + +bool psmb_need_write_wait(psmb_result_t result) +{ + return !!(result.code & PSMB_NEED_WRITE); +} + void psmb_destroy(psmb_ctx_t *ctx) { if (ctx->fd == -1) {