Feature #431 » 0001-Apply-changes-recommended-by-Stephen-Donnely-of-Enda.patch
src/source-erf-dag.c | ||
---|---|---|
#else /* Implied we do have DAG support */
|
||
#define DAG_MAX_READ_PKTS 256
|
||
#include "source-erf-dag.h"
|
||
#include <dagapi.h>
|
||
... | ... | |
int dagfd;
|
||
int dagstream;
|
||
char dagname[DAGNAME_BUFSIZE];
|
||
uint32_t dag_max_read_packets;
|
||
struct timeval maxwait, poll; /* Could possibly be made static */
|
||
uint32_t pkts;
|
||
uint64_t bytes;
|
||
/* Track current location in the DAG stream input buffer
|
||
/* Current location in the DAG stream input buffer.
|
||
*/
|
||
uint8_t* top; /* We track top as well so we don't have to
|
||
call dag_advance_stream again if there
|
||
are still pkts to process.
|
||
JNM: Currently not used.
|
||
*/
|
||
uint8_t* btm;
|
||
uint8_t *top;
|
||
uint8_t *btm;
|
||
} ErfDagThreadVars;
|
||
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
|
||
uint32_t *pkts_read);
|
||
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
|
||
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
|
||
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
|
||
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
|
||
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
|
||
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
|
||
uint32_t *pkts_read);
|
||
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
|
||
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
|
||
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
|
||
... | ... | |
memset(ewtn, 0, sizeof(*ewtn));
|
||
/* Use max_pending_packets as our maximum number of packets read
|
||
from the DAG buffer.
|
||
*/
|
||
ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
|
||
DAG_MAX_READ_PKTS : max_pending_packets;
|
||
/* dag_parse_name will return a DAG device name and stream number
|
||
* to open for this thread.
|
||
*/
|
||
... | ... | |
* Initialise DAG Polling parameters.
|
||
*/
|
||
timerclear(&ewtn->maxwait);
|
||
ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
|
||
ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */
|
||
timerclear(&ewtn->poll);
|
||
ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
|
||
ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */
|
||
/* 32kB minimum data to return -- we still restrict the number of
|
||
* pkts that are processed to a maximum of dag_max_read_packets.
|
||
... | ... | |
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
|
||
TmSlot *s = (TmSlot *)slot;
|
||
dtv->slot = s->slot_next;
|
||
SCEnter();
|
||
uint16_t packet_q_len = 0;
|
||
uint32_t diff = 0;
|
||
int err;
|
||
uint8_t *top = NULL;
|
||
uint32_t pkts_read = 0;
|
||
SCEnter();
|
||
while (1)
|
||
{
|
||
if (suricata_ctl_flags & SURICATA_STOP ||
|
||
... | ... | |
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
/* Make sure we have at least one packet in the packet pool,
|
||
* to prevent us from alloc'ing packets at line rate. */
|
||
do {
|
||
packet_q_len = PacketPoolSize();
|
||
if (unlikely(packet_q_len == 0)) {
|
||
PacketPoolWait();
|
||
}
|
||
} while (packet_q_len == 0);
|
||
/* NOTE/JNM: This might not work well if we start restricting the
|
||
* number of ERF records processed per call to a small number as
|
||
* the over head required here could exceed the time it takes to
|
||
* process a small number of ERF records.
|
||
*
|
||
* XXX/JNM: Possibly process the DAG stream buffer first if there
|
||
* are ERF packets or else call dag_advance_stream and then process
|
||
* the DAG stream buffer.
|
||
*/
|
||
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
|
||
if (NULL == top)
|
||
{
|
||
if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
|
||
usleep(10 * 1000);
|
||
dtv->btm = dtv->top;
|
||
if (top == NULL) {
|
||
if (errno == EAGAIN) {
|
||
if (dtv->dagstream & 0x1) {
|
||
usleep(10 * 1000);
|
||
dtv->btm = dtv->top;
|
||
}
|
||
continue;
|
||
}
|
||
else {
|
||
... | ... | |
}
|
||
diff = top - dtv->btm;
|
||
if (diff == 0)
|
||
{
|
||
if (diff == 0) {
|
||
continue;
|
||
}
|
||
... | ... | |
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
|
||
SCReturnInt(err);
|
||
}
|
||
}
|
||
SCLogDebug("Read %d records from stream: %d, DAG: %s",
|
||
pkts_read, dtv->dagstream, dtv->dagname);
|
||
SCLogDebug("Read %d records from stream: %d, DAG: %s",
|
||
pkts_read, dtv->dagstream, dtv->dagname);
|
||
}
|
||
if (suricata_ctl_flags != 0) {
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
... | ... | |
* This function takes a pointer to buffer read from the DAG interface
|
||
* and processes it individual records.
|
||
*/
|
||
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
|
||
uint8_t* top,
|
||
uint32_t *pkts_read)
|
||
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
|
||
uint32_t *pkts_read)
|
||
{
|
||
SCEnter();
|
||
Packet *p;
|
||
int err = 0;
|
||
dag_record_t* dr = NULL;
|
||
char *prec = NULL;
|
||
int rlen;
|
||
int err = 0;
|
||
dag_record_t *dr = NULL;
|
||
char *prec = NULL;
|
||
int rlen;
|
||
char hdr_type = 0;
|
||
int processed = 0;
|
||
int packet_q_len = 0;
|
||
|
||
*pkts_read = 0;
|
||
while(((top-(ewtn->btm))>=dag_record_size) &&
|
||
((*pkts_read)<(ewtn->dag_max_read_packets)))
|
||
{
|
||
prec = (char*)ewtn->btm;
|
||
dr = (dag_record_t*)prec;
|
||
while (((top - ewtn->btm) >= dag_record_size) &&
|
||
((processed + dag_record_size) < 4*1024*1024)) {
|
||
rlen = ntohs(dr->rlen);
|
||
/* Make sure we have at least one packet in the packet pool,
|
||
* to prevent us from alloc'ing packets at line rate. */
|
||
do {
|
||
packet_q_len = PacketPoolSize();
|
||
if (unlikely(packet_q_len == 0)) {
|
||
PacketPoolWait();
|
||
}
|
||
} while (packet_q_len == 0);
|
||
if (rlen == 20) {
|
||
rlen = 28;
|
||
SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
|
||
"Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
|
||
ewtn->dagstream, ewtn->dagname);
|
||
}
|
||
prec = (char *)ewtn->btm;
|
||
dr = (dag_record_t*)prec;
|
||
rlen = ntohs(dr->rlen);
|
||
hdr_type = dr->type;
|
||
/* If we don't have enough data to finsih processing this ERF record
|
||
* return and maybe next time we will.
|
||
*/
|
||
if ((top-(ewtn->btm)) < rlen)
|
||
if ((top - ewtn->btm) < rlen)
|
||
SCReturnInt(TM_ECODE_OK);
|
||
p = PacketGetFromQueueOrAlloc();
|
||
if (p == NULL) {
|
||
SCLogError(SC_ERR_MEM_ALLOC,
|
||
"Failed to allocate a Packet on stream: %d, DAG: %s",
|
||
ewtn->dagstream, ewtn->dagname);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
err = ProcessErfDagRecord(ewtn, prec, p);
|
||
ewtn->btm += rlen;
|
||
processed += rlen;
|
||
if (err != TM_ECODE_OK) {
|
||
TmqhOutputPacketpool(ewtn->tv, p);
|
||
SCReturnInt(err);
|
||
/* Only support ethernet at this time. */
|
||
switch (hdr_type & 0x7f) {
|
||
case TYPE_PAD:
|
||
/* Skip. */
|
||
continue;
|
||
case TYPE_ETH:
|
||
case TYPE_DSM_COLOR_ETH:
|
||
case TYPE_COLOR_ETH:
|
||
case TYPE_COLOR_HASH_ETH:
|
||
break;
|
||
default:
|
||
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
"Processing of DAG record type: %d not implemented.", dr->type);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
ewtn->btm += rlen;
|
||
err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
|
||
err = ProcessErfDagRecord(ewtn, prec);
|
||
if (err != TM_ECODE_OK) {
|
||
return err;
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
(*pkts_read)++;
|
||
... | ... | |
* \param prec pointer to a DAG record.
|
||
* \param
|
||
*/
|
||
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
|
||
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
|
||
{
|
||
SCEnter();
|
||
int wlen = 0;
|
||
int rlen = 0;
|
||
int hdr_num = 0;
|
||
char hdr_type = 0;
|
||
dag_record_t *dr = (dag_record_t*)prec;
|
||
erf_payload_t *pload;
|
||
Packet *p;
|
||
assert(prec);
|
||
assert(p);
|
||
hdr_type = dr->type;
|
||
wlen = ntohs(dr->wlen);
|
||
rlen = ntohs(dr->rlen);
|
||
if (p == NULL) SCReturnInt(TM_ECODE_OK);
|
||
/* count extension headers */
|
||
while (hdr_type & 0x80) {
|
||
if (rlen < (dag_record_size + (hdr_num * 8))) {
|
||
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
"Insufficient captured packet length.");
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
hdr_type = prec[(dag_record_size + (hdr_num * 8))];
|
||
hdr_num++;
|
||
}
|
||
/* Only support ethernet at this time. */
|
||
if (dr->type != TYPE_ETH &&
|
||
dr->type != TYPE_DSM_COLOR_ETH &&
|
||
dr->type != TYPE_COLOR_ETH &&
|
||
dr->type != TYPE_COLOR_HASH_ETH) {
|
||
SCLogError(SC_ERR_UNIMPLEMENTED,
|
||
"Processing of DAG record type: %d not implemented.", dr->type);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
/* Check that the whole frame was captured */
|
||
if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
|
||
SCLogInfo("Incomplete frame captured.");
|
||
SCReturnInt(TM_ECODE_OK);
|
||
}
|
||
wlen = ntohs(dr->wlen);
|
||
/* skip over extension headers */
|
||
pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
|
||
pload = &(dr->rec);
|
||
p = PacketGetFromQueueOrAlloc();
|
||
if (p == NULL) {
|
||
SCLogError(SC_ERR_MEM_ALLOC,
|
||
"Failed to allocate a Packet on stream: %d, DAG: %s",
|
||
ewtn->dagstream, ewtn->dagname);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
SET_PKT_LEN(p, wlen - 4); /* Trim the FCS... */
|
||
p->datalink = LINKTYPE_ETHERNET;
|
||
... | ... | |
/* Take into account for link type Ethernet ETH frame starts
|
||
* after ther ERF header + pad.
|
||
*/
|
||
PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
|
||
if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
|
||
TmqhOutputPacketpool(ewtn->tv, p);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
/* Convert ERF time to timeval - from libpcap. */
|
||
uint64_t ts = dr->ts;
|
||
... | ... | |
ewtn->pkts++;
|
||
ewtn->bytes += wlen;
|
||
if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
|
||
TmqhOutputPacketpool(ewtn->tv, p);
|
||
SCReturnInt(TM_ECODE_FAILED);
|
||
}
|
||
SCReturnInt(TM_ECODE_OK);
|
||
}
|
||