#include "facilities.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cam.h" #include "config.h" #include "cpm.h" #include "denm.h" #include "evm.h" #include "indications.h" #include "infrastructure.h" #include "requests.h" #include "saem.h" #include "tpm.h" #include "vcm.h" facilities_t facilities = {0}; static int transport_indication(void *responder, void **security_socket, uint8_t *msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; bool stored = false; TransportIndication_t *ti = calloc(1, sizeof(TransportIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_TransportIndication, (void **)&ti, msg, msg_len); if (dec.code) { log_error("<- invalid TI received"); rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); switch (ti->present) { case TransportIndication_PR_packet: break; case TransportIndication_PR_data: transport_data_indication(&ti->choice.data, security_socket); goto cleanup; default: log_debug("<- unrecognized TI.choice received"); rv = 1; goto cleanup; } TransportPacketIndication_t *tpi = &ti->choice.packet; // TODO // switch (tpi->present) { case TransportPacketIndication_PR_btp: log_debug("<- TI.packet.btp | id:%08x size:%dB", tpi->choice.btp.id, msg_len); transport_indication_btp(&tpi->choice.btp, security_socket); break; case TransportPacketIndication_PR_tcp: log_debug("<- TI.packet.tcp | id:%ld size:%dB", tpi->choice.tcp.id, msg_len); transport_indication_tcp(&tpi->choice.tcp, security_socket); break; case TransportPacketIndication_PR_udp: log_debug("<- TI.packet.udp | id:%ld size:%dB", tpi->choice.udp.id, msg_len); // TODO break; default: break; } cleanup: ASN_STRUCT_FREE(asn_DEF_TransportIndication, ti); return rv; } static int facilities_request(void *responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; FacilitiesRequest_t *fr = calloc(1, sizeof(FacilitiesRequest_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesRequest, (void **)&fr, msg, msg_len); if (dec.code) { log_error("<- invalid FR received"); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } switch (fr->present) { case FacilitiesRequest_PR_message: rv = facilities_request_single_message(responder, &fr->choice.message); break; case FacilitiesRequest_PR_data: switch (fr->choice.data.present) { case FacilitiesDataRequest_PR_activeEpisodes: rv = facilities_request_active_episodes(responder, fr); break; case FacilitiesDataRequest_PR_attributeTypes: rv = facilities_request_attribute_types(responder, fr); break; case FacilitiesDataRequest_PR_loadedProtectionZones: rv = facilities_request_loaded_protected_zones(responder, fr); break; case FacilitiesDataRequest_PR_chainInfoSet: rv = facilities_request_chaininfo_set(responder, &fr->choice.data.choice.chainInfoSet); break; default: log_error("<- unrecognized FDR type received (%d)", fr->choice.data.present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } break; default: log_error("<- unrecognized FR type received (%d)", fr->present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } cleanup: ASN_STRUCT_FREE(asn_DEF_FacilitiesRequest, fr); return rv; } static int security_indication(void *responder_secured, uint8_t *msg, uint32_t msg_len) { int rv = 0; SecurityIndication_t *si = calloc(1, sizeof(SecurityIndication_t)); SecurityResponse_t *sr = calloc(1, sizeof(SecurityResponse_t)); uint8_t buffer[64]; asn_enc_rval_t enc; asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_SecurityIndication, (void **)&si, msg, msg_len); if (dec.code) { log_error("<- invalid SIndication received"); rv = 1; goto cleanup; } pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_BLOCKED) { pthread_mutex_unlock(&facilities.id.change.lock); log_debug("identity change is currently blocked"); rv = 1; goto cleanup; } if (facilities.id.change.stage == ID_CHANGE_PREPARE && si->choice.idChangeEvent.command != SecurityIdChangeEventType_commit && si->choice.idChangeEvent.command != SecurityIdChangeEventType_abort) { pthread_mutex_unlock(&facilities.id.change.lock); log_debug("current identity change state is prepare, but received identity change command is not commit nor abort"); rv = 1; goto cleanup; } bool id_changed = false; switch (si->choice.idChangeEvent.command) { case SecurityIdChangeEventType_prepare: facilities.id.change.stage = ID_CHANGE_PREPARE; pthread_mutex_lock(&facilities.id.lock); pthread_mutex_lock(&facilities.lightship.lock); break; case SecurityIdChangeEventType_commit:; facilities.id.change.stage = ID_CHANGE_COMMIT; // Reset lightship for (int i = 0; i < facilities.lightship.path_history_len; ++i) { free(facilities.lightship.path_history[i]); } facilities.lightship.path_history_len = 0; facilities.lightship.t_last_cam = 0; facilities.lightship.t_last_cam_lfc = 0; facilities.lightship.next_cam_max = 0; facilities.lightship.next_cam_min = 0; pthread_mutex_unlock(&facilities.lightship.lock); // Change Station ID for (int i = 0; i < si->choice.idChangeEvent.ids.list.count; ++i) { switch (si->choice.idChangeEvent.ids.list.array[i]->present) { case SecurityId_PR_stationId: facilities.id.station_id = si->choice.idChangeEvent.ids.list.array[i]->choice.stationId; break; case SecurityId_PR_ipv6Address: memcpy(facilities.id.ipv6_addr, si->choice.idChangeEvent.ids.list.array[i]->choice.ipv6Address.buf, 16); break; default: break; } } facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.lock); id_changed = true; break; case SecurityIdChangeEventType_abort: pthread_mutex_unlock(&facilities.id.lock); pthread_mutex_unlock(&facilities.lightship.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; break; default: pthread_mutex_unlock(&facilities.id.change.lock); log_error("[networking]<- unhandled idChangeEvent command type"); rv = 1; goto cleanup; } sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 0; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); itss_0send(responder_secured, buffer, enc.encoded); pthread_mutex_unlock(&facilities.id.change.lock); cleanup: if (rv) { sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 1; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); itss_0send(responder_secured, buffer, enc.encoded); } pthread_mutex_unlock(&facilities.id.change.lock); ASN_STRUCT_FREE(asn_DEF_SecurityResponse, sr); ASN_STRUCT_FREE(asn_DEF_SecurityIndication, si); return rv; } static int networking_indication(void *responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; NetworkingIndication_t *ni = calloc(1, sizeof(NetworkingIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_NetworkingIndication, (void **)&ni, msg, msg_len); if (dec.code) { rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); if (ni->present != NetworkingIndication_PR_data) { goto cleanup; } if (ni->choice.data.mobileNeighbour && *ni->choice.data.mobileNeighbour) { pthread_mutex_lock(&facilities.lightship.lock); facilities.lightship.t_last_vehicle = itss_time_get(); facilities.lightship.is_vehicle_near = true; pthread_mutex_unlock(&facilities.lightship.lock); } cleanup: ASN_STRUCT_FREE(asn_DEF_NetworkingIndication, ni); return rv; } static int management_indication(void *responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; ManagementIndication_t *mi = calloc(1, sizeof(ManagementIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_ManagementIndication, (void **)&mi, msg, msg_len); if (dec.code) { rv = 1; code = 1; itss_0send(responder, &code, 1); goto cleanup; } itss_0send(responder, &code, 1); if (mi->present == ManagementIndication_PR_attributes) { itss_space_lock(); epv.space.latitude = mi->choice.attributes.coordinates.latitude; epv.space.latitude_conf = mi->choice.attributes.coordinates.latitudeConfidence; epv.space.longitude = mi->choice.attributes.coordinates.longitude; epv.space.longitude_conf = mi->choice.attributes.coordinates.longitudeConfidence; epv.space.speed = mi->choice.attributes.speed.speedValue; epv.space.speed_conf = mi->choice.attributes.speed.speedConfidence; epv.space.heading = mi->choice.attributes.heading.headingValue; epv.space.heading_conf = mi->choice.attributes.heading.headingConfidence; epv.space.accel = mi->choice.attributes.acceleration.longitudinalAccelerationValue; epv.space.altitude = mi->choice.attributes.altitude.altitudeValue; epv.space.altitude_conf = mi->choice.attributes.altitude.altitudeConfidence; itss_space_unlock(); itss_trajectory_lock(); if (mi->choice.attributes.trajectory) { epv.trajectory.len = mi->choice.attributes.trajectory->list.count; for (int i = 0; i < mi->choice.attributes.trajectory->list.count; ++i) { epv.trajectory.path[i].latitude = mi->choice.attributes.trajectory->list.array[i]->latitude; epv.trajectory.path[i].longitude = mi->choice.attributes.trajectory->list.array[i]->longitude; asn_INTEGER2ulong(&mi->choice.attributes.trajectory->list.array[i]->timestamp, (unsigned long long *)&epv.trajectory.path[i].timestamp); } } itss_trajectory_unlock(); itss_time_lock(); asn_INTEGER2ulong(&mi->choice.attributes.clock, (unsigned long long *)&epv.time.clock); itss_time_unlock(); } cleanup: ASN_STRUCT_FREE(asn_DEF_ManagementIndication, mi); return rv; } void *tx() { int rv = 0; itss_queue_t *queue = facilities.tx_queue; uint8_t code; void *applications_socket = itss_0connect(facilities.zmq.applications_address, ZMQ_REQ); void *transport_socket = itss_0connect(facilities.zmq.transport_address, ZMQ_REQ); void *management_socket = itss_0connect(facilities.zmq.management_address, ZMQ_REQ); itss_queue_t *stream = itss_queue_new(); while (!facilities.exit) { pthread_mutex_lock(&queue->lock); while (!queue->len && !facilities.exit) { pthread_cond_wait(&queue->trigger, &queue->lock); } for (int i = 0; i < queue->len; ++i) { memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); stream->packet_len[i] = queue->packet_len[i]; stream->destination[i] = queue->destination[i]; strcpy(stream->info_msg[i], queue->info_msg[i]); stream->id[i] = queue->id[i]; } stream->len = queue->len; queue->len = 0; pthread_mutex_unlock(&queue->lock); for (int i = 0; i < stream->len; ++i) { switch (stream->destination[i]) { case ITSS_TRANSPORT: log_debug("-> %s ->[transport] | id:%08x size:%dB", stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); itss_0send(transport_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&transport_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[transport] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); } break; case ITSS_APPLICATIONS: log_debug("-> %s ->[applications] | id:%08x size:%dB", stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); itss_0send(applications_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&applications_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[applications] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); } break; case ITSS_MANAGEMENT: itss_0send(management_socket, stream->packet[i], stream->packet_len[i]); rv = itss_0recv_rt(&management_socket, &code, 1, stream->packet[i], stream->packet_len[i], 1000); if (rv == -1) { log_error("-> %s ->[management] | id:%08x size:%dB ", stream->info_msg[i], (uint32_t)stream->id[i], stream->packet_len[i]); } break; } } } itss_0close(transport_socket); itss_0close(management_socket); itss_0close(applications_socket); return NULL; } static void sigh(int signum) { facilities.exit = true; uint8_t code = 0; void *socket = itss_0connect(ZMQ_INTERNAL_ADDR, ZMQ_PAIR); itss_0send(socket, &code, sizeof(code)); itss_0close(socket); } int main() { signal(SIGTERM, sigh); signal(SIGINT, sigh); signal(SIGKILL, sigh); facilities.tx_queue = itss_queue_new(); lightship_init(); den_init(); infrastructure_init(); dissemination_init(); bulletin_init(); void *security_socket = NULL; if (facilities_config()) { goto cleanup; } facilities.lightship.type = facilities.station_type; // Tx pthread_create(&facilities.transmitting, NULL, tx, NULL); // CA pthread_create(&facilities.ca_service, NULL, ca_service, NULL); // DEN pthread_create(&facilities.den_service, NULL, den_service, NULL); // Infrastructure pthread_create(&facilities.infrastructure_service, NULL, infrastructure_service, NULL); // CP if (facilities.dissemination.active) pthread_create(&facilities.cp_service, NULL, cp_service, NULL); // SA pthread_create(&facilities.sa_service, NULL, sa_service, NULL); // Tolling tolling_init(facilities.station_type); // VC if (facilities.coordination.active) { coordination_init(); pthread_create(&facilities.vc_service, NULL, vc_service, NULL); } // EVCSN if (facilities.evm_args.activate) pthread_create(&facilities.evcsn_service, NULL, evcsn_service, NULL); security_socket = itss_0connect(facilities.zmq.security_address, ZMQ_REQ); uint8_t buffer[ITSS_SDU_MAX_LEN]; log_info("listening"); uint8_t code; bool in_idchange; int32_t rl; while (!facilities.exit) { itss_0poll(facilities.zmq.responders, facilities.zmq.n_responders, -1); for (int i = 0; i < facilities.zmq.n_responders; ++i) { if (facilities.zmq.responders[i].revents) { rl = itss_0recv(facilities.zmq.responders[i].socket, buffer, ITSS_SDU_MAX_LEN); switch (buffer[0]) { /* source */ case ITSS_INTERNAL: break; case ITSS_NETWORKING: networking_indication(facilities.zmq.responders[i].socket, buffer + 1, rl); break; case ITSS_TRANSPORT: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { transport_indication(facilities.zmq.responders[i].socket, &security_socket, buffer + 1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); } break; case ITSS_APPLICATIONS: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { facilities_request(facilities.zmq.responders[i].socket, buffer + 1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); } break; case ITSS_MANAGEMENT: management_indication(facilities.zmq.responders[i].socket, buffer + 1, rl); break; case ITSS_SECURITY: security_indication(facilities.zmq.responders[i].socket, buffer + 1, rl); break; default: log_debug("<- unrecognized service"); code = 1; itss_0send(facilities.zmq.responders[i].socket, &code, 1); continue; } } } } // Exit cleanup: if (facilities.evm_args.activate) pthread_join(facilities.evcsn_service, NULL); pthread_join(facilities.ca_service, NULL); pthread_join(facilities.den_service, NULL); pthread_join(facilities.infrastructure_service, NULL); if (facilities.dissemination.active) pthread_join(facilities.cp_service, NULL); pthread_join(facilities.sa_service, NULL); itss_queue_trigger(facilities.tx_queue); pthread_join(facilities.transmitting, NULL); if (facilities.coordination.active) pthread_join(facilities.vc_service, NULL); itss_0close(security_socket); for (int i = 0; i < facilities.zmq.n_responders; ++i) { itss_0close(facilities.zmq.responders[i].socket); } itss_0destroy(); log_info("exiting"); log_close(); return 0; }