tolling TCP&TLS conn close

This commit is contained in:
emanuel 2022-06-22 13:50:59 +01:00
parent dccfe25548
commit a392ac1d64
5 changed files with 113 additions and 43 deletions

View File

@ -704,7 +704,6 @@ static int check_pz(lightship_t *lightship, it2s_tender_epv_t* epv) {
void *ca_service(void *fc) {
int rv = 0;
uint8_t code = 0;
facilities_t *facilities = (facilities_t*) fc;
TransportRequest_t* tr = calloc(1, sizeof(TransportRequest_t));

View File

@ -268,13 +268,15 @@ static int transport_indication(facilities_t *facilities, void* responder, void*
sreq->choice.tlsRecv.data.buf = malloc(tpi->choice.tcp.data.size);
memcpy(sreq->choice.tlsRecv.data.buf, tpi->choice.tcp.data.buf, tpi->choice.tcp.data.size);
tlsc_t* tlsc = tolling_tlsc_get(&facilities->tolling, tpi->choice.tcp.sourceAddress->buf, 7011);
pthread_mutex_lock(&facilities->tolling.lock);
tlsc_t* tlsc = tolling_tlsc_get(&facilities->tolling, &facilities->epv, tpi->choice.tcp.sourceAddress->buf, 7011);
if (tlsc) {
id = tlsc->id;
} else {
tlsc = tolling_tlsc_new(&facilities->tolling, tpi->choice.tcp.sourceAddress->buf, 7011);
tlsc = tolling_tlsc_new(&facilities->tolling, &facilities->epv, tpi->choice.tcp.sourceAddress->buf, 7011);
id = tlsc->id;
}
pthread_mutex_unlock(&facilities->tolling.lock);
sreq->choice.tlsSend.connId = id;
uint8_t b_sdu[2048];

View File

@ -371,10 +371,6 @@ void *sa_service(void *fc) {
pthread_mutex_lock(&bulletin->lock);
for (int a = 0; a < bulletin->to_consume_len; ++a) {
/* do some checks, e.g.
* is advertised service close by?
* do we want to enjoy the advertised service? */
// Tolling
if (facilities->tolling.enabled &&
bulletin->to_consume[a]->its_aid == 0 &&
@ -404,6 +400,11 @@ void *sa_service(void *fc) {
}
pthread_mutex_unlock(&bulletin->lock);
// Tolling management
if (facilities->tolling.protocol.p == TOLLING_PROTOCOL_TLS) {
tolling_tlsc_mgmt(&facilities->tolling, &facilities->epv, facilities->tx_queue, security_socket);
}
usleep(sleep_ms*1000);
}

104
src/tpm.c
View File

@ -62,6 +62,8 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig
SecurityReply_t* srep = NULL;
FacilitiesIndication_t* fi = NULL;
pthread_mutex_lock(&tolling->lock);
tolling->station.obu.active = true;
tolling->station.obu.nonce = rand() + 1;
@ -264,11 +266,11 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig
sreq->choice.tlsSend.data.size = tpm_uper_len;
memcpy(sreq->choice.tlsSend.data.buf, tpm_uper, tpm_uper_len);
tlsc_t* tlsc = tolling_tlsc_get(tolling, dst_addr, 7011);
tlsc_t* tlsc = tolling_tlsc_get(tolling, &facilities->epv, dst_addr, 7011);
if (tlsc) {
id = tlsc->id;
} else {
tlsc = tolling_tlsc_new(tolling, dst_addr, 7011);
tlsc = tolling_tlsc_new(tolling, &facilities->epv, dst_addr, 7011);
id = tlsc->id;
}
sreq->choice.tlsSend.connId = id;
@ -378,6 +380,7 @@ int tpm_pay(void* fc, tolling_info_t* info, void* security_socket, uint8_t* neig
}
cleanup:
pthread_mutex_unlock(&tolling->lock);
ASN_STRUCT_FREE(asn_DEF_TPM, tpm);
ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr);
ASN_STRUCT_FREE(asn_DEF_SecurityRequest, sreq);
@ -706,11 +709,11 @@ static void rsu_handle_recv(facilities_t* facilities, TPM_t* tpm_rx, void* secur
sreq->choice.tlsSend.data.size = tpm_uper_len;
memcpy(sreq->choice.tlsSend.data.buf, tpm_uper, tpm_uper_len);
tlsc_t* tlsc = tolling_tlsc_get(tolling, src_addr, 7011);
tlsc_t* tlsc = tolling_tlsc_get(tolling, &facilities->epv, src_addr, 7011);
if (tlsc) {
id = tlsc->id;
} else {
tlsc = tolling_tlsc_new(tolling, src_addr, 7011);
tlsc = tolling_tlsc_new(tolling, &facilities->epv, src_addr, 7011);
id = tlsc->id;
}
sreq->choice.tlsSend.connId = id;
@ -822,7 +825,7 @@ cleanup:
ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi);
}
static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_socket, it2s_tender_queue_t* tx_queue, uint8_t* neighbour, uint8_t* src_addr) {
static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_socket, it2s_tender_queue_t* tx_queue, it2s_tender_epv_t* epv, uint8_t* neighbour, uint8_t* src_addr) {
if (!tpm_rx->tpm->tollingType) {
syslog_err("[facilities] [tolling] received TPM does not have a type");
@ -995,7 +998,7 @@ static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_so
// Close TCP & TLS conn
if (tolling->protocol.p == TOLLING_PROTOCOL_TLS) {
tlsc_t* tlsc = tolling_tlsc_get(tolling, src_addr, 7011);
tlsc_t* tlsc = tolling_tlsc_get(tolling, epv, src_addr, 7011);
if (tlsc) {
sreq = calloc(1, sizeof(SecurityRequest_t));
sreq->present = SecurityRequest_PR_tlsClose;
@ -1007,9 +1010,6 @@ static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_so
zmq_send(security_socket, buf, enc.encoded+1, 0);
int32_t rl = zmq_recv(security_socket, buf, buf_len, 0);
syslog_debug("[facilities]<-[security] SecurityReply.tlsClose (%dB)", rl);
} else {
syslog_debug("[facilities] [tolling] unable to close TLS connection, not found");
}
uint64_t id = rand() + 1;
tr = calloc(1, sizeof(TransportRequest_t));
@ -1026,6 +1026,9 @@ static void veh_handle_recv(tolling_t* tolling, TPM_t* tpm_rx, void* security_so
buf[0] = 4;
enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, sreq, buf+1, buf_len-1);
it2s_tender_queue_send(tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, id, "TR.data.tcp.connClose");
} else {
syslog_debug("[facilities] [tolling] unable to close TLS connection, not found");
}
}
@ -1054,6 +1057,8 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour,
return 1;
}
pthread_mutex_lock(&tolling->lock);
switch (tpm_rx->tpm->tollingType->present) {
// Entry
case TollingType_PR_entry:
@ -1061,19 +1066,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour,
case TollingSingle_PR_request:
if (facilities->station_type != 15) {
syslog_debug("[facilities] [tolling] received TPM.entry.request, ignoring");
return 0;
goto cleanup;
}
rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr);
break;
case TollingSingle_PR_reply:
if (facilities->station_type == 15) {
syslog_debug("[facilities] [tolling] received TPM.entry.reply, ignoring");
return 0;
goto cleanup;
}
pthread_mutex_lock(&facilities->epv.time.lock);
syslog_info("[facilities] [tolling] entry.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz);
pthread_mutex_unlock(&facilities->epv.time.lock);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr);
break;
default:
break;
@ -1088,19 +1093,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour,
case TollingSingle_PR_request:
if (facilities->station_type != 15) {
syslog_debug("[facilities] [tolling] received TPM.exit.request, ignoring");
return 0;
goto cleanup;
}
rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr);
break;
case TollingSingle_PR_reply:
if (facilities->station_type == 15) {
syslog_debug("[facilities] [tolling] received TPM.exit.reply, ignoring");
return 0;
goto cleanup;
}
pthread_mutex_lock(&facilities->epv.time.lock);
syslog_info("[facilities] [tolling] exit.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz);
pthread_mutex_unlock(&facilities->epv.time.lock);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr);
break;
default:
break;
@ -1112,19 +1117,19 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour,
case TollingSingle_PR_request:
if (facilities->station_type != 15) {
syslog_debug("[facilities] [tolling] received TPM.single.request, ignoring");
return 0;
goto cleanup;
}
rsu_handle_recv(facilities, tpm_rx, security_socket, neighbour, src_addr);
break;
case TollingSingle_PR_reply:
if (facilities->station_type == 15) {
syslog_debug("[facilities] [tolling] received TPM.single.reply, ignoring");
return 0;
goto cleanup;
}
pthread_mutex_lock(&facilities->epv.time.lock);
syslog_info("[facilities] [tolling] single.reply took %ld us", it2s_tender_get_now(TIME_MICROSECONDS) - tolling->tz);
pthread_mutex_unlock(&facilities->epv.time.lock);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, neighbour, src_addr);
veh_handle_recv(tolling, tpm_rx, security_socket, facilities->tx_queue, &facilities->epv, neighbour, src_addr);
break;
default:
break;
@ -1135,10 +1140,15 @@ int tpm_recv(void* fc, TPM_t* tpm_rx, void* security_socket, uint8_t* neighbour,
break;
}
cleanup:
pthread_mutex_unlock(&tolling->lock);
return 0;
}
int tolling_init(tolling_t* tolling, void* zmq_ctx, char* security_address, uint8_t station_type) {
pthread_mutex_init(&tolling->lock, NULL);
switch (station_type) {
case 15:
break;
@ -1172,7 +1182,7 @@ void tolling_info_free(tolling_info_t* ti) {
free(ti);
}
tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) {
tlsc_t* tolling_tlsc_new(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port) {
if (tolling->protocol.c.tls.n_tlsc >= TOLLING_MAX_CONNS - 1) {
return NULL;
}
@ -1181,18 +1191,72 @@ tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) {
memcpy(tlsc->ipv6, ipv6, 16);
tlsc->port = port;
tlsc->id = rand();
tlsc->ts = it2s_tender_get_clock(epv);
++tolling->protocol.c.tls.n_tlsc;
return tlsc;
}
tlsc_t* tolling_tlsc_get(tolling_t* tolling, uint8_t ipv6[16], uint16_t port) {
tlsc_t* tolling_tlsc_get(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port) {
tlsc_t* tlsc = NULL;
for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) {
if (!memcmp(tolling->protocol.c.tls.tls_conns[i]->ipv6, ipv6, 16) &&
tolling->protocol.c.tls.tls_conns[i]->port == port) {
tlsc = tolling->protocol.c.tls.tls_conns[i];
tlsc->ts = it2s_tender_get_clock(epv);
break;
}
}
return tlsc;
}
void tolling_tlsc_mgmt(tolling_t* tolling, it2s_tender_epv_t* epv, it2s_tender_queue_t* tx_queue, void* security_socket) {
pthread_mutex_lock(&tolling->lock);
uint64_t now = it2s_tender_get_clock(epv);
for (int i = 0; i < tolling->protocol.c.tls.n_tlsc; ++i) {
tlsc_t* tlsc = tolling->protocol.c.tls.tls_conns[i];
if (tlsc->ts + TOLLING_CONN_TIMEOUT_MS > now) {
SecurityRequest_t* sreq = NULL;
TransportRequest_t* tr = NULL;
const uint32_t buf_len = 1024;
uint8_t buf[buf_len];
syslog_debug("[facilities] [tolling] closing TCP/TLS connection");
sreq = calloc(1, sizeof(SecurityRequest_t));
sreq->present = SecurityRequest_PR_tlsClose;
sreq->choice.tlsClose.connId = tlsc->id;
buf[0] = 4;
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sreq, buf+1, buf_len-1);
syslog_debug("[facilities]->[security] SecurityRequest.tlsClose (%ldB)", enc.encoded+1);
zmq_send(security_socket, buf, enc.encoded+1, 0);
int32_t rl = zmq_recv(security_socket, buf, buf_len, 0);
syslog_debug("[facilities]<-[security] SecurityReply.tlsClose (%dB)", rl);
uint64_t id = rand() + 1;
tr = calloc(1, sizeof(TransportRequest_t));
tr->present = TransportRequest_PR_data;
tr->choice.data.present = TransportDataRequest_PR_tcp;
tr->choice.data.choice.tcp.present = TCPDataRequest_PR_connCloseReq;
tr->choice.data.choice.tcp.choice.connCloseReq.closeByPeer = false;
tr->choice.data.choice.tcp.choice.connCloseReq.destinationPort = 7011;
tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.size = 16;
tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf = malloc(16);
memcpy(tr->choice.data.choice.tcp.choice.connCloseReq.destinationAddress.buf, tlsc->ipv6, 16);
tr->choice.data.choice.tcp.choice.connCloseReq.id = id;
buf[0] = 4;
enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, sreq, buf+1, buf_len-1);
it2s_tender_queue_send(tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, id, "TR.data.tcp.connClose");
free(tlsc);
for (int j = i; j < tolling->protocol.c.tls.n_tlsc-1; ++j) {
tolling->protocol.c.tls.tls_conns[j] = tolling->protocol.c.tls.tls_conns[j+1];
}
--tolling->protocol.c.tls.n_tlsc;
--i;
}
}
pthread_mutex_unlock(&tolling->lock);
}

View File

@ -3,11 +3,13 @@
#include <tpm/TPM.h>
#include <tpm/TollingPaymentInfo.h>
#include <it2s-tender/time.h>
#include <it2s-tender/queue.h>
#include <stdbool.h>
#define TOLLING_INFOS_MAX_LENGTH 16
#define TOLLING_PAYMENT_MIN_PERIOD_MS 60000
#define TOLLING_MAX_CONNS 24
#define TOLLING_CONN_TIMEOUT_MS 10000
typedef enum TOLLING_PROTOCOL {
TOLLING_PROTOCOL_SIMPLE,
@ -30,11 +32,12 @@ typedef struct tlsc {
uint8_t ipv6[16];
uint16_t port;
uint64_t id;
uint64_t t_used;
uint64_t ts;
} tlsc_t;
typedef struct tolling {
bool enabled;
pthread_mutex_t lock;
struct {
TOLLING_PROTOCOL_e p;
@ -93,5 +96,6 @@ int tpm_is_inside_zone(void* fc, tolling_info_t* ti);
tolling_info_t* tolling_info_new(it2s_tender_epv_t* epv, TollingPaymentInfo_t* tpi);
void tolling_info_free(tolling_info_t* ti);
tlsc_t* tolling_tlsc_new(tolling_t* tolling, uint8_t ipv6[16], uint16_t port);
tlsc_t* tolling_tlsc_get(tolling_t* tolling, uint8_t ipv6[16], uint16_t port);
tlsc_t* tolling_tlsc_new(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port);
tlsc_t* tolling_tlsc_get(tolling_t* tolling, it2s_tender_epv_t* epv, uint8_t ipv6[16], uint16_t port);
void tolling_tlsc_mgmt(tolling_t* tolling, it2s_tender_epv_t* epv, it2s_tender_queue_t* tx_queue, void* security_socket);