From 96605b5c3a3d851d2d1c018409eb306f0d8cc600 Mon Sep 17 00:00:00 2001 From: emanuel Date: Mon, 19 Oct 2020 18:41:02 +0100 Subject: [PATCH] Heartbeat support --- src/cam.c | 2 +- src/denm.c | 57 ++++++++++++++++++++++++----- src/denm.h | 3 ++ src/facilities.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 144 insertions(+), 11 deletions(-) diff --git a/src/cam.c b/src/cam.c index 1c4e41c..5371bc3 100644 --- a/src/cam.c +++ b/src/cam.c @@ -204,7 +204,7 @@ void *ca_service(void *fc) { syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0); zmq_recv(facilities->transport_socket, &code, 1, 0); - sleep(1000); + sleep(1); } ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); diff --git a/src/denm.c b/src/denm.c index 8743d62..6dee2e6 100644 --- a/src/denm.c +++ b/src/denm.c @@ -138,6 +138,17 @@ static int event_add(den_t *den, DENM_t *denm) { } } + switch (state) { + case EVENT_ACTIVE: + ++den->no_active_events; + break; + case EVENT_CANCELLED: + ++den->no_cancelled_events; + break; + case EVENT_NEGATED: + ++den->no_negated_events; + break; + } if (index != -1) { den->events[index]->enabled = true; @@ -197,6 +208,30 @@ static int event_update(den_t *den, DENM_t *denm) { } } + switch (den->events[index]->state) { + case EVENT_ACTIVE: + --den->no_active_events; + break; + case EVENT_CANCELLED: + --den->no_cancelled_events; + break; + case EVENT_NEGATED: + --den->no_negated_events; + break; + } + + switch (state) { + case EVENT_ACTIVE: + ++den->no_active_events; + break; + case EVENT_CANCELLED: + ++den->no_cancelled_events; + break; + case EVENT_NEGATED: + ++den->no_negated_events; + break; + } + if (index != -1) { den->events[index]->state = state; den->events[index]->detection_time = e_detection_time; @@ -283,7 +318,6 @@ void* den_service(void *fc) { struct timespec systemtime; uint64_t now; - int active, cancelled, negated; den_t *den = facilities->den; pthread_mutex_init(&den->lock, NULL); @@ -300,35 +334,42 @@ void* den_service(void *fc) { now = (long)(systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 - active = 0, cancelled = 0, negated = 0; - pthread_mutex_lock(&den->lock); for (int i = 0; i < den->no_max_events; ++i) { if (den->events[i]->enabled) { if (now >= den->events[i]->expiration_time) { // Remove event den->events[i]->enabled = false; ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm); + switch (den->events[i]->state) { + case EVENT_ACTIVE: + --den->no_active_events; + break; + case EVENT_CANCELLED: + --den->no_cancelled_events; + break; + case EVENT_NEGATED: + --den->no_negated_events; + break; + } } else { switch (den->events[i]->state) { case EVENT_ACTIVE: - ++active; syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000); break; case EVENT_CANCELLED: - ++cancelled; break; case EVENT_NEGATED: - ++negated; break; } } } } + + syslog_debug("[facilities] [den] events :: [ %d active | %d cancelled | %d negated ]", den->no_active_events, den->no_cancelled_events, den->no_negated_events); + pthread_mutex_unlock(&den->lock); - syslog_debug("[facilities] [den] events : %d active %d : cancelled : %d negated", active, cancelled, negated); - usleep(1000000); } diff --git a/src/denm.h b/src/denm.h index 8a6250f..66b5c09 100644 --- a/src/denm.h +++ b/src/denm.h @@ -31,6 +31,9 @@ typedef struct den { event_t ** events; uint32_t sn; uint16_t no_stored_events; + uint16_t no_active_events; + uint16_t no_cancelled_events; + uint16_t no_negated_events; uint16_t no_max_events; pthread_mutex_t lock; } den_t; diff --git a/src/facilities.c b/src/facilities.c index d0ffb5c..03bf1a6 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -35,7 +36,7 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_BTPDataIndication, (void**) &bdi, msg, msg_len); if (dec.code) { - syslog_err("[facilities] <- invalid BDI received"); + syslog_err("[facilities] <- invalid bdi received"); rv = 1; code = 1; zmq_send(facilities->responder, &code, 1, 0); @@ -85,7 +86,6 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t event_manage(facilities->den, its_msg); } - // Forward to app fdi = calloc(1, sizeof(FacilitiesDataIndication_t)); @@ -110,6 +110,90 @@ cleanup: return rv; } +static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) { + int rv = 0; + int code = 0; + + uint8_t *fdres_oer = NULL; + + FacilitiesDataRequest_t *fdreq = calloc(1, sizeof(FacilitiesDataRequest_t)); + FacilitiesDataResult_t *fdres = calloc(1, sizeof(FacilitiesDataResult_t)); + + asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesDataRequest, (void**) &fdreq, msg, msg_len); + if (dec.code) { + syslog_err("[facilities] <- invalid FDR received"); + rv = 1; + code = 1; + goto cleanup; + } + + switch (fdreq->present) { + case FacilitiesDataRequest_PR_singleMessage: + break; + case FacilitiesDataRequest_PR_activeEvents: + pthread_mutex_lock(&facilities->den->lock); + + fdres->code = ResultCode_accepted; + fdres->events = calloc(1, sizeof(Events_t)); + + switch (fdreq->choice.activeEvents) { + case EventType_active: + fdres->events->list.count = facilities->den->no_active_events; + fdres->events->list.size = facilities->den->no_active_events * sizeof(DENM_t *); + fdres->events->list.array = malloc(facilities->den->no_active_events * sizeof(DENM_t *)); + for (int i = 0, j = 0; i < facilities->den->no_active_events; ++i) { + if (facilities->den->events[i]->state == EVENT_ACTIVE) { + fdres->events->list.array[j]->itssMessageType = ItssMessageType_denm; + fdres->events->list.array[j]->data.buf = malloc(512); + asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_DENM, NULL, facilities->den->events[i]->denm, fdres->events->list.array[j]->data.buf, 512); + if (enc.encoded == -1) { + syslog_err("[facilities] failed encoding DENM for FDResult"); + continue; + } + fdres->events->list.array[j]->data.size = (enc.encoded + 7) / 8; + ++j; + } + } + break; + default: + syslog_err("[facilities] unrecognized FDR event type"); + pthread_mutex_unlock(&facilities->den->lock); + rv = 1; + goto cleanup; + } + fdres_oer = malloc(4096); + asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_DENM, NULL, fdres, fdres_oer, 4096); + if (enc.encoded == -1) { + syslog_err("[facilities] failed encoding FDResult"); + pthread_mutex_unlock(&facilities->den->lock); + rv = 1; + goto cleanup; + } + + zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); + + pthread_mutex_unlock(&facilities->den->lock); + break; + default: + syslog_err("[facilities] unrecognized FDR type received"); + rv = 1; + goto cleanup; + } + + cleanup: + if (rv) { + fdres->code = ResultCode_rejected; + uint8_t fdres_oer[32]; + asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataResult, NULL, fdres, fdres_oer, 32); + zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); + } + free(fdres_oer); + ASN_STRUCT_FREE(asn_DEF_FacilitiesDataResult, fdres); + ASN_STRUCT_FREE(asn_DEF_FacilitiesDataRequest, fdreq); + + return rv; +} + int main() { syslog_info("[facilities] starting"); @@ -128,7 +212,10 @@ int main() { facilities.den = calloc(1, sizeof(den_t)); + // CA pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); + + // DEN pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities); uint8_t buffer[512]; @@ -142,6 +229,8 @@ int main() { case 3: code = transport_indication(&facilities, buffer+1, 511); break; + case 5: + code = facilities_request(&facilities, buffer+1, 511); default: syslog_debug("[facilities] unrecognized service"); continue;