From aaf157b7ee2f33020c9d8f266c5d847bff7975f5 Mon Sep 17 00:00:00 2001 From: emanuel Date: Tue, 27 Oct 2020 19:21:51 +0000 Subject: [PATCH] TX queue cond based --- src/CMakeLists.txt | 1 + src/cam.c | 71 +++++++++++++++++++++++++--------------------- src/facilities.c | 13 +++++---- src/facilities.h | 4 +++ src/queue.c | 5 +++- src/queue.h | 3 +- 6 files changed, 58 insertions(+), 39 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f78253a..0c4a1a0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,6 +9,7 @@ TARGET_LINK_LIBRARIES(it2s-itss-facilities -lit2s-asn-itss-facilities -lit2s-asn-itss-transport -lzmq + -lit2s-gps -lpthread -lit2s-asn-camv2 -lit2s-asn-ivim diff --git a/src/cam.c b/src/cam.c index 038d394..b243511 100644 --- a/src/cam.c +++ b/src/cam.c @@ -10,6 +10,7 @@ #include #include #include +#include #define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__) #define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__) @@ -57,6 +58,11 @@ static SpeedConfidence_t getSpeedConfidence(uint32_t conf) { static int mk_cam(uint8_t *cam, uint32_t *cam_len) { int rv = 0; + struct timespec st; + if (clock_gettime(CLOCK_REALTIME, &st)) { + syslog_emerg("clock_gettime() failed"); + } + CAM_t *cam_tx = calloc(1, sizeof(CAM_t)); cam_tx->header.protocolVersion = 2; @@ -106,27 +112,27 @@ static int mk_cam(uint8_t *cam, uint32_t *cam_len) { generationdeltatime = generationdeltatime % 64536; cam_tx->cam.generationDeltaTime = generationdeltatime; -// struct it2s_gps_data gps_data; -// it2s_gps_read(&gps_data); - // if (!isnan(gps_data->gps_altitude)) { - // cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = (int32_t)((gps_data->gps_altitude) * 10); - // cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = getAltitudeConfidence(gps_data->gps_epv); - // } else { - cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = AltitudeValue_unavailable; - cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = AltitudeConfidence_unavailable; - // } + struct it2s_gps_data gps_data; + it2s_gps_read(&gps_data); + if (!isnan(gps_data.gps_altitude)) { + cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = (int32_t)((gps_data.gps_altitude) * 10); + cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = getAltitudeConfidence(gps_data.gps_epv); + } else { + cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = AltitudeValue_unavailable; + cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = AltitudeConfidence_unavailable; + } - // if (!isnan(gps_data->gps_latitude)) { - // cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = (int32_t)((gps_data->gps_latitude) * 10000000); - // } else { - cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = DeltaLatitude_unavailable; - // } + if (!isnan(gps_data.gps_latitude)) { + cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = (int32_t)((gps_data.gps_latitude) * 10000000); + } else { + cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = DeltaLatitude_unavailable; + } - // if (!isnan(gps_data->gps_longitude)) { - // cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = (int32_t)((gps_data->gps_longitude) * 10000000); - // } else { - cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = Longitude_unavailable; - // } + if (!isnan(gps_data.gps_longitude)) { + cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = (int32_t)((gps_data.gps_longitude) * 10000000); + } else { + cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = Longitude_unavailable; + } cam_tx->cam.camParameters.basicContainer.referencePosition.positionConfidenceEllipse.semiMinorConfidence = SemiAxisLength_unavailable; cam_tx->cam.camParameters.basicContainer.referencePosition.positionConfidenceEllipse.semiMajorConfidence = SemiAxisLength_unavailable; @@ -134,22 +140,22 @@ static int mk_cam(uint8_t *cam, uint32_t *cam_len) { cam_tx->cam.camParameters.highFrequencyContainer.present = HighFrequencyContainer_PR_basicVehicleContainerHighFrequency; -// if (!isnan(gps_data->gps_track)) { -// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = ((uint32_t)(gps_data->gps_track * 10)); -// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = getHeadingConfidence((uint32_t)(gps_data->gps_epd * 10)); -// } else { + if (!isnan(gps_data.gps_track)) { + cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = ((uint32_t)(gps_data.gps_track * 10)); + cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = getHeadingConfidence((uint32_t)(gps_data.gps_epd * 10)); + } else { cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = HeadingValue_unavailable; cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = HeadingConfidence_unavailable; -// } -// -// if (!isnan(gps_data->gps_speed)) { -// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = ((uint32_t)(gps_data->gps_speed * 100)); // cm/s -// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = getSpeedConfidence((uint32_t)(gps_data->gps_eps * 100)); -// } else { + } + + if (!isnan(gps_data.gps_speed)) { + cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = ((uint32_t)(gps_data.gps_speed * 100)); // cm/s + cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = getSpeedConfidence((uint32_t)(gps_data.gps_eps * 100)); + } else { cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = SpeedValue_unavailable; cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = SpeedConfidence_unavailable; -// } + } asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_CAM, NULL, cam_tx, cam, 256); if (enc.encoded == -1) { @@ -172,7 +178,7 @@ void *ca_service(void *fc) { facilities_t *facilities = (facilities_t*) fc; void* h = zmq_socket(facilities->ctx, ZMQ_REQ); - zmq_connect(h, "ipc:///tmp/itss/transport"); + zmq_connect(h, TRANSPORT_ADDRESS); BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t)); @@ -206,8 +212,9 @@ void *ca_service(void *fc) { } queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3); + pthread_cond_signal(&facilities->tx_queue->trigger); - usleep(100000); + usleep(1000*1000); } ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); diff --git a/src/facilities.c b/src/facilities.c index 38be2be..96b8fcf 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -101,6 +101,7 @@ static int transport_indication(facilities_t *facilities, void* responder, uint8 asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, 511); queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5); + pthread_cond_signal(&facilities->tx_queue->trigger); cleanup: if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); @@ -215,6 +216,7 @@ static int facilities_request(facilities_t *facilities, void* responder, uint8_t } queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3); + pthread_cond_signal(&facilities->tx_queue->trigger); } @@ -297,15 +299,18 @@ void* tx(void* fc) { uint8_t code; void* application_socket = zmq_socket(facilities->ctx, ZMQ_REQ); - zmq_connect(application_socket, "ipc:///tmp/itss/application"); + zmq_connect(application_socket, APPLICATION_ADDRESS); void* transport_socket = zmq_socket(facilities->ctx, ZMQ_REQ); - zmq_connect(transport_socket, "ipc:///tmp/itss/transport"); + zmq_connect(transport_socket, TRANSPORT_ADDRESS); queue_t* stream = queue_init(); while (!facilities->exit) { pthread_mutex_lock(&queue->lock); + while (!queue->len) { + pthread_cond_wait(&queue->trigger, &queue->lock); + } for (int i = 0; i < queue->len; ++i) { memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); stream->packet_len[i] = queue->packet_len[i]; @@ -330,8 +335,6 @@ void* tx(void* fc) { break; } } - - usleep(400); } return NULL; @@ -346,7 +349,7 @@ int main() { void *context = zmq_ctx_new(); facilities.ctx = context; void *responder = zmq_socket(context, ZMQ_REP); - int rc = zmq_bind(responder, "ipc:///tmp/itss/facilities"); + int rc = zmq_bind(responder, FACILITIES_ADDRESS); facilities.tx_queue = queue_init(); diff --git a/src/facilities.h b/src/facilities.h index 290f346..f02a913 100644 --- a/src/facilities.h +++ b/src/facilities.h @@ -7,6 +7,10 @@ #include "denm.h" #include "queue.h" +#define FACILITIES_ADDRESS "ipc:///tmp/itss/facilities" +#define TRANSPORT_ADDRESS "ipc:///tmp/itss/transport" +#define APPLICATION_ADDRESS "ipc:///tmp/itss/application" + typedef struct facilities { pthread_t ca_service; pthread_t den_service; diff --git a/src/queue.c b/src/queue.c index f762977..8d16cce 100644 --- a/src/queue.c +++ b/src/queue.c @@ -5,7 +5,10 @@ queue_t* queue_init() { queue_t* queue = calloc(1, sizeof(queue_t)); + pthread_mutex_init(&queue->lock, NULL); + pthread_cond_init(&queue->trigger, NULL); + queue->len = 0; queue->packet = calloc(QUEUE_MAX_LEN, sizeof(uint8_t *)); for (int i = 0; i < QUEUE_MAX_LEN; ++i) { @@ -25,7 +28,7 @@ int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t dest if (queue->len < QUEUE_MAX_LEN) { queue->packet_len[queue->len] = packet_len; memcpy(queue->packet[queue->len], packet, packet_len); - queue->destination[queue->len] = 2; + queue->destination[queue->len] = destination; ++queue->len; } else { rv = 1; diff --git a/src/queue.h b/src/queue.h index c0c5c14..a3afecb 100644 --- a/src/queue.h +++ b/src/queue.h @@ -4,7 +4,7 @@ #include #include -#define QUEUE_MAX_LEN 256 +#define QUEUE_MAX_LEN 32 #define PACKET_MAX_LEN 512 typedef struct queue { @@ -13,6 +13,7 @@ typedef struct queue { uint8_t *destination; uint16_t len; pthread_mutex_t lock; + pthread_cond_t trigger; } queue_t; queue_t* queue_init();