Feature #431 » 0002-Update-the-ERF-file-runmodes-to-support-autofp-and-s.patch
src/runmode-erf-file.c | ||
---|---|---|
void RunModeErfFileRegister(void)
|
||
{
|
||
default_mode = "auto";
|
||
RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "auto",
|
||
"Multi threaded Erf File mode",
|
||
RunModeErfFileAuto);
|
||
default_mode = "autofp";
|
||
RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "single",
|
||
"Single threaded ERF file mode",
|
||
RunModeErfFileSingle);
|
||
RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "autofp",
|
||
"Multi threaded ERF file mode. Packets from "
|
||
"each flow are assigned to a single detect thread",
|
||
RunModeErfFileAutoFp);
|
||
return;
|
||
}
|
||
int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
|
||
int RunModeErfFileSingle(DetectEngineCtx *de_ctx)
|
||
{
|
||
SCEnter();
|
||
char tname[12];
|
||
uint16_t cpu = 0;
|
||
int ret;
|
||
char *file;
|
||
/* Available cpus */
|
||
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
|
||
RunModeInitialize();
|
||
SCEnter();
|
||
char *file = NULL;
|
||
if (ConfGet("erf-file.file", &file) == 0) {
|
||
SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-file.file "
|
||
"from Conf");
|
||
SCLogError(SC_ERR_RUNMODE, "Failed to get erf-file.file from config.");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
SCLogDebug("file %s", file);
|
||
RunModeInitialize();
|
||
TimeModeSetOffline();
|
||
/* create the threads */
|
||
ThreadVars *tv_receiveerf =
|
||
TmThreadCreatePacketHandler("ReceiveErfFile",
|
||
"packetpool", "packetpool",
|
||
"pickup-queue", "simple",
|
||
"1slot");
|
||
if (tv_receiveerf == NULL) {
|
||
/* Basically the same setup as PCAP files. */
|
||
ThreadVars *tv = TmThreadCreatePacketHandler("ErfFile",
|
||
"packetpool", "packetpool",
|
||
"packetpool", "packetpool",
|
||
"pktacqloop");
|
||
if (tv == NULL) {
|
||
printf("ERROR: TmThreadsCreate failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv_receiveerf, tm_module, file);
|
||
TmSlotSetFuncAppend(tv, tm_module, file);
|
||
if (threading_set_cpu_affinity) {
|
||
TmThreadSetCPUAffinity(tv_receiveerf, 0);
|
||
if (ncpus > 1)
|
||
TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
|
||
}
|
||
if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
|
||
printf("ERROR: TmThreadSpawn failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
ThreadVars *tv_decode1 =
|
||
TmThreadCreatePacketHandler("Decode & Stream",
|
||
"pickup-queue", "simple",
|
||
"stream-queue1", "simple",
|
||
"varslot");
|
||
if (tv_decode1 == NULL) {
|
||
printf("ERROR: TmThreadsCreate failed for Decode1\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
tm_module = TmModuleGetByName("DecodeErfFile");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
|
||
TmSlotSetFuncAppend(tv, tm_module, NULL);
|
||
tm_module = TmModuleGetByName("StreamTcp");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
|
||
TmSlotSetFuncAppend(tv, tm_module, NULL);
|
||
if (threading_set_cpu_affinity) {
|
||
TmThreadSetCPUAffinity(tv_decode1, 0);
|
||
if (ncpus > 1)
|
||
TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
|
||
tm_module = TmModuleGetByName("Detect");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName Detect failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
|
||
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
|
||
SetupOutputs(tv);
|
||
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
|
||
printf("ERROR: TmThreadSpawn failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
SCLogInfo("RunModeErfFileSingle initialised");
|
||
SCReturnInt(0);
|
||
}
|
||
int RunModeErfFileAutoFp(DetectEngineCtx *de_ctx)
|
||
{
|
||
SCEnter();
|
||
char tname[12];
|
||
char qname[12];
|
||
uint16_t cpu = 0;
|
||
char queues[2048] = "";
|
||
RunModeInitialize();
|
||
/* Available cpus */
|
||
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
|
||
/* start with cpu 1 so that if we're creating an odd number of detect
|
||
* threads we're not creating the most on CPU0. */
|
||
if (ncpus > 0)
|
||
... | ... | |
int thread;
|
||
for (thread = 0; thread < thread_max; thread++) {
|
||
if (strlen(queues) > 0)
|
||
strlcat(queues, ",", sizeof(queues));
|
||
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
|
||
strlcat(queues, qname, sizeof(queues));
|
||
}
|
||
SCLogDebug("queues %s", queues);
|
||
char *file = NULL;
|
||
if (ConfGet("erf-file.file", &file) == 0) {
|
||
SCLogError(SC_ERR_RUNMODE,
|
||
"Failed retrieving erf-file.file from config");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TimeModeSetOffline();
|
||
/* create the threads */
|
||
ThreadVars *tv =
|
||
TmThreadCreatePacketHandler("ReceiveErfFile",
|
||
"packetpool", "packetpool",
|
||
queues, "flow",
|
||
"pktacqloop");
|
||
if (tv == NULL) {
|
||
printf("ERROR: TmThreadsCreate failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv, tm_module, file);
|
||
tm_module = TmModuleGetByName("DecodeErfFile");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv, tm_module, NULL);
|
||
if (threading_set_cpu_affinity) {
|
||
TmThreadSetCPUAffinity(tv, 0);
|
||
if (ncpus > 1)
|
||
TmThreadSetThreadPriority(tv, PRIO_MEDIUM);
|
||
}
|
||
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
|
||
printf("ERROR: TmThreadSpawn failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
for (thread = 0; thread < thread_max; thread++) {
|
||
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
|
||
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
|
||
SCLogDebug("tname %s, qname %s", tname, qname);
|
||
char *thread_name = SCStrdup(tname);
|
||
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
|
||
ThreadVars *tv_detect_ncpu =
|
||
TmThreadCreatePacketHandler(thread_name,
|
||
"stream-queue1", "simple",
|
||
"alert-queue1", "simple",
|
||
"1slot");
|
||
qname, "flow",
|
||
"packetpool", "packetpool",
|
||
"varslot");
|
||
if (tv_detect_ncpu == NULL) {
|
||
printf("ERROR: TmThreadsCreate failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
tm_module = TmModuleGetByName("StreamTcp");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
|
||
tm_module = TmModuleGetByName("Detect");
|
||
if (tm_module == NULL) {
|
||
printf("ERROR: TmModuleGetByName Detect failed\n");
|
||
... | ... | |
}
|
||
tv_detect_ncpu->thread_group_name = thread_group_name;
|
||
/* add outputs as well */
|
||
SetupOutputs(tv_detect_ncpu);
|
||
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
|
||
printf("ERROR: TmThreadSpawn failed\n");
|
||
exit(EXIT_FAILURE);
|
||
... | ... | |
cpu++;
|
||
}
|
||
ThreadVars *tv_outputs =
|
||
TmThreadCreatePacketHandler("Outputs",
|
||
"alert-queue1", "simple",
|
||
"packetpool", "packetpool",
|
||
"varslot");
|
||
if (tv_outputs == NULL) {
|
||
printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
SetupOutputs(tv_outputs);
|
||
if (threading_set_cpu_affinity) {
|
||
TmThreadSetCPUAffinity(tv_outputs, 0);
|
||
if (ncpus > 1)
|
||
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
|
||
}
|
||
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
|
||
printf("ERROR: TmThreadSpawn failed\n");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
SCLogInfo("RunModeErfFileAutoFp initialised");
|
||
return 0;
|
||
SCReturnInt(0);
|
||
}
|
src/runmode-erf-file.h | ||
---|---|---|
#ifndef __RUNMODE_ERF_FILE_H__
|
||
#define __RUNMODE_ERF_FILE_H__
|
||
int RunModeErfFileAuto(DetectEngineCtx *);
|
||
int RunModeErfFileSingle(DetectEngineCtx *);
|
||
int RunModeErfFileAutoFp(DetectEngineCtx *);
|
||
void RunModeErfFileRegister(void);
|
||
const char *RunModeErfFileGetDefaultMode(void);
|
||
src/source-erf-file.c | ||
---|---|---|
} __attribute__((packed)) DagRecord;
|
||
typedef struct ErfFileThreadVars_ {
|
||
FILE *erf;
|
||
ThreadVars *tv;
|
||
TmSlot *slot;
|
||
FILE *erf;
|
||
uint32_t pkts;
|
||
uint64_t bytes;
|
||
} ErfFileThreadVars;
|
||
TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
|
||
static inline TmEcode ReadErfRecord(ThreadVars *, Packet *, void *);
|
||
TmEcode ReceiveErfFileLoop(ThreadVars *, void *, void *);
|
||
TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **);
|
||
void ReceiveErfFileThreadExitStats(ThreadVars *, void *);
|
||
TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *);
|
||
... | ... | |
{
|
||
tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile";
|
||
tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit;
|
||
tmm_modules[TMM_RECEIVEERFFILE].Func = ReceiveErfFile;
|
||
tmm_modules[TMM_RECEIVEERFFILE].Func = NULL;
|
||
tmm_modules[TMM_RECEIVEERFFILE].PktAcqLoop = ReceiveErfFileLoop;
|
||
tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats =
|
||
ReceiveErfFileThreadExitStats;
|
||
tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL;
|
||
... | ... | |
}
|
||
/**
|
||
* \brief Thread entry function for ERF reading.
|
||
*
|
||
* Reads a new ERF record from the file and sets up the Packet for
|
||
* decoding.
|
||
* \brief ERF file reading loop.
|
||
*/
|
||
TmEcode
|
||
ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
|
||
TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
|
||
{
|
||
ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
|
||
etv->slot = ((TmSlot *)slot)->slot_next;
|
||
Packet *p;
|
||
uint16_t packet_q_len = 0;
|
||
while (1) {
|
||
if (suricata_ctl_flags & SURICATA_STOP ||
|
||
suricata_ctl_flags & SURICATA_KILL) {
|
||
SCReturnInt(TM_ECODE_OK);
|
||
}
|
||
/* Make sure we have at least one packet in the packet pool,
|
||
* to prevent us from alloc'ing packets at line rate. */
|
||
do {
|
||
packet_q_len = PacketPoolSize();
|
||
if (unlikely(packet_q_len == 0)) {
|
||
PacketPoolWait();
|
||
}
|
||
} while (packet_q_len == 0);
|
||
p = PacketGetFromQueueOrAlloc();
|
||
if (unlikely(p == NULL)) {
|
||
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a packet.");
|
||
EngineStop();
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
if (ReadErfRecord(tv, p, data) != TM_ECODE_OK) {
|
||
TmqhOutputPacketpool(etv->tv, p);
|
||
EngineStop();
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
if (TmThreadsSlotProcessPkt(etv->tv, etv->slot, p) != TM_ECODE_OK) {
|
||
EngineStop();
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
}
|
||
}
|
||
static inline TmEcode ReadErfRecord(ThreadVars *tv, Packet *p, void *data)
|
||
{
|
||
SCEnter();
|
||
... | ... | |
int r = fread(&dr, sizeof(DagRecord), 1, etv->erf);
|
||
if (r < 1) {
|
||
SCLogInfo("End of ERF file reached or an error occurred.");
|
||
EngineStop();
|
||
if (feof(etv->erf)) {
|
||
SCLogInfo("End of ERF file reached");
|
||
}
|
||
else {
|
||
SCLogInfo("Error reading ERF record");
|
||
}
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
int rlen = ntohs(dr.rlen);
|
||
int wlen = ntohs(dr.wlen);
|
||
r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf);
|
||
if (r < 1) {
|
||
SCLogInfo("End of ERF file reached or an error occurred.");
|
||
EngineStop();
|
||
if (feof(etv->erf)) {
|
||
SCLogInfo("End of ERF file reached");
|
||
}
|
||
else {
|
||
SCLogInfo("Error reading ERF record");
|
||
}
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
- « Previous
- 1
- 2
- Next »