#include "facilities.h" #include "cam.h" #include "config.h" #include "denm.h" #include "infrastructure.h" #include "indications.h" #include "requests.h" #include "cpm.h" #include "saem.h" #include "tpm.h" #include "vcm.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include facilities_t facilities = {0}; static int transport_indication(void* responder, void** security_socket, uint8_t *msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; bool handled_msg = false; FacilitiesIndication_t *fi = NULL; SecurityRequest_t* sreq = NULL; SecurityReply_t* srep = NULL; TransportRequest_t* tr = NULL; uint16_t buf_len = 2048; uint8_t buf[2048]; TransportIndication_t* ti = calloc(1, sizeof(TransportIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_TransportIndication, (void**) &ti, msg, msg_len); if (dec.code) { log_error("<- invalid TI received"); rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); switch (ti->present) { case TransportIndication_PR_packet: break; case TransportIndication_PR_data: transport_data_indication(&ti->choice.data, security_socket); goto cleanup; default: log_debug("<- unrecognized TI.choice received"); rv = 1; goto cleanup; } TransportPacketIndication_t* tpi = &ti->choice.packet; // TODO // bool fwd = false; uint64_t id = 0; asn_TYPE_descriptor_t *its_msg_descriptor = NULL; void *its_msg = NULL; int its_msg_type = 0; uint8_t* packet = NULL; uint16_t packet_len = 0; switch (tpi->present) { case TransportPacketIndication_PR_btp: id = tpi->choice.btp.id; log_debug("<- TI.packet.btp | id:%08x size:%dB", (uint32_t) id, msg_len); // Parse message switch (tpi->choice.btp.destinationPort) { case Port_cam: its_msg_descriptor = &asn_DEF_CAM; its_msg = calloc(1, sizeof(CAM_t)); its_msg_type = messageID_cam; handled_msg = true; break; case Port_denm: its_msg_descriptor = &asn_DEF_DENM; its_msg = calloc(1, sizeof(DENM_t)); its_msg_type = messageID_denm; handled_msg = true; break; case Port_ivim: its_msg_descriptor = &asn_DEF_IVIM; its_msg = calloc(1, sizeof(IVIM_t)); its_msg_type = messageID_ivim; handled_msg = true; break; case Port_cpm: its_msg_descriptor = &asn_DEF_CPM; its_msg = calloc(1, sizeof(CPM_t)); its_msg_type = 14; handled_msg = true; break; case Port_saem: its_msg_descriptor = &asn_DEF_SAEM; its_msg = calloc(1, sizeof(SAEM_t)); its_msg_type = messageID_saem; handled_msg = true; break; case 7011: /* tolling */ its_msg_descriptor = &asn_DEF_TPM; its_msg = calloc(1, sizeof(TPM_t)); its_msg_type = 117; handled_msg = true; break; case 2043: /* maneuvers */ its_msg_descriptor = &asn_DEF_VCM; its_msg = calloc(1, sizeof(VCM_t)); its_msg_type = 43; handled_msg = true; break; default: log_debug("messsage with unhandled BTP port received (%lld), ignoring", tpi->choice.btp.destinationPort); goto cleanup; } packet = tpi->choice.btp.data.buf; packet_len = tpi->choice.btp.data.size; dec = uper_decode_complete(NULL, its_msg_descriptor, (void**) &its_msg, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); if (dec.code) { log_debug("<- invalid %s received", its_msg_descriptor->name); rv = 1; goto cleanup; } // Get permisisons uint8_t* ssp = NULL; uint16_t ssp_len; if (tpi->choice.btp.gn.securityPermissions) { ssp = tpi->choice.btp.gn.securityPermissions->ssp.buf; ssp_len = tpi->choice.btp.gn.securityPermissions->ssp.size; } // Get neighbour certificate ID uint8_t* neighbour_cert = tpi->choice.btp.gn.securityNeighbour ? tpi->choice.btp.gn.securityNeighbour->buf : NULL; // Manage message switch (tpi->choice.btp.destinationPort) { case Port_cam: switch (check_cam(&tpi->choice.btp, its_msg, ssp, ssp_len)) { case CAM_OK: fwd = true; break; case CAM_INVALID: case CAM_BAD_PERMISSIONS: default: break; } break; case Port_denm: ; #ifdef DEBUG uint8_t* xml_denm = malloc(32768); asn_enc_rval_t rve = xer_encode_to_buffer(xml_denm, 32768, 0x02, &asn_DEF_DENM, its_msg); log_debug("DENM XER %d: %.*s", (int)rve.encoded, (int)rve.encoded , xml_denm); free(xml_denm); #endif switch (event_manage(its_msg, &id, ssp, ssp_len)) { case EVENT_NEW: case EVENT_CANCELLATION: case EVENT_NEGATION: case EVENT_UPDATE: case EVENT_NUMBER_EXCEEDED: fwd = true; break; case EVENT_INVALID: case EVENT_PASSED: case EVENT_REPEATED: case EVENT_BAD_PERMISSIONS: break; } break; case Port_ivim: switch (service_eval(SERVICE_IVI, its_msg, &id, ssp, ssp_len)) { case SERVICE_NEW: case SERVICE_CANCELLATION: case SERVICE_NEGATION: case SERVICE_UPDATE: case SERVICE_NUMBER_EXCEEDED: fwd = true; break; case SERVICE_INVALID: case SERVICE_REPEATED: case SERVICE_PASSED: case SERVICE_BAD_PERMISSIONS: default: break; } break; case Port_saem: switch (saem_check(its_msg, neighbour_cert)) { case SAEM_NEW: fwd = true; break; default: break; } break; case 7011: if (facilities.tolling.enabled) { tpm_recv(its_msg, security_socket, neighbour_cert, NULL); fwd = true; } break; case 2043: if (facilities.coordination.active) { vcm_check(its_msg); } fwd = true; break; default: break; } break; case TransportPacketIndication_PR_tcp: id = tpi->choice.tcp.id; packet = tpi->choice.tcp.data.buf; packet_len = tpi->choice.tcp.data.size; log_debug("<- TI.packet.tcp | id:%ld size:%dB", id, msg_len); sreq = calloc(1, sizeof(SecurityRequest_t)); sreq->present = SecurityRequest_PR_tlsRecv; sreq->choice.tlsRecv.data.size = tpi->choice.tcp.data.size; sreq->choice.tlsRecv.data.buf = malloc(tpi->choice.tcp.data.size); memcpy(sreq->choice.tlsRecv.data.buf, tpi->choice.tcp.data.buf, tpi->choice.tcp.data.size); pthread_mutex_lock(&facilities.tolling.lock); tlsc_t* tlsc = tolling_tlsc_get(tpi->choice.tcp.sourceAddress->buf, 7011); if (tlsc) { id = tlsc->id; } else { tlsc = tolling_tlsc_new(tpi->choice.tcp.sourceAddress->buf, 7011); id = tlsc->id; } ++tlsc->nmsg; pthread_mutex_unlock(&facilities.tolling.lock); sreq->choice.tlsSend.connId = id; uint8_t b_tx[2048], b_rx[2048]; b_tx[0] = 4; asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sreq, b_tx+1, 2047); log_debug("->[security] SecurityRequest.tlsRecv (%ldB)", enc.encoded+1); itss_0send(*security_socket, b_tx, enc.encoded+1); int32_t rl = itss_0recv_rt(security_socket, b_rx, 2048, b_tx, enc.encoded+1, 1000); log_debug("<-[security] SecurityReply.tlsRecv (%dB)", rl); if (oer_decode(NULL, &asn_DEF_SecurityReply, (void**) &srep, b_rx, rl).code) { log_error("SecurityReply.tlsRecv decode failure"); rv = 1; goto cleanup; } if (srep->returnCode == SecurityReplyReturnCode_rejected) { log_error("SecurityReply.tlsRecv rejected"); SecurityRequest_t* sREQ = calloc(1, sizeof(SecurityRequest_t)); sREQ->present = SecurityRequest_PR_tlsShutdown; sREQ->choice.tlsShutdown.connId = id; b_tx[0] = 4; asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sREQ, b_tx+1, 2047); log_debug("->[security] SecurityRequest.tlsShutdown (%ldB)", enc.encoded+1); itss_0send(*security_socket, b_tx, enc.encoded+1); int32_t rl = itss_0recv_rt(security_socket, b_rx, 2048, b_tx, enc.encoded+1, 1000); log_debug("<-[security] SecurityReply.tlsShutdown (%dB)", rl); rv = 1; goto cleanup; } log_debug("[tolling] tls n-msg:%d state:%d", tlsc->nmsg, tlsc->state); // Forward to [transport] if (srep->data->choice.tlsRecv.state != 1) { tr = calloc(1, sizeof(TransportRequest_t)); tr->present = TransportRequest_PR_packet; tr->choice.packet.present = TransportPacketRequest_PR_tcp; TCPPacketRequest_t* tpr = &tr->choice.packet.choice.tcp; tpr->data.size = srep->data->choice.tlsRecv.data.size; tpr->data.buf = malloc(srep->data->choice.tlsRecv.data.size); memcpy(tpr->data.buf, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size); tpr->sourcePort = tpi->choice.tcp.destinationPort; tpr->destinationPort = tpi->choice.tcp.sourcePort; tpr->destinationAddress = calloc(1, sizeof(OCTET_STRING_t)); tpr->destinationAddress->buf = malloc(16); tpr->destinationAddress->size = 16; memcpy(tpr->destinationAddress->buf, tpi->choice.tcp.sourceAddress->buf, 16); tpr->destinationPort = 7011; tpr->sourcePort = 7011; if (facilities.tolling.protocol.p == TOLLING_PROTOCOL_TLS_GN || (facilities.tolling.protocol.p == TOLLING_PROTOCOL_TLS_SHS && tlsc->nmsg < 2)) { tpr->gn = calloc(1, sizeof(GeonetworkingOutboundOptions_t)); tpr->gn->packetTransportType = PacketTransportType_shb; tpr->gn->destinationAddress.buf = calloc(1, 6); tpr->gn->destinationAddress.size = 6; } tpr->id = itss_id(tpr->data.buf, tpr->data.size); buf[0] = 4; enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, tr, buf+1, buf_len-1); if (enc.encoded == -1) { log_error("TransportRequest encoding fail"); rv = 1; goto cleanup; } itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, tpr->id, "TR.packet.tcp"); } else { if (facilities.tolling.enabled && srep->data->choice.tlsRecv.data.size) { dec = uper_decode_complete(NULL, &asn_DEF_TPM, (void**) &its_msg, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size); if (dec.code) { log_debug("<- invalid TPM received"); rv = 1; goto cleanup; } if (!dec.code) { tpm_recv(its_msg, security_socket, NULL, tpi->choice.tcp.sourceAddress->buf); // Fwd to [applications] fi = calloc(1, sizeof(FacilitiesIndication_t)); fi->present = FacilitiesIndication_PR_message; FacilitiesMessageIndication_t* fmi = &fi->choice.message; fmi->id = id; fmi->itsMessageType = 7011; fmi->data.size = srep->data->choice.tlsRecv.data.size; fmi->data.buf = malloc(srep->data->choice.tlsRecv.data.size); memcpy(fmi->data.buf, srep->data->choice.tlsRecv.data.buf,srep->data->choice.tlsRecv.data.size); uint8_t buffer[ITSS_SDU_MAX_LEN]; buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer+1, ITSS_SDU_MAX_LEN-1); itss_queue_send(facilities.tx_queue, buffer, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); } } } break; case TransportPacketIndication_PR_udp: id = tpi->choice.udp.id; log_debug("<- TI.packet.udp | id:%ld size:%dB", id, msg_len); break; default: break; } // Forward to [applications] if (fwd) { fi = calloc(1, sizeof(FacilitiesIndication_t)); fi->present = FacilitiesIndication_PR_message; FacilitiesMessageIndication_t* fmi = &fi->choice.message; fmi->id = id; fmi->itsMessageType = tpi->choice.btp.destinationPort; fmi->data.size = tpi->choice.btp.data.size; fmi->data.buf = malloc(tpi->choice.btp.data.size); memcpy(fmi->data.buf, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); uint8_t buffer[ITSS_SDU_MAX_LEN]; buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer+1, ITSS_SDU_MAX_LEN-1); itss_queue_send(facilities.tx_queue, buffer, enc.encoded+1, ITSS_APPLICATIONS, id, "FI.message"); } // Logging if (facilities.logging.dbms) { pthread_mutex_lock(&facilities.id.lock); uint64_t station_id = facilities.id.station_id; pthread_mutex_unlock(&facilities.id.lock); itss_db_add(facilities.logging.dbms, station_id, id, false, its_msg_type, NULL, packet, packet_len); } if (facilities.logging.recorder) { int e = itss_management_record_packet_sdu( buf, buf_len, tpi->choice.btp.data.buf, tpi->choice.btp.data.size, tpi->choice.btp.id, itss_time_get(), ITSS_FACILITIES, false); if (e != -1) { itss_queue_send(facilities.tx_queue, buf, e, ITSS_MANAGEMENT, tpi->choice.btp.id, "MReq.packet.set"); } } cleanup: if (handled_msg && tpi->choice.btp.destinationPort != Port_denm && tpi->choice.btp.destinationPort != Port_ivim) { ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); } ASN_STRUCT_FREE(asn_DEF_TransportIndication, ti); ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); ASN_STRUCT_FREE(asn_DEF_SecurityRequest, sreq); ASN_STRUCT_FREE(asn_DEF_SecurityReply, srep); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); return rv; } static int facilities_request(void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; FacilitiesRequest_t *fr = calloc(1, sizeof(FacilitiesRequest_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesRequest, (void**) &fr, msg, msg_len); if (dec.code) { log_error("<- invalid FR received"); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } switch (fr->present) { case FacilitiesRequest_PR_message: rv = facilities_request_single_message(responder, &fr->choice.message); break; case FacilitiesRequest_PR_data: switch (fr->choice.data.present) { case FacilitiesDataRequest_PR_activeEpisodes: rv = facilities_request_active_episodes(responder, fr); break; case FacilitiesDataRequest_PR_attributeTypes: rv = facilities_request_attribute_types(responder, fr); break; case FacilitiesDataRequest_PR_loadedProtectionZones: rv = facilities_request_loaded_protected_zones(responder, fr); break; case FacilitiesDataRequest_PR_chainInfoSet: rv = facilities_request_chaininfo_set(responder, &fr->choice.data.choice.chainInfoSet); break; default: log_error("<- unrecognized FDR type received (%d)", fr->choice.data.present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } break; default: log_error("<- unrecognized FR type received (%d)", fr->present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } cleanup: ASN_STRUCT_FREE(asn_DEF_FacilitiesRequest, fr); return rv; } static int security_indication(void* responder_secured, uint8_t *msg, uint32_t msg_len) { int rv = 0; SecurityIndication_t* si = calloc(1, sizeof(SecurityIndication_t)); SecurityResponse_t* sr = calloc(1, sizeof(SecurityResponse_t)); uint8_t buffer[64]; asn_enc_rval_t enc; asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_SecurityIndication, (void**) &si, msg, msg_len); if (dec.code) { log_error("<- invalid SIndication received"); rv = 1; goto cleanup; } pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_BLOCKED) { pthread_mutex_unlock(&facilities.id.change.lock); log_debug("identity change is currently blocked"); rv = 1; goto cleanup; } if (facilities.id.change.stage == ID_CHANGE_PREPARE && si->choice.idChangeEvent.command != SecurityIdChangeEventType_commit && si->choice.idChangeEvent.command != SecurityIdChangeEventType_abort) { pthread_mutex_unlock(&facilities.id.change.lock); log_debug("current identity change state is prepare, but received identity change command is not commit nor abort"); rv = 1; goto cleanup; } bool id_changed = false; switch (si->choice.idChangeEvent.command) { case SecurityIdChangeEventType_prepare: facilities.id.change.stage = ID_CHANGE_PREPARE; pthread_mutex_lock(&facilities.id.lock); pthread_mutex_lock(&facilities.lightship.lock); break; case SecurityIdChangeEventType_commit: ; facilities.id.change.stage = ID_CHANGE_COMMIT; // Reset lightship for (int i = 0; i < facilities.lightship.path_history_len; ++i) { free(facilities.lightship.path_history[i]); } facilities.lightship.path_history_len = 0; facilities.lightship.t_last_cam = 0; facilities.lightship.t_last_cam_lfc = 0; facilities.lightship.next_cam_max = 0; facilities.lightship.next_cam_min = 0; pthread_mutex_unlock(&facilities.lightship.lock); // Change Station ID for (int i = 0; i < si->choice.idChangeEvent.ids.list.count; ++i) { switch (si->choice.idChangeEvent.ids.list.array[i]->present) { case SecurityId_PR_stationId: facilities.id.station_id = si->choice.idChangeEvent.ids.list.array[i]->choice.stationId; break; case SecurityId_PR_ipv6Address: memcpy(facilities.id.ipv6_addr, si->choice.idChangeEvent.ids.list.array[i]->choice.ipv6Address.buf, 16); break; default: break; } } facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.lock); id_changed = true; break; case SecurityIdChangeEventType_abort: pthread_mutex_unlock(&facilities.id.lock); pthread_mutex_unlock(&facilities.lightship.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; break; default: pthread_mutex_unlock(&facilities.id.change.lock); log_error("[networking]<- unhandled idChangeEvent command type"); rv = 1; goto cleanup; } sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 0; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); itss_0send(responder_secured, buffer, enc.encoded); pthread_mutex_unlock(&facilities.id.change.lock); cleanup: if (rv) { sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 1; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); itss_0send(responder_secured, buffer, enc.encoded); } pthread_mutex_unlock(&facilities.id.change.lock); ASN_STRUCT_FREE(asn_DEF_SecurityResponse, sr); ASN_STRUCT_FREE(asn_DEF_SecurityIndication, si); return rv; } static int networking_indication(void* responder, uint8_t* msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; NetworkingIndication_t* ni = calloc(1, sizeof(NetworkingIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_NetworkingIndication, (void**) &ni, msg, msg_len); if (dec.code) { rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); if (ni->present != NetworkingIndication_PR_data) { goto cleanup; } if (ni->choice.data.mobileNeighbour && *ni->choice.data.mobileNeighbour) { pthread_mutex_lock(&facilities.lightship.lock); facilities.lightship.last_vehicle = itss_time_get(); facilities.lightship.is_vehicle_near = true; pthread_mutex_unlock(&facilities.lightship.lock); } cleanup: ASN_STRUCT_FREE(asn_DEF_NetworkingIndication, ni); return rv; } static int management_indication(void* responder, uint8_t* msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; ManagementIndication_t* mi = calloc(1, sizeof(ManagementIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_ManagementIndication, (void**) &mi, msg, msg_len); if (dec.code) { rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); if (mi->present == ManagementIndication_PR_attributes) { itss_space_lock(); epv.space.latitude = mi->choice.attributes.coordinates.latitude; epv.space.latitude_conf = mi->choice.attributes.coordinates.latitudeConfidence; epv.space.longitude = mi->choice.attributes.coordinates.longitude; epv.space.longitude_conf = mi->choice.attributes.coordinates.longitudeConfidence; epv.space.speed = mi->choice.attributes.speed.speedValue; epv.space.speed_conf = mi->choice.attributes.speed.speedConfidence; epv.space.heading = mi->choice.attributes.heading.headingValue; epv.space.heading_conf = mi->choice.attributes.heading.headingConfidence; epv.space.altitude = mi->choice.attributes.altitude.altitudeValue; epv.space.altitude_conf = mi->choice.attributes.altitude.altitudeConfidence; itss_space_unlock(); itss_trajectory_lock(); if (mi->choice.attributes.trajectory) { epv.trajectory.len = mi->choice.attributes.trajectory->list.count; for (int i = 0; i < mi->choice.attributes.trajectory->list.count; ++i) { epv.trajectory.path[i].latitude = mi->choice.attributes.trajectory->list.array[i]->latitude; epv.trajectory.path[i].longitude = mi->choice.attributes.trajectory->list.array[i]->longitude; asn_INTEGER2ulong(&mi->choice.attributes.trajectory->list.array[i]->timestamp, (unsigned long long*) &epv.trajectory.path[i].timestamp); } } itss_trajectory_unlock(); itss_time_lock(); asn_INTEGER2ulong(&mi->choice.attributes.clock, (unsigned long long*) &epv.time.clock); itss_time_unlock(); } cleanup: ASN_STRUCT_FREE(asn_DEF_ManagementIndication, mi); return rv; } void* tx() { int rv = 0; itss_queue_t* queue = facilities.tx_queue; uint8_t code; void* applications_socket = itss_0connect(facilities.zmq.applications_address, ZMQ_REQ); void* transport_socket = itss_0connect(facilities.zmq.transport_address, ZMQ_REQ); void* management_socket = itss_0connect(facilities.zmq.management_address, ZMQ_REQ); itss_queue_t* stream = itss_queue_new(); while (!facilities.exit) { pthread_mutex_lock(&queue->lock); while (!queue->len && !facilities.exit) { pthread_cond_wait(&queue->trigger, &queue->lock); } for (int i = 0; i < queue->len; ++i) { memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); stream->packet_len[i] = queue->packet_len[i]; stream->destination[i] = queue->destination[i]; strcpy(stream->info_msg[i], queue->info_msg[i]); stream->id[i] = queue->id[i]; } stream->len = queue->len; queue->len = 0; pthread_mutex_unlock(&queue->lock); for (int i = 0; i < stream->len; ++i) { switch (stream->destination[i]) { case ITSS_TRANSPORT: log_debug("-> %s ->[transport] | id:%08x size:%dB", stream->info_msg[i], (uint32_t) stream->id[i], stream->packet_len[i]); itss_0send(transport_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&transport_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[transport] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t) stream->id[i], stream->packet_len[i]); } break; case ITSS_APPLICATIONS: log_debug("-> %s ->[applications] | id:%08x size:%dB", stream->info_msg[i], (uint32_t) stream->id[i], stream->packet_len[i]); itss_0send(applications_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&applications_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[applications] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t) stream->id[i], stream->packet_len[i]); } break; case ITSS_MANAGEMENT: itss_0send(management_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&management_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[management] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t) stream->id[i], stream->packet_len[i]); } break; } } } itss_0close(transport_socket); itss_0close(management_socket); itss_0close(applications_socket); return NULL; } static void sigh(int signum) { facilities.exit = true; uint8_t code = 0; void* socket = itss_0connect(ZMQ_INTERNAL_ADDR, ZMQ_PAIR); itss_0send(socket, &code, sizeof(code)); itss_0close(socket); } int main() { signal(SIGTERM, sigh); signal(SIGINT, sigh); signal(SIGKILL, sigh); facilities.tx_queue = itss_queue_new(); lightship_init(); den_init(); infrastructure_init(); dissemination_init(); bulletin_init(); void* security_socket = NULL; time_t t; srand((unsigned) time(&t)); if (facilities_config()) { goto cleanup; } facilities.lightship.type = facilities.station_type; // Tx pthread_create(&facilities.transmitting, NULL, tx, NULL); // CA pthread_create(&facilities.ca_service, NULL, ca_service, NULL); // DEN pthread_create(&facilities.den_service, NULL, den_service, NULL); // Infrastructure pthread_create(&facilities.infrastructure_service, NULL, infrastructure_service, NULL); // CP if (facilities.dissemination.active) pthread_create(&facilities.cp_service, NULL, cp_service, NULL); // SA pthread_create(&facilities.sa_service, NULL, sa_service, NULL); // Tolling tolling_init(facilities.station_type); // VC if (facilities.coordination.active) { coordination_init(); pthread_create(&facilities.vc_service, NULL, vc_service, NULL); } security_socket = itss_0connect(facilities.zmq.security_address, ZMQ_REQ); uint8_t buffer[ITSS_SDU_MAX_LEN]; log_info("listening"); uint8_t code; bool in_idchange; int32_t rl; while (!facilities.exit) { itss_0poll(facilities.zmq.responders, facilities.zmq.n_responders, -1); for (int i = 0; i < facilities.zmq.n_responders; ++i) { if (facilities.zmq.responders[i].revents) { rl = itss_0recv(facilities.zmq.responders[i].socket, buffer, ITSS_SDU_MAX_LEN); switch (buffer[0]) { /* source */ case ITSS_INTERNAL: break; case ITSS_NETWORKING: networking_indication(facilities.zmq.responders[i].socket, buffer+1, rl); break; case ITSS_TRANSPORT: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { transport_indication(facilities.zmq.responders[i].socket, &security_socket, buffer+1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); } break; case ITSS_APPLICATIONS: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { facilities_request(facilities.zmq.responders[i].socket, buffer+1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); } break; case ITSS_MANAGEMENT: management_indication(facilities.zmq.responders[i].socket, buffer+1, rl); break; case ITSS_SECURITY: security_indication(facilities.zmq.responders[i].socket, buffer+1, rl); break; default: log_debug("<- unrecognized service"); code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); continue; } } } } // Exit cleanup: pthread_join(facilities.ca_service, NULL); pthread_join(facilities.den_service, NULL); pthread_join(facilities.infrastructure_service, NULL); if (facilities.dissemination.active) pthread_join(facilities.cp_service, NULL); pthread_join(facilities.sa_service, NULL); itss_queue_trigger(facilities.tx_queue); pthread_join(facilities.transmitting, NULL); if (facilities.coordination.active) pthread_join(facilities.vc_service, NULL); itss_0close(security_socket); for (int i = 0; i < facilities.zmq.n_responders; ++i) { itss_0close(facilities.zmq.responders[i].socket); } itss_0destroy(); log_info("exiting"); log_close(); return 0; }