diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 813d6d4..f78253a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,5 @@ ADD_EXECUTABLE(it2s-itss-facilities + queue.c cam.c denm.c facilities.c diff --git a/src/cam.c b/src/cam.c index f9f904f..038d394 100644 --- a/src/cam.c +++ b/src/cam.c @@ -171,6 +171,9 @@ void *ca_service(void *fc) { uint8_t code = 0; facilities_t *facilities = (facilities_t*) fc; + void* h = zmq_socket(facilities->ctx, ZMQ_REQ); + zmq_connect(h, "ipc:///tmp/itss/transport"); + BTPDataRequest_t *bdr = calloc(1, sizeof(BTPDataRequest_t)); bdr->btpType = BTPType_btpB; @@ -201,10 +204,10 @@ void *ca_service(void *fc) { syslog_err("[facilities] encoding BTPDataRequest for cam failed"); continue; } - syslog_debug("[facilities] -> [transport] BDR (%ldB)", enc.encoded); - zmq_send(facilities->transport_socket, bdr_oer, enc.encoded+1, 0); - zmq_recv(facilities->transport_socket, &code, 1, 0); - sleep(1); + + queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3); + + usleep(100000); } ASN_STRUCT_FREE(asn_DEF_BTPDataRequest, bdr); diff --git a/src/facilities.c b/src/facilities.c index 79aec1d..38be2be 100644 --- a/src/facilities.c +++ b/src/facilities.c @@ -29,7 +29,7 @@ #define syslog_debug(msg, ...) #endif -static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) { +static int transport_indication(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0, code = 0; FacilitiesDataIndication_t *fdi = NULL; @@ -40,11 +40,11 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t syslog_err("[facilities]<- invalid bdi received"); rv = 1; code = 1; - zmq_send(facilities->responder, &code, 1, 0); + zmq_send(responder, &code, 1, 0); goto cleanup; } - zmq_send(facilities->responder, &code, 1, 0); + zmq_send(responder, &code, 1, 0); syslog_debug("[facilities]<- BDI (%ldB)", bdi->data.size); // Parse message @@ -99,9 +99,8 @@ static int transport_indication(facilities_t *facilities, uint8_t *msg, uint32_t uint8_t buffer[512]; buffer[0] = 4; // Facilities asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataIndication, NULL, fdi, buffer+1, 511); - syslog_debug("[facilities] -> sending FDI to -> [app] (%ldB)", enc.encoded); - zmq_send(facilities->app_socket, buffer, enc.encoded+1, 0); - zmq_recv(facilities->app_socket, &code, 1, 0); + + queue_add(facilities->tx_queue, buffer, enc.encoded+1, 5); cleanup: if (bdi->destinationPort != Port_denm) ASN_STRUCT_FREE(*its_msg_descriptor, its_msg); @@ -111,7 +110,7 @@ cleanup: return rv; } -static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t msg_len) { +static int facilities_request(facilities_t *facilities, void* responder, uint8_t *msg, uint32_t msg_len) { int rv = 0; int code = 0; @@ -193,7 +192,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m rv = 1; goto cleanup; } - zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); + zmq_send(responder, fdres_oer, enc.encoded, 0); // Forward message to [transport] if (fwd) { @@ -214,8 +213,9 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m rv = 1; goto cleanup; } - zmq_send(facilities->transport_socket, bdr_oer, enc.encoded+1, 0); - zmq_recv(facilities->transport_socket, &code, 1, 0); + + queue_add(facilities->tx_queue, bdr_oer, enc.encoded+1, 3); + } break; @@ -263,7 +263,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m goto cleanup; } - zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); + zmq_send(responder, fdres_oer, enc.encoded, 0); pthread_mutex_unlock(&facilities->den->lock); break; @@ -278,7 +278,7 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m fdres->code = ResultCode_rejected; uint8_t fdres_oer[32]; asn_enc_rval_t enc = oer_encode_to_buffer(&asn_DEF_FacilitiesDataResult, NULL, fdres, fdres_oer, 32); - zmq_send(facilities->responder, fdres_oer, enc.encoded, 0); + zmq_send(responder, fdres_oer, enc.encoded, 0); } free(fdres_oer); if (its_msg_def && !managed_event) ASN_STRUCT_FREE(*its_msg_def, its_msg); @@ -289,6 +289,54 @@ static int facilities_request(facilities_t *facilities, uint8_t *msg, uint32_t m return rv; } +void* tx(void* fc) { + facilities_t *facilities = (facilities_t*) fc; + + queue_t* queue = facilities->tx_queue; + + uint8_t code; + + void* application_socket = zmq_socket(facilities->ctx, ZMQ_REQ); + zmq_connect(application_socket, "ipc:///tmp/itss/application"); + + void* transport_socket = zmq_socket(facilities->ctx, ZMQ_REQ); + zmq_connect(transport_socket, "ipc:///tmp/itss/transport"); + + queue_t* stream = queue_init(); + + while (!facilities->exit) { + pthread_mutex_lock(&queue->lock); + for (int i = 0; i < queue->len; ++i) { + memcpy(stream->packet[i], queue->packet[i], queue->packet_len[i]); + stream->packet_len[i] = queue->packet_len[i]; + stream->destination[i] = queue->destination[i]; + } + stream->len = queue->len; + + queue->len = 0; + pthread_mutex_unlock(&queue->lock); + + for (int i = 0; i < stream->len; ++i) { + switch (stream->destination[i]) { + case 3: + syslog_debug("[facilities]-> sending BDR to ->[transport] (%dB)", stream->packet_len[i]); + zmq_send(transport_socket, stream->packet[i], stream->packet_len[i], 0); + zmq_recv(transport_socket, &code, 1, 0); + break; + case 5: + syslog_debug("[facilities]-> sending FDI to ->[application] (%dB)", stream->packet_len[i]); + zmq_send(application_socket, stream->packet[i], stream->packet_len[i], 0); + zmq_recv(application_socket, &code, 1, 0); + break; + } + } + + usleep(400); + } + + return NULL; +} + int main() { syslog_info("[facilities] starting"); @@ -296,17 +344,17 @@ int main() { facilities.exit = false; void *context = zmq_ctx_new(); - facilities.responder = zmq_socket(context, ZMQ_REP); - int rc = zmq_bind(facilities.responder, "ipc:///tmp/itss/facilities"); + facilities.ctx = context; + void *responder = zmq_socket(context, ZMQ_REP); + int rc = zmq_bind(responder, "ipc:///tmp/itss/facilities"); - facilities.app_socket = zmq_socket(context, ZMQ_REQ); - rc = zmq_connect(facilities.app_socket, "ipc:///tmp/itss/application"); - - facilities.transport_socket = zmq_socket(context, ZMQ_REQ); - rc = zmq_connect(facilities.transport_socket, "ipc:///tmp/itss/transport"); + facilities.tx_queue = queue_init(); facilities.den = calloc(1, sizeof(den_t)); + // Tx + pthread_create(&facilities.transmitting, NULL, tx, (void*) &facilities); + // CA pthread_create(&facilities.ca_service, NULL, ca_service, (void*) &facilities); @@ -319,19 +367,19 @@ int main() { while (!facilities.exit) { memset(buffer, 0x00, 512); - zmq_recv(facilities.responder, buffer, 512, 0); + zmq_recv(responder, buffer, 512, 0); switch (buffer[0]) { case 3: - code = transport_indication(&facilities, buffer+1, 511); + code = transport_indication(&facilities, responder, buffer+1, 511); break; case 5: - code = facilities_request(&facilities, buffer+1, 511); + code = facilities_request(&facilities, responder, buffer+1, 511); break; default: syslog_debug("[facilities] unrecognized service"); code = 1; - zmq_send(facilities.responder, &code, 1, 0); + zmq_send(responder, &code, 1, 0); continue; } } diff --git a/src/facilities.h b/src/facilities.h index 529d35c..290f346 100644 --- a/src/facilities.h +++ b/src/facilities.h @@ -5,15 +5,17 @@ #include #include "denm.h" +#include "queue.h" typedef struct facilities { pthread_t ca_service; pthread_t den_service; + pthread_t transmitting; // ZMQ - void* responder; - void* transport_socket; - void* app_socket; + void* ctx; + + queue_t* tx_queue; // DENM den_t *den; diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 0000000..f762977 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,37 @@ +#include "queue.h" + +#include +#include + +queue_t* queue_init() { + queue_t* queue = calloc(1, sizeof(queue_t)); + pthread_mutex_init(&queue->lock, NULL); + queue->len = 0; + queue->packet = calloc(QUEUE_MAX_LEN, sizeof(uint8_t *)); + for (int i = 0; i < QUEUE_MAX_LEN; ++i) { + queue->packet[i] = malloc(PACKET_MAX_LEN); + } + queue->packet_len = calloc(QUEUE_MAX_LEN, sizeof(uint16_t)); + queue->destination = malloc(QUEUE_MAX_LEN); + + return queue; +} + +int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t destination) { + int rv = 0; + + pthread_mutex_lock(&queue->lock); + + if (queue->len < QUEUE_MAX_LEN) { + queue->packet_len[queue->len] = packet_len; + memcpy(queue->packet[queue->len], packet, packet_len); + queue->destination[queue->len] = 2; + ++queue->len; + } else { + rv = 1; + } + + pthread_mutex_unlock(&queue->lock); + + return rv; +} diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 0000000..c0c5c14 --- /dev/null +++ b/src/queue.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include + +#define QUEUE_MAX_LEN 256 +#define PACKET_MAX_LEN 512 + +typedef struct queue { + uint8_t **packet; + uint16_t *packet_len; + uint8_t *destination; + uint16_t len; + pthread_mutex_t lock; +} queue_t; + +queue_t* queue_init(); +int queue_add(queue_t* queue, uint8_t *packet, uint16_t packet_len, uint8_t destination);