TX queue cond based

This commit is contained in:
emanuel 2020-10-27 19:21:51 +00:00
parent 13da236104
commit aaf157b7ee
6 changed files with 58 additions and 39 deletions

View File

@ -9,6 +9,7 @@ TARGET_LINK_LIBRARIES(it2s-itss-facilities
-lit2s-asn-itss-facilities
-lit2s-asn-itss-transport
-lzmq
-lit2s-gps
-lpthread
-lit2s-asn-camv2
-lit2s-asn-ivim

View File

@ -10,6 +10,7 @@
#include <zmq.h>
#include <unistd.h>
#include <syslog.h>
#include <math.h>
#define syslog_info(msg, ...) syslog(LOG_INFO, msg, ##__VA_ARGS__)
#define syslog_emerg(msg, ...) syslog(LOG_EMERG, "%s:%d [" msg "]", __func__, __LINE__, ##__VA_ARGS__)
@ -57,6 +58,11 @@ static SpeedConfidence_t getSpeedConfidence(uint32_t conf) {
static int mk_cam(uint8_t *cam, uint32_t *cam_len) {
int rv = 0;
struct timespec st;
if (clock_gettime(CLOCK_REALTIME, &st)) {
syslog_emerg("clock_gettime() failed");
}
CAM_t *cam_tx = calloc(1, sizeof(CAM_t));
cam_tx->header.protocolVersion = 2;
@ -106,27 +112,27 @@ static int mk_cam(uint8_t *cam, uint32_t *cam_len) {
generationdeltatime = generationdeltatime % 64536;
cam_tx->cam.generationDeltaTime = generationdeltatime;
// struct it2s_gps_data gps_data;
// it2s_gps_read(&gps_data);
// if (!isnan(gps_data->gps_altitude)) {
// cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = (int32_t)((gps_data->gps_altitude) * 10);
// cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = getAltitudeConfidence(gps_data->gps_epv);
// } else {
struct it2s_gps_data gps_data;
it2s_gps_read(&gps_data);
if (!isnan(gps_data.gps_altitude)) {
cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = (int32_t)((gps_data.gps_altitude) * 10);
cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = getAltitudeConfidence(gps_data.gps_epv);
} else {
cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeValue = AltitudeValue_unavailable;
cam_tx->cam.camParameters.basicContainer.referencePosition.altitude.altitudeConfidence = AltitudeConfidence_unavailable;
// }
}
// if (!isnan(gps_data->gps_latitude)) {
// cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = (int32_t)((gps_data->gps_latitude) * 10000000);
// } else {
if (!isnan(gps_data.gps_latitude)) {
cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = (int32_t)((gps_data.gps_latitude) * 10000000);
} else {
cam_tx->cam.camParameters.basicContainer.referencePosition.latitude = DeltaLatitude_unavailable;
// }
}
// if (!isnan(gps_data->gps_longitude)) {
// cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = (int32_t)((gps_data->gps_longitude) * 10000000);
// } else {
if (!isnan(gps_data.gps_longitude)) {
cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = (int32_t)((gps_data.gps_longitude) * 10000000);
} else {
cam_tx->cam.camParameters.basicContainer.referencePosition.longitude = Longitude_unavailable;
// }
}
cam_tx->cam.camParameters.basicContainer.referencePosition.positionConfidenceEllipse.semiMinorConfidence = SemiAxisLength_unavailable;
cam_tx->cam.camParameters.basicContainer.referencePosition.positionConfidenceEllipse.semiMajorConfidence = SemiAxisLength_unavailable;
@ -134,22 +140,22 @@ static int mk_cam(uint8_t *cam, uint32_t *cam_len) {
cam_tx->cam.camParameters.highFrequencyContainer.present = HighFrequencyContainer_PR_basicVehicleContainerHighFrequency;
// if (!isnan(gps_data->gps_track)) {
// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = ((uint32_t)(gps_data->gps_track * 10));
// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = getHeadingConfidence((uint32_t)(gps_data->gps_epd * 10));
// } else {
if (!isnan(gps_data.gps_track)) {
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = ((uint32_t)(gps_data.gps_track * 10));
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = getHeadingConfidence((uint32_t)(gps_data.gps_epd * 10));
} else {
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingValue = HeadingValue_unavailable;
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.heading.headingConfidence = HeadingConfidence_unavailable;
// }
//
// if (!isnan(gps_data->gps_speed)) {
// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = ((uint32_t)(gps_data->gps_speed * 100)); // cm/s
// cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = getSpeedConfidence((uint32_t)(gps_data->gps_eps * 100));
// } else {
}
if (!isnan(gps_data.gps_speed)) {
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = ((uint32_t)(gps_data.gps_speed * 100)); // cm/s
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = getSpeedConfidence((uint32_t)(gps_data.gps_eps * 100));
} else {
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedValue = SpeedValue_unavailable;
cam_tx->cam.camParameters.highFrequencyContainer.choice.basicVehicleContainerHighFrequency.speed.speedConfidence = SpeedConfidence_unavailable;
// }
}
asn_enc_rval_t enc = uper_encode_to_buffer(&asn_DEF_CAM, NULL, cam_tx, cam, 256);
if (enc.encoded == -1) {
@ -172,7 +178,7 @@ void *ca_service(void *fc) {
facilities_t *facilities = (facilities_t*) fc;
void* h = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(h, "ipc:///tmp/itss/transport");
zmq_connect(h, TRANSPORT_ADDRESS);
BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t));
@ -206,8 +212,9 @@ void *ca_service(void *fc) {
}
queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3);
pthread_cond_signal(&facilities->tx_queue->trigger);
usleep(100000);
usleep(1000*1000);
}
ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr);

View File

@ -101,6 +101,7 @@ static int transport_indication(facilities_t *facilities, void* responder, uint8
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, 511);
queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5);
pthread_cond_signal(&facilities->tx_queue->trigger);
cleanup:
if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg);
@ -215,6 +216,7 @@ static int facilities_request(facilities_t *facilities, void* responder, uint8_t
}
queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3);
pthread_cond_signal(&facilities->tx_queue->trigger);
}
@ -297,15 +299,18 @@ void* tx(void* fc) {
uint8_t code;
void* application_socket = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(application_socket, "ipc:///tmp/itss/application");
zmq_connect(application_socket, APPLICATION_ADDRESS);
void* transport_socket = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(transport_socket, "ipc:///tmp/itss/transport");
zmq_connect(transport_socket, TRANSPORT_ADDRESS);
queue_t* stream = queue_init();
while (!facilities->exit) {
pthread_mutex_lock(&queue->lock);
while (!queue->len) {
pthread_cond_wait(&queue->trigger, &queue->lock);
}
for (int i = 0; i < queue->len; ++i) {
memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]);
stream->packet_len[i] = queue->packet_len[i];
@ -330,8 +335,6 @@ void* tx(void* fc) {
break;
}
}
usleep(400);
}
return NULL;
@ -346,7 +349,7 @@ int main() {
void *context = zmq_ctx_new();
facilities.ctx = context;
void *responder = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(responder, "ipc:///tmp/itss/facilities");
int rc = zmq_bind(responder, FACILITIES_ADDRESS);
facilities.tx_queue = queue_init();

View File

@ -7,6 +7,10 @@
#include "denm.h"
#include "queue.h"
#define FACILITIES_ADDRESS "ipc:///tmp/itss/facilities"
#define TRANSPORT_ADDRESS "ipc:///tmp/itss/transport"
#define APPLICATION_ADDRESS "ipc:///tmp/itss/application"
typedef struct facilities {
pthread_t ca_service;
pthread_t den_service;

View File

@ -5,7 +5,10 @@
queue_t* queue_init() {
queue_t* queue = calloc(1, sizeof(queue_t));
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->trigger, NULL);
queue->len = 0;
queue->packet = calloc(QUEUE_MAX_LEN, sizeof(uint8_t *));
for (int i = 0; i < QUEUE_MAX_LEN; ++i) {
@ -25,7 +28,7 @@ int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t dest
if (queue->len < QUEUE_MAX_LEN) {
queue->packet_len[queue->len] = packet_len;
memcpy(queue->packet[queue->len], packet, packet_len);
queue->destination[queue->len] = 2;
queue->destination[queue->len] = destination;
++queue->len;
} else {
rv = 1;

View File

@ -4,7 +4,7 @@
#include <stdbool.h>
#include <stdint.h>
#define QUEUE_MAX_LEN 256
#define QUEUE_MAX_LEN 32
#define PACKET_MAX_LEN 512
typedef struct queue {
@ -13,6 +13,7 @@ typedef struct queue {
uint8_t *destination;
uint16_t len;
pthread_mutex_t lock;
pthread_cond_t trigger;
} queue_t;
queue_t* queue_init();