diff --git a/src/config.c b/src/config.c index 525d3c5..66dc69d 100644 --- a/src/config.c +++ b/src/config.c @@ -354,9 +354,9 @@ int facilities_config() { facilities.tolling.station.obu.client_id = config->facilities.tpm.client_id; // VCM - facilities.coordination.active = config->facilities.vcm.activate; - facilities.coordination.vcm_period_min = config->facilities.vcm.period_min; - facilities.coordination.vcm_period_max = config->facilities.vcm.period_max; + facilities.coordination.active = config->facilities.mcm.activate; + facilities.coordination.vcm_period_min = config->facilities.mcm.period_min; + facilities.coordination.vcm_period_max = config->facilities.mcm.period_max; // Replay facilities.replay = config->networking.replay.activate; @@ -471,7 +471,7 @@ int facilities_config() { mreq->choice.attributes.choice.get.clockType = 1; mreq->choice.attributes.choice.get.clock = 1; mreq->choice.attributes.choice.get.clockOffset = 1; - mreq->choice.attributes.choice.get.trajectory = config->facilities.vcm.activate; + mreq->choice.attributes.choice.get.trajectory = config->facilities.mcm.activate; void* management_socket = itss_0connect(facilities.zmq.management_address, ZMQ_REQ); uint8_t b_tx[256], b_rx[256]; asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_ManagementRequest, NULL, mreq, b_tx, 256); @@ -517,7 +517,7 @@ int facilities_config() { asn_INTEGER2ulong(mrep->data->choice.attributes.clock, (unsigned long long*) &epv.time.clock); asn_INTEGER2ulong(mrep->data->choice.attributes.clockOffset,(unsigned long long*) &epv.time.offset); - if (config->facilities.vcm.activate) { + if (config->facilities.mcm.activate) { epv.trajectory.len = mrep->data->choice.attributes.trajectory->list.count; for (int i = 0; i < mrep->data->choice.attributes.trajectory->list.count; ++i) { epv.trajectory.path[i].latitude = mrep->data->choice.attributes.trajectory->list.array[i]->latitude; diff --git a/src/vcm.c b/src/vcm.c index 69537de..17b12b3 100644 --- a/src/vcm.c +++ b/src/vcm.c @@ -46,6 +46,84 @@ static int do_paths_intersect( return 0; } +static void tx_vcm(VCM_t* vcm) { + const uint16_t buf_len = 2048; + uint8_t buf[buf_len]; + TransportRequest_t* tr = NULL; + FacilitiesIndication_t* fi = NULL; + asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_VCM, NULL, vcm, buf, buf_len); + if (enc.encoded == -1) { + log_error("[vc] VCM.reply encode failure (%s)", enc.failed_type->name); + goto cleanup; + } + ssize_t vcm_rep_len = (enc.encoded + 7) / 8; + + tr = calloc(1, sizeof(TransportRequest_t)); + tr->present = TransportRequest_PR_packet; + tr->choice.packet.present = TransportPacketRequest_PR_btp; + BTPPacketRequest_t* bpr = &tr->choice.packet.choice.btp; + bpr->btpType = BTPType_btpB; + bpr->id = itss_id(buf, vcm_rep_len); + bpr->gn.destinationAddress.buf = malloc(6); + for (int i = 0; i < 6; ++i) { + bpr->gn.destinationAddress.buf[i] = 0xff; + } + bpr->gn.destinationAddress.size = 6; + bpr->gn.packetTransportType = PacketTransportType_shb; + bpr->destinationPort = 2043; + bpr->gn.trafficClass = 2; + bpr->data.buf = malloc(vcm_rep_len); + memcpy(bpr->data.buf, buf, vcm_rep_len); + bpr->data.size = vcm_rep_len; + buf[0] = 4; + enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_TransportRequest, tr, buf+1, buf_len-1); + if (enc.encoded == -1) { + log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); + goto cleanup; + } + + itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); + + fi = calloc(1, sizeof(FacilitiesIndication_t)); + fi->present = FacilitiesIndication_PR_message; + fi->choice.message.id = bpr->id; + fi->choice.message.itsMessageType = 2043; + fi->choice.message.data.size = bpr->data.size; + fi->choice.message.data.buf = malloc(bpr->data.size); + memcpy(fi->choice.message.data.buf, bpr->data.buf, bpr->data.size); + buf[0] = 4; + enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_FacilitiesIndication, fi, buf+1, buf_len-1); + if (enc.encoded == -1) { + log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); + goto cleanup; + } + + itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_APPLICATIONS, bpr->id, "FI.message"); + + facilities.coordination.t_last_send_vcm = itss_time_get(); + + if (facilities.logging.recorder) { + uint16_t buffer_len = 2048; + uint8_t buffer[buffer_len]; + int e = itss_management_record_packet_sdu( + buffer, + buffer_len, + bpr->data.buf, + bpr->data.size, + bpr->id, + itss_time_get(), + ITSS_FACILITIES, + true); + if (e != -1) { + itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); + } + } + +cleanup: + ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); + ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); +} + static void vcm_reject(VCM_t* vcm, mc_neighbour_s* neighbour) { coordination_t* coordination = &facilities.coordination; @@ -136,81 +214,10 @@ static void vcm_reject(VCM_t* vcm, mc_neighbour_s* neighbour) { mvc_rep->negotiation->choice.reply.requesterId = vcm->header.stationID; mvc_rep->negotiation->choice.reply.nonce = vcm->vcm.maneuverContainer.choice.vehicle.negotiation->choice.request.nonce; - asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_VCM, NULL, vcm_rep, buf1, buf_len); - if (enc.encoded == -1) { - log_error("[vc] VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - ssize_t vcm_rep_len = (enc.encoded + 7) / 8; - - tr = calloc(1, sizeof(TransportRequest_t)); - tr->present = TransportRequest_PR_packet; - tr->choice.packet.present = TransportPacketRequest_PR_btp; - BTPPacketRequest_t* bpr = &tr->choice.packet.choice.btp; - bpr->btpType = BTPType_btpB; - bpr->id = itss_id(buf1, vcm_rep_len); - bpr->gn.destinationAddress.buf = malloc(6); - for (int i = 0; i < 6; ++i) { - bpr->gn.destinationAddress.buf[i] = 0xff; - } - bpr->gn.destinationAddress.size = 6; - bpr->gn.packetTransportType = PacketTransportType_shb; - bpr->destinationPort = 2043; - bpr->gn.trafficClass = 2; - bpr->data.buf = malloc(vcm_rep_len); - memcpy(bpr->data.buf, buf1, vcm_rep_len); - bpr->data.size = vcm_rep_len; - buf1[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_TransportRequest, tr, buf1+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); - - fi = calloc(1, sizeof(FacilitiesIndication_t)); - fi->present = FacilitiesIndication_PR_message; - fi->choice.message.id = bpr->id; - fi->choice.message.itsMessageType = 2043; - fi->choice.message.data.size = bpr->data.size; - fi->choice.message.data.buf = malloc(bpr->data.size); - memcpy(fi->choice.message.data.buf, bpr->data.buf, bpr->data.size); - buf1[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_FacilitiesIndication, fi, buf1+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, bpr->id, "FI.message"); - - coordination->t_last_send_vcm = now; - - if (facilities.logging.recorder) { - uint16_t buffer_len = 2048; - uint8_t buffer[buffer_len]; - int e = itss_management_record_packet_sdu( - buffer, - buffer_len, - bpr->data.buf, - bpr->data.size, - bpr->id, - itss_time_get(), - ITSS_FACILITIES, - true); - if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); - } - } + tx_vcm(vcm_rep); cleanup: ASN_STRUCT_FREE(asn_DEF_VCM, vcm_rep); - ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); - ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); } static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { @@ -222,6 +229,14 @@ static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { uint32_t my_station_id = facilities.id.station_id; pthread_mutex_unlock(&facilities.id.lock); + uint64_t now = itss_time_get(); + + if (coordination->session.req && now < coordination->session.ts + MC_RESOLUTION_TIMEOUT) { /* in maneuver */ + log_debug("[vc] rejecting VCM from %d - currently in maneuver", vcm->header.stationID); + vcm_reject(vcm, neighbour); + return 1; + } + // Is request for me? CoordinationRequest_t* request = &vcm->vcm.maneuverContainer.choice.vehicle.negotiation->choice.request; bool is_req4me = false; @@ -230,23 +245,18 @@ static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { log_debug("[vc] VCM.request meant for %d", *request->desiredTrajectories.list.array[i]->affectingStations.list.array[j]); if (my_station_id == *request->desiredTrajectories.list.array[i]->affectingStations.list.array[j]) { is_req4me = true; - break; } + coordination->session.affs[i][j] = *request->desiredTrajectories.list.array[i]->affectingStations.list.array[j]; } + coordination->session.n_affs_neighs[i] = request->desiredTrajectories.list.array[i]->affectingStations.list.count; } + coordination->session.n_affs_trjs = request->desiredTrajectories.list.count; + if (!is_req4me) { log_debug("[vc] received VCM.request not affecting me"); return 0; } - uint64_t now = itss_time_get(); - - if (coordination->session.req && now < coordination->session.ts + MC_RESOLUTION_TIMEOUT) { /* in maneuver */ - log_debug("[vc] rejecting VCM from %d - currently in maneuver", vcm->header.stationID); - vcm_reject(vcm, neighbour); - return 1; - } - const ssize_t buf_len = 512; uint8_t buf1[buf_len], buf2[buf_len]; @@ -311,9 +321,11 @@ static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { itss_st_t trajectoryA[TRAJECTORY_MAX_LEN+1]; /* ego trajectory */ ssize_t trajectoryA_len = 0; - - neighbour->proposed = true; neighbour->t_proposal = now; + neighbour->proposed = true; + neighbour->session.requested = true; + neighbour->session.nonce = request->nonce; + coordination->session.requester = neighbour; vcm_rep = calloc(1, sizeof(VCM_t)); @@ -340,14 +352,14 @@ static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { } memcpy(vcm_rep->vcm.chain->link.buf, coordination->chain.links[to_link], 32); - vcm->vcm.chain->area = calloc(1, sizeof(ChainInformationArea_t)); - vcm->vcm.chain->area->trees.list.count = 1; - vcm->vcm.chain->area->trees.list.size = 1 * sizeof(OCTET_STRING_t*); - vcm->vcm.chain->area->trees.list.array = malloc(1 * sizeof(OCTET_STRING_t*)); + vcm_rep->vcm.chain->area = calloc(1, sizeof(ChainInformationArea_t)); + vcm_rep->vcm.chain->area->trees.list.count = 1; + vcm_rep->vcm.chain->area->trees.list.size = 1 * sizeof(OCTET_STRING_t*); + vcm_rep->vcm.chain->area->trees.list.array = malloc(1 * sizeof(OCTET_STRING_t*)); for (int q = 0; q < 1; ++q) { - vcm->vcm.chain->area->trees.list.array[q] = calloc(1, sizeof(OCTET_STRING_t)); - vcm->vcm.chain->area->trees.list.array[q]->buf = malloc(4); - vcm->vcm.chain->area->trees.list.array[q]->size = 4; + vcm_rep->vcm.chain->area->trees.list.array[q] = calloc(1, sizeof(OCTET_STRING_t)); + vcm_rep->vcm.chain->area->trees.list.array[q]->buf = malloc(4); + vcm_rep->vcm.chain->area->trees.list.array[q]->size = 4; } } @@ -400,160 +412,179 @@ static int vcm_check_handle_request(VCM_t* vcm, mc_neighbour_s* neighbour) { mvc_rep->negotiation->choice.reply.requesterId = vcm->header.stationID; mvc_rep->negotiation->choice.reply.nonce = request->nonce; - enc = uper_encode_to_buffer(&asn_DEF_VCM, NULL, vcm_rep, buf1, buf_len); - if (enc.encoded == -1) { - log_error("[vc] VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - ssize_t vcm_rep_len = (enc.encoded + 7) / 8; - - tr = calloc(1, sizeof(TransportRequest_t)); - tr->present = TransportRequest_PR_packet; - tr->choice.packet.present = TransportPacketRequest_PR_btp; - BTPPacketRequest_t* bpr = &tr->choice.packet.choice.btp; - bpr->btpType = BTPType_btpB; - bpr->id = itss_id(buf1, vcm_rep_len); - bpr->gn.destinationAddress.buf = malloc(6); - for (int i = 0; i < 6; ++i) { - bpr->gn.destinationAddress.buf[i] = 0xff; - } - bpr->gn.destinationAddress.size = 6; - bpr->gn.packetTransportType = PacketTransportType_shb; - bpr->destinationPort = 2043; - bpr->gn.trafficClass = 2; - bpr->data.buf = malloc(vcm_rep_len); - memcpy(bpr->data.buf, buf1, vcm_rep_len); - bpr->data.size = vcm_rep_len; - buf1[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_TransportRequest, tr, buf1+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); - - fi = calloc(1, sizeof(FacilitiesIndication_t)); - fi->present = FacilitiesIndication_PR_message; - fi->choice.message.id = bpr->id; - fi->choice.message.itsMessageType = 2043; - fi->choice.message.data.size = bpr->data.size; - fi->choice.message.data.buf = malloc(bpr->data.size); - memcpy(fi->choice.message.data.buf, bpr->data.buf, bpr->data.size); - buf1[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_FacilitiesIndication, fi, buf1+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - - itss_queue_send(facilities.tx_queue, buf1, enc.encoded+1, ITSS_APPLICATIONS, bpr->id, "FI.message"); - - coordination->t_last_send_vcm = now; - - if (facilities.logging.recorder) { - uint16_t buffer_len = 2048; - uint8_t buffer[buffer_len]; - int e = itss_management_record_packet_sdu( - buffer, - buffer_len, - bpr->data.buf, - bpr->data.size, - bpr->id, - itss_time_get(), - ITSS_FACILITIES, - true); - if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); - } - } + tx_vcm(vcm_rep); cleanup: ASN_STRUCT_FREE(asn_DEF_VCM, vcm_rep); - ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); - ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); return rv; } +static bool is_maneuver_approved(CoordinationReply_t* reply, mc_neighbour_s* neighbour) { + coordination_t* coordination = &facilities.coordination; + if (reply->acceptedTrajectoriesIds.list.count == 0) { + ASN_STRUCT_FREE(asn_DEF_VCM, coordination->session.req); + coordination->session.req = NULL; + coordination->session.ts = itss_time_get(); + memset(coordination->session.affs, 0, sizeof(coordination->session.affs)); + memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs)); + coordination->session.n_affs_trjs = 0; + neighbour->intersecting = false; + neighbour->proposed = false; + return false; + } + for (int t = 0; t < reply->acceptedTrajectoriesIds.list.count; ++t) { + int tid = *reply->acceptedTrajectoriesIds.list.array[t]; + if (tid > MC_TRAJECTORIES_N_MAX - 1) { + return false; + } + for (int n = 0; n < coordination->session.n_affs_neighs[tid]; ++n) { + if (neighbour->station_id == coordination->session.affs[tid][n]) { + for (int i = n; i < coordination->session.n_affs_neighs[tid] - 1; ++i) { + coordination->session.affs[tid][i] = coordination->session.affs[tid][i+1]; + } + --coordination->session.n_affs_neighs[tid]; + break; + } + } + } + bool all_rep = true; + for (int t = 0; t < coordination->session.n_affs_trjs; ++t) { + if (coordination->session.n_affs_neighs[t] != 0) { + all_rep = false; + break; + } + } + if (all_rep) { + ASN_STRUCT_FREE(asn_DEF_VCM, coordination->session.req); + coordination->session.req = NULL; + coordination->session.ts = itss_time_get(); + memset(coordination->session.affs, 0, sizeof(coordination->session.affs)); + memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs)); + coordination->session.n_affs_trjs = 0; + return true; + } + return false; +} + + static int vcm_check_handle_reply(VCM_t* vcm, mc_neighbour_s* neighbour) { int rv = 0; coordination_t* coordination = &facilities.coordination; - if (!coordination->session.req) { - log_debug("[vc] unknown reply context - session expired or another ITS-S rejected"); - return 1; - } - CoordinationReply_t* reply = &vcm->vcm.maneuverContainer.choice.vehicle.negotiation->choice.reply; itss_time_lock(); uint64_t now_us = itss_ts_get(TIME_MICROSECONDS); itss_time_unlock(); - if (neighbour->intersecting) { + bool send_commit = false; - if (reply->acceptedTrajectoriesIds.list.count == 0) { - log_debug("[vc] maneuver %lld rejected by %d", reply->requesterId, vcm->header.stationID); - ASN_STRUCT_FREE(asn_DEF_VCM, coordination->session.req); - coordination->session.req = NULL; - coordination->session.ts = itss_time_get(); - memset(coordination->session.affs, 0, sizeof(coordination->session.affs)); - memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs)); - coordination->session.n_affs_trjs = 0; - neighbour->intersecting = false; - neighbour->proposed = false; + if (neighbour->intersecting) { /* REQUESTER */ + + if (!coordination->session.req) { + log_debug("[vc] unknown reply context - session expired or another ITS-S rejected"); return 1; } - for (int t = 0; t < reply->acceptedTrajectoriesIds.list.count; ++t) { - int tid = *reply->acceptedTrajectoriesIds.list.array[t]; - if (tid > MC_TRAJECTORIES_N_MAX - 1) { - return 1; - } - for (int n = 0; n < coordination->session.n_affs_neighs[tid]; ++n) { - if (neighbour->station_id == coordination->session.affs[tid][n]) { - for (int i = n; i < coordination->session.n_affs_neighs[tid] - 1; ++i) { - coordination->session.affs[tid][i] = coordination->session.affs[tid][i+1]; - } - --coordination->session.n_affs_neighs[tid]; - break; - } - } + + if (!is_maneuver_approved(reply, neighbour)) { + return 1; } - bool all_rep = true; - for (int t = 0; t < coordination->session.n_affs_trjs; ++t) { - if (coordination->session.n_affs_neighs[t] != 0) { - all_rep = false; - break; - } - } - char fin[50]; - fin[0] = 0; - if (all_rep) { - ASN_STRUCT_FREE(asn_DEF_VCM, coordination->session.req); - coordination->session.req = NULL; - coordination->session.ts = itss_time_get(); - memset(coordination->session.affs, 0, sizeof(coordination->session.affs)); - memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs)); - coordination->session.n_affs_trjs = 0; - sprintf(fin, ", all replies received"); - } - log_info("[vc] received VCM.reply from %d with %d accepted trajectories%s | took %ld us", + + log_info("[vc] received VCM.reply from %d with %d accepted trajectories | took %ld us", vcm->header.stationID, reply->acceptedTrajectoriesIds.list.count, - fin, now_us-neighbour->t_iid); neighbour->intersecting = false; neighbour->proposed = false; - } else { - log_info("[vc] received VCM.reply is response to another ITS-S VCM.request"); + if (coordination->protocol == MC_PROTOCOL_REQ_REP_COM) { + send_commit = true; + } + + } else { /* REPLIER */ + log_debug("[vc] received VCM.reply is response to another ITS-S VCM.request"); + + if (coordination->protocol == MC_PROTOCOL_REQ_REP) { + return 1; + } + + /* Reply not meant for current session */ + if (reply->nonce != coordination->session.nonce) { + return 1; + } + + if (!coordination->session.requester) { + return 1; + } + + if (!is_maneuver_approved(reply, coordination->session.requester)) { + send_commit = true; + } } + if (send_commit) { + VCM_t* vcm_com = calloc(1, sizeof(VCM_t)); + vcm->header.messageID = 43; + vcm->header.protocolVersion = 1; + pthread_mutex_lock(&facilities.id.lock); + vcm->header.stationID = facilities.id.station_id; + pthread_mutex_unlock(&facilities.id.lock); + + int32_t lat, lon; + itss_space_lock(); + itss_space_get(); + lat = epv.space.latitude; + lon = epv.space.longitude; + itss_space_unlock(); + vcm_com->vcm.currentPosition.latitude = lat; + vcm_com->vcm.currentPosition.longitude = lon; + + uint64_t now = itss_time_get(); + asn_ulong2INTEGER(&vcm_com->vcm.currentPosition.timestamp, now); + + vcm_com->vcm.maneuverContainer.present = ManeuverContainer_PR_vehicle; + ManeuverVehicleContainer_t* mvc = &vcm_com->vcm.maneuverContainer.choice.vehicle; + + // Vehicle Dimensions + mvc->vehicleLength.vehicleLengthValue= facilities.vehicle.length; + mvc->vehicleLength.vehicleLengthConfidenceIndication = 0; + mvc->vehicleWidth = facilities.vehicle.width; + + itss_st_t trajectoryA[TRAJECTORY_MAX_LEN+1]; /* ego trajectory */ + uint16_t trajectoryA_len = 0; + + itss_trajectory_lock(); + trajectoryA_len = epv.trajectory.len; + memcpy(trajectoryA + 1, epv.trajectory.path, trajectoryA_len * sizeof(itss_st_t)); + itss_trajectory_unlock(); + + trajectoryA[0].latitude = lat; + trajectoryA[0].longitude = lon; + trajectoryA[0].timestamp = now; + ++trajectoryA_len; + + mvc->plannedTrajectory.list.count = trajectoryA_len - 1; + mvc->plannedTrajectory.list.size = (trajectoryA_len - 1) * sizeof(void*); + mvc->plannedTrajectory.list.array = malloc((trajectoryA_len - 1) * sizeof(void*)); + for (int i = 0; i < trajectoryA_len - 1; ++i) { + mvc->plannedTrajectory.list.array[i] = calloc(1, sizeof(STPoint_t)); + mvc->plannedTrajectory.list.array[i]->deltaLatitude = trajectoryA[i+1].latitude - trajectoryA[i].latitude; + mvc->plannedTrajectory.list.array[i]->deltaLongitude = trajectoryA[i+1].longitude - trajectoryA[i].longitude; + mvc->plannedTrajectory.list.array[i]->deltaTime = trajectoryA[i+1].timestamp - trajectoryA[i].timestamp; + } + + mvc->negotiation = calloc(1, sizeof(CoordinationNegotiation_t)); + mvc->negotiation->present = CoordinationNegotiation_PR_commit; + mvc->negotiation->choice.commit.nonce = coordination->session.nonce; + mvc->negotiation->choice.commit.requesterId = coordination->session.requester->station_id; + + tx_vcm(vcm_com); + ASN_STRUCT_FREE(asn_DEF_VCM, vcm); + } + + return rv; } @@ -652,7 +683,7 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) { ProposedTrajectory_t* pt = mvc->negotiation->choice.request.desiredTrajectories.list.array[0]; mvc->negotiation->choice.request.requesterId = vcm->header.stationID; mvc->negotiation->choice.request.nonce = rand(); - neighbour->nonce = mvc->negotiation->choice.request.nonce; + neighbour->session.nonce = mvc->negotiation->choice.request.nonce; pt->trajectory.list.count = trajectoryA_len - 1; pt->trajectory.list.size = sizeof(void*) * (trajectoryA_len - 1); @@ -714,86 +745,14 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) { } log_debug("[vc] this VCM.request affects %d station(s):%s", n_intneigh, buffer); - asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_VCM, NULL, vcm_req, buf, buf_len); - if (enc.encoded == -1) { - log_error("[vc] VCM.request encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - ssize_t vcm_req_len = (enc.encoded + 7) / 8; - coordination->session.req = vcm_req; coordination->session.ts = now; coordination->session.n_affs_trjs = 1; coordination->session.n_affs_neighs[0] = n_intneigh; memcpy(coordination->session.affs[0], intneigh, sizeof(intneigh)); - tr = calloc(1, sizeof(TransportRequest_t)); - tr->present = TransportRequest_PR_packet; - tr->choice.packet.present = TransportPacketRequest_PR_btp; - BTPPacketRequest_t* bpr = &tr->choice.packet.choice.btp; - bpr->id = itss_id(buf, vcm_req_len); - bpr->btpType = BTPType_btpB; - bpr->gn.destinationAddress.buf = malloc(6); - for (int i = 0; i < 6; ++i) { - bpr->gn.destinationAddress.buf[i] = 0xff; - } - bpr->gn.destinationAddress.size = 6; - bpr->gn.packetTransportType = PacketTransportType_shb; - bpr->destinationPort = 2043; - bpr->gn.trafficClass = 2; - bpr->data.buf = malloc(vcm_req_len); - memcpy(bpr->data.buf, buf, vcm_req_len); - bpr->data.size = vcm_req_len; - buf[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_TransportRequest, tr, buf+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.request encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } + tx_vcm(vcm_req); - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); - - fi = calloc(1, sizeof(FacilitiesIndication_t)); - fi->present = FacilitiesIndication_PR_message; - fi->choice.message.id = bpr->id; - fi->choice.message.itsMessageType = 2043; - fi->choice.message.data.size = bpr->data.size; - fi->choice.message.data.buf = malloc(bpr->data.size); - memcpy(fi->choice.message.data.buf, bpr->data.buf, bpr->data.size); - buf[0] = 4; - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_FacilitiesIndication, fi, buf+1, buf_len-1); - if (enc.encoded == -1) { - log_error("[vc] TR VCM.reply encode failure (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_APPLICATIONS, bpr->id, "FI.message"); - - coordination->t_last_send_vcm = now; - - if (facilities.logging.recorder) { - uint16_t buffer_len = 2048; - uint8_t buffer[buffer_len]; - int e = itss_management_record_packet_sdu( - buffer, - buffer_len, - bpr->data.buf, - bpr->data.size, - bpr->id, - itss_time_get(), - ITSS_FACILITIES, - true); - if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); - } - } - -cleanup: - ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr); - ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi); return rv; } @@ -862,43 +821,43 @@ static void intersection_check(VCM_t* vcm, mc_neighbour_s* neighbour) { } static int vcm_check_handle_commit(VCM_t* vcm, mc_neighbour_s* neighbour) { + coordination_t* coo = &facilities.coordination; + return 0; } +static mc_neighbour_s* get_neighbour(VCM_t* vcm) { + coordination_t* coordination = &facilities.coordination; + + mc_neighbour_s* neighbour = NULL; + + // Find neighbours + for (int i = 0; i < coordination->neighbours_len; ++i) { + if (coordination->neighbours[i].station_id == vcm->header.stationID) { + return &coordination->neighbours[i]; + } + } + + // Add it if not found + if (coordination->neighbours_len < MC_MAX_NEIGHBOURS) { + coordination->neighbours[coordination->neighbours_len].station_id = vcm->header.stationID; + ++coordination->neighbours_len; + return &coordination->neighbours[coordination->neighbours_len-1]; + } + + return NULL; +} int vcm_check(VCM_t* vcm) { coordination_t* coordination = &facilities.coordination; int rv = 0; - uint64_t now = itss_time_get(); - pthread_mutex_lock(&coordination->lock); - // Find neighbours - int ni = -1; - for (int i = 0; i < coordination->neighbours_len; ++i) { - if (coordination->neighbours[i].station_id == vcm->header.stationID) { - ni = i; - break; - } - } - - // Add it if not found - if (ni == -1 && coordination->neighbours_len < MC_MAX_NEIGHBOURS) { - coordination->neighbours[coordination->neighbours_len].station_id = vcm->header.stationID; - ni = coordination->neighbours_len; - ++coordination->neighbours_len; - } - - if (ni == -1) { - rv = 1; - goto cleanup; - } - - mc_neighbour_s* neighbour = &coordination->neighbours[ni]; + mc_neighbour_s* neighbour = get_neighbour(vcm); switch (vcm->vcm.maneuverContainer.present) { case ManeuverContainer_PR_vehicle: @@ -955,7 +914,7 @@ cleanup: return rv; } -static int mk_vcm(uint8_t* vcm_uper, uint16_t* vcm_uper_len) { +static int mk_vcm() { int rv = 0; coordination_t* coordination = &facilities.coordination; @@ -1032,13 +991,7 @@ static int mk_vcm(uint8_t* vcm_uper, uint16_t* vcm_uper_len) { } } - asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_VCM, NULL, vcm, vcm_uper, 512); - if (enc.encoded == -1) { - log_error("[vc] failed encoding VCM (%s)", enc.failed_type->name); - rv = 1; - goto cleanup; - } - *vcm_uper_len = (enc.encoded + 7) / 8; + tx_vcm(vcm); cleanup: ASN_STRUCT_FREE(asn_DEF_VCM, vcm); @@ -1061,32 +1014,6 @@ void* vc_service() { coordination_t* coordination = (coordination_t*) &facilities.coordination; - uint8_t vcm[512]; - - TransportRequest_t* tr = calloc(1, sizeof(TransportRequest_t)); - tr->present = TransportRequest_PR_packet; - tr->choice.packet.present = TransportPacketRequest_PR_btp; - BTPPacketRequest_t* bpr = &tr->choice.packet.choice.btp; - bpr->btpType = BTPType_btpB; - bpr->gn.destinationAddress.buf = malloc(6); - for (int i = 0; i < 6; ++i) { - bpr->gn.destinationAddress.buf[i] = 0xff; - } - bpr->gn.destinationAddress.size = 6; - bpr->gn.packetTransportType = PacketTransportType_shb; - bpr->destinationPort = 2043; - bpr->gn.trafficClass = 2; - bpr->data.buf = malloc(512); - - FacilitiesIndication_t* fi = calloc(1, sizeof(FacilitiesIndication_t)); - fi = calloc(1, sizeof(FacilitiesIndication_t)); - fi->present = FacilitiesIndication_PR_message; - fi->choice.message.itsMessageType = 2043; - fi->choice.message.data.buf = malloc(512); - - uint8_t buf[1024]; - buf[0] = 4; - int rv; while (!facilities.exit) { @@ -1095,49 +1022,11 @@ void* vc_service() { pthread_mutex_lock(&coordination->lock); if (vcm_timer_check(coordination, now)) { - rv = mk_vcm(bpr->data.buf, (uint16_t *) &bpr->data.size); + rv = mk_vcm(); if (rv) { pthread_mutex_unlock(&coordination->lock); continue; } - - bpr->id = itss_id(bpr->data.buf, bpr->data.size); - - asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_TransportRequest, NULL, tr, buf+1, 1023); - if (enc.encoded == -1) { - log_error("encoding TR for VCM failed"); - continue; - } - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_TRANSPORT, bpr->id, "TR.packet.btp"); - - fi->choice.message.id = bpr->id; - fi->choice.message.data.size = bpr->data.size; - memcpy(fi->choice.message.data.buf, bpr->data.buf, bpr->data.size); - enc = asn_encode_to_buffer(NULL, ATS_CANONICAL_OER, &asn_DEF_FacilitiesIndication, fi, buf+1, 1023); - if (enc.encoded == -1) { - log_error("encoding FI for VCM failed"); - continue; - } - itss_queue_send(facilities.tx_queue, buf, enc.encoded+1, ITSS_APPLICATIONS, bpr->id, "FI.message"); - - if (facilities.logging.recorder) { - uint16_t buffer_len = 2048; - uint8_t buffer[buffer_len]; - int e = itss_management_record_packet_sdu( - buffer, - buffer_len, - bpr->data.buf, - bpr->data.size, - bpr->id, - itss_time_get(), - ITSS_FACILITIES, - true); - if (e != -1) { - itss_queue_send(facilities.tx_queue, buffer, e, ITSS_MANAGEMENT, bpr->id, "MReq.packet.set"); - } - } - - coordination->t_last_send_vcm = now; } pthread_mutex_unlock(&coordination->lock); diff --git a/src/vcm.h b/src/vcm.h index a745907..f08b115 100644 --- a/src/vcm.h +++ b/src/vcm.h @@ -11,12 +11,25 @@ #define MC_TRAJECTORIES_N_MAX 4 #define MC_AFF_STATIONS_N_MAX 7 #define MC_HASH_LINK_LEN 32 +#define MC_SESSIONS_N_MAX 16 -typedef struct { +typedef enum MC_PROTOCOL { + MC_PROTOCOL_REQ_REP, + MC_PROTOCOL_REQ_REP_COM +} MC_PROTOCOL_e; + +typedef struct mc_neighbour { uint32_t station_id; uint16_t length; uint16_t width; + struct { + uint32_t nonce; + bool requested; + bool replied; + bool committed; + } session; + itss_st_t trajectory[TRAJECTORY_MAX_LEN + 1]; uint8_t trajectory_len; @@ -24,13 +37,14 @@ typedef struct { uint64_t t_iid; /* timestamp of initial intersection detection */ bool proposed; - uint32_t nonce; uint64_t t_proposal; /* timestamp of last request received */ } mc_neighbour_s; typedef struct coordination { bool active; + MC_PROTOCOL_e protocol; + pthread_mutex_t lock; uint64_t t_last_send_vcm; /* timestamp of last sent (basic) VCM */ @@ -40,6 +54,9 @@ typedef struct coordination { struct { VCM_t* req; /* last VCM.request sent */ uint64_t ts; + uint32_t nonce; + + mc_neighbour_s* requester; uint32_t affs[MC_TRAJECTORIES_N_MAX][MC_AFF_STATIONS_N_MAX]; /* trjs over affected stations */ uint8_t n_affs_trjs; @@ -52,6 +69,7 @@ typedef struct coordination { mc_neighbour_s neighbours[MC_MAX_NEIGHBOURS]; uint16_t neighbours_len; + struct { uint64_t id; uint8_t links[MC_AFF_STATIONS_N_MAX + 1][MC_HASH_LINK_LEN];