#include "facilities.h" #include "cam.h" #include "config.h" #include "denm.h" #include "infrastructure.h" #include "requests.h" #include "cpm.h" #include "saem.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) #define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #define syslog_err(msg, ...) syslog(LOG_ERR, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #ifndef NDEBUG #define syslog_debug(msg, ...) syslog(LOG_DEBUG, "%s:%d " msg "", __func__, __LINE__, ##__VA_ARGS__) #else #define syslog_debug(msg, ...) #endif static int transport_indication(facilities_t *facilities, void* responder, void* security_socket, uint8_t *msg, uint32_t msg_len) { int rv = 0, code = 0; bool handled_msg = false; FacilitiesIndication_t *fi = NULL; SecurityRequest_t* sreq = NULL; SecurityReply_t* srep = NULL; TransportRequest_t* tr = NULL; uint16_t buf_len = 2048; uint8_t buf[2048]; TransportIndication_t* ti = calloc(1, sizeof(TransportIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_TransportIndication, (void**) &ti, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid TI received"); rv = 1; code = 1; zmq_send(responder, &code, 1, 0); goto cleanup; } zmq_send(responder, &code, 1, 0); syslog_debug("[facilities]<- received TI (%dB)", msg_len); TransportPacketIndication_t* tpi = &ti->choice.packet; // TODO bool fwd = false; asn_TYPE_descriptor_t *its_msg_descriptor = NULL; void *its_msg = NULL; switch (tpi->present) { case TransportPacketIndication_PR_btp: ; // Parse message switch (tpi->choice.btp.destinationPort) { case Port_cam: its_msg_descriptor = &asn_DEF_CAM; its_msg = calloc(1, sizeof(CAM_t)); handled_msg = true; break; case Port_denm: its_msg_descriptor = &asn_DEF_DENM; its_msg = calloc(1, sizeof(DENM_t)); handled_msg = true; break; case Port_ivim: its_msg_descriptor = &asn_DEF_IVIM; its_msg = calloc(1, sizeof(IVIM_t)); handled_msg = true; break; case Port_cpm: its_msg_descriptor = &asn_DEF_CPM; its_msg = calloc(1, sizeof(CPM_t)); handled_msg = true; break; case Port_saem: its_msg_descriptor = &asn_DEF_SAEM; its_msg = calloc(1, sizeof(SAEM_t)); handled_msg = true; break; case 7011: /* tolling */ its_msg_descriptor = &asn_DEF_TPM; its_msg = calloc(1, sizeof(TPM_t)); handled_msg = true; break; default: syslog_debug("[facilities] messsage with unhandled BTP port received, ignoring"); goto cleanup; } dec = uper_decode_complete(NULL, its_msg_descriptor, (void**) &its_msg, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); if (dec.code) { syslog_debug("[facilities]<- invalid %s received", its_msg_descriptor->name); rv = 1; goto cleanup; } // Get permisisons uint8_t* ssp = NULL; uint16_t ssp_len; if (tpi->choice.btp.gn.securityPermissions) { ssp = tpi->choice.btp.gn.securityPermissions->ssp.buf; ssp_len = tpi->choice.btp.gn.securityPermissions->ssp.size; } // Manage message switch (tpi->choice.btp.destinationPort) { case Port_cam: switch (check_cam(facilities, &tpi->choice.btp, its_msg, &facilities->epv, ssp, ssp_len)) { case CAM_OK: fwd = true; break; case CAM_INVALID: case CAM_BAD_PERMISSIONS: default: break; } break; case Port_denm: ; #ifdef DEBUG uint8_t* xml_denm = malloc(32768); asn_enc_rval_t rve = xer_encode_to_buffer(xml_denm, 32768, 0x02, &asn_DEF_DENM, its_msg); syslog_debug("DENM XER %d: %.*s", (int)rve.encoded, (int)rve.encoded , xml_denm); free(xml_denm); #endif int64_t id = -1; switch (event_manage(facilities->den, its_msg, &facilities->epv, &id, ssp, ssp_len)) { case EVENT_NEW: case EVENT_CANCELLATION: case EVENT_NEGATION: case EVENT_UPDATE: case EVENT_NUMBER_EXCEEDED: fwd = true; break; case EVENT_INVALID: case EVENT_PASSED: case EVENT_REPEATED: case EVENT_BAD_PERMISSIONS: break; } break; case Port_ivim: switch (service_eval(facilities->infrastructure, SERVICE_IVI, its_msg, &facilities->epv, &id, ssp, ssp_len)) { case SERVICE_NEW: case SERVICE_CANCELLATION: case SERVICE_NEGATION: case SERVICE_UPDATE: case SERVICE_NUMBER_EXCEEDED: fwd = true; break; case SERVICE_INVALID: case SERVICE_REPEATED: case SERVICE_PASSED: case SERVICE_BAD_PERMISSIONS: default: break; } break; case Port_saem: saem_check( facilities, &facilities->bulletin, its_msg, tpi->choice.btp.gn.securityNeighbour ? tpi->choice.btp.gn.securityNeighbour->buf : NULL ); break; case 7011: tpm_recv( facilities, its_msg, tpi->choice.btp.gn.securityNeighbour ? tpi->choice.btp.gn.securityNeighbour->buf : NULL ); break; default: break; } break; case TransportPacketIndication_PR_tcp: sreq = calloc(1, sizeof(SecurityRequest_t)); sreq->present = SecurityRequest_PR_tlsRecv; sreq->choice.tlsRecv.data.size = tpi->choice.tcp.data.size; sreq->choice.tlsRecv.data.buf = malloc(tpi->choice.tcp.data.size); memcpy(sreq->choice.tlsRecv.data.buf, tpi->choice.tcp.data.buf, tpi->choice.tcp.data.size); uint8_t b_sdu[2048]; b_sdu[0] = 4; asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_SecurityRequest, NULL, sreq, b_sdu+1, 2047); syslog_debug("[facilities]->[security] SecurityRequest.tlsRecv (%ldB)", enc.encoded+1); zmq_send(security_socket, b_sdu, enc.encoded+1, 0); int32_t rl = zmq_recv(security_socket, b_sdu, enc.encoded, 0); syslog_debug("[facilities]<-[security] SecurityReply.tlsRecv (%ldB)", enc.encoded); if (oer_decode(NULL, &asn_DEF_SecurityReply, (void**) &srep, b_sdu, rl).code) { syslog_err("[facilities] SecurityReply.tlsRecv decode failure"); rv = 1; goto cleanup; } if (srep->returnCode == SecurityReplyReturnCode_rejected) { syslog_err("[facilities] SecurityReply.tlsRecv rejected"); rv = 1; goto cleanup; } printf("n=%ld\n",srep->data->choice.tlsRecv.data.size); for (int m = 0; m < srep->data->choice.tlsRecv.data.size; ++m) { printf("%02x", srep->data->choice.tlsRecv.data.buf[m]); } printf("\n"); fflush(stdout); // Forward to [transport] tr = calloc(1, sizeof(TransportRequest_t)); tr->present = TransportRequest_PR_packet; tr->choice.packet.present = TransportPacketRequest_PR_tcp; TCPPacketRequest_t* tpr = &tr->choice.packet.choice.tcp; tpr->data.size = srep->data->choice.tlsRecv.data.size; tpr->data.buf = malloc(srep->data->choice.tlsRecv.data.size); memcpy(tpr->data.buf, srep->data->choice.tlsRecv.data.buf, srep->data->choice.tlsRecv.data.size); tpr->sourcePort = tpi->choice.tcp.destinationPort; tpr->destinationPort = tpi->choice.tcp.sourcePort; tpr->destinationAddress = calloc(1, sizeof(OCTET_STRING_t)); tpr->destinationAddress->buf = malloc(16); tpr->destinationAddress->size = 16; memcpy(tpr->destinationAddress->buf, tpi->choice.tcp.sourceAddress->buf, 16); tpr->destinationPort = 7011; tpr->sourcePort = 7011; tpr->gn = calloc(1, sizeof(GeonetworkingOutboundOptions_t)); tpr->gn->packetTransportType = PacketTransportType_shb; tpr->gn->destinationAddress.buf = calloc(1, 6); tpr->gn->destinationAddress.size = 6; buf[0] = 4; enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, tr, buf+1, buf_len-1); if (enc.encoded == -1) { syslog_err("TransportRequest encoding fail"); rv = 1; goto cleanup; } queue_add(facilities->tx_queue, buf, enc.encoded+1, 3); pthread_cond_signal(&facilities->tx_queue->trigger); break; case TransportPacketIndication_PR_udp: break; default: break; } // Forward to [applications] if (fwd) { fi = calloc(1, sizeof(FacilitiesIndication_t)); fi->present = FacilitiesIndication_PR_message; FacilitiesMessageIndication_t* fmi = &fi->choice.message; fmi->itsMessageType = tpi->choice.btp.destinationPort; fmi->data.size = tpi->choice.btp.data.size; fmi->data.buf = malloc(tpi->choice.btp.data.size); memcpy(fmi->data.buf, tpi->choice.btp.data.buf, tpi->choice.btp.data.size); uint8_t buffer[PACKET_MAX_LEN]; buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesIndication, NULL, fi, buffer+1, PACKET_MAX_LEN-1); queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5); pthread_cond_signal(&facilities->tx_queue->trigger); } cleanup: if (handled_msg && tpi->choice.btp.destinationPort != Port_denm && tpi->choice.btp.destinationPort != Port_ivim) { ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); } ASN_STRUCT_FREE(asn_DEF_TransportIndication, ti); ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); ASN_STRUCT_FREE(asn_DEF_SecurityRequest, sreq); ASN_STRUCT_FREE(asn_DEF_SecurityReply, srep); ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); return rv; } static int facilities_request(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; FacilitiesRequest_t *fr = calloc(1, sizeof(FacilitiesRequest_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesRequest, (void**) &fr, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid FR received"); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } switch (fr->present) { case FacilitiesRequest_PR_message: rv = facilities_request_single_message(facilities, responder, fr); break; case FacilitiesRequest_PR_data: switch (fr->choice.data.present) { case FacilitiesDataRequest_PR_activeEpisodes: rv = facilities_request_active_episodes(facilities, responder, fr); break; case FacilitiesDataRequest_PR_attributeTypes: rv = facilities_request_attribute_types(facilities, responder, fr); break; case FacilitiesDataRequest_PR_loadedProtectionZones: rv = facilities_request_loaded_protected_zones(facilities, responder, fr); break; default: syslog_err("[facilities] unrecognized FDR type received (%d)", fr->choice.data.present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } break; default: syslog_err("[facilities] unrecognized FR type received (%d)", fr->present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } cleanup: ASN_STRUCT_FREE(asn_DEF_FacilitiesRequest, fr); return rv; } static int security_indication(facilities_t *facilities, void* responder_secured, uint8_t *msg, uint32_t msg_len) { int rv = 0; SecurityIndication_t* si = calloc(1, sizeof(SecurityIndication_t)); SecurityResponse_t* sr = calloc(1, sizeof(SecurityResponse_t)); uint8_t buffer[64]; asn_enc_rval_t enc; asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_SecurityIndication, (void**) &si, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid SIndication received"); rv = 1; goto cleanup; } pthread_mutex_lock(&facilities->id.change.lock); if (facilities->id.change.stage == ID_CHANGE_BLOCKED) { pthread_mutex_unlock(&facilities->id.change.lock); syslog_debug("[facilities] identity change is currently blocked"); rv = 1; goto cleanup; } if (facilities->id.change.stage == ID_CHANGE_PREPARE && si->choice.idChangeEvent.command != SecurityIdChangeEventType_commit && si->choice.idChangeEvent.command != SecurityIdChangeEventType_abort) { pthread_mutex_unlock(&facilities->id.change.lock); syslog_debug("[facilities] current identity change state is prepare, but received identity change command is not commit nor abort"); rv = 1; goto cleanup; } bool id_changed = false; switch (si->choice.idChangeEvent.command) { case SecurityIdChangeEventType_prepare: facilities->id.change.stage = ID_CHANGE_PREPARE; pthread_mutex_lock(&facilities->id.lock); pthread_mutex_lock(&facilities->lightship->lock); break; case SecurityIdChangeEventType_commit: ; facilities->id.change.stage = ID_CHANGE_COMMIT; // Reset lightship for (int i = 0; i < facilities->lightship->pos_history_len; ++i) { free(facilities->lightship->pos_history[i]); } facilities->lightship->pos_history_len = 0; facilities->lightship->last_cam = 0; facilities->lightship->last_cam_lfc = 0; facilities->lightship->next_cam_max = 0; facilities->lightship->next_cam_min = 0; pthread_mutex_unlock(&facilities->lightship->lock); // Change Station ID for (int i = 0; i < si->choice.idChangeEvent.ids.list.count; ++i) { switch (si->choice.idChangeEvent.ids.list.array[i]->present) { case SecurityId_PR_stationId: facilities->id.station_id = si->choice.idChangeEvent.ids.list.array[i]->choice.stationId; break; case SecurityId_PR_ipv6Address: memcpy(facilities->id.ipv6_addr, si->choice.idChangeEvent.ids.list.array[i]->choice.ipv6Address.buf, 16); break; default: break; } } facilities->id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities->id.lock); id_changed = true; break; case SecurityIdChangeEventType_abort: pthread_mutex_unlock(&facilities->id.lock); pthread_mutex_unlock(&facilities->lightship->lock); facilities->id.change.stage = ID_CHANGE_INACTIVE; break; default: pthread_mutex_unlock(&facilities->id.change.lock); syslog_err("[networking]<- unhandled idChangeEvent command type"); rv = 1; goto cleanup; } sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 0; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); zmq_send(responder_secured, buffer, enc.encoded, 0); pthread_mutex_unlock(&facilities->id.change.lock); cleanup: if (rv) { sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 1; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); zmq_send(responder_secured, buffer, enc.encoded, 0); } pthread_mutex_unlock(&facilities->id.change.lock); ASN_STRUCT_FREE(asn_DEF_SecurityResponse, sr); ASN_STRUCT_FREE(asn_DEF_SecurityIndication, si); return rv; } static int networking_indication(facilities_t* facilities, void* responder, uint8_t* msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; NetworkingIndication_t* ni = calloc(1, sizeof(NetworkingIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_NetworkingIndication, (void**) &ni, msg, msg_len); if (dec.code) { rv = 1; code = 1; zmq_send(responder, &code, 1, 0); goto cleanup; } zmq_send(responder, &code, 1, 0); if (ni->present != NetworkingIndication_PR_data) { goto cleanup; } if (ni->choice.data.mobileNeighbour && *ni->choice.data.mobileNeighbour) { pthread_mutex_lock(&facilities->lightship->lock); facilities->lightship->last_vehicle = it2s_tender_get_clock(&facilities->epv); facilities->lightship->is_vehicle_near = true; pthread_mutex_unlock(&facilities->lightship->lock); } cleanup: ASN_STRUCT_FREE(asn_DEF_NetworkingIndication, ni); return rv; } static int management_indication(facilities_t* facilities, void* responder, uint8_t* msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; ManagementIndication_t* mi = calloc(1, sizeof(ManagementIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_ManagementIndication, (void**) &mi, msg, msg_len); if (dec.code) { rv = 1; code = 1; zmq_send(responder, &code, 1, 0); goto cleanup; } zmq_send(responder, &code, 1, 0); if (mi->present == ManagementIndication_PR_attributes) { it2s_tender_lock_space(&facilities->epv); facilities->epv.space.latitude = mi->choice.attributes.coordinates.latitude; facilities->epv.space.latitude_conf = mi->choice.attributes.coordinates.latitudeConfidence; facilities->epv.space.longitude = mi->choice.attributes.coordinates.longitude; facilities->epv.space.longitude_conf = mi->choice.attributes.coordinates.longitudeConfidence; facilities->epv.space.speed = mi->choice.attributes.speed.speedValue; facilities->epv.space.speed_conf = mi->choice.attributes.speed.speedConfidence; facilities->epv.space.heading = mi->choice.attributes.heading.headingValue; facilities->epv.space.heading_conf = mi->choice.attributes.heading.headingConfidence; facilities->epv.space.altitude = mi->choice.attributes.altitude.altitudeValue; facilities->epv.space.altitude_conf = mi->choice.attributes.altitude.altitudeConfidence; it2s_tender_unlock_space(&facilities->epv); it2s_tender_lock_time(&facilities->epv); asn_INTEGER2ulong(&mi->choice.attributes.clock, &facilities->epv.time.clock); it2s_tender_unlock_time(&facilities->epv); } cleanup: ASN_STRUCT_FREE(asn_DEF_ManagementIndication, mi); return rv; } void* tx(void* fc) { int rv = 0; facilities_t *facilities = (facilities_t*) fc; queue_t* queue = facilities->tx_queue; uint8_t code; int wait_ms = 1000; void* applications_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); zmq_setsockopt(applications_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(applications_socket, facilities->zmq.applications_address); void* transport_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); zmq_setsockopt(transport_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(transport_socket, facilities->zmq.transport_address); queue_t* stream = queue_init(); while (!facilities->exit) { pthread_mutex_lock(&queue->lock); while (!queue->len) { pthread_cond_wait(&queue->trigger, &queue->lock); } for (int i = 0; i < queue->len; ++i) { memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); stream->packet_len[i] = queue->packet_len[i]; stream->destination[i] = queue->destination[i]; } stream->len = queue->len; queue->len = 0; pthread_mutex_unlock(&queue->lock); for (int i = 0; i < stream->len; ++i) { switch (stream->destination[i]) { case 3: syslog_debug("[facilities]-> sending BDR to ->[transport] (%dB)", stream->packet_len[i]); zmq_send(transport_socket, stream->packet[i], stream->packet_len[i], 0); rv = zmq_recv(transport_socket, &code, 1, 0); if (rv == -1) {syslog_err("[facilities]-> timeout sending BDR to ->[transport]");} break; case 5: syslog_debug("[facilities]-> sending FDI to ->[applications] (%dB)", stream->packet_len[i]); zmq_send(applications_socket, stream->packet[i], stream->packet_len[i], 0); rv = zmq_recv(applications_socket, &code, 1, 0); if (rv == -1) {syslog_err("[facilities]-> timeout sending FDI to ->[applications]");} break; } } } return NULL; } int main() { int rv = 0; syslog_info("[facilities] starting"); facilities_t facilities; memset(&facilities, 0x00, sizeof(facilities_t)); facilities.zmq.ctx = zmq_ctx_new(); facilities.lightship = lightship_init(); facilities.dissemination = dissemination_init(); facilities.tx_queue = queue_init(); facilities.den = calloc(1, sizeof(den_t)); facilities.infrastructure = calloc(1, sizeof(infrastructure_t)); bulletin_init(&facilities.bulletin); time_t t; srand((unsigned) time(&t)); if (facilities_config(&facilities)) return 1; facilities.lightship->type = facilities.station_type; // Tx pthread_create(&facilities.transmitting, NULL, tx, (void*) &facilities); // CA pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); // DEN pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities); // Infrastructure pthread_create(&facilities.infrastructure_service, NULL, infrastructure_service, (void*) &facilities); // CPM pthread_create(&facilities.cp_service, NULL, cp_service, (void*) &facilities); // SA pthread_create(&facilities.sa_service, NULL, sa_service, (void*) &facilities); void* security_socket = zmq_socket(facilities.zmq.ctx, ZMQ_REQ); int wait_ms = 1000; zmq_setsockopt(security_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(security_socket, facilities.zmq.security_address); uint8_t buffer[PACKET_MAX_LEN]; syslog_info("[facilities] listening"); uint8_t code; bool in_idchange; int32_t rl; while (!facilities.exit) { zmq_poll(facilities.zmq.responders, facilities.zmq.n_responders, -1); for (int i = 0; i < facilities.zmq.n_responders; ++i) { if (facilities.zmq.responders[i].revents) { rl = zmq_recv(facilities.zmq.responders[i].socket, buffer, PACKET_MAX_LEN, 0); switch (buffer[0]) { case 2: networking_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, rl); break; case 3: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { transport_indication(&facilities, facilities.zmq.responders[i].socket, security_socket, buffer+1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); } break; case 5: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { facilities_request(&facilities, facilities.zmq.responders[i].socket, buffer+1, rl); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); } break; case 6: management_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, rl); break; case 7: security_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, rl); break; default: syslog_debug("[facilities] unrecognized service"); code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); continue; } } } } pthread_join(facilities.transmitting, NULL); pthread_join(facilities.ca_service, NULL); pthread_join(facilities.den_service, NULL); pthread_join(facilities.infrastructure_service, NULL); pthread_join(facilities.cp_service, NULL); return 0; }