Feature #431 » 0001-Apply-changes-recommended-by-Stephen-Donnely-of-Enda.patch
| src/source-erf-dag.c | ||
|---|---|---|
|
#else /* Implied we do have DAG support */
|
||
|
#define DAG_MAX_READ_PKTS 256
|
||
|
#include "source-erf-dag.h"
|
||
|
#include <dagapi.h>
|
||
| ... | ... | |
|
int dagfd;
|
||
|
int dagstream;
|
||
|
char dagname[DAGNAME_BUFSIZE];
|
||
|
uint32_t dag_max_read_packets;
|
||
|
struct timeval maxwait, poll; /* Could possibly be made static */
|
||
|
uint32_t pkts;
|
||
|
uint64_t bytes;
|
||
|
/* Track current location in the DAG stream input buffer
|
||
|
/* Current location in the DAG stream input buffer.
|
||
|
*/
|
||
|
uint8_t* top; /* We track top as well so we don't have to
|
||
|
call dag_advance_stream again if there
|
||
|
are still pkts to process.
|
||
|
JNM: Currently not used.
|
||
|
*/
|
||
|
uint8_t* btm;
|
||
|
uint8_t *top;
|
||
|
uint8_t *btm;
|
||
|
} ErfDagThreadVars;
|
||
|
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
|
||
|
uint32_t *pkts_read);
|
||
|
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
|
||
|
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
|
||
|
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
|
||
|
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
|
||
|
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
|
||
|
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
|
||
|
uint32_t *pkts_read);
|
||
|
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
|
||
|
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
|
||
|
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
|
||
| ... | ... | |
|
memset(ewtn, 0, sizeof(*ewtn));
|
||
|
/* Use max_pending_packets as our maximum number of packets read
|
||
|
from the DAG buffer.
|
||
|
*/
|
||
|
ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
|
||
|
DAG_MAX_READ_PKTS : max_pending_packets;
|
||
|
/* dag_parse_name will return a DAG device name and stream number
|
||
|
* to open for this thread.
|
||
|
*/
|
||
| ... | ... | |
|
* Initialise DAG Polling parameters.
|
||
|
*/
|
||
|
timerclear(&ewtn->maxwait);
|
||
|
ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
|
||
|
ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */
|
||
|
timerclear(&ewtn->poll);
|
||
|
ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
|
||
|
ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */
|
||
|
/* 32kB minimum data to return -- we still restrict the number of
|
||
|
* pkts that are processed to a maximum of dag_max_read_packets.
|
||
| ... | ... | |
|
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
|
||
|
TmSlot *s = (TmSlot *)slot;
|
||
|
dtv->slot = s->slot_next;
|
||
|
SCEnter();
|
||
|
uint16_t packet_q_len = 0;
|
||
|
uint32_t diff = 0;
|
||
|
int err;
|
||
|
uint8_t *top = NULL;
|
||
|
uint32_t pkts_read = 0;
|
||
|
SCEnter();
|
||
|
while (1)
|
||
|
{
|
||
|
if (suricata_ctl_flags & SURICATA_STOP ||
|
||
| ... | ... | |
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
/* Make sure we have at least one packet in the packet pool,
|
||
|
* to prevent us from alloc'ing packets at line rate. */
|
||
|
do {
|
||
|
packet_q_len = PacketPoolSize();
|
||
|
if (unlikely(packet_q_len == 0)) {
|
||
|
PacketPoolWait();
|
||
|
}
|
||
|
} while (packet_q_len == 0);
|
||
|
/* NOTE/JNM: This might not work well if we start restricting the
|
||
|
* number of ERF records processed per call to a small number as
|
||
|
* the over head required here could exceed the time it takes to
|
||
|
* process a small number of ERF records.
|
||
|
*
|
||
|
* XXX/JNM: Possibly process the DAG stream buffer first if there
|
||
|
* are ERF packets or else call dag_advance_stream and then process
|
||
|
* the DAG stream buffer.
|
||
|
*/
|
||
|
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
|
||
|
if (NULL == top)
|
||
|
{
|
||
|
if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
|
||
|
usleep(10 * 1000);
|
||
|
dtv->btm = dtv->top;
|
||
|
if (top == NULL) {
|
||
|
if (errno == EAGAIN) {
|
||
|
if (dtv->dagstream & 0x1) {
|
||
|
usleep(10 * 1000);
|
||
|
dtv->btm = dtv->top;
|
||
|
}
|
||
|
continue;
|
||
|
}
|
||
|
else {
|
||
| ... | ... | |
|
}
|
||
|
diff = top - dtv->btm;
|
||
|
if (diff == 0)
|
||
|
{
|
||
|
if (diff == 0) {
|
||
|
continue;
|
||
|
}
|
||
| ... | ... | |
|
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
|
||
|
SCReturnInt(err);
|
||
|
}
|
||
|
}
|
||
|
SCLogDebug("Read %d records from stream: %d, DAG: %s",
|
||
|
pkts_read, dtv->dagstream, dtv->dagname);
|
||
|
SCLogDebug("Read %d records from stream: %d, DAG: %s",
|
||
|
pkts_read, dtv->dagstream, dtv->dagname);
|
||
|
}
|
||
|
if (suricata_ctl_flags != 0) {
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
| ... | ... | |
|
* This function takes a pointer to buffer read from the DAG interface
|
||
|
* and processes it individual records.
|
||
|
*/
|
||
|
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
|
||
|
uint8_t* top,
|
||
|
uint32_t *pkts_read)
|
||
|
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
|
||
|
uint32_t *pkts_read)
|
||
|
{
|
||
|
SCEnter();
|
||
|
Packet *p;
|
||
|
int err = 0;
|
||
|
dag_record_t* dr = NULL;
|
||
|
char *prec = NULL;
|
||
|
int rlen;
|
||
|
int err = 0;
|
||
|
dag_record_t *dr = NULL;
|
||
|
char *prec = NULL;
|
||
|
int rlen;
|
||
|
char hdr_type = 0;
|
||
|
int processed = 0;
|
||
|
int packet_q_len = 0;
|
||
|
|
||
|
*pkts_read = 0;
|
||
|
while(((top-(ewtn->btm))>=dag_record_size) &&
|
||
|
((*pkts_read)<(ewtn->dag_max_read_packets)))
|
||
|
{
|
||
|
prec = (char*)ewtn->btm;
|
||
|
dr = (dag_record_t*)prec;
|
||
|
while (((top - ewtn->btm) >= dag_record_size) &&
|
||
|
((processed + dag_record_size) < 4*1024*1024)) {
|
||
|
rlen = ntohs(dr->rlen);
|
||
|
/* Make sure we have at least one packet in the packet pool,
|
||
|
* to prevent us from alloc'ing packets at line rate. */
|
||
|
do {
|
||
|
packet_q_len = PacketPoolSize();
|
||
|
if (unlikely(packet_q_len == 0)) {
|
||
|
PacketPoolWait();
|
||
|
}
|
||
|
} while (packet_q_len == 0);
|
||
|
if (rlen == 20) {
|
||
|
rlen = 28;
|
||
|
SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
|
||
|
"Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
|
||
|
ewtn->dagstream, ewtn->dagname);
|
||
|
}
|
||
|
prec = (char *)ewtn->btm;
|
||
|
dr = (dag_record_t*)prec;
|
||
|
rlen = ntohs(dr->rlen);
|
||
|
hdr_type = dr->type;
|
||
|
/* If we don't have enough data to finsih processing this ERF record
|
||
|
* return and maybe next time we will.
|
||
|
*/
|
||
|
if ((top-(ewtn->btm)) < rlen)
|
||
|
if ((top - ewtn->btm) < rlen)
|
||
|
SCReturnInt(TM_ECODE_OK);
|
||
|
p = PacketGetFromQueueOrAlloc();
|
||
|
if (p == NULL) {
|
||
|
SCLogError(SC_ERR_MEM_ALLOC,
|
||
|
"Failed to allocate a Packet on stream: %d, DAG: %s",
|
||
|
ewtn->dagstream, ewtn->dagname);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
err = ProcessErfDagRecord(ewtn, prec, p);
|
||
|
ewtn->btm += rlen;
|
||
|
processed += rlen;
|
||
|
if (err != TM_ECODE_OK) {
|
||
|
TmqhOutputPacketpool(ewtn->tv, p);
|
||
|
SCReturnInt(err);
|
||
|
/* Only support ethernet at this time. */
|
||
|
switch (hdr_type & 0x7f) {
|
||
|
case TYPE_PAD:
|
||
|
/* Skip. */
|
||
|
continue;
|
||
|
case TYPE_ETH:
|
||
|
case TYPE_DSM_COLOR_ETH:
|
||
|
case TYPE_COLOR_ETH:
|
||
|
case TYPE_COLOR_HASH_ETH:
|
||
|
break;
|
||
|
default:
|
||
|
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
|
"Processing of DAG record type: %d not implemented.", dr->type);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
ewtn->btm += rlen;
|
||
|
err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
|
||
|
err = ProcessErfDagRecord(ewtn, prec);
|
||
|
if (err != TM_ECODE_OK) {
|
||
|
return err;
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
(*pkts_read)++;
|
||
| ... | ... | |
|
* \param prec pointer to a DAG record.
|
||
|
* \param
|
||
|
*/
|
||
|
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
|
||
|
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
|
||
|
{
|
||
|
SCEnter();
|
||
|
int wlen = 0;
|
||
|
int rlen = 0;
|
||
|
int hdr_num = 0;
|
||
|
char hdr_type = 0;
|
||
|
dag_record_t *dr = (dag_record_t*)prec;
|
||
|
erf_payload_t *pload;
|
||
|
Packet *p;
|
||
|
assert(prec);
|
||
|
assert(p);
|
||
|
hdr_type = dr->type;
|
||
|
wlen = ntohs(dr->wlen);
|
||
|
rlen = ntohs(dr->rlen);
|
||
|
if (p == NULL) SCReturnInt(TM_ECODE_OK);
|
||
|
/* count extension headers */
|
||
|
while (hdr_type & 0x80) {
|
||
|
if (rlen < (dag_record_size + (hdr_num * 8))) {
|
||
|
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
|
"Insufficient captured packet length.");
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
hdr_type = prec[(dag_record_size + (hdr_num * 8))];
|
||
|
hdr_num++;
|
||
|
}
|
||
|
/* Only support ethernet at this time. */
|
||
|
if (dr->type != TYPE_ETH &&
|
||
|
dr->type != TYPE_DSM_COLOR_ETH &&
|
||
|
dr->type != TYPE_COLOR_ETH &&
|
||
|
dr->type != TYPE_COLOR_HASH_ETH) {
|
||
|
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
|
"Processing of DAG record type: %d not implemented.", dr->type);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
/* Check that the whole frame was captured */
|
||
|
if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
|
||
|
SCLogInfo("Incomplete frame captured.");
|
||
|
SCReturnInt(TM_ECODE_OK);
|
||
|
}
|
||
|
wlen = ntohs(dr->wlen);
|
||
|
/* skip over extension headers */
|
||
|
pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
|
||
|
pload = &(dr->rec);
|
||
|
p = PacketGetFromQueueOrAlloc();
|
||
|
if (p == NULL) {
|
||
|
SCLogError(SC_ERR_MEM_ALLOC,
|
||
|
"Failed to allocate a Packet on stream: %d, DAG: %s",
|
||
|
ewtn->dagstream, ewtn->dagname);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
SET_PKT_LEN(p, wlen - 4); /* Trim the FCS... */
|
||
|
p->datalink = LINKTYPE_ETHERNET;
|
||
| ... | ... | |
|
/* Take into account for link type Ethernet ETH frame starts
|
||
|
* after ther ERF header + pad.
|
||
|
*/
|
||
|
PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
|
||
|
if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
|
||
|
TmqhOutputPacketpool(ewtn->tv, p);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
/* Convert ERF time to timeval - from libpcap. */
|
||
|
uint64_t ts = dr->ts;
|
||
| ... | ... | |
|
ewtn->pkts++;
|
||
|
ewtn->bytes += wlen;
|
||
|
if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
|
||
|
TmqhOutputPacketpool(ewtn->tv, p);
|
||
|
SCReturnInt(TM_ECODE_FAILED);
|
||
|
}
|
||
|
SCReturnInt(TM_ECODE_OK);
|
||
|
}
|
||