From 0518f44b9f7d00d8c8a8bd9776dcbd0958460d08 Mon Sep 17 00:00:00 2001 From: emanuel Date: Fri, 16 Oct 2020 19:22:32 +0100 Subject: [PATCH] Event mgmt pt II --- src/cam.c | 2 +- src/denm.c | 74 ++++++++++++++++++++++++++++++++++++++++++------ src/denm.h | 11 +++++-- src/facilities.c | 41 ++++++++++++++++++--------- 4 files changed, 102 insertions(+), 26 deletions(-) diff --git a/src/cam.c b/src/cam.c index 5371bc3..1c4e41c 100644 --- a/src/cam.c +++ b/src/cam.c @@ -204,7 +204,7 @@ void *ca_service(void *fc) { syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0); zmq_recv(facilities->transport_socket, &code, 1, 0); - sleep(1); + sleep(1000); } ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); diff --git a/src/denm.c b/src/denm.c index 266c361..8743d62 100644 --- a/src/denm.c +++ b/src/denm.c @@ -14,10 +14,9 @@ #define syslog_debug(msg, ...) #endif -enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) { +static enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) { int rv = 0; - uint64_t e_detection_time, e_reference_time; asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); @@ -55,11 +54,11 @@ enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) { for (int i = 0; i < den->no_max_events; ++i) { if (den->events[i]->enabled) { if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && - den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { + den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { is_new = false; if (den->events[i]->reference_time < e_reference_time || - den->events[i]->detection_time < e_detection_time) { + den->events[i]->detection_time < e_detection_time) { is_update = true; } @@ -102,7 +101,7 @@ enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) { return EVENT_NEW; } -int event_add(den_t *den, DENM_t *denm) { +static int event_add(den_t *den, DENM_t *denm) { struct timespec systemtime; clock_gettime(CLOCK_REALTIME, &systemtime); @@ -165,7 +164,7 @@ int event_add(den_t *den, DENM_t *denm) { else return 0; // Event added to db } -int event_update(den_t *den, DENM_t *denm) { +static int event_update(den_t *den, DENM_t *denm) { struct timespec systemtime; clock_gettime(CLOCK_REALTIME, &systemtime); @@ -191,7 +190,7 @@ int event_update(den_t *den, DENM_t *denm) { for (int i = 0; i < den->no_max_events; ++i) { if (den->events[i]->enabled) { if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && - den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { + den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { index = i; break; } @@ -222,6 +221,63 @@ int event_update(den_t *den, DENM_t *denm) { else return 0; // Event updated } +void event_manage(den_t *den, DENM_t *denm) { + int rv; + switch (event_check(den, denm)) { + case EVENT_NEW: + rv = event_add(den, denm); + syslog_debug("[facilities] [den] new event received"); + if (rv) { + syslog_debug("[facilities] [den] failed adding event, max events reached"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + rv = 1; + } + break; + case EVENT_INVALID: + syslog_debug("[facilities] [den] invalid event received, ignoring"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + break; + case EVENT_PASSED: + syslog_debug("[facilities] [den] old event received, ignoring"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + break; + case EVENT_CANCELLATION: + syslog_debug("[facilities] [den] event cancellation received"); + rv = event_update(den, denm); + if (rv) { + syslog_debug("[facilities] [den] failed cancelling event, event not found"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + rv = 1; + } + break; + case EVENT_NEGATION: + syslog_debug("[facilities] [den] event negation received"); + rv = event_update(den, denm); + if (rv) { + syslog_debug("[facilities] [den] failed negating event, event not found"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + rv = 1; + } + break; + case EVENT_UPDATE: + syslog_debug("[facilities] [den] event update received"); + rv = event_update(den, denm); + if (rv) { + syslog_debug("[facilities] [den] failed updating event, event not found"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + } + break; + case EVENT_REPEATED: + syslog_debug("[facilities] [den] repeated event received, ignoring"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + break; + case EVENT_NUMBER_EXCEEDED: + syslog_debug("[facilities] [den] max events reached, ignoring"); + ASN_STRUCT_FREE(asn_DEF_DENM, denm); + break; + } +} + void* den_service(void *fc) { facilities_t *facilities = (facilities_t *) fc; @@ -231,7 +287,7 @@ void* den_service(void *fc) { den_t *den = facilities->den; pthread_mutex_init(&den->lock, NULL); - + den->no_max_events = 32; den->events = malloc(den->no_max_events * sizeof(void*)); @@ -245,7 +301,7 @@ void* den_service(void *fc) { now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 active = 0, cancelled = 0, negated = 0; - + pthread_mutex_lock(&den->lock); for (int i = 0; i < den->no_max_events; ++i) { if (den->events[i]->enabled) { diff --git a/src/denm.h b/src/denm.h index 2bf8c1c..8a6250f 100644 --- a/src/denm.h +++ b/src/denm.h @@ -30,7 +30,7 @@ typedef struct event { typedef struct den { event_t ** events; uint32_t sn; - uint16_t no_active_events; + uint16_t no_stored_events; uint16_t no_max_events; pthread_mutex_t lock; } den_t; @@ -46,7 +46,14 @@ enum EVENT_CHECK_RESULT { EVENT_NUMBER_EXCEEDED }; -enum EVENT_CHECK_RESULT check_event(den_t *den, DENM_t *denm); +/** + * Evaluate a DENM event. + * Does all the checks to a DENM and adds it to the database. *Don't* free the DENM struct after calling this function + * @param den the DEN service struct + * @param denm the DENM to evaluate + * @return none + */ +void event_manage(den_t *den, DENM_t *denm); void* den_service(void *fc); diff --git a/src/facilities.c b/src/facilities.c index 4a16ce4..d0ffb5c 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -46,38 +46,51 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t syslog_debug("[facilities] <- BDI (%ldB)", bdi->data.size); // Parse message - asn_TYPE_descriptor_t *msg_descriptor; - void *msg_struct; + asn_TYPE_descriptor_t *its_msg_descriptor; + void *its_msg; switch (bdi->destinationPort) { case Port_cam: - msg_descriptor = &asn_DEF_CAM; - msg_struct = calloc(1, sizeof(CAM_t)); + its_msg_descriptor = &asn_DEF_CAM; + its_msg = calloc(1, sizeof(CAM_t)); break; case Port_denm: - msg_descriptor = &asn_DEF_DENM; - msg_struct = calloc(1, sizeof(DENM_t)); + its_msg_descriptor = &asn_DEF_DENM; + its_msg = calloc(1, sizeof(DENM_t)); break; case Port_ivim: - msg_descriptor = &asn_DEF_IVIM; - msg_struct = calloc(1, sizeof(IVIM_t)); + its_msg_descriptor = &asn_DEF_IVIM; + its_msg = calloc(1, sizeof(IVIM_t)); break; default: syslog_debug("[facilities] messsage with unhandled BTP port received, ignoring"); goto cleanup; } - dec = uper_decode_complete(NULL, msg_descriptor, (void**) msg_struct, bdi->data.buf, bdi->data.size); + dec = uper_decode_complete(NULL, its_msg_descriptor, (void**) &its_msg, bdi->data.buf, bdi->data.size); if (dec.code) { - syslog_debug("[facilities] invalid %s received", msg_descriptor->name); + syslog_debug("[facilities] invalid %s received", its_msg_descriptor->name); rv = 1; goto cleanup; } + // Manage message + if (bdi->destinationPort == Port_denm) { + #ifdef DEBUG + uint8_t* xml_denm = malloc(4096); + asn_enc_rval_t rve = xer_encode_to_buffer(xml_denm, 4096, 0x02, &asn_DEF_DENM, its_msg); + syslog_debug("DENM XER %d: %.*s", (int)rve.encoded, (int)rve.encoded , xml_denm); + free(xml_denm); + #endif + + event_manage(facilities->den, its_msg); + } + + // Forward to app fdi = calloc(1, sizeof(FacilitiesDataIndication_t)); - + fdi->itssMessageType = bdi->destinationPort; - + fdi->data.size = bdi->data.size; fdi->data.buf = malloc(bdi->data.size); memcpy(fdi->data.buf, bdi->data.buf, bdi->data.size); @@ -90,9 +103,9 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t zmq_recv(facilities->app_socket, &code, 1, 0); cleanup: + if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); ASN_STRUCT_FREE(asn_DEF_BTPDataIndication, bdi); ASN_STRUCT_FREE(asn_DEF_FacilitiesDataIndication, fdi); - ASN_STRUCT_FREE(*msg_descriptor, msg_struct); return rv; } @@ -109,7 +122,7 @@ int main() { facilities.app_socket = zmq_socket(context, ZMQ_REQ); rc = zmq_connect(facilities.app_socket, "ipc:///tmp/itss/application"); - + facilities.transport_socket = zmq_socket(context, ZMQ_REQ); rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport");