/*
* DREADERD/MBUF.C
*
* Non-blocking Queued/Buffered I/O routines. Direct tie-in with
* select() descriptor bitmaps. Embedded memory-concious buffer
* management.
*
* (c)Copyright 1998, Matthew Dillon, All Rights Reserved. Refer to
* the COPYRIGHT file in the base directory of this distribution
* for specific rights granted.
*/
#include "defs.h"
Prototype void MBFlush(Connection *conn, MBufHead *mh);
Prototype void MBFree(MBufHead *mh);
Prototype void MBPoll(MBufHead *mh);
Prototype void MBInit(MBufHead *mh, int fd, MemPool **mpool, MemPool **bpool);
Prototype void MBWrite(MBufHead *mh, const void *data, int len);
Prototype void MBWriteDecode(MBufHead *mh, const char *data, int len);
Prototype void MBCopy(MBufHead *m1, MBufHead *m2);
Prototype void MBPrintf(MBufHead *mh, const char *ctl, ...);
Prototype void MBLogPrintf(Connection *conn, MBufHead *mh, const char *ctl, ...);
Prototype int MBRead(MBufHead *mh, void *data, int len);
Prototype int MBReadLine(MBufHead *mh, char **pptr);
Prototype char *MBNormalize(MBufHead *mh, int *plen);
void DebugData(const char *h, const void *buf, int n);
/*
* MBFlush() - attempt to write output to descriptor, set select bits
* if anything is left after we are through.
*/
void
MBFlush(Connection *conn, MBufHead *mh)
{
MBuf *mbuf;
ForkDesc *desc = conn->co_Desc;
/*
* Try to flush mbuf's, but if a timer gets set we have a delayed-write
* situation and must break out of the loop.
*/
while ((mbuf = mh->mh_MBuf) != NULL && (desc == NULL || desc->d_Timer == NULL)) {
int n = 0;
if (mh->mh_WError == 0 && mh->mh_Fd >= 0) {
n = mbuf->mb_Size - mbuf->mb_Index;
/*
* figure out how much we can write based on rate limiting.
* For the moment just use tv_sec to calculate times. If
* rate limiting is turned on and there is an attempt to
* write too much, we write what is allowed and setup a
* timer to re-enable the select descriptor later when we
* can write more.
*/
if (desc != NULL && conn->co_Auth.dr_ReaderDef != NULL &&
conn->co_ByteCountType < DRBC_XXXX &&
conn->co_Auth.dr_ReaderDef->rd_RateLimit[conn->co_ByteCountType]) {
int rl;
int dt = CurTime.tv_sec - conn->co_RateTv.tv_sec;
if (dt != 0) {
conn->co_RateCounter = 0;
conn->co_RateTv.tv_sec = CurTime.tv_sec;
}
rl = conn->co_Auth.dr_ReaderDef->rd_RateLimit[conn->co_ByteCountType];
if (conn->co_Auth.dr_ReaderDef->rd_RateLimitTax)
rl -= conn->co_Auth.dr_ReaderDef->rd_RateLimitTax *
conn->co_Auth.dr_ConnCount;
if (n > rl - conn->co_RateCounter) {
n = rl - conn->co_RateCounter;
if (n < 0)
n = 0; /* n shouldn't be < zero, but be check anyway */
AddTimer(desc, (1000000 - CurTime.tv_usec) / 1000, TIF_WRITE);
}
}
/*
* And do it
*/
errno = 0;
n = write(mh->mh_Fd, mbuf->mb_Buf + mbuf->mb_Index, n);
if (n < 0) {
if (errno == EINTR)
continue; /* continue while */
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
break; /* break while */
if (errno == ENOTCONN)
break; /* break while */
mh->mh_WError = 1;
} else {
conn->co_RateCounter += n;
conn->co_ClientTotalByteCount += n;
conn->co_ClientGroupByteCount += n;
switch(conn->co_ByteCountType) {
case DRBC_ARTICLE: conn->co_Auth.dr_ByteCountArticle += n;
break;
case DRBC_HEAD: conn->co_Auth.dr_ByteCountHead += n;
break;
case DRBC_BODY: conn->co_Auth.dr_ByteCountBody += n;
break;
case DRBC_LIST: conn->co_Auth.dr_ByteCountList += n;
break;
case DRBC_XOVER: conn->co_Auth.dr_ByteCountXover += n;
break;
case DRBC_XHDR: conn->co_Auth.dr_ByteCountXhdr += n;
break;
case DRBC_NONE: conn->co_Auth.dr_ByteCountOther += n;
break;
}
if (n > 0)
conn->co_LastActiveTime = CurTime.tv_sec;
}
}
if (mh->mh_WError)
n = mbuf->mb_Size - mbuf->mb_Index;
if (DebugOpt > 1) {
DebugData(">>", mbuf->mb_Buf + mbuf->mb_Index, n);
}
mbuf->mb_Index += n;
mh->mh_Bytes -= n;
if (mbuf->mb_Index == mbuf->mb_Size) {
mh->mh_MBuf = mbuf->mb_Next;
if (mbuf->mb_Buf)
zfree(mh->mh_BufPool, mbuf->mb_Buf, mbuf->mb_Max);
zfree(mh->mh_MemPool, mbuf, sizeof(MBuf));
}
}
if (mh->mh_Fd >= 0) {
if (mh->mh_WError || mh->mh_MBuf == NULL || (desc && desc->d_Timer))
FD_CLR(mh->mh_Fd, &WFds);
else
FD_SET(mh->mh_Fd, &WFds);
}
}
void
MBFree(MBufHead *mh)
{
MBuf *mbuf;
while ((mbuf = mh->mh_MBuf) != NULL) {
mh->mh_MBuf = mbuf->mb_Next;
if (mbuf->mb_Buf)
zfree(mh->mh_BufPool, mbuf->mb_Buf, mbuf->mb_Max);
zfree(mh->mh_MemPool, mbuf, sizeof(MBuf));
}
mh->mh_Bytes = 0;
mh->mh_TotalBytes = 0.0;
mh->mh_Wait = 0;
mh->mh_REof = 0;
mh->mh_WEof = 0;
mh->mh_RError = 0;
mh->mh_WError = 0;
}
/*
* MBInit() initialize an MBuf header. MBuf's are allocated on the
* fly as needed.
*/
void
MBInit(MBufHead *mh, int fd, MemPool **mpool, MemPool **bpool)
{
bzero(mh, sizeof(MBufHead));
mh->mh_MemPool = mpool;
mh->mh_BufPool = bpool;
mh->mh_Fd = fd;
}
void
MBWrite(MBufHead *mh, const void *data, int len)
{
/*
* If nothing is queued, attempt to write the data buffer
* directly to the descriptor
*/
if (mh->mh_Fd >= 0)
FD_CLR(mh->mh_Fd, &WFds);
/*
* Queue the remainder and set WFds for the descriptor
*/
while (len) {
MBuf **pmbuf = &mh->mh_MBuf;
MBuf *mbuf;
int n;
while ((mbuf = *pmbuf) != NULL &&
(mbuf->mb_Size == mbuf->mb_Max || mbuf->mb_Next != NULL)
) {
pmbuf = &mbuf->mb_Next;
}
if (mbuf == NULL) {
*pmbuf = mbuf = zalloc(mh->mh_MemPool, sizeof(MBuf));
mbuf->mb_Buf = nzalloc(mh->mh_BufPool, MBUF_SIZE);
mbuf->mb_Max = MBUF_SIZE;
}
n = mbuf->mb_Max - mbuf->mb_Size;
if (n > len)
n = len;
bcopy(data, mbuf->mb_Buf + mbuf->mb_Size, n);
mbuf->mb_Size += n;
mh->mh_Bytes += n;
mh->mh_TotalBytes += n;
len -= n;
data = (char *)data + n;
}
if (mh->mh_Fd >= 0)
FD_SET(mh->mh_Fd, &WFds);
}
/*
* MBWriteDecode() - write buffer out but decode % escapes while
* doing it.
*/
void
MBWriteDecode(MBufHead *mh, const char *data, int len)
{
/*
* If nothing is queued, attempt to write the data buffer
* directly to the descriptor
*/
if (mh->mh_Fd >= 0)
FD_CLR(mh->mh_Fd, &WFds);
/*
* Queue the remainder and set WFds for the descriptor
*/
while (len) {
MBuf **pmbuf = &mh->mh_MBuf;
MBuf *mbuf;
while ((mbuf = *pmbuf) != NULL &&
(mbuf->mb_Size == mbuf->mb_Max || mbuf->mb_Next != NULL)
) {
pmbuf = &mbuf->mb_Next;
}
if (mbuf == NULL) {
*pmbuf = mbuf = zalloc(mh->mh_MemPool, sizeof(MBuf));
mbuf->mb_Buf = nzalloc(mh->mh_BufPool, MBUF_SIZE);
mbuf->mb_Max = MBUF_SIZE;
}
while (len && mbuf->mb_Size < mbuf->mb_Max) {
if (*data == '%' && len >= 3) {
char s[3];
s[0] = data[1];
s[1] = data[2];
s[2] = 0;
mbuf->mb_Buf[mbuf->mb_Size] = strtol(s, NULL, 16);
data += 3;
len -= 3;
} else {
mbuf->mb_Buf[mbuf->mb_Size] = *data;
++data;
--len;
}
++mbuf->mb_Size;
++mh->mh_Bytes;
++mh->mh_TotalBytes;
}
}
if (mh->mh_Fd >= 0)
FD_SET(mh->mh_Fd, &WFds);
}
void
MBCopy(MBufHead *m1, MBufHead *m2)
{
MBuf *mbuf;
MBuf **pmbuf;
if (m1->mh_MemPool != m2->mh_MemPool ||
m1->mh_BufPool != m2->mh_BufPool
) {
logit(LOG_CRIT, "MBCopy: illegal mbuf copy %08lx/%08lx %08lx/%08lx",
(long)m1->mh_MemPool,
(long)m2->mh_MemPool,
(long)m1->mh_BufPool,
(long)m2->mh_BufPool
);
exit(1);
}
for (pmbuf = &m2->mh_MBuf; (mbuf = *pmbuf) != NULL; pmbuf = &mbuf->mb_Next)
;
*pmbuf = m1->mh_MBuf;
m1->mh_MBuf = NULL;
m2->mh_Bytes += m1->mh_Bytes;
m2->mh_TotalBytes += m1->mh_TotalBytes;
m1->mh_Bytes = 0;
m1->mh_TotalBytes = 0.0;
if (m2->mh_Fd >= 0)
FD_SET(m2->mh_Fd, &WFds);
}
void
MBPrintf(MBufHead *mh, const char *ctl, ...)
{
va_list va;
char buf[1024];
va_start(va, ctl);
vsnprintf(buf, sizeof(buf), ctl, va);
va_end(va);
MBWrite(mh, buf, strlen(buf));
}
void
MBLogPrintf(Connection *conn, MBufHead *mh, const char *ctl, ...)
{
va_list va;
char buf[1024];
char *ptr;
va_start(va, ctl);
vsnprintf(buf, sizeof(buf), ctl, va);
va_end(va);
MBWrite(mh, buf, strlen(buf));
if ((ptr = strrchr(buf, '\n'))) {
*ptr = '\0';
}
if ((ptr = strrchr(buf, '\r'))) {
*ptr = '\0';
}
LogCmd(conn, '>', buf);
}
/*
* MBRead() - read up to N bytes. Return -1 on EOF, 0 if no
* data is available at all. If less then requested
* amount of data is returned, RFds for the descriptor
* will be set, else it will be cleared.
*/
int
MBRead(MBufHead *mh, void *data, int len)
{
int r = 0;
if (mh->mh_Fd >= 0)
FD_CLR(mh->mh_Fd, &RFds);
while (len) {
MBuf *mbuf;
if ((mbuf = mh->mh_MBuf) != NULL) {
int n = mbuf->mb_Size - mbuf->mb_Index;
if (n > len)
n = len;
bcopy(mbuf->mb_Buf + mbuf->mb_Index, data, n);
data = (char *)data + n;
len -= n;
mh->mh_Bytes -= n;
mbuf->mb_Index += n;
if (mbuf->mb_NLScan < mbuf->mb_Index)
mbuf->mb_NLScan = mbuf->mb_Index;
r += n;
if (mbuf->mb_Index == mbuf->mb_Size) {
mh->mh_MBuf = mbuf->mb_Next;
if (mbuf->mb_Buf)
zfree(mh->mh_BufPool, mbuf->mb_Buf, mbuf->mb_Max);
zfree(mh->mh_MemPool, mbuf, sizeof(MBuf));
}
} else {
int n = 0;
errno = 0;
if (mh->mh_Fd < 0)
mh->mh_REof = 1;
if (mh->mh_REof == 0)
n = read(mh->mh_Fd, data, len);
if (n < 0) {
if (errno == EINTR)
continue;
if (errno == EWOULDBLOCK ||
errno == EINPROGRESS ||
errno == EAGAIN
) {
FD_SET(mh->mh_Fd, &RFds);
break; /* break while */
}
if (errno == ENOTCONN) {
FD_SET(mh->mh_Fd, &RFds);
break; /* break while */
}
}
if (n == 0) {
mh->mh_REof = 1;
break;
}
if (DebugOpt > 1) {
DebugData("<<", data, n);
}
data = (char *)data + n;
len -= n;
r += n;
}
}
if (r == 0 && mh->mh_REof)
r = -1;
return(r);
}
/*
* MBReadLine() - read one line and return the length of the
* line. Return -1 on EOF, 0 if no line is yet
* available. If a line cannot be returned,
* RFds for the descriptor will be set, else
* it will be cleared.
*
* The length of the line returned includes the
* newline, but the newline is replaced with a \0.
* (the caller can set it back to \n). However,
* if we hit EOF and the last character is not a
* newline, we add a \0 terminator but do NOT
* include it in the length. If the caller is adding
* newlines back in, it must check for this condition.
*/
int
MBReadLine(MBufHead *mh, char **pptr)
{
int r = 0;
int haveit = 0;
int nonl = 0;
MBuf *mbuf = NULL;
if (mh->mh_Fd >= 0)
FD_CLR(mh->mh_Fd, &RFds);
while (haveit == 0) {
MBuf **pmbuf = &mh->mh_MBuf;
r = 0;
nonl = 0;
while (haveit == 0 && (mbuf = *pmbuf) != NULL) {
int i = mbuf->mb_NLScan;
while (i < mbuf->mb_Size) {
++i;
/*
* ??? i != mbuf->mb_Index ?
*/
if (i != mbuf->mb_Index && mbuf->mb_Buf[i - 1] == '\n') {
haveit = 1;
break;
}
}
mbuf->mb_NLScan = i;
r += mbuf->mb_NLScan - mbuf->mb_Index;
/*
* delete mbuf's left over from the last valid MBReadLine()
* if they are marked empty.
*/
if (mbuf->mb_Index == mbuf->mb_Size) {
*pmbuf = mbuf->mb_Next;
if (mbuf->mb_Buf)
zfree(mh->mh_BufPool, mbuf->mb_Buf, mbuf->mb_Max);
zfree(mh->mh_MemPool, mbuf, sizeof(MBuf));
} else {
pmbuf = &mbuf->mb_Next;
}
}
/*
* NOTE! if haveit == 1, pmbuf will not be valid. If haveit == 0
* we have exhausted all mbufs and pmbuf is valid to append
* a new one.
*/
if (haveit == 0) {
/*
* did not find newline in MBuf list, read more. If there
* isn't enough room in this mbuf, allocate a new mbuf. We
* have to do this even if we are at REof or we will not be
* able to append the \0 if the last line is unterminated.
*/
int n = 0;
if (mbuf == NULL || mbuf->mb_Size == mbuf->mb_Max) {
*pmbuf = mbuf = zalloc(mh->mh_MemPool, sizeof(MBuf));
mbuf->mb_Buf = nzalloc(mh->mh_BufPool, MBUF_SIZE);
mbuf->mb_Max = MBUF_SIZE;
}
/*
* attempt to read.
*/
if (mh->mh_Fd < 0)
mh->mh_REof = 1;
if (mh->mh_REof) {
if (mh->mh_Bytes) {
nonl = 1;
haveit = 1;
}
break;
}
{
errno = 0;
n = read(
mh->mh_Fd,
mbuf->mb_Buf + mbuf->mb_Size,
mbuf->mb_Max - mbuf->mb_Size
);
if (n < 0) {
if (errno == EINTR)
continue;
if (errno == EWOULDBLOCK ||
errno == EINPROGRESS ||
errno == EAGAIN
) {
FD_SET(mh->mh_Fd, &RFds);
break; /* break while */
}
if (errno == ENOTCONN) {
FD_SET(mh->mh_Fd, &RFds);
break; /* break while */
}
n = 0;
}
if (n == 0) {
mh->mh_REof = 1;
/*
* leave mb_NLScan == mb_Size. if mh->mh_Bytes != 0,
* the last line at EOF was not newline-terminated.
* we set haveit (and later on do not try to replace
* the last valid character in the line with \0 because
* it will not be a newline).
*/
if (mh->mh_Bytes) {
haveit = 1;
nonl = 1;
}
break;
}
if (DebugOpt > 1) {
DebugData("<<", mbuf->mb_Buf + mbuf->mb_Size, n);
}
mh->mh_Bytes += n;
mh->mh_TotalBytes += n;
mbuf->mb_Size += n;
n = mbuf->mb_NLScan;
while (n < mbuf->mb_Size) {
++n;
if (mbuf->mb_Buf[n - 1] == '\n') {
haveit = 1;
break;
}
}
r += n - mbuf->mb_Index;
mbuf->mb_NLScan = n;
}
}
}
/*
* If we have a full line, we need to (re)construct it and replace the
* newline with a \0. Reconstruction is required if the line runs across
* multiple MBuf's.
*
* The reconstructed buffer is marked as already having been read, but
* remains valid until the next MBRead() or MBReadLine() call.
*/
if (DebugOpt > 2)
logit(LOG_CRIT, "haveit %d %08lx %08lx (%d,%d)", haveit, (long)mbuf,(long)mh->mh_MBuf, r, nonl);
if (haveit) {
/*
* buffer crosses boundry, replace with new combined buffer
*/
if (mbuf != mh->mh_MBuf) {
MBuf *nmbuf = zalloc(mh->mh_MemPool, sizeof(MBuf));
MBuf *scan;
int n = 0;
nmbuf->mb_Buf = nzalloc(mh->mh_BufPool, r + nonl);
nmbuf->mb_Max = r + nonl; /* 'fake' \0 terminator if no newline */
nmbuf->mb_Size = r; /* actual size */
nmbuf->mb_NLScan = r; /* mark as scanned */
while ((scan = mh->mh_MBuf) != NULL) {
mh->mh_MBuf = scan->mb_Next;
scan->mb_Next = (void*)-1; /* catch fault */
bcopy(
scan->mb_Buf + scan->mb_Index,
nmbuf->mb_Buf + n,
scan->mb_NLScan - scan->mb_Index
);
n += scan->mb_NLScan - scan->mb_Index;
scan->mb_Index = scan->mb_NLScan;
if (scan == mbuf) {
scan->mb_Next = mh->mh_MBuf;
mh->mh_MBuf = scan;
break;
}
if (scan->mb_Buf)
zfree(mh->mh_BufPool, scan->mb_Buf, scan->mb_Max);
zfree(mh->mh_MemPool, scan, sizeof(MBuf));
}
if (n != r) {
if (DebugOpt)
logit(LOG_CRIT, "MBReadLine corrupt MBuf list: %d/%d nonl=%d", n, r, nonl);
exit(1);
}
nmbuf->mb_Next = mh->mh_MBuf;
mh->mh_MBuf = nmbuf;
mbuf = nmbuf;
}
/*
* XXX check bounds for nonl case when we do not cross a buffer
* boundry.
*/
if (nonl)
mbuf->mb_Buf[mbuf->mb_NLScan] = 0; /* do not replace last char */
else
mbuf->mb_Buf[mbuf->mb_NLScan-1] = 0;/* replace nl */
*pptr = mbuf->mb_Buf + mbuf->mb_Index;
mh->mh_Bytes -= r;
mbuf->mb_Index = mbuf->mb_NLScan;
if (DebugOpt > 2)
logit(LOG_CRIT, "haveit ok r = %d bytes %d", r, mh->mh_Bytes);
} else {
r = 0;
*pptr = "";
}
if (r == 0 && mh->mh_REof)
r = -1;
return(r);
}
/*
* MBNormalize() - Put all data into a single mbuf and zero-terminate. The
* zero termination is not included in the length of the mbuf.
* The size of the mbuf is returned.
*
* "" is returned if the mbuf is empty
*/
char *
MBNormalize(MBufHead *mh, int *plen)
{
MBuf *mbuf;
MBuf *scan;
int bytes = 0;
for (scan = mh->mh_MBuf; scan; scan = scan->mb_Next) {
bytes += scan->mb_Size - scan->mb_Index;
}
*plen = bytes;
/*
* degenerate case: one mbuf, and zero terminator fits
*/
if ((scan = mh->mh_MBuf) == NULL ||
(scan->mb_Next == NULL && scan->mb_Size < scan->mb_Max)
) {
if (scan) {
scan->mb_Buf[scan->mb_Size] = 0;
return(scan->mb_Buf + scan->mb_Index);
} else {
return("");
}
}
/*
*
*/
mbuf = zalloc(mh->mh_MemPool, sizeof(MBuf));
mbuf->mb_Buf = nzalloc(mh->mh_BufPool, bytes + 1);
mbuf->mb_Max = bytes + 1;
mbuf->mb_Size = bytes;
mbuf->mb_Buf[bytes] = 0;
bytes = 0;
while ((scan = mh->mh_MBuf) != NULL) {
mh->mh_MBuf = scan->mb_Next;
bcopy(
scan->mb_Buf + scan->mb_Index,
mbuf->mb_Buf + bytes,
scan->mb_Size - scan->mb_Index
);
if (scan->mb_Buf)
zfree(mh->mh_BufPool, scan->mb_Buf, scan->mb_Max);
zfree(mh->mh_MemPool, scan, sizeof(MBuf));
bytes += scan->mb_Size - scan->mb_Index;
}
mh->mh_MBuf = mbuf;
return(mbuf->mb_Buf);
}
void
DebugData(const char *h, const void *buf, int n)
{
int i;
int nl = 0;
printf("%s ", h);
for (i = 0; i < n; ++i) {
int c = ((unsigned char *)buf)[i];
if (nl) {
printf("\n%s ", h);
nl = 0;
}
if (isprint(c))
printf("%c", c);
else
printf("<%02x>", c);
if (c == '\n')
nl = 1;
}
printf("\n");
fflush(stdout);
}
syntax highlighted by Code2HTML, v. 0.9.1