|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <ctype.h>
|
|
#include <pthread.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/un.h>
|
|
#include <unistd.h>
|
|
#include <hiredis/hiredis.h>
|
|
|
|
#define BUFFER_SIZE 4096
|
|
#define MAX_QUEUE_SIZE 100000
|
|
#define BATCH_SIZE 500
|
|
#define REDIS_HOST "127.0.0.1"
|
|
#define REDIS_PORT 6380
|
|
#define REDIS_PASSWORD "qazWSX123"
|
|
#define REDIS_KEY "logs"
|
|
#define SOCKET_PATH "/tmp/suricata-json.sock"
|
|
#define NUM_WORKER_THREADS 4
|
|
#define MAX_RETRIES 1
|
|
#define CX_VERSION "4.1"
|
|
typedef struct {
|
|
char** items;
|
|
size_t capacity;
|
|
size_t head;
|
|
size_t tail;
|
|
pthread_mutex_t lock;
|
|
pthread_cond_t cond;
|
|
} Queue;
|
|
|
|
Queue queue;
|
|
volatile int running = 1;
|
|
|
|
int is_valid_json(const char *str) {
|
|
if (!str || *str != '{') return 0;
|
|
|
|
int brace_count = 0;
|
|
int in_string = 0;
|
|
for (const char *p = str; *p; p++) {
|
|
if (*p == '\\' && *(p+1)) {
|
|
p++;
|
|
continue;
|
|
}
|
|
if (*p == '"') in_string = !in_string;
|
|
if (!in_string) {
|
|
if (*p == '{') brace_count++;
|
|
if (*p == '}') brace_count--;
|
|
}
|
|
}
|
|
return brace_count == 0;
|
|
}
|
|
|
|
void queue_init(Queue* q, size_t capacity) {
|
|
q->items = malloc(capacity * sizeof(char*));
|
|
q->capacity = capacity;
|
|
q->head = q->tail = 0;
|
|
pthread_mutex_init(&q->lock, NULL);
|
|
pthread_cond_init(&q->cond, NULL);
|
|
}
|
|
|
|
void queue_push(Queue* q, char* item) {
|
|
pthread_mutex_lock(&q->lock);
|
|
while ((q->tail + 1) % q->capacity == q->head) {
|
|
pthread_cond_wait(&q->cond, &q->lock);
|
|
}
|
|
q->items[q->tail] = item;
|
|
q->tail = (q->tail + 1) % q->capacity;
|
|
pthread_cond_signal(&q->cond);
|
|
pthread_mutex_unlock(&q->lock);
|
|
}
|
|
|
|
char** queue_pop_batch(Queue* q, size_t batch_size, size_t* count) {
|
|
pthread_mutex_lock(&q->lock);
|
|
while (q->head == q->tail && running) {
|
|
pthread_cond_wait(&q->cond, &q->lock);
|
|
}
|
|
|
|
*count = 0;
|
|
char** batch = malloc(batch_size * sizeof(char*));
|
|
|
|
while (*count < batch_size && q->head != q->tail) {
|
|
batch[(*count)++] = q->items[q->head];
|
|
q->head = (q->head + 1) % q->capacity;
|
|
}
|
|
|
|
pthread_mutex_unlock(&q->lock);
|
|
return batch;
|
|
}
|
|
|
|
redisContext* redis_connect() {
|
|
redisContext* c = redisConnect(REDIS_HOST, REDIS_PORT);
|
|
if (c == NULL || c->err) {
|
|
if (c) {
|
|
fprintf(stderr, "Redis connection error: %s\n", c->errstr);
|
|
redisFree(c);
|
|
} else {
|
|
fprintf(stderr, "Can't allocate redis context\n");
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
redisReply* reply = redisCommand(c, "AUTH %s", REDIS_PASSWORD);
|
|
if (reply->type == REDIS_REPLY_ERROR) {
|
|
fprintf(stderr, "Auth error: %s\n", reply->str);
|
|
freeReplyObject(reply);
|
|
redisFree(c);
|
|
return NULL;
|
|
}
|
|
freeReplyObject(reply);
|
|
return c;
|
|
}
|
|
|
|
void* redis_worker(void* arg) {
|
|
redisContext* c = NULL;
|
|
char** current_batch = NULL;
|
|
size_t current_count = 0;
|
|
int retries = 0;
|
|
|
|
while (running) {
|
|
if (current_batch == NULL) {
|
|
current_batch = queue_pop_batch(&queue, BATCH_SIZE, ¤t_count);
|
|
if (current_count == 0) continue;
|
|
}
|
|
|
|
if (c == NULL) {
|
|
c = redis_connect();
|
|
if (c == NULL) {
|
|
sleep(1);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
size_t argc = 2 + current_count;
|
|
const char **argv = malloc(argc * sizeof(char*));
|
|
size_t *argvlen = malloc(argc * sizeof(size_t));
|
|
if (!argv || !argvlen) {
|
|
fprintf(stderr, "Memory allocation failed\n");
|
|
free(argv);
|
|
free(argvlen);
|
|
continue;
|
|
}
|
|
|
|
argv[0] = "RPUSH";
|
|
argvlen[0] = 5;
|
|
argv[1] = REDIS_KEY;
|
|
argvlen[1] = strlen(REDIS_KEY);
|
|
for (size_t i = 0; i < current_count; i++) {
|
|
argv[2+i] = current_batch[i];
|
|
argvlen[2+i] = strlen(current_batch[i]);
|
|
}
|
|
|
|
if (redisAppendCommandArgv(c, argc, argv, argvlen) != REDIS_OK) {
|
|
fprintf(stderr, "Failed to append command: %s\n", c->errstr);
|
|
free(argv);
|
|
free(argvlen);
|
|
redisFree(c);
|
|
c = NULL;
|
|
retries++;
|
|
if (retries >= MAX_RETRIES) {
|
|
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
|
|
free(current_batch);
|
|
current_batch = NULL;
|
|
retries = 0;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
free(argv);
|
|
free(argvlen);
|
|
|
|
redisReply* reply;
|
|
int ret = redisGetReply(c, (void**)&reply);
|
|
if (ret == REDIS_OK) {
|
|
if (reply->type == REDIS_REPLY_ERROR) {
|
|
fprintf(stderr, "Redis error: %s\n", reply->str);
|
|
redisFree(c);
|
|
c = NULL;
|
|
retries++;
|
|
} else {
|
|
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
|
|
free(current_batch);
|
|
current_batch = NULL;
|
|
retries = 0;
|
|
}
|
|
freeReplyObject(reply);
|
|
} else {
|
|
fprintf(stderr, "Redis error: %s\n", c->errstr);
|
|
redisFree(c);
|
|
c = NULL;
|
|
retries++;
|
|
}
|
|
|
|
if (retries >= MAX_RETRIES) {
|
|
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
|
|
free(current_batch);
|
|
current_batch = NULL;
|
|
retries = 0;
|
|
}
|
|
}
|
|
|
|
if (c) redisFree(c);
|
|
if (current_batch) {
|
|
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
|
|
free(current_batch);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void handle_client(int client_fd) {
|
|
char buffer[BUFFER_SIZE];
|
|
ssize_t bytes_read;
|
|
char* saved = NULL;
|
|
size_t saved_len = 0;
|
|
|
|
while ((bytes_read = read(client_fd, buffer, BUFFER_SIZE)) > 0) {
|
|
char* ptr = buffer;
|
|
size_t remain = bytes_read;
|
|
|
|
while (remain > 0) {
|
|
char* nl = memchr(ptr, '\n', remain);
|
|
if (nl) {
|
|
size_t line_len = nl - ptr;
|
|
char* line = malloc(saved_len + line_len + 1);
|
|
|
|
if (saved_len > 0) {
|
|
memcpy(line, saved, saved_len);
|
|
free(saved);
|
|
saved = NULL;
|
|
saved_len = 0;
|
|
}
|
|
|
|
memcpy(line + saved_len, ptr, line_len);
|
|
line[saved_len + line_len] = '\0';
|
|
|
|
if (is_valid_json(line)) {
|
|
queue_push(&queue, line);
|
|
} else {
|
|
free(line);
|
|
}
|
|
|
|
remain -= line_len + 1;
|
|
ptr = nl + 1;
|
|
} else {
|
|
char* new_saved = realloc(saved, saved_len + remain);
|
|
if (!new_saved) break;
|
|
|
|
memcpy(new_saved + saved_len, ptr, remain);
|
|
saved = new_saved;
|
|
saved_len += remain;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
free(saved);
|
|
close(client_fd);
|
|
}
|
|
|
|
int main() {
|
|
printf("version:%s,threads nums:%d\n",CX_VERSION, NUM_WORKER_THREADS);
|
|
int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
|
struct sockaddr_un addr;
|
|
memset(&addr, 0, sizeof(addr));
|
|
addr.sun_family = AF_UNIX;
|
|
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path)-1);
|
|
|
|
unlink(SOCKET_PATH);
|
|
bind(server_fd, (struct sockaddr*)&addr, sizeof(addr));
|
|
listen(server_fd, 5);
|
|
|
|
queue_init(&queue, MAX_QUEUE_SIZE);
|
|
|
|
pthread_t workers[NUM_WORKER_THREADS];
|
|
for (int i = 0; i < NUM_WORKER_THREADS; i++) {
|
|
pthread_create(&workers[i], NULL, redis_worker, NULL);
|
|
}
|
|
|
|
while (running) {
|
|
int client_fd = accept(server_fd, NULL, NULL);
|
|
if (client_fd == -1) continue;
|
|
|
|
pthread_t thread;
|
|
pthread_create(&thread, NULL, (void*(*)(void*))handle_client, (void*)(intptr_t)client_fd);
|
|
pthread_detach(thread);
|
|
}
|
|
|
|
close(server_fd);
|
|
unlink(SOCKET_PATH);
|
|
return 0;
|
|
}
|