Heartbeat support

This commit is contained in:
emanuel 2020-10-19 18:41:02 +01:00
parent 0518f44b9f
commit 96605b5c3a
4 changed files with 144 additions and 11 deletions

View File

@ -204,7 +204,7 @@ void *ca_service(void *fc) {
syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded);
zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0); zmq_send(facilities->transport_socket, bdr_oer, enc.encoded, 0);
zmq_recv(facilities->transport_socket, &code, 1, 0); zmq_recv(facilities->transport_socket, &code, 1, 0);
sleep(1000); sleep(1);
} }
ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr);

View File

@ -138,6 +138,17 @@ static int event_add(den_t *den, DENM_t *denm) {
} }
} }
switch (state) {
case EVENT_ACTIVE:
++den->no_active_events;
break;
case EVENT_CANCELLED:
++den->no_cancelled_events;
break;
case EVENT_NEGATED:
++den->no_negated_events;
break;
}
if (index != -1) { if (index != -1) {
den->events[index]->enabled = true; den->events[index]->enabled = true;
@ -197,6 +208,30 @@ static int event_update(den_t *den, DENM_t *denm) {
} }
} }
switch (den->events[index]->state) {
case EVENT_ACTIVE:
--den->no_active_events;
break;
case EVENT_CANCELLED:
--den->no_cancelled_events;
break;
case EVENT_NEGATED:
--den->no_negated_events;
break;
}
switch (state) {
case EVENT_ACTIVE:
++den->no_active_events;
break;
case EVENT_CANCELLED:
++den->no_cancelled_events;
break;
case EVENT_NEGATED:
++den->no_negated_events;
break;
}
if (index != -1) { if (index != -1) {
den->events[index]->state = state; den->events[index]->state = state;
den->events[index]->detection_time = e_detection_time; den->events[index]->detection_time = e_detection_time;
@ -283,7 +318,6 @@ void* den_service(void *fc) {
struct timespec systemtime; struct timespec systemtime;
uint64_t now; uint64_t now;
int active, cancelled, negated;
den_t *den = facilities->den; den_t *den = facilities->den;
pthread_mutex_init(&den->lock, NULL); pthread_mutex_init(&den->lock, NULL);
@ -300,34 +334,41 @@ void* den_service(void *fc) {
now = (long)(systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6); now = (long)(systemtime.tv_sec * 1000 + systemtime.tv_nsec / 1E6);
now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000 now = now - 1072915200000; // Convert EPOCH to 2004/01/01 00:00:000
active = 0, cancelled = 0, negated = 0;
pthread_mutex_lock(&den->lock); pthread_mutex_lock(&den->lock);
for (int i = 0; i < den->no_max_events; ++i) { for (int i = 0; i < den->no_max_events; ++i) {
if (den->events[i]->enabled) { if (den->events[i]->enabled) {
if (now >= den->events[i]->expiration_time) { // Remove event if (now >= den->events[i]->expiration_time) { // Remove event
den->events[i]->enabled = false; den->events[i]->enabled = false;
ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm); ASN_STRUCT_FREE(asn_DEF_DENM, den->events[i]->denm);
switch (den->events[i]->state) {
case EVENT_ACTIVE:
--den->no_active_events;
break;
case EVENT_CANCELLED:
--den->no_cancelled_events;
break;
case EVENT_NEGATED:
--den->no_negated_events;
break;
}
} else { } else {
switch (den->events[i]->state) { switch (den->events[i]->state) {
case EVENT_ACTIVE: case EVENT_ACTIVE:
++active;
syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000); syslog_debug("[facilities] [den] event %d expiring in %ld second(s)", i, (den->events[i]->expiration_time - now)/1000);
break; break;
case EVENT_CANCELLED: case EVENT_CANCELLED:
++cancelled;
break; break;
case EVENT_NEGATED: case EVENT_NEGATED:
++negated;
break; break;
} }
} }
} }
} }
pthread_mutex_unlock(&den->lock);
syslog_debug("[facilities] [den] events : %d active %d : cancelled : %d negated", active, cancelled, negated); syslog_debug("[facilities] [den] events :: [ %d active | %d cancelled | %d negated ]", den->no_active_events, den->no_cancelled_events, den->no_negated_events);
pthread_mutex_unlock(&den->lock);
usleep(1000000); usleep(1000000);
} }

View File

@ -31,6 +31,9 @@ typedef struct den {
event_t ** events; event_t ** events;
uint32_t sn; uint32_t sn;
uint16_t no_stored_events; uint16_t no_stored_events;
uint16_t no_active_events;
uint16_t no_cancelled_events;
uint16_t no_negated_events;
uint16_t no_max_events; uint16_t no_max_events;
pthread_mutex_t lock; pthread_mutex_t lock;
} den_t; } den_t;

View File

@ -6,6 +6,7 @@
#include <itss-transport/BTPDataIndication.h> #include <itss-transport/BTPDataIndication.h>
#include <itss-facilities/FacilitiesDataIndication.h> #include <itss-facilities/FacilitiesDataIndication.h>
#include <itss-facilities/FacilitiesDataRequest.h> #include <itss-facilities/FacilitiesDataRequest.h>
#include <itss-facilities/FacilitiesDataResult.h>
#include <camv2/CAM.h> #include <camv2/CAM.h>
#include <denmv2/DENM.h> #include <denmv2/DENM.h>
@ -35,7 +36,7 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_BTPDataIndication, (void**) &bdi, msg, msg_len); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_BTPDataIndication, (void**) &bdi, msg, msg_len);
if (dec.code) { if (dec.code) {
syslog_err("[facilities] <- invalid BDI received"); syslog_err("[facilities] <- invalid bdi received");
rv = 1; rv = 1;
code = 1; code = 1;
zmq_send(facilities->responder, &code, 1, 0); zmq_send(facilities->responder, &code, 1, 0);
@ -85,7 +86,6 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t
event_manage(facilities->den, its_msg); event_manage(facilities->den, its_msg);
} }
// Forward to app // Forward to app
fdi = calloc(1, sizeof(FacilitiesDataIndication_t)); fdi = calloc(1, sizeof(FacilitiesDataIndication_t));
@ -110,6 +110,90 @@ cleanup:
return rv; return rv;
} }
static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
int code = 0;
uint8_t *fdres_oer = NULL;
FacilitiesDataRequest_t *fdreq = calloc(1, sizeof(FacilitiesDataRequest_t));
FacilitiesDataResult_t *fdres = calloc(1, sizeof(FacilitiesDataResult_t));
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesDataRequest, (void**) &fdreq, msg, msg_len);
if (dec.code) {
syslog_err("[facilities] <- invalid FDR received");
rv = 1;
code = 1;
goto cleanup;
}
switch (fdreq->present) {
case FacilitiesDataRequest_PR_singleMessage:
break;
case FacilitiesDataRequest_PR_activeEvents:
pthread_mutex_lock(&facilities->den->lock);
fdres->code = ResultCode_accepted;
fdres->events = calloc(1, sizeof(Events_t));
switch (fdreq->choice.activeEvents) {
case EventType_active:
fdres->events->list.count = facilities->den->no_active_events;
fdres->events->list.size = facilities->den->no_active_events * sizeof(DENM_t *);
fdres->events->list.array = malloc(facilities->den->no_active_events * sizeof(DENM_t *));
for (int i = 0, j = 0; i < facilities->den->no_active_events; ++i) {
if (facilities->den->events[i]->state == EVENT_ACTIVE) {
fdres->events->list.array[j]->itssMessageType = ItssMessageType_denm;
fdres->events->list.array[j]->data.buf = malloc(512);
asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_DENM, NULL, facilities->den->events[i]->denm, fdres->events->list.array[j]->data.buf, 512);
if (enc.encoded == -1) {
syslog_err("[facilities] failed encoding DENM for FDResult");
continue;
}
fdres->events->list.array[j]->data.size = (enc.encoded + 7) / 8;
++j;
}
}
break;
default:
syslog_err("[facilities] unrecognized FDR event type");
pthread_mutex_unlock(&facilities->den->lock);
rv = 1;
goto cleanup;
}
fdres_oer = malloc(4096);
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_DENM, NULL, fdres, fdres_oer, 4096);
if (enc.encoded == -1) {
syslog_err("[facilities] failed encoding FDResult");
pthread_mutex_unlock(&facilities->den->lock);
rv = 1;
goto cleanup;
}
zmq_send(facilities->responder, fdres_oer, enc.encoded, 0);
pthread_mutex_unlock(&facilities->den->lock);
break;
default:
syslog_err("[facilities] unrecognized FDR type received");
rv = 1;
goto cleanup;
}
cleanup:
if (rv) {
fdres->code = ResultCode_rejected;
uint8_t fdres_oer[32];
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataResult, NULL, fdres, fdres_oer, 32);
zmq_send(facilities->responder, fdres_oer, enc.encoded, 0);
}
free(fdres_oer);
ASN_STRUCT_FREE(asn_DEF_FacilitiesDataResult, fdres);
ASN_STRUCT_FREE(asn_DEF_FacilitiesDataRequest, fdreq);
return rv;
}
int main() { int main() {
syslog_info("[facilities] starting"); syslog_info("[facilities] starting");
@ -128,7 +212,10 @@ int main() {
facilities.den = calloc(1, sizeof(den_t)); facilities.den = calloc(1, sizeof(den_t));
// CA
pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities);
// DEN
pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities); pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities);
uint8_t buffer[512]; uint8_t buffer[512];
@ -142,6 +229,8 @@ int main() {
case 3: case 3:
code = transport_indication(&facilities, buffer+1, 511); code = transport_indication(&facilities, buffer+1, 511);
break; break;
case 5:
code = facilities_request(&facilities, buffer+1, 511);
default: default:
syslog_debug("[facilities] unrecognized service"); syslog_debug("[facilities] unrecognized service");
continue; continue;