New TX queue

This commit is contained in:
emanuel 2020-10-26 19:40:15 +00:00
parent e1b0521b19
commit 13da236104
6 changed files with 140 additions and 30 deletions

View File

@ -1,4 +1,5 @@
ADD_EXECUTABLE(it2s-itss-facilities ADD_EXECUTABLE(it2s-itss-facilities
queue.c
cam.c cam.c
denm.c denm.c
facilities.c facilities.c

View File

@ -171,6 +171,9 @@ void *ca_service(void *fc) {
uint8_t code = 0; uint8_t code = 0;
facilities_t *facilities = (facilities_t*) fc; facilities_t *facilities = (facilities_t*) fc;
void* h = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(h, "ipc:///tmp/itss/transport");
BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t)); BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t));
bdr->btpType = BTPType_btpB; bdr->btpType = BTPType_btpB;
@ -201,10 +204,10 @@ void *ca_service(void *fc) {
syslog_err("[facilities] encoding BTPDataRequest for cam failed"); syslog_err("[facilities] encoding BTPDataRequest for cam failed");
continue; continue;
} }
syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded);
zmq_send(facilities->transport_socket, bdr_oer, enc.encoded+1, 0); queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3);
zmq_recv(facilities->transport_socket, &code, 1, 0);
sleep(1); usleep(100000);
} }
ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr);

View File

@ -29,7 +29,7 @@
#define syslog_debug(msg, ...) #define syslog_debug(msg, ...)
#endif #endif
static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) { static int transport_indication(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) {
int rv = 0, code = 0; int rv = 0, code = 0;
FacilitiesDataIndication_t *fdi = NULL; FacilitiesDataIndication_t *fdi = NULL;
@ -40,11 +40,11 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t
syslog_err("[facilities]<- invalid bdi received"); syslog_err("[facilities]<- invalid bdi received");
rv = 1; rv = 1;
code = 1; code = 1;
zmq_send(facilities->responder, &code, 1, 0); zmq_send(responder, &code, 1, 0);
goto cleanup; goto cleanup;
} }
zmq_send(facilities->responder, &code, 1, 0); zmq_send(responder, &code, 1, 0);
syslog_debug("[facilities]<- BDI (%ldB)", bdi->data.size); syslog_debug("[facilities]<- BDI (%ldB)", bdi->data.size);
// Parse message // Parse message
@ -99,9 +99,8 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t
uint8_t buffer[512]; uint8_t buffer[512];
buffer[0] = 4; // Facilities buffer[0] = 4; // Facilities
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, 511); asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, 511);
syslog_debug("[facilities] -> sending FDI to -> [app] (%ldB)", enc.encoded);
zmq_send(facilities->app_socket, buffer, enc.encoded+1, 0); queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5);
zmq_recv(facilities->app_socket, &code, 1, 0);
cleanup: cleanup:
if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg);
@ -111,7 +110,7 @@ cleanup:
return rv; return rv;
} }
static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) { static int facilities_request(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) {
int rv = 0; int rv = 0;
int code = 0; int code = 0;
@ -193,7 +192,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m
rv = 1; rv = 1;
goto cleanup; goto cleanup;
} }
zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); zmq_send(responder, fdres_oer, enc.encoded, 0);
// Forward message to [transport] // Forward message to [transport]
if (fwd) { if (fwd) {
@ -214,8 +213,9 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m
rv = 1; rv = 1;
goto cleanup; goto cleanup;
} }
zmq_send(facilities->transport_socket, bdr_oer, enc.encoded+1, 0);
zmq_recv(facilities->transport_socket, &code, 1, 0); queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3);
} }
break; break;
@ -263,7 +263,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m
goto cleanup; goto cleanup;
} }
zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); zmq_send(responder, fdres_oer, enc.encoded, 0);
pthread_mutex_unlock(&facilities->den->lock); pthread_mutex_unlock(&facilities->den->lock);
break; break;
@ -278,7 +278,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m
fdres->code = ResultCode_rejected; fdres->code = ResultCode_rejected;
uint8_t fdres_oer[32]; uint8_t fdres_oer[32];
asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataResult, NULL, fdres, fdres_oer, 32); asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataResult, NULL, fdres, fdres_oer, 32);
zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); zmq_send(responder, fdres_oer, enc.encoded, 0);
} }
free(fdres_oer); free(fdres_oer);
if (its_msg_def && !managed_event) ASN_STRUCT_FREE(*its_msg_def, its_msg); if (its_msg_def && !managed_event) ASN_STRUCT_FREE(*its_msg_def, its_msg);
@ -289,6 +289,54 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m
return rv; return rv;
} }
void* tx(void* fc) {
facilities_t *facilities = (facilities_t*) fc;
queue_t* queue = facilities->tx_queue;
uint8_t code;
void* application_socket = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(application_socket, "ipc:///tmp/itss/application");
void* transport_socket = zmq_socket(facilities->ctx, ZMQ_REQ);
zmq_connect(transport_socket, "ipc:///tmp/itss/transport");
queue_t* stream = queue_init();
while (!facilities->exit) {
pthread_mutex_lock(&queue->lock);
for (int i = 0; i < queue->len; ++i) {
memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]);
stream->packet_len[i] = queue->packet_len[i];
stream->destination[i] = queue->destination[i];
}
stream->len = queue->len;
queue->len = 0;
pthread_mutex_unlock(&queue->lock);
for (int i = 0; i < stream->len; ++i) {
switch (stream->destination[i]) {
case 3:
syslog_debug("[facilities]-> sending BDR to ->[transport] (%dB)", stream->packet_len[i]);
zmq_send(transport_socket, stream->packet[i], stream->packet_len[i], 0);
zmq_recv(transport_socket, &code, 1, 0);
break;
case 5:
syslog_debug("[facilities]-> sending FDI to ->[application] (%dB)", stream->packet_len[i]);
zmq_send(application_socket, stream->packet[i], stream->packet_len[i], 0);
zmq_recv(application_socket, &code, 1, 0);
break;
}
}
usleep(400);
}
return NULL;
}
int main() { int main() {
syslog_info("[facilities] starting"); syslog_info("[facilities] starting");
@ -296,17 +344,17 @@ int main() {
facilities.exit = false; facilities.exit = false;
void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
facilities.responder = zmq_socket(context, ZMQ_REP); facilities.ctx = context;
int rc = zmq_bind(facilities.responder, "ipc:///tmp/itss/facilities"); void *responder = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(responder, "ipc:///tmp/itss/facilities");
facilities.app_socket = zmq_socket(context, ZMQ_REQ); facilities.tx_queue = queue_init();
rc = zmq_connect(facilities.app_socket, "ipc:///tmp/itss/application");
facilities.transport_socket = zmq_socket(context, ZMQ_REQ);
rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport");
facilities.den = calloc(1, sizeof(den_t)); facilities.den = calloc(1, sizeof(den_t));
// Tx
pthread_create(&facilities.transmitting, NULL, tx, (void*) &facilities);
// CA // CA
pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities);
@ -319,19 +367,19 @@ int main() {
while (!facilities.exit) { while (!facilities.exit) {
memset(buffer, 0x00, 512); memset(buffer, 0x00, 512);
zmq_recv(facilities.responder, buffer, 512, 0); zmq_recv(responder, buffer, 512, 0);
switch (buffer[0]) { switch (buffer[0]) {
case 3: case 3:
code = transport_indication(&facilities, buffer+1, 511); code = transport_indication(&facilities, responder, buffer+1, 511);
break; break;
case 5: case 5:
code = facilities_request(&facilities, buffer+1, 511); code = facilities_request(&facilities, responder, buffer+1, 511);
break; break;
default: default:
syslog_debug("[facilities] unrecognized service"); syslog_debug("[facilities] unrecognized service");
code = 1; code = 1;
zmq_send(facilities.responder, &code, 1, 0); zmq_send(responder, &code, 1, 0);
continue; continue;
} }
} }

View File

@ -5,15 +5,17 @@
#include <stdbool.h> #include <stdbool.h>
#include "denm.h" #include "denm.h"
#include "queue.h"
typedef struct facilities { typedef struct facilities {
pthread_t ca_service; pthread_t ca_service;
pthread_t den_service; pthread_t den_service;
pthread_t transmitting;
// ZMQ // ZMQ
void* responder; void* ctx;
void* transport_socket;
void* app_socket; queue_t* tx_queue;
// DENM // DENM
den_t *den; den_t *den;

37
src/queue.c Normal file
View File

@ -0,0 +1,37 @@
#include "queue.h"
#include <stdlib.h>
#include <string.h>
queue_t* queue_init() {
queue_t* queue = calloc(1, sizeof(queue_t));
pthread_mutex_init(&queue->lock, NULL);
queue->len = 0;
queue->packet = calloc(QUEUE_MAX_LEN, sizeof(uint8_t *));
for (int i = 0; i < QUEUE_MAX_LEN; ++i) {
queue->packet[i] = malloc(PACKET_MAX_LEN);
}
queue->packet_len = calloc(QUEUE_MAX_LEN, sizeof(uint16_t));
queue->destination = malloc(QUEUE_MAX_LEN);
return queue;
}
int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t destination) {
int rv = 0;
pthread_mutex_lock(&queue->lock);
if (queue->len < QUEUE_MAX_LEN) {
queue->packet_len[queue->len] = packet_len;
memcpy(queue->packet[queue->len], packet, packet_len);
queue->destination[queue->len] = 2;
++queue->len;
} else {
rv = 1;
}
pthread_mutex_unlock(&queue->lock);
return rv;
}

19
src/queue.h Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#define QUEUE_MAX_LEN 256
#define PACKET_MAX_LEN 512
typedef struct queue {
uint8_t **packet;
uint16_t *packet_len;
uint8_t *destination;
uint16_t len;
pthread_mutex_t lock;
} queue_t;
queue_t* queue_init();
int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t destination);