#include "facilities.h" #include "cam.h" #include "config.h" #include "denm.h" #include "infrastructure.h" #include "requests.h" #include "cpm.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) #define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #define syslog_err(msg, ...) syslog(LOG_ERR, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) #ifndef NDEBUG #define syslog_debug(msg, ...) syslog(LOG_DEBUG, "%s:%d " msg "", __func__, __LINE__, ##__VA_ARGS__) #else #define syslog_debug(msg, ...) #endif static int transport_indication(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0, code = 0; bool handled_msg = false; FacilitiesDataIndication_t *fdi = NULL; BTPDataIndication_t *bdi = calloc(1, sizeof(BTPDataIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_BTPDataIndication, (void**) &bdi, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid bdi received"); rv = 1; code = 1; zmq_send(responder, &code, 1, 0); goto cleanup; } zmq_send(responder, &code, 1, 0); syslog_debug("[facilities]<- BDI (%ldB)", bdi->data.size); // Parse message asn_TYPE_descriptor_t *its_msg_descriptor; void *its_msg; switch (bdi->destinationPort) { case Port_cam: its_msg_descriptor = &asn_DEF_CAM; its_msg = calloc(1, sizeof(CAM_t)); handled_msg = true; break; case Port_denm: its_msg_descriptor = &asn_DEF_DENM; its_msg = calloc(1, sizeof(DENM_t)); handled_msg = true; break; case Port_ivim: its_msg_descriptor = &asn_DEF_IVIM; its_msg = calloc(1, sizeof(IVIM_t)); handled_msg = true; break; case Port_cpm: its_msg_descriptor = &asn_DEF_CPM; its_msg = calloc(1, sizeof(CPM_t)); handled_msg = true; break; default: syslog_debug("[facilities] messsage with unhandled BTP port received, ignoring"); goto cleanup; } dec = uper_decode_complete(NULL, its_msg_descriptor, (void**) &its_msg, bdi->data.buf, bdi->data.size); if (dec.code) { syslog_debug("[facilities]<- invalid %s received", its_msg_descriptor->name); rv = 1; goto cleanup; } // Get permisisons uint8_t* ssp = NULL; uint16_t ssp_len; if (bdi->gnPermissions) { ssp = bdi->gnPermissions->ssp.buf; ssp_len = bdi->gnPermissions->ssp.size; } bool fwd = false; // Manage message switch (bdi->destinationPort) { case Port_cam: switch (check_cam(facilities, bdi, its_msg, &facilities->epv, ssp, ssp_len)) { case CAM_OK: fwd = true; break; case CAM_INVALID: case CAM_BAD_PERMISSIONS: default: break; } break; case Port_denm: ; #ifdef DEBUG uint8_t* xml_denm = malloc(32768); asn_enc_rval_t rve = xer_encode_to_buffer(xml_denm, 32768, 0x02, &asn_DEF_DENM, its_msg); syslog_debug("DENM XER %d: %.*s", (int)rve.encoded, (int)rve.encoded , xml_denm); free(xml_denm); #endif int64_t id = -1; switch (event_manage(facilities->den, its_msg, &facilities->epv, &id, ssp, ssp_len)) { case EVENT_NEW: case EVENT_CANCELLATION: case EVENT_NEGATION: case EVENT_UPDATE: case EVENT_NUMBER_EXCEEDED: fwd = true; break; case EVENT_INVALID: case EVENT_PASSED: case EVENT_REPEATED: case EVENT_BAD_PERMISSIONS: break; } break; case Port_ivim: switch (service_eval(facilities->infrastructure, SERVICE_IVI, its_msg, &facilities->epv, &id, ssp, ssp_len)) { case SERVICE_NEW: case SERVICE_CANCELLATION: case SERVICE_NEGATION: case SERVICE_UPDATE: case SERVICE_NUMBER_EXCEEDED: fwd = true; break; case SERVICE_INVALID: case SERVICE_REPEATED: case SERVICE_PASSED: case SERVICE_BAD_PERMISSIONS: default: break; } break; default: break; } // Forward to application if (fwd) { fdi = calloc(1, sizeof(FacilitiesDataIndication_t)); fdi->itsMessageType = bdi->destinationPort; fdi->data.size = bdi->data.size; fdi->data.buf = malloc(bdi->data.size); memcpy(fdi->data.buf, bdi->data.buf, bdi->data.size); uint8_t buffer[PACKET_MAX_LEN]; buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, PACKET_MAX_LEN-1); queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5); pthread_cond_signal(&facilities->tx_queue->trigger); } cleanup: if (handled_msg && bdi->destinationPort != Port_denm && bdi->destinationPort != Port_ivim) { ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); } ASN_STRUCT_FREE(asn_DEF_BTPDataIndication, bdi); ASN_STRUCT_FREE(asn_DEF_FacilitiesDataIndication, fdi); return rv; } static int facilities_request(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; FacilitiesDataRequest_t *fdreq = calloc(1, sizeof(FacilitiesDataRequest_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_FacilitiesDataRequest, (void**) &fdreq, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid FDRequest received"); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } switch (fdreq->present) { case FacilitiesDataRequest_PR_singleMessage: rv = facilities_request_single_message(facilities, responder, fdreq); break; case FacilitiesDataRequest_PR_activeEvents: rv = facilities_request_active_events(facilities, responder, fdreq); break; case FacilitiesDataRequest_PR_attributeTypes: rv = facilities_request_attribute_types(facilities, responder, fdreq); break; default: syslog_err("[facilities] unrecognized FDR type received (%d)", fdreq->present); facilities_request_result_rejected(responder); rv = 1; goto cleanup; } cleanup: ASN_STRUCT_FREE(asn_DEF_FacilitiesDataRequest, fdreq); return rv; } static int security_indication(facilities_t *facilities, void* responder_secured, uint8_t *msg, uint32_t msg_len) { int rv = 0; SecurityIndication_t* si = calloc(1, sizeof(SecurityIndication_t)); SecurityResponse_t* sr = calloc(1, sizeof(SecurityResponse_t)); uint8_t buffer[64]; asn_enc_rval_t enc; asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_SecurityIndication, (void**) &si, msg, msg_len); if (dec.code) { syslog_err("[facilities]<- invalid SIndication received"); rv = 1; goto cleanup; } pthread_mutex_lock(&facilities->id.change.lock); if (facilities->id.change.stage == ID_CHANGE_BLOCKED) { pthread_mutex_unlock(&facilities->id.change.lock); syslog_debug("[facilities] identity change is currently blocked"); rv = 1; goto cleanup; } if (facilities->id.change.stage == ID_CHANGE_PREPARE && si->choice.idChangeEvent.command != SecurityIdChangeEventType_commit && si->choice.idChangeEvent.command != SecurityIdChangeEventType_abort) { pthread_mutex_unlock(&facilities->id.change.lock); syslog_debug("[facilities] current identity change state is prepare, but received identity change command is not commit nor abort"); rv = 1; goto cleanup; } switch (si->choice.idChangeEvent.command) { case SecurityIdChangeEventType_prepare: facilities->id.change.stage = ID_CHANGE_PREPARE; pthread_mutex_lock(&facilities->id.lock); pthread_mutex_lock(&facilities->lightship->lock); break; case SecurityIdChangeEventType_commit: ; facilities->id.change.stage = ID_CHANGE_COMMIT; // Reset lightship for (int i = 0; i < facilities->lightship->pos_history_len; ++i) { free(facilities->lightship->pos_history[i]); } facilities->lightship->pos_history_len = 0; facilities->lightship->last_cam = 0; facilities->lightship->last_cam_lfc = 0; facilities->lightship->next_cam_max = 0; facilities->lightship->next_cam_min = 0; pthread_mutex_unlock(&facilities->lightship->lock); // Change Station ID facilities->id.value = rand(); facilities->id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities->id.lock); break; case SecurityIdChangeEventType_abort: pthread_mutex_unlock(&facilities->id.lock); pthread_mutex_unlock(&facilities->lightship->lock); facilities->id.change.stage = ID_CHANGE_INACTIVE; break; default: pthread_mutex_unlock(&facilities->id.change.lock); syslog_err("[networking]<- unhandled idChangeEvent command type"); rv = 1; goto cleanup; } sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 0; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); zmq_send(responder_secured, buffer, enc.encoded, 0); if (facilities->id.change.stage == ID_CHANGE_INACTIVE) { // Inform management ManagementRequest_t* mreq = calloc(1, sizeof(ManagementRequest_t)); mreq->present = ManagementRequest_PR_attributes; mreq->choice.attributes.present = ManagementRequestAttributes_PR_set; mreq->choice.attributes.choice.set.stationID = malloc(sizeof(long)); *mreq->choice.attributes.choice.set.stationID = facilities->id.value; uint8_t b_oer[128]; asn_enc_rval_t enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_ManagementRequest, mreq, b_oer, 128); if (enc.encoded != -1) { int wait_ms = 1000; void* management_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); zmq_setsockopt(management_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(management_socket, facilities->zmq.management_address); zmq_send(management_socket, b_oer, enc.encoded, 0); uint8_t code; rv = zmq_recv(management_socket, &code, 1, 0); if (rv == -1) {syslog_err("[facilities]-> timeout sending MReq set.stationId to ->[management]");} zmq_close(management_socket); } ASN_STRUCT_FREE(asn_DEF_ManagementRequest, mreq); } pthread_mutex_unlock(&facilities->id.change.lock); cleanup: if (rv) { sr->present = SecurityResponse_PR_idChangeEvent; sr->choice.idChangeEvent.returnCode = 1; enc = oer_encode_to_buffer(&asn_DEF_SecurityResponse, NULL, sr, buffer, 64); zmq_send(responder_secured, buffer, enc.encoded, 0); } pthread_mutex_unlock(&facilities->id.change.lock); ASN_STRUCT_FREE(asn_DEF_SecurityResponse, sr); ASN_STRUCT_FREE(asn_DEF_SecurityIndication, si); return rv; } static int management_indication(facilities_t* facilities, void* responder, uint8_t* msg, uint32_t msg_len) { int rv = 0; uint8_t code = 0; ManagementIndication_t* mi = calloc(1, sizeof(ManagementIndication_t)); asn_dec_rval_t dec = oer_decode(NULL, &asn_DEF_ManagementIndication, (void**) &mi, msg, msg_len); if (dec.code) { rv = 1; code = 1; zmq_send(responder, &code, 1, 0); goto cleanup; } zmq_send(responder, &code, 1, 0); if (mi->present == ManagementIndication_PR_attributes) { it2s_tender_lock_space(&facilities->epv); facilities->epv.space.latitude = mi->choice.attributes.coordinates.latitude; facilities->epv.space.longitude = mi->choice.attributes.coordinates.longitude; facilities->epv.space.speed = mi->choice.attributes.speed.speedValue; facilities->epv.space.speed_conf = mi->choice.attributes.speed.speedConfidence; facilities->epv.space.heading = mi->choice.attributes.heading.headingValue; facilities->epv.space.heading_conf = mi->choice.attributes.heading.headingConfidence; facilities->epv.space.altitude = mi->choice.attributes.altitude.altitudeValue; facilities->epv.space.altitude_conf = mi->choice.attributes.altitude.altitudeConfidence; it2s_tender_unlock_space(&facilities->epv); it2s_tender_lock_time(&facilities->epv); asn_INTEGER2ulong(&mi->choice.attributes.clock, &facilities->epv.time.clock); it2s_tender_unlock_time(&facilities->epv); } cleanup: ASN_STRUCT_FREE(asn_DEF_ManagementIndication, mi); return rv; } void* tx(void* fc) { int rv = 0; facilities_t *facilities = (facilities_t*) fc; queue_t* queue = facilities->tx_queue; uint8_t code; int wait_ms = 1000; void* applications_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); zmq_setsockopt(applications_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(applications_socket, facilities->zmq.applications_address); void* transport_socket = zmq_socket(facilities->zmq.ctx, ZMQ_REQ); zmq_setsockopt(transport_socket, ZMQ_RCVTIMEO, &wait_ms, sizeof(int)); zmq_connect(transport_socket, facilities->zmq.transport_address); queue_t* stream = queue_init(); while (!facilities->exit) { pthread_mutex_lock(&queue->lock); while (!queue->len) { pthread_cond_wait(&queue->trigger, &queue->lock); } for (int i = 0; i < queue->len; ++i) { memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); stream->packet_len[i] = queue->packet_len[i]; stream->destination[i] = queue->destination[i]; } stream->len = queue->len; queue->len = 0; pthread_mutex_unlock(&queue->lock); for (int i = 0; i < stream->len; ++i) { switch (stream->destination[i]) { case 3: syslog_debug("[facilities]-> sending BDR to ->[transport] (%dB)", stream->packet_len[i]); zmq_send(transport_socket, stream->packet[i], stream->packet_len[i], 0); rv = zmq_recv(transport_socket, &code, 1, 0); if (rv == -1) {syslog_err("[facilities]-> timeout sending BDR to ->[transport]");} break; case 5: syslog_debug("[facilities]-> sending FDI to ->[applications] (%dB)", stream->packet_len[i]); zmq_send(applications_socket, stream->packet[i], stream->packet_len[i], 0); rv = zmq_recv(applications_socket, &code, 1, 0); if (rv == -1) {syslog_err("[facilities]-> timeout sending FDI to ->[applications]");} break; } } } return NULL; } int main() { int rv = 0; syslog_info("[facilities] starting"); facilities_t facilities; memset(&facilities, 0x00, sizeof(facilities_t)); facilities.zmq.ctx = zmq_ctx_new(); facilities.lightship = lightship_init(); facilities.dissemination = dissemination_init(); facilities.tx_queue = queue_init(); facilities.den = calloc(1, sizeof(den_t)); facilities.infrastructure = calloc(1, sizeof(infrastructure_t)); time_t t; srand((unsigned) time(&t)); if (facilities_config(&facilities)) return 1; facilities.lightship->type = facilities.station_type; // Tx pthread_create(&facilities.transmitting, NULL, tx, (void*) &facilities); // CA pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); // DEN pthread_create(&facilities.den_service, NULL, den_service, (void*) &facilities); // Infrastructure pthread_create(&facilities.infrastructure_service, NULL, infrastructure_service, (void*) &facilities); // CPM if(facilities.dissemination->active) pthread_create(&facilities.cp_service, NULL, cp_service, (void*) &facilities); uint8_t buffer[PACKET_MAX_LEN]; syslog_info("[facilities] listening"); uint8_t code; bool in_idchange; while (!facilities.exit) { zmq_poll(facilities.zmq.responders, facilities.zmq.n_responders, -1); for (int i = 0; i < facilities.zmq.n_responders; ++i) { if (facilities.zmq.responders[i].revents) { zmq_recv(facilities.zmq.responders[i].socket, buffer, PACKET_MAX_LEN, 0); switch (buffer[0]) { case 3: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { transport_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, PACKET_MAX_LEN-1); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); } break; case 5: in_idchange = true; pthread_mutex_lock(&facilities.id.change.lock); if (facilities.id.change.stage == ID_CHANGE_INACTIVE) { in_idchange = false; facilities.id.change.stage = ID_CHANGE_BLOCKED; } pthread_mutex_unlock(&facilities.id.change.lock); if (!in_idchange) { facilities_request(&facilities, facilities.zmq.responders[i].socket, buffer+1, PACKET_MAX_LEN-1); pthread_mutex_lock(&facilities.id.change.lock); facilities.id.change.stage = ID_CHANGE_INACTIVE; pthread_mutex_unlock(&facilities.id.change.lock); } else { code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); } break; case 6: management_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, PACKET_MAX_LEN-1); break; case 7: security_indication(&facilities, facilities.zmq.responders[i].socket, buffer+1, PACKET_MAX_LEN-1); break; default: syslog_debug("[facilities] unrecognized service"); code = 1; zmq_send(facilities.zmq.responders[i].socket, &code, 1, 0); continue; } } } } pthread_join(facilities.transmitting, NULL); pthread_join(facilities.ca_service, NULL); pthread_join(facilities.den_service, NULL); pthread_join(facilities.infrastructure_service, NULL); if(facilities.dissemination->active) pthread_join(facilities.cp_service, NULL); return 0; }