diff --git a/src/cam.c b/src/cam.c index c0ea3dd..98a0b0f 100644 --- a/src/cam.c +++ b/src/cam.c @@ -704,7 +704,6 @@ static int check_pz(lightship_t *lightship, it2s_tender_epv_t* epv) { void *ca_service(void *fc) { int rv = 0; - uint8_t code = 0; facilities_t *facilities = (facilities_t*) fc; TransportRequest_t* tr = calloc(1, sizeof(TransportRequest_t)); diff --git a/src/facilities.c b/src/facilities.c index c1d65d7..bc405fc 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -268,13 +268,15 @@ static int transport_indication(facilities_t *facilities, void* responder, void* sreq->choice.tlsRecv.data.buf = malloc(tpi->choice.tcp.data.size); memcpy(sreq->choice.tlsRecv.data.buf, tpi->choice.tcp.data.buf, tpi->choice.tcp.data.size); - tlsc_t* tlsc = tolling_tlsc_get(&facilities->tolling, tpi->choice.tcp.sourceAddress->buf, 7011); + pthread_mutex_lock(&facilities->tolling.lock); + tlsc_t* tlsc = tolling_tlsc_get(&facilities->tolling, &facilities->epv, tpi->choice.tcp.sourceAddress->buf, 7011); if (tlsc) { id = tlsc->id; } else { - tlsc = tolling_tlsc_new(&facilities->tolling, tpi->choice.tcp.sourceAddress->buf, 7011); + tlsc = tolling_tlsc_new(&facilities->tolling, &facilities->epv, tpi->choice.tcp.sourceAddress->buf, 7011); id = tlsc->id; } + pthread_mutex_unlock(&facilities->tolling.lock); sreq->choice.tlsSend.connId = id; uint8_t b_sdu[2048]; diff --git a/src/saem.c b/src/saem.c index 9472f1b..66fa7df 100644 --- a/src/saem.c +++ b/src/saem.c @@ -371,10 +371,6 @@ void *sa_service(void *fc) { pthread_mutex_lock(&bulletin->lock); for (int a = 0; a < bulletin->to_consume_len; ++a) { - /* do some checks, e.g. - * is advertised service close by? - * do we want to enjoy the advertised service? */ - // Tolling if (facilities->tolling.enabled && bulletin->to_consume[a]->its_aid == 0 && @@ -404,6 +400,11 @@ void *sa_service(void *fc) { } pthread_mutex_unlock(&bulletin->lock); + // Tolling management + if (facilities->tolling.protocol.p == TOLLING_PROTOCOL_TLS) { + tolling_tlsc_mgmt(&facilities->tolling, &facilities->epv, facilities->tx_queue, security_socket); + } + usleep(sleep_ms*1000); } diff --git a/src/tpm.c b/src/tpm.c index e66e4e8..d5d8f10 100644 --- a/src/tpm.c +++ b/src/tpm.c @@ -62,6 +62,8 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig SecurityReply_t* srep = NULL; FacilitiesIndication_t* fi = NULL; + pthread_mutex_lock(&tolling->lock); + tolling->station.obu.active = true; tolling->station.obu.nonce = rand() + 1; @@ -264,11 +266,11 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig sreq->choice.tlsSend.data.size = tpm_uper_len; memcpy(sreq->choice.tlsSend.data.buf, tpm_uper, tpm_uper_len); - tlsc_t* tlsc = tolling_tlsc_get(tolling, dst_addr, 7011); + tlsc_t* tlsc = tolling_tlsc_get(tolling, &facilities->epv, dst_addr, 7011); if (tlsc) { id = tlsc->id; } else { - tlsc = tolling_tlsc_new(tolling, dst_addr, 7011); + tlsc = tolling_tlsc_new(tolling, &facilities->epv, dst_addr, 7011); id = tlsc->id; } sreq->choice.tlsSend.connId = id; @@ -378,6 +380,7 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig } cleanup: + pthread_mutex_unlock(&tolling->lock); ASN_STRUCT_FREE(asn_DEF_TPM, tpm); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); ASN_STRUCT_FREE(asn_DEF_SecurityRequest, sreq); @@ -706,11 +709,11 @@ static void rsu_handle_recv(facilities_t* facilities, TPM_t* tpm_rx, void* secur sreq->choice.tlsSend.data.size = tpm_uper_len; memcpy(sreq->choice.tlsSend.data.buf, tpm_uper, tpm_uper_len); - tlsc_t* tlsc = tolling_tlsc_get(tolling, src_addr, 7011); + tlsc_t* tlsc = tolling_tlsc_get(tolling, &facilities->epv, src_addr, 7011); if (tlsc) { id = tlsc->id; } else { - tlsc = tolling_tlsc_new(tolling, src_addr, 7011); + tlsc = tolling_tlsc_new(tolling, &facilities->epv, src_addr, 7011); id = tlsc->id; } sreq->choice.tlsSend.connId = id; @@ -822,7 +825,7 @@ cleanup: ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); } -static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_socket, it2s_tender_queue_t* tx_queue, uint8_t* neighbour, uint8_t* src_addr) { +static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_socket, it2s_tender_queue_t* tx_queue, it2s_tender_epv_t* epv, uint8_t* neighbour, uint8_t* src_addr) { if (!tpm_rx->tpm->tollingType) { syslog_err("[facilities] [tolling] received TPM does not have a type"); @@ -995,7 +998,7 @@ static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_so // Close TCP & TLS conn if (tolling->protocol.p == TOLLING_PROTOCOL_TLS) { - tlsc_t* tlsc = tolling_tlsc_get(tolling, src_addr, 7011); + tlsc_t* tlsc = tolling_tlsc_get(tolling, epv, src_addr, 7011); if (tlsc) { sreq = calloc(1, sizeof(SecurityRequest_t)); sreq->present = SecurityRequest_PR_tlsClose; @@ -1007,25 +1010,25 @@ static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_so zmq_send(security_socket, buf, enc.encoded+1, 0); int32_t rl = zmq_recv(security_socket, buf, buf_len, 0); syslog_debug("[facilities]<-[security] SecurityReply.tlsClose (%dB)", rl); + + uint64_t id = rand() + 1; + tr = calloc(1, sizeof(TransportRequest_t)); + tr->present = TransportRequest_PR_data; + tr->choice.data.present = TransportDataRequest_PR_tcp; + tr->choice.data.choice.tcp.present = TCPDataRequest_PR_connCloseReq; + tr->choice.data.choice.tcp.choice.connCloseReq.closeByPeer = false; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationPort = 7011; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.size = 16; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf = malloc(16); + memcpy(tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf, src_addr, 16); + tr->choice.data.choice.tcp.choice.connCloseReq.id = id; + + buf[0] = 4; + enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, sreq, buf+1, buf_len-1); + it2s_tender_queue_send(tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, id, "TR.data.tcp.connClose"); } else { syslog_debug("[facilities] [tolling] unable to close TLS connection, not found"); } - - uint64_t id = rand() + 1; - tr = calloc(1, sizeof(TransportRequest_t)); - tr->present = TransportRequest_PR_data; - tr->choice.data.present = TransportDataRequest_PR_tcp; - tr->choice.data.choice.tcp.present = TCPDataRequest_PR_connCloseReq; - tr->choice.data.choice.tcp.choice.connCloseReq.closeByPeer = false; - tr->choice.data.choice.tcp.choice.connCloseReq.destinationPort = 7011; - tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.size = 16; - tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf = malloc(16); - memcpy(tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf, src_addr, 16); - tr->choice.data.choice.tcp.choice.connCloseReq.id = id; - - buf[0] = 4; - enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, sreq, buf+1, buf_len-1); - it2s_tender_queue_send(tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, id, "TR.data.tcp.connClose"); } @@ -1054,6 +1057,8 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour, return 1; } + pthread_mutex_lock(&tolling->lock); + switch (tpm_rx->tpm->tollingType->present) { // Entry case TollingType_PR_entry: @@ -1061,19 +1066,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour, case TollingSingle_PR_request: if (facilities->station_type != 15) { syslog_debug("[facilities] [tolling] received TPM.entry.request, ignoring"); - return 0; + goto cleanup; } rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr); break; case TollingSingle_PR_reply: if (facilities->station_type == 15) { syslog_debug("[facilities] [tolling] received TPM.entry.reply, ignoring"); - return 0; + goto cleanup; } pthread_mutex_lock(&facilities->epv.time.lock); syslog_info("[facilities] [tolling] entry.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz); pthread_mutex_unlock(&facilities->epv.time.lock); - veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr); + veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr); break; default: break; @@ -1088,19 +1093,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour, case TollingSingle_PR_request: if (facilities->station_type != 15) { syslog_debug("[facilities] [tolling] received TPM.exit.request, ignoring"); - return 0; + goto cleanup; } rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr); break; case TollingSingle_PR_reply: if (facilities->station_type == 15) { syslog_debug("[facilities] [tolling] received TPM.exit.reply, ignoring"); - return 0; + goto cleanup; } pthread_mutex_lock(&facilities->epv.time.lock); syslog_info("[facilities] [tolling] exit.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz); pthread_mutex_unlock(&facilities->epv.time.lock); - veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr); + veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr); break; default: break; @@ -1112,19 +1117,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour, case TollingSingle_PR_request: if (facilities->station_type != 15) { syslog_debug("[facilities] [tolling] received TPM.single.request, ignoring"); - return 0; + goto cleanup; } rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr); break; case TollingSingle_PR_reply: if (facilities->station_type == 15) { syslog_debug("[facilities] [tolling] received TPM.single.reply, ignoring"); - return 0; + goto cleanup; } pthread_mutex_lock(&facilities->epv.time.lock); syslog_info("[facilities] [tolling] single.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz); pthread_mutex_unlock(&facilities->epv.time.lock); - veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr); + veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr); break; default: break; @@ -1135,10 +1140,15 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour, break; } +cleanup: + pthread_mutex_unlock(&tolling->lock); + return 0; } int tolling_init(tolling_t* tolling, void* zmq_ctx, char* security_address, uint8_t station_type) { + pthread_mutex_init(&tolling->lock, NULL); + switch (station_type) { case 15: break; @@ -1172,7 +1182,7 @@ void tolling_info_free(tolling_info_t* ti) { free(ti); } -tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) { +tlsc_t* tolling_tlsc_new(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port) { if (tolling->protocol.c.tls.n_tlsc >= TOLLING_MAX_CONNS - 1) { return NULL; } @@ -1181,18 +1191,72 @@ tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) { memcpy(tlsc->ipv6, ipv6, 16); tlsc->port = port; tlsc->id = rand(); + tlsc->ts = it2s_tender_get_clock(epv); ++tolling->protocol.c.tls.n_tlsc; return tlsc; } -tlsc_t* tolling_tlsc_get(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) { +tlsc_t* tolling_tlsc_get(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port) { tlsc_t* tlsc = NULL; for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) { if (!memcmp(tolling->protocol.c.tls.tls_conns[i]->ipv6, ipv6, 16) && tolling->protocol.c.tls.tls_conns[i]->port == port) { tlsc = tolling->protocol.c.tls.tls_conns[i]; + tlsc->ts = it2s_tender_get_clock(epv); break; } } return tlsc; } + + +void tolling_tlsc_mgmt(tolling_t* tolling, it2s_tender_epv_t* epv, it2s_tender_queue_t* tx_queue, void* security_socket) { + pthread_mutex_lock(&tolling->lock); + uint64_t now = it2s_tender_get_clock(epv); + for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) { + tlsc_t* tlsc = tolling->protocol.c.tls.tls_conns[i]; + if (tlsc->ts + TOLLING_CONN_TIMEOUT_MS > now) { + SecurityRequest_t* sreq = NULL; + TransportRequest_t* tr = NULL; + const uint32_t buf_len = 1024; + uint8_t buf[buf_len]; + syslog_debug("[facilities] [tolling] closing TCP/TLS connection"); + + sreq = calloc(1, sizeof(SecurityRequest_t)); + sreq->present = SecurityRequest_PR_tlsClose; + sreq->choice.tlsClose.connId = tlsc->id; + + buf[0] = 4; + asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sreq, buf+1, buf_len-1); + syslog_debug("[facilities]->[security] SecurityRequest.tlsClose (%ldB)", enc.encoded+1); + zmq_send(security_socket, buf, enc.encoded+1, 0); + int32_t rl = zmq_recv(security_socket, buf, buf_len, 0); + syslog_debug("[facilities]<-[security] SecurityReply.tlsClose (%dB)", rl); + + uint64_t id = rand() + 1; + tr = calloc(1, sizeof(TransportRequest_t)); + tr->present = TransportRequest_PR_data; + tr->choice.data.present = TransportDataRequest_PR_tcp; + tr->choice.data.choice.tcp.present = TCPDataRequest_PR_connCloseReq; + tr->choice.data.choice.tcp.choice.connCloseReq.closeByPeer = false; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationPort = 7011; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.size = 16; + tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf = malloc(16); + memcpy(tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf, tlsc->ipv6, 16); + tr->choice.data.choice.tcp.choice.connCloseReq.id = id; + + buf[0] = 4; + enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, sreq, buf+1, buf_len-1); + it2s_tender_queue_send(tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, id, "TR.data.tcp.connClose"); + + free(tlsc); + for (int j = i; j < tolling->protocol.c.tls.n_tlsc-1; ++j) { + tolling->protocol.c.tls.tls_conns[j] = tolling->protocol.c.tls.tls_conns[j+1]; + } + --tolling->protocol.c.tls.n_tlsc; + --i; + } + } + + pthread_mutex_unlock(&tolling->lock); +} diff --git a/src/tpm.h b/src/tpm.h index 4c16ca4..23b1849 100644 --- a/src/tpm.h +++ b/src/tpm.h @@ -3,11 +3,13 @@ #include #include #include +#include #include #define TOLLING_INFOS_MAX_LENGTH 16 #define TOLLING_PAYMENT_MIN_PERIOD_MS 60000 #define TOLLING_MAX_CONNS 24 +#define TOLLING_CONN_TIMEOUT_MS 10000 typedef enum TOLLING_PROTOCOL { TOLLING_PROTOCOL_SIMPLE, @@ -30,11 +32,12 @@ typedef struct tlsc { uint8_t ipv6[16]; uint16_t port; uint64_t id; - uint64_t t_used; + uint64_t ts; } tlsc_t; typedef struct tolling { bool enabled; + pthread_mutex_t lock; struct { TOLLING_PROTOCOL_e p; @@ -93,5 +96,6 @@ int tpm_is_inside_zone(void* fc, tolling_info_t* ti); tolling_info_t* tolling_info_new(it2s_tender_epv_t* epv, TollingPaymentInfo_t* tpi); void tolling_info_free(tolling_info_t* ti); -tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port); -tlsc_t* tolling_tlsc_get(tolling_t* tolling, uint8_t ipv6[16], uint16_t port); +tlsc_t* tolling_tlsc_new(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port); +tlsc_t* tolling_tlsc_get(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port); +void tolling_tlsc_mgmt(tolling_t* tolling, it2s_tender_epv_t* epv, it2s_tender_queue_t* tx_queue, void* security_socket);