#include "denm.h" #include "facilities.h" #include #include #include #include #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) #define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #define syslog_err(msg, ...) syslog(LOG_ERR, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #ifndef NDEBUG #define syslog_debug(msg, ...) syslog(LOG_DEBUG, "%s:%d " msg "", __func__, __LINE__, ##__VA_ARGS__) #else #define syslog_debug(msg, ...) #endif const cc_ssp_bm_t CC_SSP_BM_MAP[] = { {"trafficCondition", 1, 0x80000000}, {"accident", 2, 0x40000000}, {"roadworks", 3, 0x20000000}, {"adverseWeatherCondition-Adhesion", 6, 0x10000000}, {"hazardousLocation-SurfaceCondition", 9, 0x08000000}, {"hazardousLocation-ObstacleOnTheRoad", 10, 0x04000000}, {"hazardousLocation-AnimalOnTheRoad", 11, 0x02000000}, {"humanPresenceOnTheRoad", 12, 0x01000000}, {"wrongWayDriving", 14, 0x00800000}, {"rescueAndRecoveryWorkInProgress", 15, 0x00400000}, {"adverseWeatherCondition-ExtremeWeatherCondition", 17, 0x00200000}, {"adverseWeatherCondition-Visibility", 18, 0x00100000}, {"adverseWeatherCondition-Precipitation", 19, 0x00080000}, {"slowVehicle", 26, 0x00040000}, {"dangerousEndOfQueue", 27, 0x00020000}, {"vehicleBreakdown", 91, 0x00010000}, {"postCrash", 92, 0x00008000}, {"humanProblem", 93, 0x00004000}, {"stationaryVehicle", 94, 0x00002000}, {"emergencyVehicleApproaching", 95, 0x00001000}, {"hazardousLocation-DangerousCurve", 96, 0x00000800}, {"collisionRisk", 97, 0x00000400}, {"signalViolation", 98, 0x00000200}, {"dangerousSituation", 99, 0x00000100}, }; static int permissions_check(int cause_code, uint8_t* permissions, uint8_t permissions_len) { /* DENM SSP scheme * * byte | description * --------------------------------- * 0 | SSP version control * 1-3 | Service-specific parameter * 3-40 | Reserved */ if (permissions_len < 4) { syslog_debug("[facilities] [den] permissions length too small"); return 0; } uint32_t perms_int = (*((uint32_t*) permissions)) >> 8; uint32_t perm_val; for (int i = 0; i < 24; ++i) { if (cause_code == CC_SSP_BM_MAP[i].cause_code) { perm_val = CC_SSP_BM_MAP[i].bitmap_val; perm_val = ((perm_val>>24) & 0xff) | ((perm_val<<8) & 0xff0000) | ((perm_val>>8) & 0xff00) | ((perm_val<<24) & 0xff000000); break; } } if ((perm_val & perms_int) == perm_val) return 1; else return 0; } static enum EVENT_CHECK_R event_check(den_t *den, DENM_t *denm, it2s_tender_epv_t* epv, uint8_t* ssp, uint32_t ssp_len) { int rv = 0; uint64_t e_detection_time, e_reference_time; asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); if (e_detection_time > e_reference_time) { return EVENT_INVALID; } // Check if event cause code type is permitted by the issuing ticket if (ssp && denm->denm.situation) { if (!permissions_check(denm->denm.situation->eventType.causeCode, ssp, ssp_len)) { return EVENT_BAD_PERMISSIONS; } } uint32_t e_validity_duration; if (denm->denm.management.validityDuration != NULL) { e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds } else { e_validity_duration = den->default_event_duration * 1000; } uint64_t now = it2s_tender_get_clock(epv); if (e_detection_time + e_validity_duration < now) { return EVENT_PASSED; } bool is_new = true; bool is_update = false; bool exists_vacancy = false; pthread_mutex_lock(&den->lock); for (int i = 0; i < den->n_max_events; ++i) { if (den->events[i]->enabled) { if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { is_new = false; if (den->events[i]->reference_time < e_reference_time) is_update = true; break; } } } if (is_new) { for (int i = 0; i < den->n_max_events; ++i) { if (!den->events[i]->enabled) { exists_vacancy = true; break; } } } pthread_mutex_unlock(&den->lock); if (is_update) { if (denm->denm.management.termination != NULL) { if (*denm->denm.management.termination == 0) { return EVENT_CANCELLATION; } else if (*denm->denm.management.termination == 1) { return EVENT_NEGATION; } } return EVENT_UPDATE; } if (!is_new) { return EVENT_REPEATED; } if (!exists_vacancy) { return EVENT_NUMBER_EXCEEDED; } return EVENT_NEW; } static int event_add(den_t *den, DENM_t *denm, it2s_tender_epv_t* epv, int64_t* id) { uint64_t now = it2s_tender_get_clock(epv); uint64_t e_detection_time, e_reference_time; asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); uint32_t e_validity_duration; if (denm->denm.management.validityDuration != NULL) { e_validity_duration = *(uint32_t *) denm->denm.management.validityDuration * 1000; // validityDuration comes in seconds } else { e_validity_duration = den->default_event_duration * 1000; } pthread_mutex_lock(&den->lock); int index = -1; for (int i = 0; i < den->n_max_events; ++i) { if (!den->events[i]->enabled) { index = i; break; } } uint8_t state = EVENT_ACTIVE; if (denm->denm.management.termination != NULL) { if (*(denm->denm.management.termination) == 0) { state = EVENT_CANCELLED; } else if (*(denm->denm.management.termination) == 1) { state = EVENT_NEGATED; } } switch (state) { case EVENT_ACTIVE: ++den->n_active_events; break; case EVENT_CANCELLED: ++den->n_cancelled_events; break; case EVENT_NEGATED: ++den->n_negated_events; break; } if (index != -1) { den->events[index]->id = rand(); *id = den->events[index]->id; den->events[index]->enabled = true; den->events[index]->state = state; den->events[index]->station_id = denm->denm.management.actionID.originatingStationID; den->events[index]->sn = denm->denm.management.actionID.sequenceNumber; den->events[index]->detection_time = e_detection_time; den->events[index]->reference_time = e_reference_time; den->events[index]->expiration_time = e_detection_time + e_validity_duration; den->events[index]->latitude = denm->denm.management.eventPosition.latitude; den->events[index]->longitude = denm->denm.management.eventPosition.longitude; den->events[index]->denm = denm; if (denm->denm.management.actionID.sequenceNumber > den->sn) { den->sn = denm->denm.management.actionID.sequenceNumber; } } pthread_mutex_unlock(&den->lock); if (index == -1) return 1; // Max events reached else return 0; // Event added to db } static int event_update(den_t *den, DENM_t *denm, it2s_tender_epv_t* epv, int64_t* id) { uint64_t now = it2s_tender_get_clock(epv); uint64_t e_detection_time, e_reference_time; asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.detectionTime, &e_detection_time); asn_INTEGER2ulong((INTEGER_t*) &denm->denm.management.referenceTime, &e_reference_time); uint8_t state = EVENT_ACTIVE; if (denm->denm.management.termination != NULL) { if (*(denm->denm.management.termination) == 0) { state = EVENT_CANCELLED; } else if (*(denm->denm.management.termination) == 1) { state = EVENT_NEGATED; } } int index = -1; pthread_mutex_lock(&den->lock); for (int i = 0; i < den->n_max_events; ++i) { if (den->events[i]->enabled) { if (den->events[i]->station_id == denm->denm.management.actionID.originatingStationID && den->events[i]->sn == denm->denm.management.actionID.sequenceNumber) { index = i; break; } } } switch (den->events[index]->state) { case EVENT_ACTIVE: --den->n_active_events; break; case EVENT_CANCELLED: --den->n_cancelled_events; break; case EVENT_NEGATED: --den->n_negated_events; break; } switch (state) { case EVENT_ACTIVE: ++den->n_active_events; break; case EVENT_CANCELLED: ++den->n_cancelled_events; break; case EVENT_NEGATED: ++den->n_negated_events; break; } if (index != -1) { *id = den->events[index]->id; den->events[index]->state = state; den->events[index]->detection_time = e_detection_time; den->events[index]->reference_time = e_reference_time; if (denm->denm.management.validityDuration != NULL) { den->events[index]->expiration_time = e_detection_time + *(uint32_t *) denm->denm.management.validityDuration * 1000; } if (state == EVENT_CANCELLED || state == EVENT_NEGATED) { // Keep terminated events for 10 mins den->events[index]->expiration_time = now + 600 * 1000; } den->events[index]->latitude = denm->denm.management.eventPosition.latitude; den->events[index]->longitude = denm->denm.management.eventPosition.longitude; ASN_STRUCT_FREE(asn_DEF_DENM, den->events[index]->denm); den->events[index]->denm = denm; } pthread_mutex_unlock(&den->lock); if (index == -1) return 1; // Event not found else return 0; // Event updated } enum EVENT_CHECK_R event_manage(den_t *den, DENM_t *denm, it2s_tender_epv_t* epv, int64_t* id, uint8_t* ssp, uint32_t ssp_len) { int rv = 0; switch (rv = event_check(den, denm, epv, ssp, ssp_len)) { case EVENT_NEW: syslog_debug("[facilities] [den] new event received"); if (event_add(den, denm, epv, id)) { syslog_debug("[facilities] [den] failed adding event, max events reached"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); rv = -1; } break; case EVENT_INVALID: syslog_debug("[facilities] [den] invalid event received, ignoring"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); break; case EVENT_PASSED: syslog_debug("[facilities] [den] old event received, ignoring"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); break; case EVENT_CANCELLATION: syslog_debug("[facilities] [den] event cancellation received"); if (event_update(den, denm, epv, id)) { syslog_debug("[facilities] [den] failed cancelling event, event not found"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); rv = EVENT_NUMBER_EXCEEDED; } break; case EVENT_NEGATION: syslog_debug("[facilities] [den] event negation received"); if (event_update(den, denm, epv, id)) { syslog_debug("[facilities] [den] failed negating event, event not found"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); rv = EVENT_NUMBER_EXCEEDED; } break; case EVENT_UPDATE: syslog_debug("[facilities] [den] event update received"); if (event_update(den, denm, epv, id)) { syslog_debug("[facilities] [den] failed updating event, event not found"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); rv = EVENT_NUMBER_EXCEEDED; } break; case EVENT_REPEATED: syslog_debug("[facilities] [den] repeated event received or referenceTime doesn't allow an update, ignoring"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); break; case EVENT_NUMBER_EXCEEDED: syslog_debug("[facilities] [den] max events reached, ignoring"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); break; case EVENT_BAD_PERMISSIONS: syslog_debug("[facilities] [den] permisisons check failed for the received event, ignoring"); ASN_STRUCT_FREE(asn_DEF_DENM, denm); break; } return rv; } void* den_service(void *fc) { facilities_t *facilities = (facilities_t *) fc; uint64_t now; den_t *den = facilities->den; pthread_mutex_init(&den->lock, NULL); den->events = malloc(den->n_max_events * sizeof(void*)); for (int i = 0; i < den->n_max_events; ++i) { den->events[i] = calloc(1, sizeof(event_t)); } uint32_t sleep4us = 5e5; uint32_t sleep_count = 0; while (!facilities->exit) { now = it2s_tender_get_clock(&facilities->epv); pthread_mutex_lock(&den->lock); for (int i = 0; i < den->n_max_events; ++i) { if (den->events[i]->enabled) { if (now >= den->events[i]->expiration_time) { // Remove event syslog_debug("[facilities] [den] removed event %d (expiration)", i); den->events[i]->enabled = false; ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm); switch (den->events[i]->state) { case EVENT_ACTIVE: --den->n_active_events; break; case EVENT_CANCELLED: --den->n_cancelled_events; break; case EVENT_NEGATED: --den->n_negated_events; break; } } else { switch (den->events[i]->state) { case EVENT_ACTIVE: syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000); break; case EVENT_CANCELLED: break; case EVENT_NEGATED: break; } } } } if (sleep4us * sleep_count > 5e6) { /* Print info every 5 seconds */ syslog_info("[facilities] [den] events :: [ %d active | %d cancelled | %d negated ]", den->n_active_events, den->n_cancelled_events, den->n_negated_events); sleep_count = 0; } pthread_mutex_unlock(&den->lock); ++sleep_count; usleep(sleep4us); } return NULL; }