Project

General

Profile

Feature #431 » 0001-Apply-changes-recommended-by-Stephen-Donnely-of-Enda.patch

Jason Ish, 04/03/2012 05:33 PM

View differences:

src/source-erf-dag.c
#else /* Implied we do have DAG support */
#define DAG_MAX_READ_PKTS 256
#include "source-erf-dag.h"
#include <dagapi.h>
......
int dagfd;
int dagstream;
char dagname[DAGNAME_BUFSIZE];
uint32_t dag_max_read_packets;
struct timeval maxwait, poll; /* Could possibly be made static */
uint32_t pkts;
uint64_t bytes;
/* Track current location in the DAG stream input buffer
/* Current location in the DAG stream input buffer.
*/
uint8_t* top; /* We track top as well so we don't have to
call dag_advance_stream again if there
are still pkts to process.
JNM: Currently not used.
*/
uint8_t* btm;
uint8_t *top;
uint8_t *btm;
} ErfDagThreadVars;
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
uint32_t *pkts_read);
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
uint32_t *pkts_read);
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
......
memset(ewtn, 0, sizeof(*ewtn));
/* Use max_pending_packets as our maximum number of packets read
from the DAG buffer.
*/
ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
DAG_MAX_READ_PKTS : max_pending_packets;
/* dag_parse_name will return a DAG device name and stream number
* to open for this thread.
*/
......
* Initialise DAG Polling parameters.
*/
timerclear(&ewtn->maxwait);
ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */
timerclear(&ewtn->poll);
ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */
/* 32kB minimum data to return -- we still restrict the number of
* pkts that are processed to a maximum of dag_max_read_packets.
......
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
dtv->slot = s->slot_next;
SCEnter();
uint16_t packet_q_len = 0;
uint32_t diff = 0;
int err;
uint8_t *top = NULL;
uint32_t pkts_read = 0;
SCEnter();
while (1)
{
if (suricata_ctl_flags & SURICATA_STOP ||
......
SCReturnInt(TM_ECODE_FAILED);
}
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc'ing packets at line rate. */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
/* NOTE/JNM: This might not work well if we start restricting the
* number of ERF records processed per call to a small number as
* the over head required here could exceed the time it takes to
* process a small number of ERF records.
*
* XXX/JNM: Possibly process the DAG stream buffer first if there
* are ERF packets or else call dag_advance_stream and then process
* the DAG stream buffer.
*/
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
if (NULL == top)
{
if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
usleep(10 * 1000);
dtv->btm = dtv->top;
if (top == NULL) {
if (errno == EAGAIN) {
if (dtv->dagstream & 0x1) {
usleep(10 * 1000);
dtv->btm = dtv->top;
}
continue;
}
else {
......
}
diff = top - dtv->btm;
if (diff == 0)
{
if (diff == 0) {
continue;
}
......
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
SCReturnInt(err);
}
}
SCLogDebug("Read %d records from stream: %d, DAG: %s",
pkts_read, dtv->dagstream, dtv->dagname);
SCLogDebug("Read %d records from stream: %d, DAG: %s",
pkts_read, dtv->dagstream, dtv->dagname);
}
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
......
* This function takes a pointer to buffer read from the DAG interface
* and processes it individual records.
*/
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
uint8_t* top,
uint32_t *pkts_read)
static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
uint32_t *pkts_read)
{
SCEnter();
Packet *p;
int err = 0;
dag_record_t* dr = NULL;
char *prec = NULL;
int rlen;
int err = 0;
dag_record_t *dr = NULL;
char *prec = NULL;
int rlen;
char hdr_type = 0;
int processed = 0;
int packet_q_len = 0;
*pkts_read = 0;
while(((top-(ewtn->btm))>=dag_record_size) &&
((*pkts_read)<(ewtn->dag_max_read_packets)))
{
prec = (char*)ewtn->btm;
dr = (dag_record_t*)prec;
while (((top - ewtn->btm) >= dag_record_size) &&
((processed + dag_record_size) < 4*1024*1024)) {
rlen = ntohs(dr->rlen);
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc'ing packets at line rate. */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
if (rlen == 20) {
rlen = 28;
SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
"Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
}
prec = (char *)ewtn->btm;
dr = (dag_record_t*)prec;
rlen = ntohs(dr->rlen);
hdr_type = dr->type;
/* If we don't have enough data to finsih processing this ERF record
* return and maybe next time we will.
*/
if ((top-(ewtn->btm)) < rlen)
if ((top - ewtn->btm) < rlen)
SCReturnInt(TM_ECODE_OK);
p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate a Packet on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
err = ProcessErfDagRecord(ewtn, prec, p);
ewtn->btm += rlen;
processed += rlen;
if (err != TM_ECODE_OK) {
TmqhOutputPacketpool(ewtn->tv, p);
SCReturnInt(err);
/* Only support ethernet at this time. */
switch (hdr_type & 0x7f) {
case TYPE_PAD:
/* Skip. */
continue;
case TYPE_ETH:
case TYPE_DSM_COLOR_ETH:
case TYPE_COLOR_ETH:
case TYPE_COLOR_HASH_ETH:
break;
default:
SCLogError(SC_ERR_UNIMPLEMENTED,
"Processing of DAG record type: %d not implemented.", dr->type);
SCReturnInt(TM_ECODE_FAILED);
}
ewtn->btm += rlen;
err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
err = ProcessErfDagRecord(ewtn, prec);
if (err != TM_ECODE_OK) {
return err;
SCReturnInt(TM_ECODE_FAILED);
}
(*pkts_read)++;
......
* \param prec pointer to a DAG record.
* \param
*/
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
{
SCEnter();
int wlen = 0;
int rlen = 0;
int hdr_num = 0;
char hdr_type = 0;
dag_record_t *dr = (dag_record_t*)prec;
erf_payload_t *pload;
Packet *p;
assert(prec);
assert(p);
hdr_type = dr->type;
wlen = ntohs(dr->wlen);
rlen = ntohs(dr->rlen);
if (p == NULL) SCReturnInt(TM_ECODE_OK);
/* count extension headers */
while (hdr_type & 0x80) {
if (rlen < (dag_record_size + (hdr_num * 8))) {
SCLogError(SC_ERR_UNIMPLEMENTED,
"Insufficient captured packet length.");
SCReturnInt(TM_ECODE_FAILED);
}
hdr_type = prec[(dag_record_size + (hdr_num * 8))];
hdr_num++;
}
/* Only support ethernet at this time. */
if (dr->type != TYPE_ETH &&
dr->type != TYPE_DSM_COLOR_ETH &&
dr->type != TYPE_COLOR_ETH &&
dr->type != TYPE_COLOR_HASH_ETH) {
SCLogError(SC_ERR_UNIMPLEMENTED,
"Processing of DAG record type: %d not implemented.", dr->type);
SCReturnInt(TM_ECODE_FAILED);
/* Check that the whole frame was captured */
if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
SCLogInfo("Incomplete frame captured.");
SCReturnInt(TM_ECODE_OK);
}
wlen = ntohs(dr->wlen);
/* skip over extension headers */
pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
pload = &(dr->rec);
p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate a Packet on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
SET_PKT_LEN(p, wlen - 4); /* Trim the FCS... */
p->datalink = LINKTYPE_ETHERNET;
......
/* Take into account for link type Ethernet ETH frame starts
* after ther ERF header + pad.
*/
PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
TmqhOutputPacketpool(ewtn->tv, p);
SCReturnInt(TM_ECODE_FAILED);
}
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr->ts;
......
ewtn->pkts++;
ewtn->bytes += wlen;
if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(ewtn->tv, p);
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
(1-1/2)