#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <hiredis/hiredis.h>

#define BUFFER_SIZE 4096
#define MAX_QUEUE_SIZE 100000
#define BATCH_SIZE 500
#define REDIS_HOST "127.0.0.1"
#define REDIS_PORT 6380
#define REDIS_PASSWORD "qazWSX123"
#define REDIS_KEY "logs"
#define SOCKET_PATH "/tmp/suricata-json.sock"
#define NUM_WORKER_THREADS 4
#define MAX_RETRIES 1
#define CX_VERSION "4.1"
typedef struct {
    char** items;
    size_t capacity;
    size_t head;
    size_t tail;
    pthread_mutex_t lock;
    pthread_cond_t cond;
} Queue;

Queue queue;
volatile int running = 1;

int is_valid_json(const char *str) {
    if (!str || *str != '{') return 0;
    
    int brace_count = 0;
    int in_string = 0;
    for (const char *p = str; *p; p++) {
        if (*p == '\\' && *(p+1)) {
            p++;
            continue;
        }
        if (*p == '"') in_string = !in_string;
        if (!in_string) {
            if (*p == '{') brace_count++;
            if (*p == '}') brace_count--;
        }
    }
    return brace_count == 0;
}

void queue_init(Queue* q, size_t capacity) {
    q->items = malloc(capacity * sizeof(char*));
    q->capacity = capacity;
    q->head = q->tail = 0;
    pthread_mutex_init(&q->lock, NULL);
    pthread_cond_init(&q->cond, NULL);
}

void queue_push(Queue* q, char* item) {
    pthread_mutex_lock(&q->lock);
    while ((q->tail + 1) % q->capacity == q->head) {
        pthread_cond_wait(&q->cond, &q->lock);
    }
    q->items[q->tail] = item;
    q->tail = (q->tail + 1) % q->capacity;
    pthread_cond_signal(&q->cond);
    pthread_mutex_unlock(&q->lock);
}

char** queue_pop_batch(Queue* q, size_t batch_size, size_t* count) {
    pthread_mutex_lock(&q->lock);
    while (q->head == q->tail && running) {
        pthread_cond_wait(&q->cond, &q->lock);
    }

    *count = 0;
    char** batch = malloc(batch_size * sizeof(char*));
    
    while (*count < batch_size && q->head != q->tail) {
        batch[(*count)++] = q->items[q->head];
        q->head = (q->head + 1) % q->capacity;
    }
    
    pthread_mutex_unlock(&q->lock);
    return batch;
}

redisContext* redis_connect() {
    redisContext* c = redisConnect(REDIS_HOST, REDIS_PORT);
    if (c == NULL || c->err) {
        if (c) {
            fprintf(stderr, "Redis connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            fprintf(stderr, "Can't allocate redis context\n");
        }
        return NULL;
    }

    redisReply* reply = redisCommand(c, "AUTH %s", REDIS_PASSWORD);
    if (reply->type == REDIS_REPLY_ERROR) {
        fprintf(stderr, "Auth error: %s\n", reply->str);
        freeReplyObject(reply);
        redisFree(c);
        return NULL;
    }
    freeReplyObject(reply);
    return c;
}

void* redis_worker(void* arg) {
    redisContext* c = NULL;
    char** current_batch = NULL;
    size_t current_count = 0;
    int retries = 0;

    while (running) {
        if (current_batch == NULL) {
            current_batch = queue_pop_batch(&queue, BATCH_SIZE, &current_count);
            if (current_count == 0) continue;
        }

        if (c == NULL) {
            c = redis_connect();
            if (c == NULL) {
                sleep(1);
                continue;
            }
        }

        size_t argc = 2 + current_count;
        const char **argv = malloc(argc * sizeof(char*));
        size_t *argvlen = malloc(argc * sizeof(size_t));
        if (!argv || !argvlen) {
            fprintf(stderr, "Memory allocation failed\n");
            free(argv);
            free(argvlen);
            continue;
        }

        argv[0] = "RPUSH";
        argvlen[0] = 5;
        argv[1] = REDIS_KEY;
        argvlen[1] = strlen(REDIS_KEY);
        for (size_t i = 0; i < current_count; i++) {
            argv[2+i] = current_batch[i];
            argvlen[2+i] = strlen(current_batch[i]);
        }

        if (redisAppendCommandArgv(c, argc, argv, argvlen) != REDIS_OK) {
            fprintf(stderr, "Failed to append command: %s\n", c->errstr);
            free(argv);
            free(argvlen);
            redisFree(c);
            c = NULL;
            retries++;
            if (retries >= MAX_RETRIES) {
                for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
                free(current_batch);
                current_batch = NULL;
                retries = 0;
            }
            continue;
        }

        free(argv);
        free(argvlen);

        redisReply* reply;
        int ret = redisGetReply(c, (void**)&reply);
        if (ret == REDIS_OK) {
            if (reply->type == REDIS_REPLY_ERROR) {
                fprintf(stderr, "Redis error: %s\n", reply->str);
                redisFree(c);
                c = NULL;
                retries++;
            } else {
                for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
                free(current_batch);
                current_batch = NULL;
                retries = 0;
            }
            freeReplyObject(reply);
        } else {
            fprintf(stderr, "Redis error: %s\n", c->errstr);
            redisFree(c);
            c = NULL;
            retries++;
        }

        if (retries >= MAX_RETRIES) {
            for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
            free(current_batch);
            current_batch = NULL;
            retries = 0;
        }
    }

    if (c) redisFree(c);
    if (current_batch) {
        for (size_t i = 0; i < current_count; i++) free(current_batch[i]);
        free(current_batch);
    }
    return NULL;
}

void handle_client(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    char* saved = NULL;
    size_t saved_len = 0;

    while ((bytes_read = read(client_fd, buffer, BUFFER_SIZE)) > 0) {
        char* ptr = buffer;
        size_t remain = bytes_read;

        while (remain > 0) {
            char* nl = memchr(ptr, '\n', remain);
            if (nl) {
                size_t line_len = nl - ptr;
                char* line = malloc(saved_len + line_len + 1);
                
                if (saved_len > 0) {
                    memcpy(line, saved, saved_len);
                    free(saved);
                    saved = NULL;
                    saved_len = 0;
                }
                
                memcpy(line + saved_len, ptr, line_len);
                line[saved_len + line_len] = '\0';
                
                if (is_valid_json(line)) {
                    queue_push(&queue, line);
                } else {
                    free(line);
                }

                remain -= line_len + 1;
                ptr = nl + 1;
            } else {
                char* new_saved = realloc(saved, saved_len + remain);
                if (!new_saved) break;
                
                memcpy(new_saved + saved_len, ptr, remain);
                saved = new_saved;
                saved_len += remain;
                break;
            }
        }
    }

    free(saved);
    close(client_fd);
}

int main() {
    printf("version:%s,threads nums:%d\n",CX_VERSION, NUM_WORKER_THREADS);
    int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    struct sockaddr_un addr;
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path)-1);

    unlink(SOCKET_PATH);
    bind(server_fd, (struct sockaddr*)&addr, sizeof(addr));
    listen(server_fd, 5);

    queue_init(&queue, MAX_QUEUE_SIZE);

    pthread_t workers[NUM_WORKER_THREADS];
    for (int i = 0; i < NUM_WORKER_THREADS; i++) {
        pthread_create(&workers[i], NULL, redis_worker, NULL);
    }

    while (running) {
        int client_fd = accept(server_fd, NULL, NULL);
        if (client_fd == -1) continue;
        
        pthread_t thread;
        pthread_create(&thread, NULL, (void*(*)(void*))handle_client, (void*)(intptr_t)client_fd);
        pthread_detach(thread);
    }

    close(server_fd);
    unlink(SOCKET_PATH);
    return 0;
}
