From e85f8b024e5577c89ce76854a43ca79cc93843b7 Mon Sep 17 00:00:00 2001 From: emanuel Date: Thu, 9 May 2024 16:02:15 +0100 Subject: [PATCH] queue update --- src/cam.c | 7 ++++--- src/cpm.c | 6 +++--- src/evm.c | 10 +++++----- src/facilities.c | 40 ++++++++++++++++++---------------------- src/requests.c | 12 ++++++------ src/saem.c | 4 ++-- src/tpm.c | 20 ++++++++++---------- src/vcm.c | 6 +++--- 8 files changed, 51 insertions(+), 54 deletions(-) diff --git a/src/cam.c b/src/cam.c index 9d3c915..9feeee3 100644 --- a/src/cam.c +++ b/src/cam.c @@ -199,6 +199,7 @@ static int mk_cam(uint8_t *cam_oer, uint16_t *cam_len) { bvc_hf->speed.speedValue = epv.space.data.speed.value; bvc_hf->speed.speedConfidence = epv.space.data.speed.confidence; + // Set heading bvc_hf->heading.headingValue = epv.space.data.heading.value; bvc_hf->heading.headingConfidence = epv.space.data.heading.confidence; @@ -822,9 +823,9 @@ void* ca_service() { continue; } - itss_queue_send(facilities.tx_queue, nr_oer, enc.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(nr_oer, enc.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp")); - itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message")); lightship_reset_timer(); @@ -849,7 +850,7 @@ void* ca_service() { ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } } diff --git a/src/cpm.c b/src/cpm.c index 81b2544..901ffdd 100644 --- a/src/cpm.c +++ b/src/cpm.c @@ -782,10 +782,10 @@ void *cp_service(){ } /* Create thread to send packet to the Networking Layer (=3) */ - itss_queue_send(facilities.tx_queue, tr_oer, enc_tdr.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc_tdr.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp")); /* Create thread to send packet to the Applications Layer (=5) */ - itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message")); /*Reset Timer for dissemination control */ dissemination_reset_timer(1); @@ -810,7 +810,7 @@ void *cp_service(){ ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set")); } } diff --git a/src/evm.c b/src/evm.c index 73125b7..fc825c1 100644 --- a/src/evm.c +++ b/src/evm.c @@ -193,9 +193,9 @@ int evrsrm_recv(EI1_EV_RSR_t *evrsr_request) { rv = 1; goto cleanup; } - itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp")); - itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message")); cleanup: log_debug("[ev] evrsrm_recv done with rv %d", rv); @@ -435,9 +435,9 @@ void *evcsn_service() { continue; } - itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp")); - itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message")); // Logging if (facilities.logging.dbms) { @@ -460,7 +460,7 @@ void *evcsn_service() { ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } } diff --git a/src/facilities.c b/src/facilities.c index df3f743..bb201cf 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -301,50 +301,46 @@ void *tx() { while (!queue->len && !facilities.exit) { pthread_cond_wait(&queue->trigger, &queue->lock); } - for (int i = 0; i < queue->len; ++i) { - memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); - stream->packet_len[i] = queue->packet_len[i]; - stream->destination[i] = queue->destination[i]; - strcpy(stream->info_msg[i], queue->info_msg[i]); - stream->id[i] = queue->id[i]; - } - stream->len = queue->len; - queue->len = 0; + itss_queue_packet_t* qp; + while ((qp = itss_queue_pop(queue))) { + itss_queue_add(stream, qp); + }; pthread_mutex_unlock(&queue->lock); - for (int i = 0; i < stream->len; ++i) { - switch (stream->destination[i]) { + while ((qp = itss_queue_pop(stream))) { + switch (qp->destination) { case ITSS_NETWORKING: log_debug("-> %s ->[networking] | id:%08x size:%dB", - stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); - itss_0send(networking_socket, stream->packet[i], stream->packet_len[i]); - rv = itss_0recv_rt(&networking_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); + qp->info_msg, (uint32_t)qp->id, qp->data_len); + itss_0send(networking_socket, qp->data, qp->data_len); + rv = itss_0recv_rt(&networking_socket, &code, 1, qp->data, qp->data_len, 1000); if (rv == -1) { log_error("-> %s ->[networking] | id:%08x size:%dB ", - stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); + qp->info_msg, (uint32_t)qp->id, qp->data_len); } break; case ITSS_APPLICATIONS: log_debug("-> %s ->[applications] | id:%08x size:%dB", - stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); - itss_0send(applications_socket, stream->packet[i], stream->packet_len[i]); - rv = itss_0recv_rt(&applications_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); + qp->info_msg, (uint32_t)qp->id, qp->data_len); + itss_0send(applications_socket, qp->data, qp->data_len); + rv = itss_0recv_rt(&applications_socket, &code, 1, qp->data, qp->data_len, 1000); if (rv == -1) { log_error("-> %s ->[applications] | id:%08x size:%dB ", - stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); + qp->info_msg, (uint32_t)qp->id, qp->data_len); } break; case ITSS_MANAGEMENT: - itss_0send(management_socket, stream->packet[i], stream->packet_len[i]); - rv = itss_0recv_rt(&management_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); + itss_0send(management_socket, qp->data, qp->data_len); + rv = itss_0recv_rt(&management_socket, &code, 1, qp->data, qp->data_len, 1000); if (rv == -1) { log_error("-> %s ->[management] | id:%08x size:%dB ", - stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); + qp->info_msg, (uint32_t)qp->id, qp->data_len); } break; } } + itss_queue_packet_destroy(qp); } itss_0close(networking_socket); diff --git a/src/requests.c b/src/requests.c index 264045d..7046e5a 100644 --- a/src/requests.c +++ b/src/requests.c @@ -389,7 +389,7 @@ int facilities_request_single_message(void *responder, EIS_FacilitiesMessageRequ goto cleanup; } - itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp")); } // Logging @@ -406,7 +406,7 @@ int facilities_request_single_message(void *responder, EIS_FacilitiesMessageRequ ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } @@ -919,7 +919,7 @@ static int networking_packet_indication_btp(EIS_NetworkingPacketIndication_t* np ITSS_FACILITIES, false); if (e != -1) { - itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, npi->id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, e, ITSS_MANAGEMENT, npi->id, "MReq.packet.set")); } } @@ -1053,7 +1053,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np goto cleanup; } - itss_queue_send(facilities.tx_queue, buf, enc.encoded + 1, ITSS_NETWORKING, npr->id, "NR.packet.tcp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded + 1, ITSS_NETWORKING, npr->id, "NR.packet.tcp")); } else { if (facilities.tolling.enabled && srep->data->choice.tlsRecv.data.size) { asn_dec_rval_t dec = uper_decode_complete(NULL, &asn_DEF_EI1_TPM, (void **)&its_msg, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size); @@ -1082,7 +1082,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_EIS_FacilitiesIndication, NULL, fi, buffer + 1, ITSS_SDU_MAX_LEN - 1); - itss_queue_send(facilities.tx_queue, buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message")); // Logging if (facilities.logging.dbms) { @@ -1102,7 +1102,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np ITSS_FACILITIES, false); if (e != -1) { - itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } } diff --git a/src/saem.c b/src/saem.c index 647ca73..05c70a9 100644 --- a/src/saem.c +++ b/src/saem.c @@ -347,7 +347,7 @@ void *sa_service() { log_error("encoding TR for SAEM failed"); continue; } else { - itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp")); // Logging if (facilities.logging.dbms) { pthread_mutex_lock(&facilities.id.lock); @@ -368,7 +368,7 @@ void *sa_service() { ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set")); } } } diff --git a/src/tpm.c b/src/tpm.c index b49642f..a4694c8 100644 --- a/src/tpm.c +++ b/src/tpm.c @@ -384,9 +384,9 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui goto cleanup; } - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, (tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI) - ? "NR.packet.btp" : "NR.packet.tcp"); + ? "NR.packet.btp" : "NR.packet.tcp")); // Retransmission uint64_t now = itss_ts_get(TIME_MICROSECONDS); @@ -418,7 +418,7 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui rv = 1; goto cleanup; } - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.request)"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.request)")); if (facilities.logging.recorder) { uint16_t buffer_len = 2048; @@ -433,7 +433,7 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } @@ -1009,8 +1009,8 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n goto cleanup; } - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, - (tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI) ? "NR.packet.btp" : "NR.packet.tcp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, + (tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI) ? "NR.packet.btp" : "NR.packet.tcp")); // Logging if (facilities.logging.dbms) { @@ -1035,7 +1035,7 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n goto cleanup; } - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.reply)"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.reply)")); if (facilities.logging.recorder) { uint16_t buffer_len = 2048; uint8_t buffer[buffer_len]; @@ -1049,7 +1049,7 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set")); } } @@ -1270,7 +1270,7 @@ static int veh_handle_recv(tolling_t* tolling, EI1_TPM_t* tpm_rx, void** securit buf1[0] = 4; enc = oer_encode_to_buffer(&asn_DEF_EIS_NetworkingRequest, NULL, tr, buf1+1, buf_len-1); - itss_queue_send(tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose"); + itss_queue_send(tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose")); free(tlsc); for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) { @@ -1523,7 +1523,7 @@ void tolling_tlsc_mgmt(itss_queue_t* tx_queue, void** security_socket) { buf1[0] = 4; enc = oer_encode_to_buffer(&asn_DEF_EIS_NetworkingRequest, NULL, tr, buf1+1, buf_len-1); - itss_queue_send(tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose"); + itss_queue_send(tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose")); free(tlsc); for (int j = i; j < tolling->protocol.c.tls.n_tlsc-1; ++j) { diff --git a/src/vcm.c b/src/vcm.c index 780b7c1..a413e34 100644 --- a/src/vcm.c +++ b/src/vcm.c @@ -128,7 +128,7 @@ static void tx_vcm(EI1_VCM_t* vcm) { goto cleanup; } - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp")); fi = calloc(1, sizeof(EIS_FacilitiesIndication_t)); fi->present = EIS_FacilitiesIndication_PR_message; @@ -144,7 +144,7 @@ static void tx_vcm(EI1_VCM_t* vcm) { goto cleanup; } - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_APPLICATIONS, npr->id, "FI.message"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded+1, ITSS_APPLICATIONS, npr->id, "FI.message")); facilities.coordination.t_last_send_vcm = itss_time_get(); @@ -161,7 +161,7 @@ static void tx_vcm(EI1_VCM_t* vcm) { ITSS_FACILITIES, true); if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"); + itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set")); } }