From 931cef50d0dd0dc4b06c70ab36666716e528f385 Mon Sep 17 00:00:00 2001 From: emanuel Date: Mon, 7 Mar 2022 17:13:57 +0000 Subject: [PATCH] Management logging --- src/cam.c | 22 +++++++++++++++++++-- src/config.c | 6 +++++- src/cpm.c | 22 +++++++++++++++++++-- src/facilities.c | 43 +++++++++++++++++++++++++++++++++------- src/facilities.h | 6 ++++-- src/pcm.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ src/requests.c | 20 ++++++++++++++++++- src/saem.c | 21 ++++++++++++++++++-- src/tpm.c | 43 ++++++++++++++++++++++++++++++++++++---- 9 files changed, 213 insertions(+), 21 deletions(-) diff --git a/src/cam.c b/src/cam.c index 003c008..f035035 100644 --- a/src/cam.c +++ b/src/cam.c @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -753,8 +754,8 @@ void *ca_service(void *fc) { memcpy(fmi->data.buf, bpr->data.buf, bpr->data.size); fmi->data.size = bpr->data.size; - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, true, ItsPduHeader__messageID_cam, bpr->data.buf, bpr->data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, true, ItsPduHeader__messageID_cam, bpr->data.buf, bpr->data.size); } // Check if inside PZ @@ -782,6 +783,23 @@ void *ca_service(void *fc) { it2s_tender_queue_add(facilities->tx_queue, fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); lightship_reset_timer(facilities->lightship, &facilities->epv); + + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + } + } } } diff --git a/src/config.c b/src/config.c index ed2906f..1289434 100644 --- a/src/config.c +++ b/src/config.c @@ -193,7 +193,6 @@ int facilities_config(void* facilities_s) { facilities->station_type = 5; } - facilities->logging = config->general.logging; facilities->use_security = config->security.use_security; @@ -276,6 +275,11 @@ int facilities_config(void* facilities_s) { memcpy(facilities->id.ipv6_addr+13, src_mac+3, 3); } + facilities->logging.recorder = config->facilities.logging.management; + if (config->facilities.logging.dbms) { + facilities->logging.dbms = calloc(1, sizeof(it2s_tender_database_s)); + } + // DENM facilities->den->n_max_events = config->facilities.denm.nmax_active_events; diff --git a/src/cpm.c b/src/cpm.c index 20e49b3..97aa546 100644 --- a/src/cpm.c +++ b/src/cpm.c @@ -22,6 +22,7 @@ #include #include #include +#include #define PI 3.141592654 @@ -713,8 +714,8 @@ void *cp_service(void *fc){ if(mk_cpm(facilities, bpr->data.buf, (uint32_t *) &bpr->data.size, fmi->data.buf, (uint32_t *) &fmi->data.size, history_list, valid_array, history_timestamp) == 1) continue; - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, true, 14, bpr->data.buf, bpr->data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, true, 14, bpr->data.buf, bpr->data.size); } uint64_t id = rand() + 1; @@ -744,6 +745,23 @@ void *cp_service(void *fc){ /*Reset Timer for dissemination control */ dissemination_reset_timer(facilities->dissemination, &facilities->epv,1); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + }else{ is_radar_connected = radar_connection(RADAR_PORT,facilities); } diff --git a/src/facilities.c b/src/facilities.c index 01aac21..2206bec 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -38,6 +38,7 @@ #include #include #include +#include static int transport_indication(facilities_t *facilities, void* responder, void* security_socket, uint8_t *msg, uint32_t msg_len) { int rv = 0, code = 0; @@ -141,8 +142,8 @@ static int transport_indication(facilities_t *facilities, void* responder, void* goto cleanup; } - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, false, its_msg_type, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, false, its_msg_type, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); } // Get permisisons @@ -334,8 +335,23 @@ static int transport_indication(facilities_t *facilities, void* responder, void* buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer+1, ITSS_SDU_MAX_LEN-1); - it2s_tender_queue_add(facilities->tx_queue, buffer, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); - pthread_cond_signal(&facilities->tx_queue->trigger); + it2s_tender_queue_send(facilities->tx_queue, buffer, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); + } + + // Logging + if (facilities->logging.recorder) { + int e = it2s_tender_management_record_packet_sdu( + buf, + buf_len, + tpi->choice.btp.data.buf, + tpi->choice.btp.data.size, + tpi->choice.btp.id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + false); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buf, e, ITSS_MANAGEMENT, tpi->choice.btp.id, "MReq.packet.set"); + } } cleanup: @@ -634,6 +650,10 @@ void* tx(void* fc) { zmq_setsockopt(transport_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(transport_socket, facilities->zmq.transport_address); + void* management_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); + zmq_setsockopt(management_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); + zmq_connect(management_socket, facilities->zmq.management_address); + it2s_tender_queue_t* stream = it2s_tender_queue_new(); while (!facilities->exit) { @@ -675,6 +695,14 @@ void* tx(void* fc) { stream->info_msg[i], stream->id[i], stream->packet_len[i]); } break; + case ITSS_MANAGEMENT: + zmq_send(management_socket, stream->packet[i], stream->packet_len[i], 0); + rv = zmq_recv(management_socket, &code, 1, 0); + if (rv == -1) { + syslog_err("[facilities]-> %s ->[management] | id:%ld size:%dB ", + stream->info_msg[i], stream->id[i], stream->packet_len[i]); + } + break; } } } @@ -731,10 +759,11 @@ int main() { if (facilities.coordination.active) pthread_create(&facilities.pc_service, NULL, pc_service, (void*) &facilities); - if (facilities.logging) { - if (it2s_tender_db_init(&facilities.db, "facilities", facilities.id.station_id)) { + if (facilities.logging.dbms) { + if (it2s_tender_db_init(facilities.logging.dbms, "facilities", facilities.id.station_id)) { syslog_err("[facilities] failed to initialize the database -> turning off logging"); - facilities.logging = false; + free(facilities.logging.dbms); + facilities.logging.dbms = NULL; } } diff --git a/src/facilities.h b/src/facilities.h index 1769b21..f0460ad 100644 --- a/src/facilities.h +++ b/src/facilities.h @@ -72,8 +72,10 @@ typedef struct facilities { coordination_s coordination; // Logging - bool logging; - it2s_tender_database_s db; + struct { + bool recorder; + it2s_tender_database_s* dbms; + } logging; int station_type; bool use_security; diff --git a/src/pcm.c b/src/pcm.c index 6ba11cb..1eb6b8a 100644 --- a/src/pcm.c +++ b/src/pcm.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -162,6 +163,23 @@ static int pcm_check_handle_request(facilities_t* facilities, PCM_t* pcm, mc_nei it2s_tender_queue_send(facilities->tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + cleanup: ASN_STRUCT_FREE(asn_DEF_PCM, pcm_rep); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); @@ -334,6 +352,22 @@ static int pcm_check_intersection_detected(facilities_t* facilities, PCM_t* pcm, it2s_tender_queue_send(facilities->tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } cleanup: ASN_STRUCT_FREE(asn_DEF_PCM, pcm_req); @@ -583,6 +617,23 @@ void* pc_service(void* fc) { it2s_tender_queue_send(facilities->tx_queue, tr_oer, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + coordination->t_last_send_pcm = now; } pthread_mutex_unlock(&coordination->lock); diff --git a/src/requests.c b/src/requests.c index 32d3e57..26e5e01 100644 --- a/src/requests.c +++ b/src/requests.c @@ -14,7 +14,7 @@ #include #include - +#include int facilities_request_result_accepted(void* responder) { int rv = 0; @@ -311,6 +311,24 @@ int facilities_request_single_message(facilities_t* facilities, void* responder, it2s_tender_queue_send(facilities->tx_queue, tr_oer, enc.encoded+1, ITSS_TRANSPORT, id, "TR.packet.btp"); } + // Logging + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + frm->data.buf, + frm->data.size, + id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + } + } + cleanup: if (its_msg_def && !managed_msg) ASN_STRUCT_FREE(*its_msg_def, its_msg); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); diff --git a/src/saem.c b/src/saem.c index 6f1e0ce..88bb005 100644 --- a/src/saem.c +++ b/src/saem.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -321,8 +322,8 @@ void *sa_service(void *fc) { rv = mk_saem(facilities, bpr->data.buf, (uint32_t *) &bpr->data.size); if (!rv) { - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, true, messageID_saem, bpr->data.buf, bpr->data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, true, messageID_saem, bpr->data.buf, bpr->data.size); } bpr->id = rand() + 1; @@ -333,6 +334,22 @@ void *sa_service(void *fc) { continue; } else { it2s_tender_queue_send(facilities->tx_queue, tr_oer, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } } } mk_saem_n_sleep = 0; diff --git a/src/tpm.c b/src/tpm.c index 6112559..5ef44ff 100644 --- a/src/tpm.c +++ b/src/tpm.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -157,8 +158,8 @@ int tpm_pay(void* fc, uint8_t* neighbour) { bpr->gn.trafficClass = 2; bpr->gn.packetTransportType = PacketTransportType_shb; - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, true, 117, bpr->data.buf, bpr->data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, true, 117, bpr->data.buf, bpr->data.size); } // encode TR @@ -172,6 +173,23 @@ int tpm_pay(void* fc, uint8_t* neighbour) { it2s_tender_queue_send(facilities->tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + cleanup: ASN_STRUCT_FREE(asn_DEF_TPM, tpm); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); @@ -358,8 +376,8 @@ static void rsu_handle_recv(facilities_t* facilities, TPM_t* tpm_rx, uint8_t* ne memcpy(bpr->gn.securityNeighbour->buf, neighbour, 8); } - if (facilities->logging) { - it2s_tender_db_add(&facilities->db, &facilities->epv, true, 117, bpr->data.buf, bpr->data.size); + if (facilities->logging.dbms) { + it2s_tender_db_add(facilities->logging.dbms, &facilities->epv, true, 117, bpr->data.buf, bpr->data.size); } // encode TR @@ -372,6 +390,23 @@ static void rsu_handle_recv(facilities_t* facilities, TPM_t* tpm_rx, uint8_t* ne it2s_tender_queue_send(facilities->tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + if (facilities->logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = it2s_tender_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + it2s_tender_get_clock(&facilities->epv), + ITSS_FACILITIES, + true); + if (e != -1) { + it2s_tender_queue_send(facilities->tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + cleanup: ASN_STRUCT_FREE(asn_DEF_TPM, tpm); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr);