queue update

This commit is contained in:
emanuel 2024-05-09 16:02:15 +01:00
parent 5478d29a42
commit e85f8b024e
8 changed files with 51 additions and 54 deletions

View File

@ -199,6 +199,7 @@ static int mk_cam(uint8_t *cam_oer, uint16_t *cam_len) {
bvc_hf->speed.speedValue = epv.space.data.speed.value;
bvc_hf->speed.speedConfidence = epv.space.data.speed.confidence;
// Set heading
bvc_hf->heading.headingValue = epv.space.data.heading.value;
bvc_hf->heading.headingConfidence = epv.space.data.heading.confidence;
@ -822,9 +823,9 @@ void* ca_service() {
continue;
}
itss_queue_send(facilities.tx_queue, nr_oer, enc.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(nr_oer, enc.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp"));
itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message"));
lightship_reset_timer();
@ -849,7 +850,7 @@ void* ca_service() {
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
}

View File

@ -782,10 +782,10 @@ void *cp_service(){
}
/* Create thread to send packet to the Networking Layer (=3) */
itss_queue_send(facilities.tx_queue, tr_oer, enc_tdr.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc_tdr.encoded+1, ITSS_NETWORKING, id, "NR.packet.btp"));
/* Create thread to send packet to the Applications Layer (=5) */
itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded+1, ITSS_APPLICATIONS, id, "FI.message"));
/*Reset Timer for dissemination control */
dissemination_reset_timer(1);
@ -810,7 +810,7 @@ void *cp_service(){
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"));
}
}

View File

@ -193,9 +193,9 @@ int evrsrm_recv(EI1_EV_RSR_t *evrsr_request) {
rv = 1;
goto cleanup;
}
itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"));
itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"));
cleanup:
log_debug("[ev] evrsrm_recv done with rv %d", rv);
@ -435,9 +435,9 @@ void *evcsn_service() {
continue;
}
itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"));
itss_queue_send(facilities.tx_queue, fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(fi_oer, enc_fdi.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"));
// Logging
if (facilities.logging.dbms) {
@ -460,7 +460,7 @@ void *evcsn_service() {
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
}

View File

@ -301,50 +301,46 @@ void *tx() {
while (!queue->len && !facilities.exit) {
pthread_cond_wait(&queue->trigger, &queue->lock);
}
for (int i = 0; i < queue->len; ++i) {
memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]);
stream->packet_len[i] = queue->packet_len[i];
stream->destination[i] = queue->destination[i];
strcpy(stream->info_msg[i], queue->info_msg[i]);
stream->id[i] = queue->id[i];
}
stream->len = queue->len;
queue->len = 0;
itss_queue_packet_t* qp;
while ((qp = itss_queue_pop(queue))) {
itss_queue_add(stream, qp);
};
pthread_mutex_unlock(&queue->lock);
for (int i = 0; i < stream->len; ++i) {
switch (stream->destination[i]) {
while ((qp = itss_queue_pop(stream))) {
switch (qp->destination) {
case ITSS_NETWORKING:
log_debug("-> %s ->[networking] | id:%08x size:%dB",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
itss_0send(networking_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&networking_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
qp->info_msg, (uint32_t)qp->id, qp->data_len);
itss_0send(networking_socket, qp->data, qp->data_len);
rv = itss_0recv_rt(&networking_socket, &code, 1, qp->data, qp->data_len, 1000);
if (rv == -1) {
log_error("-> %s ->[networking] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
qp->info_msg, (uint32_t)qp->id, qp->data_len);
}
break;
case ITSS_APPLICATIONS:
log_debug("-> %s ->[applications] | id:%08x size:%dB",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
itss_0send(applications_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&applications_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
qp->info_msg, (uint32_t)qp->id, qp->data_len);
itss_0send(applications_socket, qp->data, qp->data_len);
rv = itss_0recv_rt(&applications_socket, &code, 1, qp->data, qp->data_len, 1000);
if (rv == -1) {
log_error("-> %s ->[applications] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
qp->info_msg, (uint32_t)qp->id, qp->data_len);
}
break;
case ITSS_MANAGEMENT:
itss_0send(management_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&management_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
itss_0send(management_socket, qp->data, qp->data_len);
rv = itss_0recv_rt(&management_socket, &code, 1, qp->data, qp->data_len, 1000);
if (rv == -1) {
log_error("-> %s ->[management] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
qp->info_msg, (uint32_t)qp->id, qp->data_len);
}
break;
}
}
itss_queue_packet_destroy(qp);
}
itss_0close(networking_socket);

View File

@ -389,7 +389,7 @@ int facilities_request_single_message(void *responder, EIS_FacilitiesMessageRequ
goto cleanup;
}
itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded + 1, ITSS_NETWORKING, id, "NR.packet.btp"));
}
// Logging
@ -406,7 +406,7 @@ int facilities_request_single_message(void *responder, EIS_FacilitiesMessageRequ
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
@ -919,7 +919,7 @@ static int networking_packet_indication_btp(EIS_NetworkingPacketIndication_t* np
ITSS_FACILITIES,
false);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, npi->id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, e, ITSS_MANAGEMENT, npi->id, "MReq.packet.set"));
}
}
@ -1053,7 +1053,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf, enc.encoded + 1, ITSS_NETWORKING, npr->id, "NR.packet.tcp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded + 1, ITSS_NETWORKING, npr->id, "NR.packet.tcp"));
} else {
if (facilities.tolling.enabled && srep->data->choice.tlsRecv.data.size) {
asn_dec_rval_t dec = uper_decode_complete(NULL, &asn_DEF_EI1_TPM, (void **)&its_msg, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size);
@ -1082,7 +1082,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np
buffer[0] = 4; // Facilities
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_EIS_FacilitiesIndication, NULL, fi, buffer + 1, ITSS_SDU_MAX_LEN - 1);
itss_queue_send(facilities.tx_queue, buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message"));
// Logging
if (facilities.logging.dbms) {
@ -1102,7 +1102,7 @@ static int networking_packet_indication_tcp(EIS_NetworkingPacketIndication_t* np
ITSS_FACILITIES,
false);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
}

View File

@ -347,7 +347,7 @@ void *sa_service() {
log_error("encoding TR for SAEM failed");
continue;
} else {
itss_queue_send(facilities.tx_queue, tr_oer, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(tr_oer, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp"));
// Logging
if (facilities.logging.dbms) {
pthread_mutex_lock(&facilities.id.lock);
@ -368,7 +368,7 @@ void *sa_service() {
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"));
}
}
}

View File

@ -384,9 +384,9 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id,
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id,
(tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI)
? "NR.packet.btp" : "NR.packet.tcp");
? "NR.packet.btp" : "NR.packet.tcp"));
// Retransmission
uint64_t now = itss_ts_get(TIME_MICROSECONDS);
@ -418,7 +418,7 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui
rv = 1;
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.request)");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.request)"));
if (facilities.logging.recorder) {
uint16_t buffer_len = 2048;
@ -433,7 +433,7 @@ int tpm_pay(tolling_info_t* info, void** security_socket, uint8_t* neighbour, ui
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
@ -1009,8 +1009,8 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id,
(tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI) ? "NR.packet.btp" : "NR.packet.tcp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id,
(tolling->protocol.p == TOLLING_PROTOCOL_GN_SPKI || tolling->protocol.p == TOLLING_PROTOCOL_GN_DPKI) ? "NR.packet.btp" : "NR.packet.tcp"));
// Logging
if (facilities.logging.dbms) {
@ -1035,7 +1035,7 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.reply)");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message (TPM.reply)"));
if (facilities.logging.recorder) {
uint16_t buffer_len = 2048;
uint8_t buffer[buffer_len];
@ -1049,7 +1049,7 @@ static int rsu_handle_recv(EI1_TPM_t* tpm_rx, void** security_socket, uint8_t* n
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, id, "MReq.packet.set"));
}
}
@ -1270,7 +1270,7 @@ static int veh_handle_recv(tolling_t* tolling, EI1_TPM_t* tpm_rx, void** securit
buf1[0] = 4;
enc = oer_encode_to_buffer(&asn_DEF_EIS_NetworkingRequest, NULL, tr, buf1+1, buf_len-1);
itss_queue_send(tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose");
itss_queue_send(tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose"));
free(tlsc);
for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) {
@ -1523,7 +1523,7 @@ void tolling_tlsc_mgmt(itss_queue_t* tx_queue, void** security_socket) {
buf1[0] = 4;
enc = oer_encode_to_buffer(&asn_DEF_EIS_NetworkingRequest, NULL, tr, buf1+1, buf_len-1);
itss_queue_send(tx_queue, buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose");
itss_queue_send(tx_queue, itss_queue_packet_new(buf1, enc.encoded+1, ITSS_NETWORKING, id, "NR.data.tcp.connClose"));
free(tlsc);
for (int j = i; j < tolling->protocol.c.tls.n_tlsc-1; ++j) {

View File

@ -128,7 +128,7 @@ static void tx_vcm(EI1_VCM_t* vcm) {
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded+1, ITSS_NETWORKING, npr->id, "NR.packet.btp"));
fi = calloc(1, sizeof(EIS_FacilitiesIndication_t));
fi->present = EIS_FacilitiesIndication_PR_message;
@ -144,7 +144,7 @@ static void tx_vcm(EI1_VCM_t* vcm) {
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_APPLICATIONS, npr->id, "FI.message");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buf, enc.encoded+1, ITSS_APPLICATIONS, npr->id, "FI.message"));
facilities.coordination.t_last_send_vcm = itss_time_get();
@ -161,7 +161,7 @@ static void tx_vcm(EI1_VCM_t* vcm) {
ITSS_FACILITIES,
true);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set");
itss_queue_send(facilities.tx_queue, itss_queue_packet_new(buffer, e, ITSS_MANAGEMENT, npr->id, "MReq.packet.set"));
}
}