Added DENM mgmt

This commit is contained in:
emanuel 2020-10-12 19:22:22 +01:00
parent 0aa785b088
commit 817bacca16
7 changed files with 406 additions and 57 deletions

View File

@ -1,5 +1,6 @@
ADD_EXECUTABLE(it2s-itss-facilities ADD_EXECUTABLE(it2s-itss-facilities
cam.c cam.c
denm.c
facilities.c facilities.c
) )

View File

@ -1,11 +1,14 @@
#include "cam.h" #include "cam.h"
#include "facilities.h"
#include <itss-transport/BTPDataRequest.h>
#include <camv2/CAM.h>
#include <it2s-gps.h>
#include <stdint.h> #include <stdint.h>
#include <time.h> #include <time.h>
#include <zmq.h>
#include <camv2/CAM.h> #include <unistd.h>
//#include <it2s-gps.h>
#include <syslog.h> #include <syslog.h>
#define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__)
@ -51,12 +54,12 @@ static SpeedConfidence_t getSpeedConfidence(uint32_t conf) {
} }
int mk_cam(uint8_t *cam, uint32_t *cam_len) { static int mk_cam(uint8_t *cam, uint32_t *cam_len) {
int rv = 0; int rv = 0;
CAM_t *cam_tx = calloc(1, sizeof(CAM_t)); CAM_t *cam_tx = calloc(1, sizeof(CAM_t));
cam_tx->header.protocolVersion = protocolVersion_currentVersion; cam_tx->header.protocolVersion = 2;
cam_tx->header.messageID = ItsPduHeader__messageID_cam; cam_tx->header.messageID = ItsPduHeader__messageID_cam;
cam_tx->header.stationID = 0; cam_tx->header.stationID = 0;
cam_tx->cam.camParameters.basicContainer.stationType = StationType_passengerCar; cam_tx->cam.camParameters.basicContainer.stationType = StationType_passengerCar;
@ -160,3 +163,51 @@ cleanup:
return rv; return rv;
} }
void *ca_service(void *fc) {
int rv = 0;
uint8_t code = 0;
facilities_t *facilities = (facilities_t*) fc;
BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t));
bdr->btpType = BTPType_btpB;
bdr->destinationAddress.buf = malloc(6);
for (int i = 0; i < 6; ++i) {bdr->destinationAddress.buf[i] = 0xff;}
bdr->destinationAddress.size = 6;
bdr->packetTransportType = PacketTransportType_shb;
bdr->destinationPort = Port_cam;
bdr->trafficClass = 2;
bdr->data.buf = malloc(256);
uint8_t bdr_oer[256];
bdr_oer[0] = 4; // Facilities
while (!facilities->exit) {
rv = mk_cam(bdr->data.buf, (uint32_t *) &bdr->data.size);
if (rv) {
syslog_err("[facilities] make cam failed (%d)", rv);
continue;
}
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_BTPDataRequest, NULL, bdr, bdr_oer+1, 255);
if (enc.encoded == -1) {
syslog_err("[facilities] encoding BTPDataRequest for cam failed");
continue;
}
syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded);
zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0);
zmq_recv(facilities->transport_socket, &code, 1, 0);
sleep(1);
}
ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr);
return NULL;
}

View File

@ -3,6 +3,6 @@
#include <stdint.h> #include <stdint.h>
int mk_cam(uint8_t *cam, uint32_t *cam_len); void *ca_service(void *fc);
#endif #endif

280
src/denm.c Normal file
View File

@ -0,0 +1,280 @@
#include "denm.h"
#include "facilities.h"
#include <unistd.h>
#include <syslog.h>
#define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__)
#define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__)
#define syslog_err(msg, ...) syslog(LOG_ERR, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__)
#ifndef NDEBUG
#define syslog_debug(msg, ...) syslog(LOG_DEBUG, "%s:%d " msg "", __func__, __LINE__, ##__VA_ARGS__)
#else
#define syslog_debug(msg, ...)
#endif
enum EVENT_CHECK_RESULT event_check(den_t *den, DENM_t *denm) {
int rv = 0;
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time);
if (denm->denm.situation == NULL) {
return EVENT_INVALID; // Ignore situationless events
}
if (e_detection_time > e_reference_time) {
return EVENT_INVALID;
}
struct timespec systemtime;
clock_gettime(CLOCK_REALTIME, &systemtime);
long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6);
now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000
uint32_t e_validity_duration;
if (denm->denm.management.validityDuration != NULL) {
e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds
} else {
e_validity_duration = 30000;
}
if (e_detection_time + e_validity_duration < now) {
return EVENT_PASSED;
}
bool is_new = true;
bool is_update = false;
bool exists_vacancy = false;
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->no_max_events; ++i) {
if (den->events[i]->enabled) {
if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID &&
den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) {
is_new = false;
if (den->events[i]->reference_time < e_reference_time ||
den->events[i]->detection_time < e_detection_time) {
is_update = true;
}
break;
}
}
}
if (is_new) {
for (int i = 0; i < den->no_max_events; ++i) {
if (!den->events[i]->enabled) {
exists_vacancy = true;
break;
}
}
}
pthread_mutex_unlock(&den->lock);
if (denm->denm.management.termination != NULL) {
if (*denm->denm.management.termination == 0) {
return EVENT_CANCELLATION;
} else if (*denm->denm.management.termination == 1) {
return EVENT_NEGATION;
}
}
if (is_update) {
return EVENT_UPDATE;
}
if (!is_new) {
return EVENT_REPEATED;
}
if (!exists_vacancy) {
return EVENT_NUMBER_EXCEEDED;
}
return EVENT_NEW;
}
int event_add(den_t *den, DENM_t *denm) {
struct timespec systemtime;
clock_gettime(CLOCK_REALTIME, &systemtime);
long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6);
now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time);
uint32_t e_validity_duration;
if (denm->denm.management.validityDuration != NULL) {
e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds
} else {
e_validity_duration = 30000;
}
pthread_mutex_lock(&den->lock);
int index = -1;
for (int i = 0; i < den->no_max_events; ++i) {
if (!den->events[i]->enabled) {
index = i;
break;
}
}
uint8_t state = EVENT_ACTIVE;
if (denm->denm.management.termination != NULL) {
if (*(denm->denm.management.termination) == 0) {
state = EVENT_CANCELLED;
} else if (*(denm->denm.management.termination) == 1) {
state = EVENT_NEGATION;
}
}
if (index != -1) {
den->events[index]->enabled = true;
den->events[index]->state = state;
den->events[index]->station_id = denm->denm.management.actionID.originatingStationID;
den->events[index]->sn = denm->denm.management.actionID.sequenceNumber;
den->events[index]->cause = denm->denm.situation->eventType.causeCode;
den->events[index]->subcause = denm->denm.situation->eventType.subCauseCode;
den->events[index]->detection_time = e_detection_time;
den->events[index]->reference_time = e_reference_time;
den->events[index]->expiration_time = e_detection_time + e_validity_duration;
den->events[index]->latitude = denm->denm.management.eventPosition.latitude;
den->events[index]->longitude = denm->denm.management.eventPosition.longitude;
den->events[index]->denm = denm;
if (denm->denm.management.actionID.sequenceNumber > den->sn) {
den->sn = denm->denm.management.actionID.sequenceNumber;
}
}
pthread_mutex_unlock(&den->lock);
if (index == -1) return 1; // Max events reached
else return 0; // Event added to db
}
int event_update(den_t *den, DENM_t *denm) {
struct timespec systemtime;
clock_gettime(CLOCK_REALTIME, &systemtime);
long now = (long) (systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6);
now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time);
uint8_t state = EVENT_ACTIVE;
if (denm->denm.management.termination != NULL) {
if (*(denm->denm.management.termination) == 0) {
state = EVENT_CANCELLED;
} else if (*(denm->denm.management.termination) == 1) {
state = EVENT_NEGATION;
}
}
int index = -1;
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->no_max_events; ++i) {
if (den->events[i]->enabled) {
if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID &&
den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) {
index = i;
break;
}
}
}
if (index != -1) {
den->events[index]->state = state;
den->events[index]->detection_time = e_detection_time;
den->events[index]->cause = denm->denm.situation->eventType.causeCode;
den->events[index]->subcause = denm->denm.situation->eventType.subCauseCode;
den->events[index]->reference_time = e_reference_time;
if (denm->denm.management.validityDuration != NULL) {
den->events[index]->expiration_time = e_detection_time + *(uint32_t *) denm->denm.management.validityDuration * 1000;
}
den->events[index]->latitude = denm->denm.management.eventPosition.latitude;
den->events[index]->longitude = denm->denm.management.eventPosition.longitude;
ASN_STRUCT_FREE(asn_DEF_DENM, den->events[index]->denm);
den->events[index]->denm = denm;
}
pthread_mutex_unlock(&den->lock);
if (index == -1) return 1; // Event not found
else return 0; // Event updated
}
void* den_service(void *fc) {
facilities_t *facilities = (facilities_t *) fc;
struct timespec systemtime;
uint64_t now;
int active, cancelled, negated;
den_t *den = facilities->den;
pthread_mutex_init(&den->lock, NULL);
den->no_max_events = 32;
den->events = malloc(den->no_max_events * sizeof(void*));
for (int i = 0; i < den->no_max_events; ++i) {
den->events[i] = calloc(1, sizeof(event_t));
}
while (!facilities->exit) {
clock_gettime(CLOCK_REALTIME, &systemtime);
now = (long)(systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6);
now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000
active = 0, cancelled = 0, negated = 0;
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->no_max_events; ++i) {
if (den->events[i]->enabled) {
if (now >= den->events[i]->expiration_time) { // Remove event
den->events[i]->enabled = false;
ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm);
} else {
switch (den->events[i]->state) {
case EVENT_ACTIVE:
++active;
syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000);
break;
case EVENT_CANCELLED:
++cancelled;
break;
case EVENT_NEGATED:
++negated;
break;
}
}
}
}
pthread_mutex_unlock(&den->lock);
syslog_debug("[facilities] [den] events : %d active %d : cancelled : %d negated", active, cancelled, negated);
usleep(1000000);
}
return NULL;
}

53
src/denm.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef FACILITIES_DENM_H
#define FACILITIES_DENM_H
#include <stdint.h>
#include <denmv2/DENM.h>
#include <pthread.h>
#include <stdbool.h>
enum EVENT_STATE {
EVENT_ACTIVE,
EVENT_CANCELLED,
EVENT_NEGATED
};
typedef struct event {
DENM_t *denm;
uint32_t station_id;
uint32_t sn;
uint64_t detection_time;
uint64_t reference_time;
uint64_t expiration_time;
uint8_t cause;
uint8_t subcause;
uint32_t latitude;
uint32_t longitude;
bool enabled;
enum EVENT_STATE state;
} event_t;
typedef struct den {
event_t ** events;
uint32_t sn;
uint16_t no_active_events;
uint16_t no_max_events;
pthread_mutex_t lock;
} den_t;
enum EVENT_CHECK_RESULT {
EVENT_NEW,
EVENT_INVALID,
EVENT_PASSED,
EVENT_CANCELLATION,
EVENT_NEGATION,
EVENT_UPDATE,
EVENT_REPEATED,
EVENT_NUMBER_EXCEEDED
};
enum EVENT_CHECK_RESULT check_event(den_t *den, DENM_t *denm);
void* den_service(void *fc);
#endif

View File

@ -1,5 +1,6 @@
#include "facilities.h" #include "facilities.h"
#include "cam.h" #include "cam.h"
#include "denm.h"
#include <itss-transport/BTPDataRequest.h> #include <itss-transport/BTPDataRequest.h>
#include <itss-transport/BTPDataIndication.h> #include <itss-transport/BTPDataIndication.h>
@ -8,7 +9,6 @@
#include <camv2/CAM.h> #include <camv2/CAM.h>
#include <denmv2/DENM.h> #include <denmv2/DENM.h>
#include <itss-transport/per_decoder.h>
#include <ivim/IVIM.h> #include <ivim/IVIM.h>
#include <it2s-btp.h> #include <it2s-btp.h>
@ -97,56 +97,11 @@ cleanup:
return rv; return rv;
} }
static void* handler(void *fc) {
int rv = 0;
uint8_t code = 0;
facilities_t *facilities = (facilities_t*) fc;
BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t));
bdr->btpType = BTPType_btpB;
bdr->destinationAddress.buf = malloc(6);
for (int i = 0; i < 6; ++i) {bdr->destinationAddress.buf[i] = 0xff;}
bdr->destinationAddress.size = 6;
bdr->packetTransportType = PacketTransportType_shb;
bdr->destinationPort = Port_cam;
bdr->trafficClass = 2;
bdr->data.buf = malloc(256);
uint8_t bdr_oer[256];
bdr_oer[0] = 4; // Facilities
while (!facilities->exit) {
rv = mk_cam(bdr->data.buf, (uint32_t *) &bdr->data.size);
if (rv) {
syslog_err("[facilities] make cam failed (%d)", rv);
continue;
}
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_BTPDataRequest, NULL, bdr, bdr_oer+1, 255);
if (enc.encoded == -1) {
syslog_err("[facilities] encoding BTPDataRequest for cam failed");
continue;
}
syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded);
zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0);
zmq_recv(facilities->transport_socket, &code, 1, 0);
sleep(1);
}
ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr);
return NULL;
}
int main() { int main() {
syslog_info("[facilities] starting"); syslog_info("[facilities] starting");
facilities_t facilities; facilities_t facilities;
facilities.exit = false;
void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
facilities.responder = zmq_socket(context, ZMQ_REP); facilities.responder = zmq_socket(context, ZMQ_REP);
@ -158,13 +113,16 @@ int main() {
facilities.transport_socket = zmq_socket(context, ZMQ_REQ); facilities.transport_socket = zmq_socket(context, ZMQ_REQ);
rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport"); rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport");
pthread_create(&facilities.handler, NULL, handler, (void*) &facilities); facilities.den = calloc(1, sizeof(den_t));
pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities);
pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities);
uint8_t buffer[512]; uint8_t buffer[512];
uint8_t code; uint8_t code;
syslog_info("[facilities] listening"); syslog_info("[facilities] listening");
while (1) { while (!facilities.exit) {
zmq_recv(facilities.responder, buffer, 512, 0); zmq_recv(facilities.responder, buffer, 512, 0);
switch (buffer[0]) { switch (buffer[0]) {

View File

@ -4,14 +4,20 @@
#include <pthread.h> #include <pthread.h>
#include <stdbool.h> #include <stdbool.h>
#include "denm.h"
typedef struct facilities { typedef struct facilities {
pthread_t handler; pthread_t ca_service;
pthread_t den_service;
// ZMQ // ZMQ
void* responder; void* responder;
void* transport_socket; void* transport_socket;
void* app_socket; void* app_socket;
// DENM
den_t *den;
bool exit; bool exit;
} facilities_t; } facilities_t;