it2s-itss-facilities/src/denm.c

440 lines
15 KiB
C

#include "denm.h"
#include "facilities.h"
#include <unistd.h>
#include <it2s-tender/time.h>
#include <it2s-tender/space.h>
const cc_ssp_bm_t CC_SSP_BM_MAP[] = {
{"trafficCondition", 1, 0x80000000},
{"accident", 2, 0x40000000},
{"roadworks", 3, 0x20000000},
{"adverseWeatherCondition-Adhesion", 6, 0x10000000},
{"hazardousLocation-SurfaceCondition", 9, 0x08000000},
{"hazardousLocation-ObstacleOnTheRoad", 10, 0x04000000},
{"hazardousLocation-AnimalOnTheRoad", 11, 0x02000000},
{"humanPresenceOnTheRoad", 12, 0x01000000},
{"wrongWayDriving", 14, 0x00800000},
{"rescueAndRecoveryWorkInProgress", 15, 0x00400000},
{"adverseWeatherCondition-ExtremeWeatherCondition", 17, 0x00200000},
{"adverseWeatherCondition-Visibility", 18, 0x00100000},
{"adverseWeatherCondition-Precipitation", 19, 0x00080000},
{"slowVehicle", 26, 0x00040000},
{"dangerousEndOfQueue", 27, 0x00020000},
{"vehicleBreakdown", 91, 0x00010000},
{"postCrash", 92, 0x00008000},
{"humanProblem", 93, 0x00004000},
{"stationaryVehicle", 94, 0x00002000},
{"emergencyVehicleApproaching", 95, 0x00001000},
{"hazardousLocation-DangerousCurve", 96, 0x00000800},
{"collisionRisk", 97, 0x00000400},
{"signalViolation", 98, 0x00000200},
{"dangerousSituation", 99, 0x00000100},
};
static int permissions_check(int cause_code, uint8_t* permissions, uint8_t permissions_len) {
/* DENM SSP scheme
*
* byte | description
* ---------------------------------
* 0 | SSP version control
* 1-3 | Service-specific parameter
* 3-40 | Reserved
*/
if (permissions_len < 4) {
log_debug("[den] permissions length too small");
return 0;
}
uint32_t perms_int = (*((uint32_t*) permissions)) >> 8;
uint32_t perm_val;
for (int i = 0; i < 24; ++i) {
if (cause_code == CC_SSP_BM_MAP[i].cause_code) {
perm_val = CC_SSP_BM_MAP[i].bitmap_val;
perm_val = ((perm_val>>24) & 0xff) |
((perm_val<<8) & 0xff0000) |
((perm_val>>8) & 0xff00) |
((perm_val<<24) & 0xff000000);
break;
}
}
if ((perm_val & perms_int) == perm_val) return 1;
else return 0;
}
static enum EVENT_CHECK_R event_check(DENM_t *denm, uint8_t* ssp, uint32_t ssp_len) {
int rv = 0;
den_t* den = &facilities.den;
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, (unsigned long long*) &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, (unsigned long long*) &e_reference_time);
if (e_detection_time > e_reference_time) {
return EVENT_INVALID;
}
// Check if event cause code type is permitted by the issuing ticket
if (ssp && denm->denm.situation) {
if (!permissions_check(denm->denm.situation->eventType.causeCode, ssp, ssp_len)) {
return EVENT_BAD_PERMISSIONS;
}
}
uint32_t e_validity_duration;
if (denm->denm.management.validityDuration != NULL) {
e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds
} else {
e_validity_duration = 600 * 1000;
}
uint64_t now = itss_time_get();
if (e_detection_time + e_validity_duration < now) {
return EVENT_PASSED;
}
bool is_new = true;
bool is_update = false;
bool exists_vacancy = false;
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->n_max_events; ++i) {
if (den->events[i]->enabled) {
if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID &&
den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) {
is_new = false;
if (den->events[i]->reference_time < e_reference_time) is_update = true;
break;
}
}
}
if (is_new) {
for (int i = 0; i < den->n_max_events; ++i) {
if (!den->events[i]->enabled) {
exists_vacancy = true;
break;
}
}
}
pthread_mutex_unlock(&den->lock);
if (is_update) {
if (denm->denm.management.termination != NULL) {
if (*denm->denm.management.termination == 0) {
return EVENT_CANCELLATION;
} else if (*denm->denm.management.termination == 1) {
return EVENT_NEGATION;
}
}
return EVENT_UPDATE;
}
if (!is_new) {
return EVENT_REPEATED;
}
if (!exists_vacancy) {
return EVENT_NUMBER_EXCEEDED;
}
return EVENT_NEW;
}
static int event_add(DENM_t *denm, uint64_t* id) {
den_t* den = &facilities.den;
uint64_t now = itss_time_get();
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, (unsigned long long*) &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, (unsigned long long*) &e_reference_time);
uint32_t e_validity_duration;
if (denm->denm.management.validityDuration != NULL) {
e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds
} else {
e_validity_duration = 600 * 1000;
}
pthread_mutex_lock(&den->lock);
int index = -1;
for (int i = 0; i < den->n_max_events; ++i) {
if (!den->events[i]->enabled) {
index = i;
break;
}
}
uint8_t state = EVENT_ACTIVE;
if (denm->denm.management.termination != NULL) {
if (*(denm->denm.management.termination) == 0) {
state = EVENT_CANCELLED;
} else if (*(denm->denm.management.termination) == 1) {
state = EVENT_NEGATED;
}
}
switch (state) {
case EVENT_ACTIVE:
++den->n_active_events;
break;
case EVENT_CANCELLED:
++den->n_cancelled_events;
break;
case EVENT_NEGATED:
++den->n_negated_events;
break;
}
if (index != -1) {
den->events[index]->id = *id;
den->events[index]->enabled = true;
den->events[index]->state = state;
den->events[index]->station_id = denm->denm.management.actionID.originatingStationID;
den->events[index]->sn = denm->denm.management.actionID.sequenceNumber;
den->events[index]->detection_time = e_detection_time;
den->events[index]->reference_time = e_reference_time;
den->events[index]->expiration_time = e_detection_time + e_validity_duration;
den->events[index]->latitude = denm->denm.management.eventPosition.latitude;
den->events[index]->longitude = denm->denm.management.eventPosition.longitude;
den->events[index]->denm = denm;
if (denm->denm.management.actionID.sequenceNumber > den->sn) {
den->sn = denm->denm.management.actionID.sequenceNumber;
}
}
pthread_mutex_unlock(&den->lock);
if (index == -1) return 1; // Max events reached
else return 0; // Event added to db
}
static int event_update(DENM_t *denm, uint64_t* id) {
den_t* den = &facilities.den;
uint64_t now = itss_time_get();
uint64_t e_detection_time, e_reference_time;
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, (unsigned long long*) &e_detection_time);
asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, (unsigned long long*) &e_reference_time);
uint8_t state = EVENT_ACTIVE;
if (denm->denm.management.termination != NULL) {
if (*(denm->denm.management.termination) == 0) {
state = EVENT_CANCELLED;
} else if (*(denm->denm.management.termination) == 1) {
state = EVENT_NEGATED;
}
}
int index = -1;
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->n_max_events; ++i) {
if (den->events[i]->enabled) {
if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID &&
den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) {
index = i;
break;
}
}
}
switch (den->events[index]->state) {
case EVENT_ACTIVE:
--den->n_active_events;
break;
case EVENT_CANCELLED:
--den->n_cancelled_events;
break;
case EVENT_NEGATED:
--den->n_negated_events;
break;
}
switch (state) {
case EVENT_ACTIVE:
++den->n_active_events;
break;
case EVENT_CANCELLED:
++den->n_cancelled_events;
break;
case EVENT_NEGATED:
++den->n_negated_events;
break;
}
if (index != -1) {
*id = den->events[index]->id;
den->events[index]->state = state;
den->events[index]->detection_time = e_detection_time;
den->events[index]->reference_time = e_reference_time;
if (denm->denm.management.validityDuration != NULL) {
den->events[index]->expiration_time = e_detection_time + *(uint32_t *) denm->denm.management.validityDuration * 1000;
}
if (state == EVENT_CANCELLED || state == EVENT_NEGATED) { // Keep terminated events for 10 mins
den->events[index]->expiration_time = now + 600 * 1000;
}
den->events[index]->latitude = denm->denm.management.eventPosition.latitude;
den->events[index]->longitude = denm->denm.management.eventPosition.longitude;
ASN_STRUCT_FREE(asn_DEF_DENM, den->events[index]->denm);
den->events[index]->denm = denm;
}
pthread_mutex_unlock(&den->lock);
if (index == -1) return 1; // Event not found
else return 0; // Event updated
}
enum EVENT_CHECK_R event_manage(DENM_t *denm, uint64_t* id, uint8_t* ssp, uint32_t ssp_len) {
int rv = 0;
switch (rv = event_check(denm, ssp, ssp_len)) {
case EVENT_NEW:
log_debug("[den] new event received");
if (event_add(denm, id)) {
log_debug("[den] failed adding event, max events reached");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
rv = -1;
}
break;
case EVENT_INVALID:
log_debug("[den] invalid event received, ignoring");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
break;
case EVENT_PASSED:
log_debug("[den] old event received, ignoring");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
break;
case EVENT_CANCELLATION:
log_debug("[den] event cancellation received");
if (event_update(denm, id)) {
log_debug("[den] failed cancelling event, event not found");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
rv = EVENT_NUMBER_EXCEEDED;
}
break;
case EVENT_NEGATION:
log_debug("[den] event negation received");
if (event_update(denm, id)) {
log_debug("[den] failed negating event, event not found");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
rv = EVENT_NUMBER_EXCEEDED;
}
break;
case EVENT_UPDATE:
log_debug("[den] event update received");
if (event_update(denm, id)) {
log_debug("[den] failed updating event, event not found");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
rv = EVENT_NUMBER_EXCEEDED;
}
break;
case EVENT_REPEATED:
log_debug("[den] repeated event received or referenceTime doesn't allow an update, ignoring");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
break;
case EVENT_NUMBER_EXCEEDED:
log_debug("[den] max events reached, ignoring");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
break;
case EVENT_BAD_PERMISSIONS:
log_debug("[den] permisisons check failed for the received event, ignoring");
ASN_STRUCT_FREE(asn_DEF_DENM, denm);
break;
}
return rv;
}
int den_init() {
return 0;
}
void* den_service() {
uint64_t now;
den_t *den = &facilities.den;
pthread_mutex_init(&den->lock, NULL);
den->events = malloc(den->n_max_events * sizeof(void*));
for (int i = 0; i < den->n_max_events; ++i) {
den->events[i] = calloc(1, sizeof(event_t));
}
uint32_t sleep4us = 1e6;
uint32_t sleep_count = 0;
while (!facilities.exit) {
now = itss_time_get();
pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->n_max_events; ++i) {
if (den->events[i]->enabled) {
if (now >= den->events[i]->expiration_time) { // Remove event
log_debug("[den] removed event %d (expiration)", i);
den->events[i]->enabled = false;
ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm);
switch (den->events[i]->state) {
case EVENT_ACTIVE:
--den->n_active_events;
break;
case EVENT_CANCELLED:
--den->n_cancelled_events;
break;
case EVENT_NEGATED:
--den->n_negated_events;
break;
}
} else {
switch (den->events[i]->state) {
case EVENT_ACTIVE:
log_debug("[den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000);
break;
case EVENT_CANCELLED:
break;
case EVENT_NEGATED:
break;
}
}
}
}
if (sleep4us * sleep_count > 5e6) { /* Print info every 5 seconds */
log_info("[den] events :: [ %d active | %d cancelled | %d negated ]", den->n_active_events, den->n_cancelled_events, den->n_negated_events);
sleep_count = 0;
}
pthread_mutex_unlock(&den->lock);
++sleep_count;
usleep(sleep4us);
}
return NULL;
}