From 817bacca16dff809ab4952d02d1f37f138924fc8 Mon Sep 17 00:00:00 2001 From: emanuel Date: Mon, 12 Oct 2020 19:22:22 +0100 Subject: [PATCH] Added DENM mgmt --- src/CMakeLists.txt | 1 + src/cam.c | 63 +++++++++- src/cam.h | 2 +- src/denm.c | 280 +++++++++++++++++++++++++++++++++++++++++++++ src/denm.h | 53 +++++++++ src/facilities.c | 56 ++------- src/facilities.h | 8 +- 7 files changed, 406 insertions(+), 57 deletions(-) create mode 100644 src/denm.c create mode 100644 src/denm.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6dd30e3..813d6d4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,6 @@ ADD_EXECUTABLE(it2s-itss-facilities cam.c + denm.c facilities.c ) diff --git a/src/cam.c b/src/cam.c index c3ffb76..5371bc3 100644 --- a/src/cam.c +++ b/src/cam.c @@ -1,11 +1,14 @@ #include "cam.h" +#include "facilities.h" + +#include +#include +#include #include #include - -#include -//#include - +#include +#include #include #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) @@ -51,12 +54,12 @@ static SpeedConfidence_t getSpeedConfidence(uint32_t conf) { } -int mk_cam(uint8_t *cam, uint32_t *cam_len) { +static int mk_cam(uint8_t *cam, uint32_t *cam_len) { int rv = 0; CAM_t *cam_tx = calloc(1, sizeof(CAM_t)); - cam_tx->header.protocolVersion = protocolVersion_currentVersion; + cam_tx->header.protocolVersion = 2; cam_tx->header.messageID = ItsPduHeader__messageID_cam; cam_tx->header.stationID = 0; cam_tx->cam.camParameters.basicContainer.stationType = StationType_passengerCar; @@ -160,3 +163,51 @@ cleanup: return rv; } + + +void *ca_service(void *fc) { + + int rv = 0; + uint8_t code = 0; + facilities_t *facilities = (facilities_t*) fc; + + BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t)); + + bdr->btpType = BTPType_btpB; + + bdr->destinationAddress.buf = malloc(6); + for (int i = 0; i < 6; ++i) {bdr->destinationAddress.buf[i] = 0xff;} + bdr->destinationAddress.size = 6; + + bdr->packetTransportType = PacketTransportType_shb; + + bdr->destinationPort = Port_cam; + + bdr->trafficClass = 2; + + bdr->data.buf = malloc(256); + + uint8_t bdr_oer[256]; + bdr_oer[0] = 4; // Facilities + while (!facilities->exit) { + rv = mk_cam(bdr->data.buf, (uint32_t *) &bdr->data.size); + if (rv) { + syslog_err("[facilities] make cam failed (%d)", rv); + continue; + } + + asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_BTPDataRequest, NULL, bdr, bdr_oer+1, 255); + if (enc.encoded == -1) { + syslog_err("[facilities] encoding BTPDataRequest for cam failed"); + continue; + } + syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); + zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0); + zmq_recv(facilities->transport_socket, &code, 1, 0); + sleep(1); + } + + ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); + + return NULL; +} diff --git a/src/cam.h b/src/cam.h index 76cfadd..d69c35f 100644 --- a/src/cam.h +++ b/src/cam.h @@ -3,6 +3,6 @@ #include -int mk_cam(uint8_t *cam, uint32_t *cam_len); +void *ca_service(void *fc); #endif diff --git a/src/denm.c b/src/denm.c new file mode 100644 index 0000000..266c361 --- /dev/null +++ b/src/denm.c @@ -0,0 +1,280 @@ +#include "denm.h" +#include "facilities.h" + +#include +#include + +#define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) +#define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) +#define syslog_err(msg, ...) syslog(LOG_ERR, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) + +#ifndef NDEBUG +#define syslog_debug(msg, ...) syslog(LOG_DEBUG, "%s:%d " msg "", __func__, __LINE__, ##__VA_ARGS__) +#else +#define syslog_debug(msg, ...) +#endif + +enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) { + int rv = 0; + + + uint64_t e_detection_time, e_reference_time; + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); + + if (denm->denm.situation == NULL) { + return EVENT_INVALID; // Ignore situationless events + } + + if (e_detection_time > e_reference_time) { + return EVENT_INVALID; + } + + struct timespec systemtime; + clock_gettime(CLOCK_REALTIME, &systemtime); + long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); + now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 + + uint32_t e_validity_duration; + if (denm->denm.management.validityDuration != NULL) { + e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds + } else { + e_validity_duration = 30000; + } + + if (e_detection_time + e_validity_duration < now) { + return EVENT_PASSED; + } + + bool is_new = true; + bool is_update = false; + bool exists_vacancy = false; + + pthread_mutex_lock(&den->lock); + + for (int i = 0; i < den->no_max_events; ++i) { + if (den->events[i]->enabled) { + if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && + den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { + + is_new = false; + if (den->events[i]->reference_time < e_reference_time || + den->events[i]->detection_time < e_detection_time) { + is_update = true; + } + + break; + } + } + } + + if (is_new) { + for (int i = 0; i < den->no_max_events; ++i) { + if (!den->events[i]->enabled) { + exists_vacancy = true; + break; + } + } + } + + pthread_mutex_unlock(&den->lock); + + if (denm->denm.management.termination != NULL) { + if (*denm->denm.management.termination == 0) { + return EVENT_CANCELLATION; + } else if (*denm->denm.management.termination == 1) { + return EVENT_NEGATION; + } + } + + if (is_update) { + return EVENT_UPDATE; + } + + if (!is_new) { + return EVENT_REPEATED; + } + + if (!exists_vacancy) { + return EVENT_NUMBER_EXCEEDED; + } + + return EVENT_NEW; +} + +int event_add(den_t *den, DENM_t *denm) { + + struct timespec systemtime; + clock_gettime(CLOCK_REALTIME, &systemtime); + long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); + now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 + + uint64_t e_detection_time, e_reference_time; + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); + + uint32_t e_validity_duration; + if (denm->denm.management.validityDuration != NULL) { + e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds + } else { + e_validity_duration = 30000; + } + + pthread_mutex_lock(&den->lock); + + int index = -1; + for (int i = 0; i < den->no_max_events; ++i) { + if (!den->events[i]->enabled) { + index = i; + break; + } + } + + uint8_t state = EVENT_ACTIVE; + if (denm->denm.management.termination != NULL) { + if (*(denm->denm.management.termination) == 0) { + state = EVENT_CANCELLED; + } else if (*(denm->denm.management.termination) == 1) { + state = EVENT_NEGATION; + } + } + + + if (index != -1) { + den->events[index]->enabled = true; + den->events[index]->state = state; + den->events[index]->station_id = denm->denm.management.actionID.originatingStationID; + den->events[index]->sn = denm->denm.management.actionID.sequenceNumber; + den->events[index]->cause = denm->denm.situation->eventType.causeCode; + den->events[index]->subcause = denm->denm.situation->eventType.subCauseCode; + den->events[index]->detection_time = e_detection_time; + den->events[index]->reference_time = e_reference_time; + den->events[index]->expiration_time = e_detection_time + e_validity_duration; + den->events[index]->latitude = denm->denm.management.eventPosition.latitude; + den->events[index]->longitude = denm->denm.management.eventPosition.longitude; + den->events[index]->denm = denm; + + if (denm->denm.management.actionID.sequenceNumber > den->sn) { + den->sn = denm->denm.management.actionID.sequenceNumber; + } + } + + pthread_mutex_unlock(&den->lock); + + if (index == -1) return 1; // Max events reached + else return 0; // Event added to db +} + +int event_update(den_t *den, DENM_t *denm) { + + struct timespec systemtime; + clock_gettime(CLOCK_REALTIME, &systemtime); + long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); + now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 + + uint64_t e_detection_time, e_reference_time; + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); + asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); + + uint8_t state = EVENT_ACTIVE; + if (denm->denm.management.termination != NULL) { + if (*(denm->denm.management.termination) == 0) { + state = EVENT_CANCELLED; + } else if (*(denm->denm.management.termination) == 1) { + state = EVENT_NEGATION; + } + } + + int index = -1; + pthread_mutex_lock(&den->lock); + + for (int i = 0; i < den->no_max_events; ++i) { + if (den->events[i]->enabled) { + if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && + den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { + index = i; + break; + } + } + } + + if (index != -1) { + den->events[index]->state = state; + den->events[index]->detection_time = e_detection_time; + den->events[index]->cause = denm->denm.situation->eventType.causeCode; + den->events[index]->subcause = denm->denm.situation->eventType.subCauseCode; + den->events[index]->reference_time = e_reference_time; + + if (denm->denm.management.validityDuration != NULL) { + den->events[index]->expiration_time = e_detection_time + *(uint32_t *) denm->denm.management.validityDuration * 1000; + } + + den->events[index]->latitude = denm->denm.management.eventPosition.latitude; + den->events[index]->longitude = denm->denm.management.eventPosition.longitude; + + ASN_STRUCT_FREE(asn_DEF_DENM, den->events[index]->denm); + den->events[index]->denm = denm; + } + + pthread_mutex_unlock(&den->lock); + + if (index == -1) return 1; // Event not found + else return 0; // Event updated +} + +void* den_service(void *fc) { + facilities_t *facilities = (facilities_t *) fc; + + struct timespec systemtime; + uint64_t now; + int active, cancelled, negated; + + den_t *den = facilities->den; + pthread_mutex_init(&den->lock, NULL); + + den->no_max_events = 32; + + den->events = malloc(den->no_max_events * sizeof(void*)); + for (int i = 0; i < den->no_max_events; ++i) { + den->events[i] = calloc(1, sizeof(event_t)); + } + + while (!facilities->exit) { + clock_gettime(CLOCK_REALTIME, &systemtime); + now = (long)(systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); + now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 + + active = 0, cancelled = 0, negated = 0; + + pthread_mutex_lock(&den->lock); + for (int i = 0; i < den->no_max_events; ++i) { + if (den->events[i]->enabled) { + if (now >= den->events[i]->expiration_time) { // Remove event + den->events[i]->enabled = false; + ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm); + } else { + switch (den->events[i]->state) { + case EVENT_ACTIVE: + ++active; + syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000); + break; + case EVENT_CANCELLED: + ++cancelled; + break; + case EVENT_NEGATED: + ++negated; + break; + } + } + } + + } + pthread_mutex_unlock(&den->lock); + + syslog_debug("[facilities] [den] events : %d active %d : cancelled : %d negated", active, cancelled, negated); + + usleep(1000000); + } + + return NULL; +} diff --git a/src/denm.h b/src/denm.h new file mode 100644 index 0000000..2bf8c1c --- /dev/null +++ b/src/denm.h @@ -0,0 +1,53 @@ +#ifndef FACILITIES_DENM_H +#define FACILITIES_DENM_H + +#include +#include +#include +#include + +enum EVENT_STATE { + EVENT_ACTIVE, + EVENT_CANCELLED, + EVENT_NEGATED +}; + +typedef struct event { + DENM_t *denm; + uint32_t station_id; + uint32_t sn; + uint64_t detection_time; + uint64_t reference_time; + uint64_t expiration_time; + uint8_t cause; + uint8_t subcause; + uint32_t latitude; + uint32_t longitude; + bool enabled; + enum EVENT_STATE state; +} event_t; + +typedef struct den { + event_t ** events; + uint32_t sn; + uint16_t no_active_events; + uint16_t no_max_events; + pthread_mutex_t lock; +} den_t; + +enum EVENT_CHECK_RESULT { + EVENT_NEW, + EVENT_INVALID, + EVENT_PASSED, + EVENT_CANCELLATION, + EVENT_NEGATION, + EVENT_UPDATE, + EVENT_REPEATED, + EVENT_NUMBER_EXCEEDED +}; + +enum EVENT_CHECK_RESULT check_event(den_t *den, DENM_t *denm); + +void* den_service(void *fc); + +#endif diff --git a/src/facilities.c b/src/facilities.c index d99ce6f..4a16ce4 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -1,5 +1,6 @@ #include "facilities.h" #include "cam.h" +#include "denm.h" #include #include @@ -8,7 +9,6 @@ #include #include -#include #include #include @@ -97,56 +97,11 @@ cleanup: return rv; } -static void* handler(void *fc) { - int rv = 0; - uint8_t code = 0; - facilities_t *facilities = (facilities_t*) fc; - - BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t)); - - bdr->btpType = BTPType_btpB; - - bdr->destinationAddress.buf = malloc(6); - for (int i = 0; i < 6; ++i) {bdr->destinationAddress.buf[i] = 0xff;} - bdr->destinationAddress.size = 6; - - bdr->packetTransportType = PacketTransportType_shb; - - bdr->destinationPort = Port_cam; - - bdr->trafficClass = 2; - - bdr->data.buf = malloc(256); - - uint8_t bdr_oer[256]; - bdr_oer[0] = 4; // Facilities - while (!facilities->exit) { - rv = mk_cam(bdr->data.buf, (uint32_t *) &bdr->data.size); - if (rv) { - syslog_err("[facilities] make cam failed (%d)", rv); - continue; - } - - asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_BTPDataRequest, NULL, bdr, bdr_oer+1, 255); - if (enc.encoded == -1) { - syslog_err("[facilities] encoding BTPDataRequest for cam failed"); - continue; - } - syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); - zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0); - zmq_recv(facilities->transport_socket, &code, 1, 0); - sleep(1); - } - - ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); - - return NULL; -} - int main() { syslog_info("[facilities] starting"); facilities_t facilities; + facilities.exit = false; void *context = zmq_ctx_new(); facilities.responder = zmq_socket(context, ZMQ_REP); @@ -158,13 +113,16 @@ int main() { facilities.transport_socket = zmq_socket(context, ZMQ_REQ); rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport"); - pthread_create(&facilities.handler, NULL, handler, (void*) &facilities); + facilities.den = calloc(1, sizeof(den_t)); + + pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); + pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities); uint8_t buffer[512]; uint8_t code; syslog_info("[facilities] listening"); - while (1) { + while (!facilities.exit) { zmq_recv(facilities.responder, buffer, 512, 0); switch (buffer[0]) { diff --git a/src/facilities.h b/src/facilities.h index 2ed25e0..529d35c 100644 --- a/src/facilities.h +++ b/src/facilities.h @@ -4,14 +4,20 @@ #include #include +#include "denm.h" + typedef struct facilities { - pthread_t handler; + pthread_t ca_service; + pthread_t den_service; // ZMQ void* responder; void* transport_socket; void* app_socket; + // DENM + den_t *den; + bool exit; } facilities_t;