it2s-itss-facilities/src/facilities.c

997 lines
38 KiB
C

#include "facilities.h"
#include <it2s-asn/camv2/CAM.h>
#include <it2s-asn/cpm/CPM.h>
#include <it2s-asn/denmv2/DENM.h>
#include <it2s-asn/itss-facilities/FacilitiesIndication.h>
#include <it2s-asn/itss-facilities/FacilitiesReply.h>
#include <it2s-asn/itss-facilities/FacilitiesRequest.h>
#include <it2s-asn/itss-management/ManagementIndication.h>
#include <it2s-asn/itss-management/ManagementRequest.h>
#include <it2s-asn/itss-networking/NetworkingIndication.h>
#include <it2s-asn/itss-security/SecurityIndication.h>
#include <it2s-asn/itss-security/SecurityReply.h>
#include <it2s-asn/itss-security/SecurityRequest.h>
#include <it2s-asn/itss-security/SecurityResponse.h>
#include <it2s-asn/itss-transport/TransportIndication.h>
#include <it2s-asn/itss-transport/TransportRequest.h>
#include <it2s-asn/ivim/IVIM.h>
#include <it2s-asn/saem/SAEM.h>
#include <it2s-asn/verco/VERCOe.h>
#include <it2s-tender/packet.h>
#include <it2s-tender/recorder.h>
#include <it2s-tender/space.h>
#include <it2s-tender/time.h>
#include <it2s-tender/trajectory.h>
#include <signal.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <zmq.h>
#include "cam.h"
#include "config.h"
#include "cpm.h"
#include "denm.h"
#include "evm.h"
#include "indications.h"
#include "infrastructure.h"
#include "requests.h"
#include "saem.h"
#include "tpm.h"
#include "vcm.h"
facilities_t facilities = {0};
static int transport_indication(void *responder, void **security_socket, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
uint8_t code = 0;
bool handled_msg = false;
FacilitiesIndication_t *fi = NULL;
SecurityRequest_t *sreq = NULL;
SecurityReply_t *srep = NULL;
TransportRequest_t *tr = NULL;
uint16_t buf_len = 2048;
uint8_t buf[2048];
TransportIndication_t *ti = calloc(1, sizeof(TransportIndication_t));
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_TransportIndication, (void **)&ti, msg, msg_len);
if (dec.code) {
log_error("<- invalid TI received");
rv = 1;
code = 1;
itss_0send(responder, &code, 1);
goto cleanup;
}
itss_0send(responder, &code, 1);
switch (ti->present) {
case TransportIndication_PR_packet:
break;
case TransportIndication_PR_data:
transport_data_indication(&ti->choice.data, security_socket);
goto cleanup;
default:
log_debug("<- unrecognized TI.choice received");
rv = 1;
goto cleanup;
}
TransportPacketIndication_t *tpi = &ti->choice.packet; // TODO
//
bool fwd = false;
uint64_t id = 0;
asn_TYPE_descriptor_t *its_msg_descriptor = NULL;
void *its_msg = NULL;
int its_msg_type = 0;
uint8_t *packet = NULL;
uint16_t packet_len = 0;
switch (tpi->present) {
case TransportPacketIndication_PR_btp:
id = tpi->choice.btp.id;
log_debug("<- TI.packet.btp | id:%08x size:%dB", (uint32_t)id, msg_len);
// Parse message
switch (tpi->choice.btp.destinationPort) {
case Port_cam:
its_msg_descriptor = &asn_DEF_CAM;
its_msg = calloc(1, sizeof(CAM_t));
its_msg_type = messageID_cam;
handled_msg = true;
break;
case Port_denm:
its_msg_descriptor = &asn_DEF_DENM;
its_msg = calloc(1, sizeof(DENM_t));
its_msg_type = messageID_denm;
handled_msg = true;
break;
case Port_ivim:
its_msg_descriptor = &asn_DEF_IVIM;
its_msg = calloc(1, sizeof(IVIM_t));
its_msg_type = messageID_ivim;
handled_msg = true;
break;
case Port_cpm:
its_msg_descriptor = &asn_DEF_CPM;
its_msg = calloc(1, sizeof(CPM_t));
its_msg_type = 14;
handled_msg = true;
break;
case Port_saem:
its_msg_descriptor = &asn_DEF_SAEM;
its_msg = calloc(1, sizeof(SAEM_t));
its_msg_type = messageID_saem;
handled_msg = true;
break;
case 7011: /* tolling */
its_msg_descriptor = &asn_DEF_TPM;
its_msg = calloc(1, sizeof(TPM_t));
its_msg_type = 117;
handled_msg = true;
break;
case 2043: /* maneuvers */
its_msg_descriptor = &asn_DEF_VCM;
its_msg = calloc(1, sizeof(VCM_t));
its_msg_type = 43;
handled_msg = true;
break;
case 2044: /* VERCOe */
its_msg_descriptor = &asn_DEF_VERCOe;
its_msg = calloc(1, sizeof(VERCOe_t));
its_msg_type = 44;
handled_msg = true;
fwd = true;
break;
case Port_poi: /* EVCSNM */
its_msg_descriptor = &asn_DEF_EvcsnPdu;
its_msg = calloc(1, sizeof(EvcsnPdu_t));
its_msg_type = messageID_evcsn;
handled_msg = true;
fwd = true;
break;
case Port_evrsr: /* EVRSRM */
its_msg_descriptor = &asn_DEF_EV_RSR;
its_msg = calloc(1, sizeof(EV_RSR_t));
its_msg_type = messageID_evcsn;
handled_msg = true;
fwd = true;
break;
default:
log_debug("messsage with unhandled BTP port received (%lld), ignoring", tpi->choice.btp.destinationPort);
goto cleanup;
}
packet = tpi->choice.btp.data.buf;
packet_len = tpi->choice.btp.data.size;
dec = uper_decode_complete(NULL, its_msg_descriptor, (void **)&its_msg, tpi->choice.btp.data.buf, tpi->choice.btp.data.size);
if (dec.code) {
log_debug("<- invalid %s received", its_msg_descriptor->name);
rv = 1;
goto cleanup;
}
// Get permisisons
uint8_t *ssp = NULL;
uint16_t ssp_len;
if (tpi->choice.btp.gn.securityPermissions) {
ssp = tpi->choice.btp.gn.securityPermissions->ssp.buf;
ssp_len = tpi->choice.btp.gn.securityPermissions->ssp.size;
}
// Get neighbour certificate ID
uint8_t *neighbour_cert = tpi->choice.btp.gn.securityNeighbour ? tpi->choice.btp.gn.securityNeighbour->buf : NULL;
// Manage message
switch (tpi->choice.btp.destinationPort) {
case Port_cam:
switch (check_cam(&tpi->choice.btp, its_msg, ssp, ssp_len)) {
case CAM_OK:
fwd = true;
break;
case CAM_INVALID:
case CAM_BAD_PERMISSIONS:
default:
break;
}
break;
case Port_denm:;
#ifdef DEBUG
uint8_t *xml_denm = malloc(32768);
asn_enc_rval_t rve = xer_encode_to_buffer(xml_denm, 32768, 0x02, &asn_DEF_DENM, its_msg);
log_debug("DENM XER %d: %.*s", (int)rve.encoded, (int)rve.encoded, xml_denm);
free(xml_denm);
#endif
switch (event_manage(its_msg, &id, ssp, ssp_len)) {
case EVENT_NEW:
case EVENT_CANCELLATION:
case EVENT_NEGATION:
case EVENT_UPDATE:
case EVENT_NUMBER_EXCEEDED:
fwd = true;
break;
case EVENT_INVALID:
case EVENT_PASSED:
case EVENT_REPEATED:
case EVENT_BAD_PERMISSIONS:
break;
}
break;
case Port_ivim:
switch (service_eval(SERVICE_IVI, its_msg, &id, ssp, ssp_len)) {
case SERVICE_NEW:
case SERVICE_CANCELLATION:
case SERVICE_NEGATION:
case SERVICE_UPDATE:
case SERVICE_NUMBER_EXCEEDED:
fwd = true;
break;
case SERVICE_INVALID:
case SERVICE_REPEATED:
case SERVICE_PASSED:
case SERVICE_BAD_PERMISSIONS:
default:
break;
}
break;
case Port_saem:
switch (saem_check(its_msg, neighbour_cert)) {
case SAEM_NEW:
fwd = true;
break;
default:
break;
}
break;
case 7011:
if (facilities.tolling.enabled) {
tpm_recv(its_msg, security_socket, neighbour_cert, NULL);
fwd = true;
}
break;
case 2043:
if (facilities.coordination.active) {
vcm_check(its_msg);
}
fwd = true;
break;
case Port_poi:
if (facilities.evm_args.activate) {
evcsnm_check(its_msg);
fwd = true;
}
break;
case Port_evrsr:
if (facilities.evm_args.activate) {
evrsrm_check(its_msg);
fwd = true;
EV_RSR_t *evrsr_request = (EV_RSR_t *)its_msg;
evrsrm_recv(evrsr_request);
}
default:
break;
}
break;
case TransportPacketIndication_PR_tcp:
id = tpi->choice.tcp.id;
packet = tpi->choice.tcp.data.buf;
packet_len = tpi->choice.tcp.data.size;
log_debug("<- TI.packet.tcp | id:%ld size:%dB", id, msg_len);
sreq = calloc(1, sizeof(SecurityRequest_t));
sreq->present = SecurityRequest_PR_tlsRecv;
sreq->choice.tlsRecv.data.size = tpi->choice.tcp.data.size;
sreq->choice.tlsRecv.data.buf = malloc(tpi->choice.tcp.data.size);
memcpy(sreq->choice.tlsRecv.data.buf, tpi->choice.tcp.data.buf, tpi->choice.tcp.data.size);
pthread_mutex_lock(&facilities.tolling.lock);
tlsc_t *tlsc = tolling_tlsc_get(tpi->choice.tcp.sourceAddress->buf, 7011);
if (tlsc) {
id = tlsc->id;
} else {
tlsc = tolling_tlsc_new(tpi->choice.tcp.sourceAddress->buf, 7011);
id = tlsc->id;
}
++tlsc->nmsg;
pthread_mutex_unlock(&facilities.tolling.lock);
sreq->choice.tlsSend.connId = id;
uint8_t b_tx[2048], b_rx[2048];
b_tx[0] = 4;
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sreq, b_tx + 1, 2047);
log_debug("->[security] SecurityRequest.tlsRecv (%ldB)", enc.encoded + 1);
itss_0send(*security_socket, b_tx, enc.encoded + 1);
int32_t rl = itss_0recv_rt(security_socket, b_rx, 2048, b_tx, enc.encoded + 1, 1000);
log_debug("<-[security] SecurityReply.tlsRecv (%dB)", rl);
if (oer_decode(NULL, &asn_DEF_SecurityReply, (void **)&srep, b_rx, rl).code) {
log_error("SecurityReply.tlsRecv decode failure");
rv = 1;
goto cleanup;
}
if (srep->returnCode == SecurityReplyReturnCode_rejected) {
log_error("SecurityReply.tlsRecv rejected");
SecurityRequest_t *sREQ = calloc(1, sizeof(SecurityRequest_t));
sREQ->present = SecurityRequest_PR_tlsShutdown;
sREQ->choice.tlsShutdown.connId = id;
b_tx[0] = 4;
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sREQ, b_tx + 1, 2047);
log_debug("->[security] SecurityRequest.tlsShutdown (%ldB)", enc.encoded + 1);
itss_0send(*security_socket, b_tx, enc.encoded + 1);
int32_t rl = itss_0recv_rt(security_socket, b_rx, 2048, b_tx, enc.encoded + 1, 1000);
log_debug("<-[security] SecurityReply.tlsShutdown (%dB)", rl);
rv = 1;
goto cleanup;
}
log_debug("[tolling] tls n-msg:%d state:%d", tlsc->nmsg, tlsc->state);
// Forward to [transport]
if (srep->data->choice.tlsRecv.state != 1) {
tr = calloc(1, sizeof(TransportRequest_t));
tr->present = TransportRequest_PR_packet;
tr->choice.packet.present = TransportPacketRequest_PR_tcp;
TCPPacketRequest_t *tpr = &tr->choice.packet.choice.tcp;
tpr->data.size = srep->data->choice.tlsRecv.data.size;
tpr->data.buf = malloc(srep->data->choice.tlsRecv.data.size);
memcpy(tpr->data.buf, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size);
tpr->sourcePort = tpi->choice.tcp.destinationPort;
tpr->destinationPort = tpi->choice.tcp.sourcePort;
tpr->destinationAddress = calloc(1, sizeof(OCTET_STRING_t));
tpr->destinationAddress->buf = malloc(16);
tpr->destinationAddress->size = 16;
memcpy(tpr->destinationAddress->buf, tpi->choice.tcp.sourceAddress->buf, 16);
tpr->destinationPort = 7011;
tpr->sourcePort = 7011;
if (facilities.tolling.protocol.p == TOLLING_PROTOCOL_TLS_GN ||
(facilities.tolling.protocol.p == TOLLING_PROTOCOL_TLS_SHS && tlsc->nmsg < 2)) {
tpr->gn = calloc(1, sizeof(GeonetworkingOutboundOptions_t));
tpr->gn->packetTransportType = PacketTransportType_shb;
tpr->gn->destinationAddress.buf = calloc(1, 6);
tpr->gn->destinationAddress.size = 6;
}
tpr->id = itss_id(tpr->data.buf, tpr->data.size);
buf[0] = 4;
enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, tr, buf + 1, buf_len - 1);
if (enc.encoded == -1) {
log_error("TransportRequest encoding fail");
rv = 1;
goto cleanup;
}
itss_queue_send(facilities.tx_queue, buf, enc.encoded + 1, ITSS_TRANSPORT, tpr->id, "TR.packet.tcp");
} else {
if (facilities.tolling.enabled && srep->data->choice.tlsRecv.data.size) {
dec = uper_decode_complete(NULL, &asn_DEF_TPM, (void **)&its_msg, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size);
if (dec.code) {
log_debug("<- invalid TPM received");
rv = 1;
goto cleanup;
}
if (!dec.code) {
tpm_recv(its_msg, security_socket, NULL, tpi->choice.tcp.sourceAddress->buf);
// Fwd to [applications]
fi = calloc(1, sizeof(FacilitiesIndication_t));
fi->present = FacilitiesIndication_PR_message;
FacilitiesMessageIndication_t *fmi = &fi->choice.message;
fmi->id = id;
fmi->itsMessageType = 7011;
fmi->data.size = srep->data->choice.tlsRecv.data.size;
fmi->data.buf = malloc(srep->data->choice.tlsRecv.data.size);
memcpy(fmi->data.buf, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size);
uint8_t buffer[ITSS_SDU_MAX_LEN];
buffer[0] = 4; // Facilities
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer + 1, ITSS_SDU_MAX_LEN - 1);
itss_queue_send(facilities.tx_queue, buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message");
}
}
}
break;
case TransportPacketIndication_PR_udp:
id = tpi->choice.udp.id;
log_debug("<- TI.packet.udp | id:%ld size:%dB", id, msg_len);
break;
default:
break;
}
// Forward to [applications]
if (fwd) {
fi = calloc(1, sizeof(FacilitiesIndication_t));
fi->present = FacilitiesIndication_PR_message;
FacilitiesMessageIndication_t *fmi = &fi->choice.message;
fmi->id = id;
fmi->itsMessageType = tpi->choice.btp.destinationPort;
fmi->data.size = tpi->choice.btp.data.size;
fmi->data.buf = malloc(tpi->choice.btp.data.size);
memcpy(fmi->data.buf, tpi->choice.btp.data.buf, tpi->choice.btp.data.size);
uint8_t buffer[ITSS_SDU_MAX_LEN];
buffer[0] = 4; // Facilities
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer + 1, ITSS_SDU_MAX_LEN - 1);
itss_queue_send(facilities.tx_queue, buffer, enc.encoded + 1, ITSS_APPLICATIONS, id, "FI.message");
}
// Logging
if (facilities.logging.dbms) {
pthread_mutex_lock(&facilities.id.lock);
uint32_t station_id = facilities.id.station_id;
pthread_mutex_unlock(&facilities.id.lock);
itss_db_add(facilities.logging.dbms, station_id, id, false, its_msg_type, NULL, packet, packet_len);
}
if (facilities.logging.recorder) {
int e = itss_management_record_packet_sdu(
buf,
buf_len,
tpi->choice.btp.data.buf,
tpi->choice.btp.data.size,
tpi->choice.btp.id,
itss_time_get(),
ITSS_FACILITIES,
false);
if (e != -1) {
itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, tpi->choice.btp.id, "MReq.packet.set");
}
}
cleanup:
if (handled_msg && tpi->choice.btp.destinationPort != Port_denm && tpi->choice.btp.destinationPort != Port_ivim) {
ASN_STRUCT_FREE(*its_msg_descriptor, its_msg);
}
ASN_STRUCT_FREE(asn_DEF_TransportIndication, ti);
ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi);
ASN_STRUCT_FREE(asn_DEF_SecurityRequest, sreq);
ASN_STRUCT_FREE(asn_DEF_SecurityReply, srep);
ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr);
return rv;
}
static int facilities_request(void *responder, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
FacilitiesRequest_t *fr = calloc(1, sizeof(FacilitiesRequest_t));
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesRequest, (void **)&fr, msg, msg_len);
if (dec.code) {
log_error("<- invalid FR received");
facilities_request_result_rejected(responder);
rv = 1;
goto cleanup;
}
switch (fr->present) {
case FacilitiesRequest_PR_message:
rv = facilities_request_single_message(responder, &fr->choice.message);
break;
case FacilitiesRequest_PR_data:
switch (fr->choice.data.present) {
case FacilitiesDataRequest_PR_activeEpisodes:
rv = facilities_request_active_episodes(responder, fr);
break;
case FacilitiesDataRequest_PR_attributeTypes:
rv = facilities_request_attribute_types(responder, fr);
break;
case FacilitiesDataRequest_PR_loadedProtectionZones:
rv = facilities_request_loaded_protected_zones(responder, fr);
break;
case FacilitiesDataRequest_PR_chainInfoSet:
rv = facilities_request_chaininfo_set(responder, &fr->choice.data.choice.chainInfoSet);
break;
default:
log_error("<- unrecognized FDR type received (%d)", fr->choice.data.present);
facilities_request_result_rejected(responder);
rv = 1;
goto cleanup;
}
break;
default:
log_error("<- unrecognized FR type received (%d)", fr->present);
facilities_request_result_rejected(responder);
rv = 1;
goto cleanup;
}
cleanup:
ASN_STRUCT_FREE(asn_DEF_FacilitiesRequest, fr);
return rv;
}
static int security_indication(void *responder_secured, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
SecurityIndication_t *si = calloc(1, sizeof(SecurityIndication_t));
SecurityResponse_t *sr = calloc(1, sizeof(SecurityResponse_t));
uint8_t buffer[64];
asn_enc_rval_t enc;
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_SecurityIndication, (void **)&si, msg, msg_len);
if (dec.code) {
log_error("<- invalid SIndication received");
rv = 1;
goto cleanup;
}
pthread_mutex_lock(&facilities.id.change.lock);
if (facilities.id.change.stage == ID_CHANGE_BLOCKED) {
pthread_mutex_unlock(&facilities.id.change.lock);
log_debug("identity change is currently blocked");
rv = 1;
goto cleanup;
}
if (facilities.id.change.stage == ID_CHANGE_PREPARE &&
si->choice.idChangeEvent.command != SecurityIdChangeEventType_commit &&
si->choice.idChangeEvent.command != SecurityIdChangeEventType_abort) {
pthread_mutex_unlock(&facilities.id.change.lock);
log_debug("current identity change state is prepare, but received identity change command is not commit nor abort");
rv = 1;
goto cleanup;
}
bool id_changed = false;
switch (si->choice.idChangeEvent.command) {
case SecurityIdChangeEventType_prepare:
facilities.id.change.stage = ID_CHANGE_PREPARE;
pthread_mutex_lock(&facilities.id.lock);
pthread_mutex_lock(&facilities.lightship.lock);
break;
case SecurityIdChangeEventType_commit:;
facilities.id.change.stage = ID_CHANGE_COMMIT;
// Reset lightship
for (int i = 0; i < facilities.lightship.path_history_len; ++i) {
free(facilities.lightship.path_history[i]);
}
facilities.lightship.path_history_len = 0;
facilities.lightship.t_last_cam = 0;
facilities.lightship.t_last_cam_lfc = 0;
facilities.lightship.next_cam_max = 0;
facilities.lightship.next_cam_min = 0;
pthread_mutex_unlock(&facilities.lightship.lock);
// Change Station ID
for (int i = 0; i < si->choice.idChangeEvent.ids.list.count; ++i) {
switch (si->choice.idChangeEvent.ids.list.array[i]->present) {
case SecurityId_PR_stationId:
facilities.id.station_id = si->choice.idChangeEvent.ids.list.array[i]->choice.stationId;
break;
case SecurityId_PR_ipv6Address:
memcpy(facilities.id.ipv6_addr, si->choice.idChangeEvent.ids.list.array[i]->choice.ipv6Address.buf, 16);
break;
default:
break;
}
}
facilities.id.change.stage = ID_CHANGE_INACTIVE;
pthread_mutex_unlock(&facilities.id.lock);
id_changed = true;
break;
case SecurityIdChangeEventType_abort:
pthread_mutex_unlock(&facilities.id.lock);
pthread_mutex_unlock(&facilities.lightship.lock);
facilities.id.change.stage = ID_CHANGE_INACTIVE;
break;
default:
pthread_mutex_unlock(&facilities.id.change.lock);
log_error("[networking]<- unhandled idChangeEvent command type");
rv = 1;
goto cleanup;
}
sr->present = SecurityResponse_PR_idChangeEvent;
sr->choice.idChangeEvent.returnCode = 0;
enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64);
itss_0send(responder_secured, buffer, enc.encoded);
pthread_mutex_unlock(&facilities.id.change.lock);
cleanup:
if (rv) {
sr->present = SecurityResponse_PR_idChangeEvent;
sr->choice.idChangeEvent.returnCode = 1;
enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64);
itss_0send(responder_secured, buffer, enc.encoded);
}
pthread_mutex_unlock(&facilities.id.change.lock);
ASN_STRUCT_FREE(asn_DEF_SecurityResponse, sr);
ASN_STRUCT_FREE(asn_DEF_SecurityIndication, si);
return rv;
}
static int networking_indication(void *responder, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
uint8_t code = 0;
NetworkingIndication_t *ni = calloc(1, sizeof(NetworkingIndication_t));
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_NetworkingIndication, (void **)&ni, msg, msg_len);
if (dec.code) {
rv = 1;
code = 1;
itss_0send(responder, &code, 1);
goto cleanup;
}
itss_0send(responder, &code, 1);
if (ni->present != NetworkingIndication_PR_data) {
goto cleanup;
}
if (ni->choice.data.mobileNeighbour &&
*ni->choice.data.mobileNeighbour) {
pthread_mutex_lock(&facilities.lightship.lock);
facilities.lightship.t_last_vehicle = itss_time_get();
facilities.lightship.is_vehicle_near = true;
pthread_mutex_unlock(&facilities.lightship.lock);
}
cleanup:
ASN_STRUCT_FREE(asn_DEF_NetworkingIndication, ni);
return rv;
}
static int management_indication(void *responder, uint8_t *msg, uint32_t msg_len) {
int rv = 0;
uint8_t code = 0;
ManagementIndication_t *mi = calloc(1, sizeof(ManagementIndication_t));
asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_ManagementIndication, (void **)&mi, msg, msg_len);
if (dec.code) {
rv = 1;
code = 1;
itss_0send(responder, &code, 1);
goto cleanup;
}
itss_0send(responder, &code, 1);
if (mi->present == ManagementIndication_PR_attributes) {
itss_space_lock();
epv.space.latitude = mi->choice.attributes.coordinates.latitude;
epv.space.latitude_conf = mi->choice.attributes.coordinates.latitudeConfidence;
epv.space.longitude = mi->choice.attributes.coordinates.longitude;
epv.space.longitude_conf = mi->choice.attributes.coordinates.longitudeConfidence;
epv.space.speed = mi->choice.attributes.speed.speedValue;
epv.space.speed_conf = mi->choice.attributes.speed.speedConfidence;
epv.space.heading = mi->choice.attributes.heading.headingValue;
epv.space.heading_conf = mi->choice.attributes.heading.headingConfidence;
epv.space.accel = mi->choice.attributes.acceleration.longitudinalAccelerationValue;
epv.space.altitude = mi->choice.attributes.altitude.altitudeValue;
epv.space.altitude_conf = mi->choice.attributes.altitude.altitudeConfidence;
itss_space_unlock();
itss_trajectory_lock();
if (mi->choice.attributes.trajectory) {
epv.trajectory.len = mi->choice.attributes.trajectory->list.count;
for (int i = 0; i < mi->choice.attributes.trajectory->list.count; ++i) {
epv.trajectory.path[i].latitude = mi->choice.attributes.trajectory->list.array[i]->latitude;
epv.trajectory.path[i].longitude = mi->choice.attributes.trajectory->list.array[i]->longitude;
asn_INTEGER2ulong(&mi->choice.attributes.trajectory->list.array[i]->timestamp, (unsigned long long *)&epv.trajectory.path[i].timestamp);
}
}
itss_trajectory_unlock();
itss_time_lock();
asn_INTEGER2ulong(&mi->choice.attributes.clock, (unsigned long long *)&epv.time.clock);
itss_time_unlock();
}
cleanup:
ASN_STRUCT_FREE(asn_DEF_ManagementIndication, mi);
return rv;
}
void *tx() {
int rv = 0;
itss_queue_t *queue = facilities.tx_queue;
uint8_t code;
void *applications_socket = itss_0connect(facilities.zmq.applications_address, ZMQ_REQ);
void *transport_socket = itss_0connect(facilities.zmq.transport_address, ZMQ_REQ);
void *management_socket = itss_0connect(facilities.zmq.management_address, ZMQ_REQ);
itss_queue_t *stream = itss_queue_new();
while (!facilities.exit) {
pthread_mutex_lock(&queue->lock);
while (!queue->len && !facilities.exit) {
pthread_cond_wait(&queue->trigger, &queue->lock);
}
for (int i = 0; i < queue->len; ++i) {
memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]);
stream->packet_len[i] = queue->packet_len[i];
stream->destination[i] = queue->destination[i];
strcpy(stream->info_msg[i], queue->info_msg[i]);
stream->id[i] = queue->id[i];
}
stream->len = queue->len;
queue->len = 0;
pthread_mutex_unlock(&queue->lock);
for (int i = 0; i < stream->len; ++i) {
switch (stream->destination[i]) {
case ITSS_TRANSPORT:
log_debug("-> %s ->[transport] | id:%08x size:%dB",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
itss_0send(transport_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&transport_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
if (rv == -1) {
log_error("-> %s ->[transport] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
}
break;
case ITSS_APPLICATIONS:
log_debug("-> %s ->[applications] | id:%08x size:%dB",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
itss_0send(applications_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&applications_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
if (rv == -1) {
log_error("-> %s ->[applications] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
}
break;
case ITSS_MANAGEMENT:
itss_0send(management_socket, stream->packet[i], stream->packet_len[i]);
rv = itss_0recv_rt(&management_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000);
if (rv == -1) {
log_error("-> %s ->[management] | id:%08x size:%dB <TIMEOUT>",
stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]);
}
break;
}
}
}
itss_0close(transport_socket);
itss_0close(management_socket);
itss_0close(applications_socket);
return NULL;
}
static void sigh(int signum) {
facilities.exit = true;
uint8_t code = 0;
void *socket = itss_0connect(ZMQ_INTERNAL_ADDR, ZMQ_PAIR);
itss_0send(socket, &code, sizeof(code));
itss_0close(socket);
}
int main() {
signal(SIGTERM, sigh);
signal(SIGINT, sigh);
signal(SIGKILL, sigh);
facilities.tx_queue = itss_queue_new();
lightship_init();
den_init();
infrastructure_init();
dissemination_init();
bulletin_init();
void *security_socket = NULL;
if (facilities_config()) {
goto cleanup;
}
facilities.lightship.type = facilities.station_type;
// Tx
pthread_create(&facilities.transmitting, NULL, tx, NULL);
// CA
pthread_create(&facilities.ca_service, NULL, ca_service, NULL);
// DEN
pthread_create(&facilities.den_service, NULL, den_service, NULL);
// Infrastructure
pthread_create(&facilities.infrastructure_service, NULL, infrastructure_service, NULL);
// CP
if (facilities.dissemination.active)
pthread_create(&facilities.cp_service, NULL, cp_service, NULL);
// SA
pthread_create(&facilities.sa_service, NULL, sa_service, NULL);
// Tolling
tolling_init(facilities.station_type);
// VC
if (facilities.coordination.active) {
coordination_init();
pthread_create(&facilities.vc_service, NULL, vc_service, NULL);
}
// EVCSN
if (facilities.evm_args.activate)
pthread_create(&facilities.evcsn_service, NULL, evcsn_service, NULL);
security_socket = itss_0connect(facilities.zmq.security_address, ZMQ_REQ);
uint8_t buffer[ITSS_SDU_MAX_LEN];
log_info("listening");
uint8_t code;
bool in_idchange;
int32_t rl;
while (!facilities.exit) {
itss_0poll(facilities.zmq.responders, facilities.zmq.n_responders, -1);
for (int i = 0; i < facilities.zmq.n_responders; ++i) {
if (facilities.zmq.responders[i].revents) {
rl = itss_0recv(facilities.zmq.responders[i].socket, buffer, ITSS_SDU_MAX_LEN);
switch (buffer[0]) { /* source */
case ITSS_INTERNAL:
break;
case ITSS_NETWORKING:
networking_indication(facilities.zmq.responders[i].socket, buffer + 1, rl);
break;
case ITSS_TRANSPORT:
in_idchange = true;
pthread_mutex_lock(&facilities.id.change.lock);
if (facilities.id.change.stage == ID_CHANGE_INACTIVE) {
in_idchange = false;
facilities.id.change.stage = ID_CHANGE_BLOCKED;
}
pthread_mutex_unlock(&facilities.id.change.lock);
if (!in_idchange) {
transport_indication(facilities.zmq.responders[i].socket, &security_socket, buffer + 1, rl);
pthread_mutex_lock(&facilities.id.change.lock);
facilities.id.change.stage = ID_CHANGE_INACTIVE;
pthread_mutex_unlock(&facilities.id.change.lock);
} else {
code = 1;
itss_0send(facilities.zmq.responders[i].socket, &code, 1);
}
break;
case ITSS_APPLICATIONS:
in_idchange = true;
pthread_mutex_lock(&facilities.id.change.lock);
if (facilities.id.change.stage == ID_CHANGE_INACTIVE) {
in_idchange = false;
facilities.id.change.stage = ID_CHANGE_BLOCKED;
}
pthread_mutex_unlock(&facilities.id.change.lock);
if (!in_idchange) {
facilities_request(facilities.zmq.responders[i].socket, buffer + 1, rl);
pthread_mutex_lock(&facilities.id.change.lock);
facilities.id.change.stage = ID_CHANGE_INACTIVE;
pthread_mutex_unlock(&facilities.id.change.lock);
} else {
code = 1;
itss_0send(facilities.zmq.responders[i].socket, &code, 1);
}
break;
case ITSS_MANAGEMENT:
management_indication(facilities.zmq.responders[i].socket, buffer + 1, rl);
break;
case ITSS_SECURITY:
security_indication(facilities.zmq.responders[i].socket, buffer + 1, rl);
break;
default:
log_debug("<- unrecognized service");
code = 1;
itss_0send(facilities.zmq.responders[i].socket, &code, 1);
continue;
}
}
}
}
// Exit
cleanup:
pthread_join(facilities.evcsn_service, NULL);
pthread_join(facilities.ca_service, NULL);
pthread_join(facilities.den_service, NULL);
pthread_join(facilities.infrastructure_service, NULL);
if (facilities.dissemination.active)
pthread_join(facilities.cp_service, NULL);
pthread_join(facilities.sa_service, NULL);
itss_queue_trigger(facilities.tx_queue);
pthread_join(facilities.transmitting, NULL);
if (facilities.coordination.active)
pthread_join(facilities.vc_service, NULL);
itss_0close(security_socket);
for (int i = 0; i < facilities.zmq.n_responders; ++i) {
itss_0close(facilities.zmq.responders[i].socket);
}
itss_0destroy();
log_info("exiting");
log_close();
return 0;
}