Project

General

Profile

Feature #7062 ยป suricata-redis-json.c

yang xing, 08/04/2025 05:11 AM

 
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <hiredis/hiredis.h>

#define BUFFER_SIZE 4096
#define MAX_QUEUE_SIZE 100000
#define BATCH_SIZE 500
#define REDIS_HOST "127.0.0.1"
#define REDIS_PORT 6380
#define REDIS_PASSWORD "qazWSX123"
#define REDIS_KEY "logs"
#define SOCKET_PATH "/tmp/suricata-json.sock"
#define NUM_WORKER_THREADS 4
#define MAX_RETRIES 1
#define CX_VERSION "4.1"
typedef struct {
char** items;
size_t capacity;
size_t head;
size_t tail;
pthread_mutex_t lock;
pthread_cond_t cond;
} Queue;

Queue queue;
volatile int running = 1;

int is_valid_json(const char *str) {
if (!str || *str != '{') return 0;
int brace_count = 0;
int in_string = 0;
for (const char *p = str; *p; p++) {
if (*p == '\\' && *(p+1)) {
p++;
continue;
}
if (*p == '"') in_string = !in_string;
if (!in_string) {
if (*p == '{') brace_count++;
if (*p == '}') brace_count--;
}
}
return brace_count == 0;
}

void queue_init(Queue* q, size_t capacity) {
q->items = malloc(capacity * sizeof(char*));
q->capacity = capacity;
q->head = q->tail = 0;
pthread_mutex_init(&q->lock, NULL);
pthread_cond_init(&q->cond, NULL);
}

void queue_push(Queue* q, char* item) {
pthread_mutex_lock(&q->lock);
while ((q->tail + 1) % q->capacity == q->head) {
pthread_cond_wait(&q->cond, &q->lock);
}
q->items[q->tail] = item;
q->tail = (q->tail + 1) % q->capacity;
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->lock);
}

char** queue_pop_batch(Queue* q, size_t batch_size, size_t* count) {
pthread_mutex_lock(&q->lock);
while (q->head == q->tail && running) {
pthread_cond_wait(&q->cond, &q->lock);
}

*count = 0;
char** batch = malloc(batch_size * sizeof(char*));
while (*count < batch_size && q->head != q->tail) {
batch[(*count)++] = q->items[q->head];
q->head = (q->head + 1) % q->capacity;
}
pthread_mutex_unlock(&q->lock);
return batch;
}

redisContext* redis_connect() {
redisContext* c = redisConnect(REDIS_HOST, REDIS_PORT);
if (c == NULL || c->err) {
if (c) {
fprintf(stderr, "Redis connection error: %s\n", c->errstr);
redisFree(c);
} else {
fprintf(stderr, "Can't allocate redis context\n");
}
return NULL;
}

redisReply* reply = redisCommand(c, "AUTH %s", REDIS_PASSWORD);
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "Auth error: %s\n", reply->str);
freeReplyObject(reply);
redisFree(c);
return NULL;
}
freeReplyObject(reply);
return c;
}

void* redis_worker(void* arg) {
redisContext* c = NULL;
char** current_batch = NULL;
size_t current_count = 0;
int retries = 0;

while (running) {
if (current_batch == NULL) {
current_batch = queue_pop_batch(&queue, BATCH_SIZE, &current_count);
if (current_count == 0) continue;
}

if (c == NULL) {
c = redis_connect();
if (c == NULL) {
sleep(1);
continue;
}
}

size_t argc = 2 + current_count;
const char **argv = malloc(argc * sizeof(char*));
size_t *argvlen = malloc(argc * sizeof(size_t));
if (!argv || !argvlen) {
fprintf(stderr, "Memory allocation failed\n");
free(argv);
free(argvlen);
continue;
}

argv[0] = "RPUSH";
argvlen[0] = 5;
argv[1] = REDIS_KEY;
argvlen[1] = strlen(REDIS_KEY);
for (size_t i = 0; i < current_count; i++) {
argv[2+i] = current_batch[i];
argvlen[2+i] = strlen(current_batch[i]);
}

if (redisAppendCommandArgv(c, argc, argv, argvlen) != REDIS_OK) {
fprintf(stderr, "Failed to append command: %s\n", c->errstr);
free(argv);
free(argvlen);
redisFree(c);
c = NULL;
retries++;
if (retries >= MAX_RETRIES) {
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
free(current_batch);
current_batch = NULL;
retries = 0;
}
continue;
}

free(argv);
free(argvlen);

redisReply* reply;
int ret = redisGetReply(c, (void**)&reply);
if (ret == REDIS_OK) {
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "Redis error: %s\n", reply->str);
redisFree(c);
c = NULL;
retries++;
} else {
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
free(current_batch);
current_batch = NULL;
retries = 0;
}
freeReplyObject(reply);
} else {
fprintf(stderr, "Redis error: %s\n", c->errstr);
redisFree(c);
c = NULL;
retries++;
}

if (retries >= MAX_RETRIES) {
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
free(current_batch);
current_batch = NULL;
retries = 0;
}
}

if (c) redisFree(c);
if (current_batch) {
for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
free(current_batch);
}
return NULL;
}

void handle_client(int client_fd) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read;
char* saved = NULL;
size_t saved_len = 0;

while ((bytes_read = read(client_fd, buffer, BUFFER_SIZE)) > 0) {
char* ptr = buffer;
size_t remain = bytes_read;

while (remain > 0) {
char* nl = memchr(ptr, '\n', remain);
if (nl) {
size_t line_len = nl - ptr;
char* line = malloc(saved_len + line_len + 1);
if (saved_len > 0) {
memcpy(line, saved, saved_len);
free(saved);
saved = NULL;
saved_len = 0;
}
memcpy(line + saved_len, ptr, line_len);
line[saved_len + line_len] = '\0';
if (is_valid_json(line)) {
queue_push(&queue, line);
} else {
free(line);
}

remain -= line_len + 1;
ptr = nl + 1;
} else {
char* new_saved = realloc(saved, saved_len + remain);
if (!new_saved) break;
memcpy(new_saved + saved_len, ptr, remain);
saved = new_saved;
saved_len += remain;
break;
}
}
}

free(saved);
close(client_fd);
}

int main() {
printf("version:%s,threads nums:%d\n",CX_VERSION, NUM_WORKER_THREADS);
int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path)-1);

unlink(SOCKET_PATH);
bind(server_fd, (struct sockaddr*)&addr, sizeof(addr));
listen(server_fd, 5);

queue_init(&queue, MAX_QUEUE_SIZE);

pthread_t workers[NUM_WORKER_THREADS];
for (int i = 0; i < NUM_WORKER_THREADS; i++) {
pthread_create(&workers[i], NULL, redis_worker, NULL);
}

while (running) {
int client_fd = accept(server_fd, NULL, NULL);
if (client_fd == -1) continue;
pthread_t thread;
pthread_create(&thread, NULL, (void*(*)(void*))handle_client, (void*)(intptr_t)client_fd);
pthread_detach(thread);
}

close(server_fd);
unlink(SOCKET_PATH);
return 0;
}
    (1-1/1)