Multiple VCM-replies delays

This commit is contained in:
emanuel 2022-11-16 17:26:41 +00:00
parent c583f7a0e2
commit 5ad8f67514
2 changed files with 97 additions and 21 deletions

View File

@ -273,6 +273,7 @@ cleanup:
static int vcm_check_handle_reply(VCM_t* vcm, mc_neighbour_s* neighbour) {
int rv = 0;
coordination_t* coordination = &facilities.coordination;
CoordinationReply_t* reply = &vcm->vcm.maneuverContainer.choice.vehicle.negotiation->choice.reply;
itss_time_lock();
@ -280,7 +281,46 @@ static int vcm_check_handle_reply(VCM_t* vcm, mc_neighbour_s* neighbour) {
itss_time_unlock();
if (neighbour->intersecting) {
log_info("[vc] received VCM.reply with %d accepted trajectories | took %ld us", reply->acceptedTrajectoriesIds.list.count, now_us-neighbour->t_iid);
for (int t = 0; t < reply->acceptedTrajectoriesIds.list.count; ++t) {
int tid = *reply->acceptedTrajectoriesIds.list.array[t];
log_debug("---chedking tid: %d @ %d", tid, t);
if (tid > MC_TRAJECTORIES_N_MAX - 1) {
return 1;
}
for (int n = 0; n < coordination->session.n_affs_neighs[tid]; ++n) {
if (neighbour->station_id == coordination->session.affs[tid][n]) {
for (int i = n; i < coordination->session.n_affs_neighs[tid] - 1; ++i) {
coordination->session.affs[tid][i] = coordination->session.affs[tid][i+1];
}
--coordination->session.n_affs_neighs[tid];
break;
}
}
}
bool all_rep = true;
for (int t = 0; t < coordination->session.n_affs_trjs; ++t) {
if (coordination->session.n_affs_neighs[t] != 0) {
all_rep = false;
break;
}
}
char fin[50];
fin[0] = 0;
if (all_rep) {
free(coordination->session.req);
coordination->session.req = NULL;
memset(coordination->session.affs, 0, sizeof(coordination->session.affs));
memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs));
coordination->session.n_affs_trjs = 0;
sprintf(fin, ", all replies received");
}
log_info("[vc] received VCM.reply from %d with %d accepted trajectories%s | took %ld us",
vcm->header.stationID,
reply->acceptedTrajectoriesIds.list.count,
fin,
now_us-neighbour->t_iid);
} else {
log_info("[vc] received VCM.reply is response to another ITS-S VCM.request");
}
@ -309,19 +349,15 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) {
uint64_t now = itss_time_get();
if (now < neighbour->t_iid/1000 + MC_RESOLUTION_TIMEOUT) {
rv = 0;
goto cleanup;
}
neighbour->intersecting = true;
itss_time_lock();
uint64_t now_us = itss_ts_get(TIME_MICROSECONDS) ;
itss_time_unlock();
neighbour->t_iid = now_us;
if (now < neighbour->t_proposal + MC_RESOLUTION_TIMEOUT) {
if (now < neighbour->t_iid/1000 + MC_RESOLUTION_TIMEOUT ||
now < neighbour->t_proposal + MC_RESOLUTION_TIMEOUT) {
if (coordination->session.req) {
free(coordination->session.req);
coordination->session.req = NULL;
memset(coordination->session.affs, 0, sizeof(coordination->session.affs));
memset(coordination->session.n_affs_neighs, 0, sizeof(coordination->session.n_affs_neighs));
coordination->session.n_affs_trjs = 0;
}
rv = 0;
goto cleanup;
}
@ -411,9 +447,16 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) {
pt->offer = 5;
pt->priority = 1;
pt->id = rand();
pt->id = 0;
uint32_t intneigh[MC_MAX_NEIGHBOURS];
itss_time_lock();
uint64_t now_us = itss_ts_get(TIME_MICROSECONDS) ;
itss_time_unlock();
neighbour->intersecting = true;
neighbour->t_iid = now_us;
uint32_t intneigh[15];
intneigh[0] = neighbour->station_id;
int n_intneigh = 1;
for (int n = 0; n < coordination->neighbours_len && n_intneigh < 15; ++n) {
@ -429,6 +472,8 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) {
coordination->neighbours[n].length, coordination->neighbours[n].width,
&index
)) {
coordination->neighbours[n].intersecting = true;
coordination->neighbours[n].t_iid = now_us;
intneigh[n_intneigh] = coordination->neighbours[n].station_id;
++n_intneigh;
}
@ -458,6 +503,11 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) {
}
ssize_t vcm_req_len = (enc.encoded + 7) / 8;
coordination->session.req = vcm_req;
coordination->session.n_affs_trjs = 1;
coordination->session.n_affs_neighs[0] = n_intneigh;
memcpy(coordination->session.affs[0], intneigh, sizeof(intneigh));
tr = calloc(1, sizeof(TransportRequest_t));
tr->present = TransportRequest_PR_packet;
tr->choice.packet.present = TransportPacketRequest_PR_btp;
@ -520,7 +570,6 @@ static int intersection_detected(VCM_t* vcm, mc_neighbour_s* neighbour) {
}
cleanup:
ASN_STRUCT_FREE(asn_DEF_VCM, vcm_req);
ASN_STRUCT_FREE(asn_DEF_TransportRequest, tr);
ASN_STRUCT_FREE(asn_DEF_FacilitiesIndication, fi);
return rv;

View File

@ -8,6 +8,9 @@
#define MC_MAX_NEIGHBOURS 256
#define MC_RESOLUTION_TIMEOUT 1000
#define MC_TRAJECTORIES_N_MAX 4
#define MC_AFF_STATIONS_N_MAX 15
#define MC_HASH_LINK_LEN 32
typedef struct {
uint32_t station_id;
@ -18,11 +21,11 @@ typedef struct {
uint8_t trajectory_len;
bool intersecting;
uint64_t t_iid; /* initial intersection detection */
uint64_t t_iid; /* timestamp of initial intersection detection */
bool proposed;
uint32_t nonce;
uint64_t t_proposal;
uint64_t t_proposal; /* timestamp of last request received */
} mc_neighbour_s;
typedef struct coordination {
@ -30,10 +33,19 @@ typedef struct coordination {
pthread_mutex_t lock;
uint64_t t_last_send_vcm;
uint64_t t_last_send_vcm; /* timestamp of last sent (basic) VCM */
uint64_t vcm_period_min;
uint64_t vcm_period_max;
struct {
VCM_t* req; /* last VCM.request sent */
uint32_t affs[MC_TRAJECTORIES_N_MAX][MC_AFF_STATIONS_N_MAX]; /* trjs over affected stations */
uint8_t n_affs_trjs;
uint8_t n_affs_neighs[MC_TRAJECTORIES_N_MAX];
} session;
void* mgmt_socket;
mc_neighbour_s neighbours[MC_MAX_NEIGHBOURS];
@ -41,12 +53,27 @@ typedef struct coordination {
struct {
uint64_t id;
uint8_t link[32];
uint8_t link[MC_HASH_LINK_LEN];
itss_region_t region;
} chain;
} coordination_t;
/**
* Analyses a VCM
* @param vcm The VCM to be analyzed
* @return 0 on success, other value otherwise
*/
int vcm_check(VCM_t* vcm);
/**
* VC loop, threaded
* @return NULL
*/
void* vc_service();
/**
* Initializes the main VC struct
* @return Nothing
*/
void coordination_init();