/*
 * DNEWSLINK.C
 *
 * dnewslink -b batchfile -B srcipaddress -h host -t timeout -l log-after 
 * 	-c close-reopen-after -pipe ...other options
 *
 * This program will attempt to run one or more batch files containing
 * article references of the form:
 *
 * Relative-Path <message-id>
 * Relative-Path <message-id>
 *
 * For example:
 *
 * de/comm/chatsystems/911 <4mle9s$mef@nz12.rz.uni-karlsruhe.de>
 * alt/binaries/multimedia/d/3643 <4mme9r$plc@nntp1.best.com>
 *
 * This program also supports multi-article batch files.  Each article in the
 * multi-article batch file must be terminated with a \0 and batch file lines
 * must include and offset and byte count (non-inclusive of the \0):
 *
 * de/comm/chatsystems/911 <4mle9s$mef@nz12.rz.uni-karlsruhe.de> off,size
 * alt/binaries/multimedia/d/3643 <4mme9r$plc@nntp1.best.com> off,size
 *
 * The primary methodology of using this program is to have another
 * program, SPOOLOUT, generate and maintain numerically indexed spool files.
 * That program forks and runs this program to actually process the spool
 * files.
 *
 * This program's job is to process one or more spool files (performing locking
 * and skipping locked spool files), spool the articles in question to the
 * remote hosts, then delete the spool files as process is completed, or
 * rewrite the spool files if a failure occurs.
 *
 * (c)Copyright 1997-1998, Matthew Dillon, All Rights Reserved.  Refer to
 *    the COPYRIGHT file in the base directory of this distribution 
 *    for specific rights granted.
 *
 * Modified 12/4/1997 to include support for compressed data streams.
 * Modifications (c) 1997, Christopher M Sedore, All Rights Reserved.
 * Modifications may be distributed freely as long as this copyright
 * notice is included without modification.
 */

#include "defs.h"
#include <sys/uio.h>

#define NDEBUG
#include <assert.h>

#define T_ACCEPTED	1
#define T_REFUSED	2
#define T_REJECTED	3
#define T_FAILED	4
#define T_STREAMING	5
#define T_FAILEDEXIT	6
#define T_DEFERIT	7

#define INF_HELP        100     /* Help text on way */
#define INF_AUTH        180     /* Authorization capabilities */
#define INF_DEBUG       199     /* Debug output */

#define MAXCLINE	8192
#define MAXFILEDES	32

#define OUR_DELAY	1
#define THEIR_DELAY	2
#define DDTIME		30
#define CACHEFLUSHTIME	300

#define OK_CANPOST      200     /* Hello; you can post */
#define OK_NOPOST       201     /* Hello; you can't post */
#define OK_SLAVE        202     /* Slave status noted */
#define OK_STREAMOK	203	/* Can-do streaming	*/
#define OK_GOODBYE      205     /* Closing connection */
#define OK_COMPRESSOK	207	/* Can-do compression */
#define OK_GROUP        211     /* Group selected */
#define OK_GROUPS       215     /* Newsgroups follow */
#define OK_ARTICLE      220     /* Article (head & body) follows */
#define OK_HEAD         221     /* Head follows */
#define OK_BODY         222     /* Body follows */
#define OK_NOTEXT       223     /* No text sent -- stat, next, last */
#define OK_NEWNEWS      230     /* New articles by message-id follow */
#define OK_NEWGROUPS    231     /* New newsgroups follow */
#define OK_XFERED       235     /* Article transferred successfully */
#define OK_STRMCHECK	238	/* check response / want article	*/
#define OK_STRMTAKE	239	/* takeit response / article received	*/
#define OK_POSTED       240     /* Article posted successfully */
#define OK_MODECMDOK	250	/* general mode command response (diablo specific) */
#define OK_AUTHSYS      280     /* Authorization system ok */
#define OK_AUTH         281     /* Authorization (user/pass) ok */

#define CONT_XFER       335     /* Continue to send article */
#define CONT_POST       340     /* Continue to post article */
#define NEED_AUTHINFO   380     /* authorization is required */
#define NEED_AUTHDATA   381     /* <type> authorization data required */

#define ERR_GOODBYE     400     /* Have to hang up for some reason */
#define ERR_NOGROUP     411     /* No such newsgroup */
#define ERR_NCING       412     /* Not currently in newsgroup */
#define ERR_NOCRNT      420     /* No current article selected */
#define ERR_NONEXT      421     /* No next article in this group */
#define ERR_NOPREV      422     /* No previous article in this group */
#define ERR_NOARTIG     423     /* No such article in this group */
#define ERR_NOART       430     /* No such article at all */
#define ERR_RESEND	431	/* please resend the article	*/
#define ERR_GOTIT       435     /* Already got that article, don't send */
#define ERR_XFERFAIL    436     /* Transfer failed */
#define ERR_XFERRJCT    437     /* ihave, Article rejected, don't resend */
#define ERR_STRMCHECK	438	/* check response / do not want article */
#define ERR_STRMTAKE	439	/* takeit response / article failed	*/
#define ERR_NOPOST      440     /* Posting not allowed */
#define ERR_POSTFAIL    441     /* Posting failed */
#define ERR_NOAUTH      480     /* authorization required for command */
#define ERR_AUTHSYS     481     /* Authorization system invalid */
#define ERR_AUTHREJ     482     /* Authorization data rejected */

#define ERR_COMMAND     500     /* Command not recognized */
#define ERR_CMDSYN      501     /* Command syntax error */
#define ERR_ACCESS      502     /* Access to server denied */
#define ERR_FAULT       503     /* Program fault, command not performed */
#define ERR_AUTHBAD     580     /* Authorization Failed */

#define ERR_UNKNOWN	990

/*
 * Streaming parameters
 */

#define STREAM_OFF	0
#define STREAM_RELOAD	1
#define STREAM_ON	2

#define STATE_EMPTY	0	/* empty slot, must be 0		*/
#define STATE_CHECK	1	/* check transmitted			*/
#define STATE_POSTED	2	/* takethis + article transmitted	*/
#define STATE_RETRY	3	/* retry after connection failure	*/

#define STREAMDRAIN	5	/* when we hit MAXSTREAM, allow it to drop */
				/* by this amount before we push again     */
#define STREAMFRAC	10	/* number of check responses before 	*/
				/* MaxStream is incremented		*/
#define MAXPENDBYTES	(1024 - (MAXSTREAM * 8)) /* maximum pending bytes*/
#define MAXDEFER	100	/* Don't defer more than this many entries
				 * This MUST be larger that 20 due to assumptions
				 * in the code. */

#define MAXREASON       100     /* maximal length of a peer response logged */

#define CMDBUFFSIZE	32768	/* Output buffer for NNTP traffic */

typedef struct Stream {
    int	st_State;		/* state	*/
    int st_DumpRCode;
    char *st_RelPath;		/* path		*/
    char *st_MsgId;		/* message id	*/
    off_t st_Off;		/* file offset	*/
    int32 st_Size;		/* file size	*/
    int32 st_CompSize;		/* compressed file size	*/
} Stream;

int connectTo(const char *hostName, const char *serviceName, int defPort);
int Transact(int cfd, const char *relPath, char *msgId, off_t off, int size, int cSize, int defers, char *stage, char *reason, char *buf, int *sentSize);
int DumpArticle(int cfd, const char *relPath, off_t off, int size, int cSize);
int writeLarge(int cfd, char *buffer, size_t size);
int StreamTransact(int cfd, const char *relPath, char *msgId, off_t off, int size, int cSize, int defers, char *stage, char *reason, char *buf, int *sentSize);
void StreamReload(int cfd);
Stream *LocateStream(const char *msgId, int state);
int RefilePendingStreams(FILE *fo);
int commandResponse(int cfd, char **rptr, const char *ctl, ...);
int commandWrite(int cfd, const void *buf, int bytes, int artdata);
void clearResponseBuf(void);
void readreset(int fd);
void dl_logit(const char *ctl, ...);
int readretry(char *buf, int size);
int readline(int fd, char *buf, int size, time_t t);
void logStats(const char *description);
int ValidMsgId(char *msgid);
void AttemptRemoveRenamedFile(struct stat *st, const char *spoolFile);
void cdinit(void);
char *cdmap(const char *path, off_t off, int *psize, int cSize, int *multiArtFile);
void cdunmap(char *ptr, int bytes, int multiArtFile, int compressed);
int extractFeedLine(const char *buf, char *relPath, char *msgId, off_t *poff, int *psize, int *pheadOnly, time_t *queuedTime, int *cSize);
void cdflush(void);
const char *dtstamp(void);
void clearCommandBuffer(void);
#ifdef NOTDEF
void copyCommandBuffer(FILE *fo);
#endif
int flushCommandBuffer(int cfd);
void flushCompressBuffer(int cfd);
void doArtLog(const char *class, char *msgid, char *message, const char *stage, const char *reason);

#ifdef CREDTIME
void credtime(int whos);
void credreset(void);
#else
#define credtime(whos)
#define credreset()
#endif

int LogAfter = 1000;
int CloseReopenAfter = 1000;
int Timeout = 600;
int DeleteDetectOpt = 0;
int WaitTime = 10;
int DelayTime = 0;
int Port = 119;
int TailOpt = 0;
int TxBufSize = 0;
int NoCheckOpt = 0;
int RxBufSize = 0;
int HeaderOnlyFeed = 0;
int KeepBytes = 0;
int GenLinesHeader = 0;
char ArtLog[100]; /* auto-initialised to "" */
FILE *Logfd = NULL;
struct stat CurSt;

Stream StreamAry[MAXSTREAM];
int MaxStream = MAXSTREAM;	/* Maximum number of streaming slots */
int NumStream = MAXSTREAM;	/* Current number of streaming slots */
int HiWaterMark =  0;
int StreamMode = 0;		/* set by stream negotiation	*/
int StreamPend = 0;		/* pending streaming requests	*/
int StreamRetry = 0;		/* retry after connection failure */
double AveragePend = 0.0;	/* sliding average of pending requests */
int TryStreaming = 1;
int BytesPend = 0;
int PipeOpt = 0;
int RealTimeOpt = 0;
char *NotifyOpt = NULL;
int ArticleStatOpt = 0;
int LogAfterCount = -1;
int CloseReopenCount = -1;
int KillFd = -1;
int WritesLosing = 0;
time_t TimeNow;			/* Very rough estimate of current time */
int TimeCounter = 100000;

/*
 * added to support compression, cmsedore@maxwell.syr.edu 12/4/97
 */


#define COMPRESS_OFF 0
#define COMPRESS_ON  1

int CompressMode = COMPRESS_OFF;
int CompressOn = 0;

#ifdef	USE_ZLIB

int32 CompressedBytes = 0;
int32 RawBytes = 0;
int32 CompressedBytesTotal = 0;
int32 RawBytesTotal = 0;
z_stream Z_Strm;
int CompressBufInPos = 0;
char CompressBufIn[8192]; 
char CompressBufOut[8192]; 
int CompressNextFlush = 0;

#endif	/* USE_ZLIB */

char *HostName = NULL;
char *OutboundIpName = NULL;
char *BatchFileCtl = NULL;
char CurrentBatchFile[1024];
char LastErrBuf[256];
int CBFIndex;
int BatchSeq = -1;
int NumBatches = 1;
int HeaderSize = 0;
int NotifyFd = -1;
int NotifyLockFd = -1;
int IPQoS=0;
time_t LastNotify = 0;
time_t LastFeedData = 0;

int TermFlag;

FeedStats	*Stats;
FeedStats	*HostStats;

int OurMs;
int TheirMs;
int MsCount;

int OurMsTotal;
int TheirMsTotal;
int MsCountTotal;
int WouldHaveRefiled;
int HasStatusLine;

void sigTerm(int sigNo);

void
Usage(char *progname)
{
    printf("Usage: %s -b batchfile -h hostname\n", progname);
    puts(
	    "-A[levels]      - turn on article logging, default all\n"
	    "                  levels is either all or comma-separated list\n"
	    "                  of accept,reject,defer,refuse,error\n"
	    "-B ip           - set source ip address for outbound connections\n"
	    "-b batchfile    - specify batchfile\n"
	    "-b template%d   - template containing %[xx]d\n"
	    "-c #            - close-reopen-after, default 1000\n"
	    "-D              - detect delete-out-from-under\n"
	    "-d[#]           - debug option\n"
	    "-H              - (header only) incl Bytes: hdr, pass body only for control msgs\n"
	    "-h host         - specify remote host\n"
	    "-i              - disable streaming & streaming check\n"
	    "-L              - generate Lines: header\n"
	    "-l #            - log-after-count, default 1000\n"
	    "-M #            - set max concurrent stream transactions\n"
	    "-N #            - number of batchfiles to process, template mode\n"
	    "-o label        - enable feed notification from diablo\n"
	    "-P #            - specify remote port (default: 119)\n"
	    "-p              - input on stdin rather then file\n"
#ifdef IP_TOS
	    "-Q              - set QoS using IP_TOS\n"
#endif
	    "-n[op]          - NOP"
	    "-P #            - specify destination tcp port\n"
	    "-R #            - set receive buffer size\n"
	    "-r              - realtime feed\n"
	    "-S #            - starting sequence #, template mode\n"
	    "-s \"<24chars>\"  - /bin/ps argv space for status\n"
	    "-T #            - set transmit buffer size\n"
	    "-t #            - specify timeout, default 600s\n"
	    "-W #            - delay sending articles from the queue time\n"
	    "-w #            - reconnect-after-failure delay\n"
    );
    exit(1);
}

int
main(int ac, char **av)
{
    int i;
    int eNoBat = 0;

    TimeNow = time(NULL);
    OpenLog("newslink", (DebugOpt > 0? LOG_PERROR: 0) | LOG_NDELAY | LOG_PID);
    LoadDiabloConfig(ac, av);

    for (i = 1; i < ac; ++i) {
	char *ptr = av[i];

	ptr += 2;
	switch(ptr[-1]) {
	case 'A':
            StrnCpyNull(ArtLog, ((*ptr) ? ptr: "all"), sizeof(ArtLog));
            break;
	case 'B':
	    if (*ptr == 0)
		ptr = av[++i];
	    OutboundIpName = strdup(SanitiseAddr(ptr));
	    break;
	case 'b':
	    BatchFileCtl = (*ptr) ? ptr : av[++i];
	    break;
	case 'C':
	    if (*ptr == 0)
		++i;
	    break;
	case 'c':
	    CloseReopenAfter = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'D':
	    DeleteDetectOpt = 1;
	    break;
	case 'd':
	    DebugOpt = (*ptr) ? strtol(ptr, NULL, 0) : 1;
	    break;
	case 'f':
	    TailOpt = 1;	/* sit on tailable file XXX	*/
	    break;
	case 'H':
	    HeaderOnlyFeed = 1;
	    if (*ptr == 'B')
		KeepBytes = 1;
	    break;
	case 'h':
	    HostName = (*ptr) ? ptr : av[++i];
	    break;
	case 'i':
	    TryStreaming = 0;
	    break;
	case 'I':
	    NoCheckOpt = 1;
	    break;
	case 'L':
	    GenLinesHeader = 1;
	    break;
	case 'l':
	    LogAfter = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'M':
	    MaxStream = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    if (MaxStream > MAXSTREAM)
		MaxStream = MAXSTREAM;
	    NumStream = MaxStream;
	    break;
	case 'N':
	    NumBatches = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'n':
	    /* NOP */
	    break;
	case 'o':
	    NotifyOpt = (*ptr) ? ptr : av[++i];
	    break;
	case 'P':
	    Port = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'p':
	    PipeOpt = 1;	/* input from pipe 	*/
	    break;
	case 'Q':
	    IPQoS = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'R':
	    /*
	     * note: must be large enough to hold check responses that
	     * may become pending while we are transmitting an article.
	     */
	    RxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    if (RxBufSize < MAXSTREAM * (MAXMSGIDLEN + 64))
	        RxBufSize = MAXSTREAM * (MAXMSGIDLEN + 64);
	    break;
	case 'r':
	    RealTimeOpt = (*ptr) ? strtol(ptr, NULL, 0) : -1;
	    break;
	case 'S':
	    BatchSeq = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 's':
	    ptr = (*ptr) ? ptr : av[++i];
	    SetStatusLine(ptr, strlen(ptr));
	    HasStatusLine = 1;
	    break;
	case 'T':
	    TxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    if (TxBufSize < 512)
	        TxBufSize = 512;
	    break;
	case 't':
	    Timeout = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'V':
	    PrintVersion();
	    break;
	case 'W':
	    DelayTime = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'w':
	    WaitTime = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
	    break;
	case 'x':
	    ArticleStatOpt = 1;
	    break;
        case 'Z':
            CompressMode = strtol(((*ptr) ? ptr : av[++i]), NULL, 0);
            break;
	default:
	    Usage(av[0]);
	    break;
	}
    }

    LogAfterCount += LogAfter;
    CloseReopenCount += CloseReopenAfter;

    if (BatchFileCtl == NULL || HostName == NULL || i > ac) {
	Usage(av[0]);
    }

    rsignal(SIGPIPE, SIG_IGN);
    rsignal(SIGHUP, sigTerm);
    rsignal(SIGINT, sigTerm);
    rsignal(SIGTERM, sigTerm);
    rsignal(SIGALRM, sigTerm);

    cdinit();

    Stats = (FeedStats *)malloc(sizeof(FeedStats));
    if (Stats == NULL) {
	logit(LOG_CRIT, "Unable to malloc stats memory");
	exit(1);
    }
    strcpy(Stats->hostname, HostName);
    switch (DOpts.FeederRTStats) {
	case RTSTATS_NONE:
	    HostStats = NULL;
	    break;
	case RTSTATS_LABEL:
	    {
		char buf[255];
		char *p;
		strncpy(buf, BatchFileCtl, sizeof(buf) - 1);
		buf[sizeof(buf) - 1] = '\0';
		if ((p = strstr(buf, ".S%05d")) != NULL)
		    *p = 0;
		HostStats = FeedStatsFindSlot(buf);
	    }
	    break;
	case RTSTATS_HOST:
	    HostStats = FeedStatsFindSlot(HostName);
	    break;
    }

    if (*BatchFileCtl == 0) {
	dl_logit("batchfile is null!");
	exit(0);
    }

    while (--NumBatches >= 0) {
	time_t ddTime;
	time_t ddCount = 0;
	time_t cdflushTime;
	int fd;
	int cfd = -1;

	HeaderSize = 0;

	Stats->SentStats.ConnectTotal = 0;
	Stats->SentStats.OfferedTotal = 0;
	Stats->SentStats.AcceptedTotal = 0;
	Stats->SentStats.RefusedTotal = 0;
	Stats->SentStats.RejectedTotal = 0;
	Stats->SentStats.DeferredTotal = 0;
	Stats->SentStats.DeferredFailTotal = 0;
	Stats->SentStats.RejectedBytesTotal = 0.0;
	Stats->SentStats.AcceptedBytesTotal = 0.0;

	Stats->SentStats.ConnectCnt = 0;
	Stats->SentStats.OfferedCnt = 0;
	Stats->SentStats.AcceptedCnt = 0;
	Stats->SentStats.RefusedCnt = 0;
	Stats->SentStats.RejectedCnt = 0;
	Stats->SentStats.DeferredCnt = 0;
	Stats->SentStats.DeferredFailCnt = 0;
	Stats->SentStats.RejectedBytes = 0.0;
	Stats->SentStats.AcceptedBytes = 0.0;

#ifdef	USE_ZLIB
	CompressedBytes = 0;
	RawBytes = 0;
	CompressedBytesTotal = 0;
	RawBytesTotal = 0;
#endif

	OurMs = 0;
	TheirMs = 0;
	MsCount = 0;

	OurMsTotal = 0;
	TheirMsTotal = 0;
	MsCountTotal = 0;

	snprintf(CurrentBatchFile, sizeof(CurrentBatchFile) - 32, DQueueHomePat, NewsHome);
	strcat(CurrentBatchFile, "/");
	CBFIndex = strlen(CurrentBatchFile);

	if (BatchSeq < 0) {
	    sprintf(CurrentBatchFile + CBFIndex, "%s", BatchFileCtl);
	} else {
	    sprintf(CurrentBatchFile + CBFIndex, BatchFileCtl, BatchSeq);
	    ++BatchSeq;
	}

	/*
	 * Open next batchfile.  If the open fails, we go onto the next
	 * batchfile.  However, if we are doing a realtime feed, we must
	 * deal with a race condition with diablo where the next realtime
	 * queue file may not yet exist.
	 */

	if (PipeOpt) {
	    fd = 0;
	} else {
	    fd = open(CurrentBatchFile, O_RDWR | (RealTimeOpt ? O_CREAT : 0), 0600);
	    if (fd < 0) {
		if (eNoBat == 0) {
		    eNoBat = 1;
		    dl_logit("no batchfile");
		}
		continue;
	    }

	    /*
	     * Lock batchfile
	     */
	    {
		struct stat st;

		if (xflock(fd, XLOCK_EX|XLOCK_NB) != 0) {
		    if (eNoBat == 0) {
			eNoBat = 1;
			dl_logit("batchfile already locked");
		    }
		    close(fd);

		    /*
		     * If we cannot get the lock and RealTimeOpt is
		     * set, we abort - someone else has a lock on the
		     * realtime file.  Otherwise we retry (w/ the next
		     * sequence number)
		     */

		    if (RealTimeOpt) {
			dl_logit("realtime batchfile already locked");
			break;
		    }
		    continue;
		}
		if (fstat(fd, &st) != 0) {
		    dl_logit("fstat failed: %s", strerror(errno));
		    exit(1);
		}
		if (st.st_nlink == 0) {
		    close(fd);
		    continue;
		}
		eNoBat = 0;
	    }
	    fprintf(stderr, "%s\n", CurrentBatchFile);
	} /* pipeopt */

	fstat(fd, &CurSt);
	WouldHaveRefiled = 0;

	Stats->SentStats.TimeStart = Stats->SentStats.DeltaStart =
					ddTime = cdflushTime = time(NULL);
	if (HostStats != NULL && HostStats->SentStats.TimeStart == 0) {
	    LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
	    HostStats->SentStats.TimeStart = Stats->SentStats.TimeStart;
	    LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
	}

	/*
	 * Connect to remote, send news
	 */

	if ((cfd = connectTo(HostName, NULL, Port)) >= 0) {
	    char buf[1024];
	    char relPath[1024];
	    char msgId[1024];
	    char path[1024];
	    off_t fileOff = 0;
	    int fileSize = 0;
	    int sentFileSize = 0;
	    int headOnly = 0;
	    time_t queuedTime = 0;
	    FILE *fo = NULL;
	    int connectCount = 0;
	    int bufInval = 1;
	    int loggedMark = 0;
	    int deferbegin = 0;
	    int deferend = 0;
	    int cSize = 0;
	    /* We kept a list of defer requests (431 response) and procedd
	     * it from time to time.
	     */
    	    char *deferbuf[MAXDEFER];

	    credtime(0);

	    if (PipeOpt)
		sprintf(path, "%s", CurrentBatchFile);
	    else
		sprintf(path, "%s.tmp", CurrentBatchFile);

	    stprintf("%s %s wait/in %d", HostName, CurrentBatchFile + CBFIndex, connectCount);

	    /*
	     * Loop if need to reconnect, there are pending streaming commands,
	     * or we can get a valid buffer.  Note that this allows us to halt
	     * readline() calls when we reach our streaming limit as well as to
	     * do a final-drain when we reach the end of the buffer.
	     */

	    for (;;) {

		if (TimeCounter++ >= 100)
		    TimeNow = time(NULL);

		/*
		 * If there is no active buffer and we have not reached
		 * our streaming limit, get next article to transmit, either
		 * from the defer buffer or read in another article control line.
		 */
		if (bufInval && 
		    StreamPend < NumStream - HiWaterMark && 
		    BytesPend < MAXPENDBYTES
		) {
		    if (DebugOpt > 1)
			printf("clear watermark\n");
		    HiWaterMark = 0;	/* reset WaterMark */
		    if (deferbegin < deferend &&
			(deferbegin || (deferend >= (MAXDEFER - 20)))) {
			/* We are at this moment clearing the defer backlog,
			 * or the defer buffer is full and it has to be
			 * cleared.
			 * We need to flush the defer queue some time before it 
			 * reaches its maximum capacity becase we might receive
			 * several 431 answers from the peer before reaching
			 * this point again.
			 */
			strncpy(buf, deferbuf[deferbegin], sizeof(buf) - 1);
			buf[sizeof(buf) - 1] = '\0';
			zfreeStr(&SysMemPool, &deferbuf[deferbegin++]);
			bufInval = 0;
		    } else {
			/* There are no defers to be processed, so try to get
			 * something from the input file.
			 */
			if (deferbegin > 0)
			    /* We have to reset deferbegin in case we are switching
			     * back from "process from defer queue" mode to "process
			     * from file" mode.
			     */
			    deferbegin = deferend = 0;
			if ((bufInval = readretry(buf, sizeof(buf))) != 0)
			    bufInval = readline(fd, buf, sizeof(buf), TimeNow);
		    }
		}

		if (TimeCounter++ >= 100)
		    TimeNow = time(NULL);


		if (bufInval && StreamPend == 0 && deferbegin == 0 && deferend > 0) {
		    /* Start to begin spooling deferred articles at the end of 
		     * input file. Subsequent articles are retrieved by the 
		     * code above, but this test has to come after trying to
		     * read from file.
		     */
		    if (deferend < 20) {
			/* Sleep in case the defer queue is emptied at the end
			 * of input file, since in that case the defer might
			 * have happened just before. Alo, we don't need to
			 * wait if the defer queue already has a certain size.
			 * Note that infinite defer cycles cannot happen since 
			 * we do not add anything to the defer queue in case it 
			 * is being processed.
			 */
			sleep(5);
		    }
		    strncpy(buf, deferbuf[deferbegin], sizeof(buf) - 1);
		    buf[sizeof(buf) - 1] = '\0';
		    zfreeStr(&SysMemPool, &deferbuf[deferbegin++]);
		    bufInval = 0;
		}

		/*
		 * If there is nothing left to do, break out of the loop
		 */

		if (bufInval && StreamPend == 0)
		    break;

		/*
		 * Go.
		 */

		if (DebugOpt > 1)
		    printf("%s bufInval %d StreamPend %d/%d (%s)\n", dtstamp(),
					bufInval, StreamPend, NumStream,
					(bufInval) ? "?" : buf);

		if (!PipeOpt && (TimeNow - ddTime) > DDTIME) {
		    /*
		     * Check if queue file removed out from under us.  Stop
		     * processing if it has been.
		     */
		    struct stat st;

		    if (fstat(fd, &st) == 0 && st.st_nlink == 0) {
			if (DeleteDetectOpt || ddCount++ > 5) {
			    NumBatches = 0;
			    break;
			}
		    } else
			ddCount = 0;
		    ddTime = TimeNow;
		}

		if ((TimeNow - cdflushTime) > (CACHEFLUSHTIME / 3)) {
		    /*
		     * Flush old file handles
		     */
		    cdflush();
		    cdflushTime = TimeNow;
		}

		if (bufInval || 
		    extractFeedLine(buf, relPath, msgId, &fileOff, &fileSize,
					&headOnly, &queuedTime, &cSize) == 0
		) {
		    int t;
		    char stage[MAXREASON], reason[MAXREASON];

		    /*
		     * If header-only feed line and this is not a header-only
		     * feed, we do not have enough data to pass this on as a
		     * normal article so skip it.
		     */

		    if (bufInval == 0 && headOnly && HeaderOnlyFeed == 0) {
			bufInval = 1;
			continue;
		    }

		    /*
		     * If this is a delayed feed, then wait until we are
		     * allowed to send it.
		     */
		    if (DelayTime && queuedTime) {
			int sleepTime = (queuedTime + DelayTime) - TimeNow;
			if (DebugOpt)
			    printf("sleep %u (%u,%u,%u)\n",
					sleepTime,
					DelayTime, queuedTime, TimeNow);
			if (sleepTime > 0)
			    sleep(sleepTime);
		    }

		    /*
		     * State processing.
		     */

		    stprintf("%s %s process %d", HostName, CurrentBatchFile + CBFIndex, connectCount);

		    if (bufInval)
			t = Transact(cfd, NULL, msgId, 0, 0, 0, (deferbegin > 0),
				     stage, reason, buf, &sentFileSize);
		    else {
			struct stat st;
			if (fileSize == 0) {
			    if (DebugOpt > 1)
				printf("%s instant expired article\n", dtstamp());
			    bufInval = 1;

			    continue;
			} else if (ArticleStatOpt && stat(relPath, &st) == -1) {
			    if (DebugOpt > 1)
				printf("%s expired article\n", dtstamp());
			    bufInval = 1;

			    continue;
			} else 
			t = Transact(cfd, relPath, msgId, fileOff, fileSize,
				     cSize,
				     (deferbegin > 0), stage, reason, buf,
				     &sentFileSize);
		    }

		    if (DebugOpt > 1)
			printf("%s Transaction result: %d %s", dtstamp(), t, (bufInval) ? "<ibuf-empty>\n" : buf);

		    stprintf("%s %s wait/in %d", HostName, CurrentBatchFile + CBFIndex, connectCount);

		    switch(t) {
		    case T_STREAMING:
			/*
			 * operation in progress, will return the real status
			 * later.
			 */
			break;
		    case T_ACCEPTED:
			++connectCount;
			++Stats->SentStats.AcceptedCnt;
			++Stats->SentStats.AcceptedTotal;
			if (HeaderOnlyFeed) {
			    Stats->SentStats.AcceptedBytes += (double)HeaderSize;
			    Stats->SentStats.AcceptedBytesTotal += (double)HeaderSize;
			} else {
			    Stats->SentStats.AcceptedBytes += (double)sentFileSize;
			    Stats->SentStats.AcceptedBytesTotal += (double)sentFileSize;
			}

			if (HostStats != NULL) {
			    LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
			    ++HostStats->SentStats.AcceptedCnt;
			    ++HostStats->SentStats.AcceptedTotal;
			    if (HeaderOnlyFeed) {
				HostStats->SentStats.AcceptedBytes += (double)HeaderSize;
				HostStats->SentStats.AcceptedBytesTotal += (double)HeaderSize;
			    } else {
				HostStats->SentStats.AcceptedBytes += (double)sentFileSize;
				HostStats->SentStats.AcceptedBytesTotal += (double)sentFileSize;
			    }
			    LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			}

			if (DebugOpt > 1)
			    printf("fileSize = %d, HeaderSize = %d, HeaderOnlyFeed = %d\n",
				fileSize, HeaderSize, HeaderOnlyFeed);
			doArtLog("accept", msgId, "accepted", stage, reason);
			break;
		    case T_REFUSED:
			++connectCount;
			++Stats->SentStats.RefusedCnt;
			++Stats->SentStats.RefusedTotal;
			if (HostStats != NULL) {
			    LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
			    ++HostStats->SentStats.RefusedCnt;
			    ++HostStats->SentStats.RefusedTotal;
			    LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			}
			doArtLog("refuse", msgId, "refused", stage, reason);
			break;
		    case T_DEFERIT:
			/* We need to store the entry for later
			 * Drop it if the buffer is full (treat like reject)
			 * or we are already handling defers
			 */
			if (deferbegin == 0 && deferend < MAXDEFER) {
			    doArtLog("defer", msgId, "deferred", stage, reason);
			    deferbuf[deferend] = zallocStr(&SysMemPool, buf);
			    if (deferbuf[deferend] != NULL)
				deferend++;
			    ++Stats->SentStats.DeferredCnt;
			    ++Stats->SentStats.DeferredTotal;
			    if (HostStats != NULL) {
				LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
				++HostStats->SentStats.DeferredCnt;
				++HostStats->SentStats.DeferredTotal;
				LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			    }
			    break;
			} else {
			    doArtLog("error", msgId, deferbegin? "error-doubledefer": 
				"error-deferqueuefull", stage, reason);
			    ++Stats->SentStats.DeferredFailCnt;
			    ++Stats->SentStats.DeferredFailTotal;
			    if (HostStats != NULL) {
				LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
				++HostStats->SentStats.DeferredFailCnt;
				++HostStats->SentStats.DeferredFailTotal;
				LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			    }
			}
		    case T_REJECTED:
			++connectCount;
			++Stats->SentStats.RejectedCnt;
			++Stats->SentStats.RejectedTotal;
			if (HeaderOnlyFeed) {
			    Stats->SentStats.RejectedBytes += (double)HeaderSize;
			    Stats->SentStats.RejectedBytesTotal += (double)HeaderSize;
			} else {
			    Stats->SentStats.RejectedBytes += (double)sentFileSize;
			    Stats->SentStats.RejectedBytesTotal += (double)sentFileSize;
			}
			if (HostStats != NULL) {
			    LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
			    ++HostStats->SentStats.RejectedCnt;
			    ++HostStats->SentStats.RejectedTotal;
			    if (HeaderOnlyFeed) {
				HostStats->SentStats.RejectedBytes += (double)HeaderSize;
				HostStats->SentStats.RejectedBytesTotal += (double)HeaderSize;
			    } else {
				HostStats->SentStats.RejectedBytes += (double)sentFileSize;
				HostStats->SentStats.RejectedBytesTotal += (double)sentFileSize;
			    }
			    LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			}
			doArtLog("reject", msgId, "rejected", stage, reason);
			break;
		    case T_FAILED:
		    case T_FAILEDEXIT:
			/*
			 * If failed, attempt to reconnect to remote.
			 * if failed+exit, exit out
			 */
			break;
		    default:
			/* should not occur */
			break;
		    }

		    /*
		     * Many INND's are setup to close a connection after a
		     * certain period of time.  Thus, a failure could be a
		     * normal occurance.  If we have already had at least one
		     * successfull transaction, we attempt to reconnect when
		     * this case occurs.
		     */

		    if (t == T_FAILED || t == T_FAILEDEXIT) {
			if (connectCount > 0 && t != T_FAILEDEXIT && TermFlag == 0) {
			    dl_logit("Remote EOF, attempting to reconnect");
			    logStats("mark");
			    loggedMark = 1;
			    close(cfd);
			    clearCommandBuffer();
			    KillFd = -1;
			    credtime(OUR_DELAY);
			    (void)RefilePendingStreams(NULL); /* retry */
			    if ((cfd = connectTo(HostName, NULL, Port)) >= 0) {
				connectCount = 0;
				if (CloseReopenCount > 0)
				    CloseReopenCount = Stats->SentStats.OfferedTotal +
							CloseReopenAfter;

				credtime(0);
				continue;
			    }
			    credtime(0);
			}

			/*
			 * reopen failed: termination, cleanup
			 */

			NumBatches = 0;

			if (RealTimeOpt == 0) {
			    if (fo == NULL)
				fo = fopen(path, "w");
			    if (fo != NULL) {
				(void)RefilePendingStreams(fo);
				if (!bufInval)
				    fputs(buf, fo);
			    }
			} else {
			    WouldHaveRefiled += RefilePendingStreams(NULL);
			    if (!bufInval)
				++WouldHaveRefiled;
			}
			break; /* break out of for(EVER) */
		    }

		    /*
		     * buffer successfully dealt with, buffer is now invalid
		     */

		    bufInval = 1;

		    /*
		     * Bump transaction count
		     */

		    if (t != T_STREAMING && deferbegin == 0) {
			++Stats->SentStats.OfferedCnt;
			++Stats->SentStats.OfferedTotal;
			if (HostStats != NULL) {
			    LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
			    ++HostStats->SentStats.OfferedCnt;
			    ++HostStats->SentStats.OfferedTotal;
			    LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
			}
		    }

		    /*
		     * Log stats after specified count
		     */

		    if (LogAfterCount > 0 && Stats->SentStats.OfferedTotal >= LogAfterCount) {
			loggedMark = 1;
			logStats("mark");
			LogAfterCount += LogAfter;
		    }

		    /*
		     * Close remote to allow remote logging to occur after
		     * specified count (doesn't work with streaming yet)
		     */

		    if (
			StreamMode == STREAM_OFF &&
			CloseReopenCount > 0 && 
			Stats->SentStats.OfferedTotal >= CloseReopenCount
		    ) {
			char *ptr;

			(void)RefilePendingStreams(NULL); /* retry */
			flushCompressBuffer(cfd);
			flushCommandBuffer(cfd);
			commandResponse(cfd, &ptr, "quit\r\n");
			close(cfd);
			KillFd = -1;
			clearCommandBuffer();
			credtime(OUR_DELAY);
			if ((cfd = connectTo(HostName, NULL, Port)) < 0) {
			    credtime(0);
			    break;
			}
			credtime(0);
			connectCount = 0;
			CloseReopenCount += CloseReopenAfter;
		    }
		} else {
		    if (buf[0] != '#')
			dl_logit("Buffer syntax error: %s", buf);
		    bufInval = 1;
		}

		/*
		 * If a signal occured during the transaction, break out of
		 * our loop.
		 */

		if (TermFlag) {
		    dl_logit("Terminated with signal");
		    break;
		}
	    } /* for(EVER) */

	    /*
	     * Final delta stats, only if we had
	     * previously logged marks, otherwise the
	     * mark stats will be the same as the final
	     * stats and we do not bother logging the mark.
	     */

	    if (loggedMark != 0)
		logStats("mark");

	    /*
	     * Rewrite batchfile, but only if not a realtime dnewslink.
	     */

	    if (RealTimeOpt == 0) {
		if (StreamPend) {
		    if (fo == NULL)
			fo = fopen(path, "w");
		    if (fo) {
			(void)RefilePendingStreams(fo);
		    }
		}

		if (readline(fd, buf, sizeof(buf), TimeNow) == 0 || fo != NULL) {
		    struct stat st;

		    if (fo == NULL)
			fo = fopen(path, "w");
		    if (fo != NULL) {
			do {
			    fputs(buf, fo);
			} while (readline(fd, buf, sizeof(buf), TimeNow) == 0);
			fflush(fo);
			fclose(fo);
			fo = NULL;
		    }
		    if (DeleteDetectOpt && 
			!PipeOpt && 
			!RealTimeOpt &&
			(fstat(fd, &st) != 0 || st.st_nlink == 0)
		    ) {
			/*
			 * If a queue file gets deleted out from under
			 * us, we don't try to rename the rewrite file back.
			 */
			remove(path);
			NumBatches = 0;
		    } else if (PipeOpt == 0 && RealTimeOpt == 0) {
			rename(path, CurrentBatchFile);
		    }
		} else {
		    if (PipeOpt == 0 && RealTimeOpt == 0)
			remove(CurrentBatchFile);
		}
	    } else {
		/*
		 * refiling does not work if we are in realtime mode
		 */
		if (StreamPend)
		    WouldHaveRefiled += RefilePendingStreams(NULL);
		if (readline(fd, buf, sizeof(buf), TimeNow) == 0)
		    WouldHaveRefiled = 1;
	    }

	    /*
	     * Final overall stats
	     */

	    Stats->SentStats.DeltaStart = Stats->SentStats.TimeStart;
	    Stats->SentStats.OfferedCnt = Stats->SentStats.OfferedTotal;
	    Stats->SentStats.AcceptedCnt = Stats->SentStats.AcceptedTotal;
	    Stats->SentStats.RefusedCnt = Stats->SentStats.RefusedTotal;
	    Stats->SentStats.RejectedCnt = Stats->SentStats.RejectedTotal;
	    Stats->SentStats.DeferredCnt = Stats->SentStats.DeferredTotal;
	    Stats->SentStats.DeferredFailCnt = Stats->SentStats.DeferredFailTotal;
	    Stats->SentStats.RejectedBytes = Stats->SentStats.AcceptedBytesTotal;
	    Stats->SentStats.AcceptedBytes = Stats->SentStats.AcceptedBytesTotal;

	    OurMs += OurMsTotal;
	    TheirMs += TheirMsTotal;
	    MsCount += MsCountTotal;

#ifdef	USE_ZLIB
            RawBytes += RawBytesTotal;
            CompressedBytes += CompressedBytesTotal;
#endif

	    /*
	     * be nice
	     */

	    if (cfd >= 0) {
		char *ptr;

		commandResponse(cfd, &ptr, "quit\r\n");
		flushCompressBuffer(cfd);
		flushCommandBuffer(cfd);
		close(cfd);
		clearCommandBuffer();
		KillFd = -1;
		/* normal quit, do not set cfd to -1 	     */
		/* this allows us to go on to the next batch */

#ifdef	USE_ZLIB
		if (CompressOn > 0) {
		    int r;
		    if ((r = deflateEnd(&Z_Strm)) != Z_OK)
			logit(LOG_ERR, "deflateEnd error: %s", Z_Strm.msg);
		}
#endif
	    }

	    logStats("final");

	} /* cfd=connectTo */

	if (fd > 0) {
	    readreset(fd);
	    close(fd);
	}
	if (cfd < 0 || TermFlag)
	    break;
    } /* while, on NumBatches */

    if (Logfd != NULL) fclose(Logfd);
    if (NotifyOpt != NULL) {
	RegisterNotify(NotifyOpt, 0);
	CloseNotify(&NotifyFd, &NotifyLockFd, NotifyOpt);
    }
    exit(0);
}

void
setSocketOptions(int fd)
{
    if (fd >= 0) {
	int on = 1;

	/*
	 * Make sure keepalive is turned on to prevent infinite hangs
	 */
	setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));

	/*
	 * Set the transmit and receive buffer size
	 */
	if (TxBufSize)
	    setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&TxBufSize,
							sizeof(TxBufSize));
	if (RxBufSize)
	    setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&RxBufSize,
							sizeof(RxBufSize));

#ifdef IP_TOS
	/* Set TOS */
	if (IPQoS > 0)
	    setsockopt(fd, IPPROTO_IP, IP_TOS, (char *) &IPQoS, sizeof(IPQoS));
#endif
    }

}

/*
 * Figure out the ip address and port.  Try to use our cached lookup
 * info because a host might have multiple IN A's and we want to connect
 * to the same one we succesfully connected to before.
 */

void
getHostAddr(int *fd, const char *hostName, const char *serviceName, int defPort, struct sockaddr **addr, int *addrLen, int useSaved)
{
#ifdef INET6
    static struct addrinfo *res0 = NULL;
    static struct addrinfo *res = NULL;
    if (useSaved && res != NULL) {
	*addr = res->ai_addr;
	*addrLen = res->ai_addrlen;
	*fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    } else {
	struct addrinfo hints;
	char p[10];
	int error;

	/*
	 * Open a wildcard listening socket
	 */
	memset(&hints, 0, sizeof(hints));
	hints.ai_family = PF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	snprintf(p, sizeof(p), "%d", defPort);
	if (res0 == NULL) {
	    error = getaddrinfo(HostName, p, &hints, &res0);
	    if (error == 0) {
		res = res0;
	    } else {
		dl_logit("hostname lookup failure: %s:%s: %s\n",
				HostName, serviceName, gai_strerror(error));
		*fd = -1;
		*addr = NULL;
		return;
	    }
	} else if (res != NULL) {
	    res = res->ai_next;
	}
	if (res == NULL) {
	    *fd = -1;
	    *addr = NULL;
	} else {
	    *fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
	    *addr = res->ai_addr;
	    *addrLen = res->ai_addrlen;
	}
    }

#else
    static struct sockaddr_in sin;

    if (useSaved) {
	*addr = (struct sockaddr *)&sin;
	*addrLen = sizeof(sin);
    } else {
	struct hostent *hent0 = NULL;
	static int hindex = 0;
	char *hent;

	memset(&sin, 0, sizeof(sin));

	if (hent0 == NULL) {
	    hent0 = gethostbyname(hostName);
	    if (hent0 == NULL) {
		dl_logit("hostname lookup failure: %s", strerror(errno));
		*fd = -1;
		*addr = NULL;
		return;
	    }
	    hindex = 0;
	} else {
	    hindex++;
	}
	hent = hent0->h_addr_list[hindex];
	if (hent == NULL) {
	    dl_logit("hostname lookup failure: %s", strerror(errno));
	    *fd = -1;
	    *addr = NULL;
	    hent0 = NULL;
	    return;
	} else {
	    sin.sin_family = hent0->h_addrtype;
	    memmove(&sin.sin_addr, hent, hent0->h_length);
	}
	{
	    struct servent *serv = serviceName ?
				getservbyname(serviceName, "tcp") : NULL;
	    if (serv != NULL) {
		sin.sin_port = serv->s_port;
	    } else {
		sin.sin_port = htons(defPort);
		if (serviceName) {
		    dl_logit("unable to lookup service '%s', using port %d", 
			serviceName, 
			defPort
		    );
		}
	    }
	}
	*addr = (struct sockaddr *)&sin;
	*addrLen = sizeof(sin);
    }

#endif	/* INET6 */

}

void
bindOutboundInterface(int *fd)
{
    if (OutboundIpName != NULL) {
#ifdef INET6
	int error;
	struct addrinfo hints;
	struct addrinfo *bres;

	/*
	* Open a wildcard listening socket
	*/
	memset(&hints, 0, sizeof(hints));
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family = PF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	error = getaddrinfo(OutboundIpName, 0, &hints, &bres);
	if (error != 0) {
	    dl_logit("local bind address lookup failure: %s",
							gai_strerror(error));
	    close(*fd);
	    *fd = -1;
	}

	if (bind(*fd, bres->ai_addr, bres->ai_addrlen) < 0) {
	    dl_logit("local bind address failed: %s", strerror(errno));
	    close(*fd);
	    *fd = -1;
	}
	freeaddrinfo(bres);
#else	/* INET4 */
	struct hostent *host = gethostbyname(OutboundIpName);
	struct sockaddr_in bsin;

	bzero(&bsin, sizeof(bsin));

	if (host != NULL) {
	    bsin.sin_family = host->h_addrtype;
	    memmove(&bsin.sin_addr, host->h_addr, host->h_length);
	} else if (IsIpAddr(OutboundIpName)) {
	    bsin.sin_family = AF_INET;
	    bsin.sin_addr.s_addr = inet_addr(OutboundIpName);
	} else {
	    dl_logit("local bind address lookup failure: %s", strerror(errno));
	    close(*fd);
	    *fd = -1;
	}
	if (bind(*fd, (struct sockaddr *)&bsin, sizeof(bsin)) < 0) {
	    dl_logit("local bind address failed: %s", strerror(errno));
	    close(*fd);
	    *fd = -1;
	}
#endif	/* INET6 */
    }
}

int
connectTo(const char *hostName, const char *serviceName, int defPort)
{
    static int UseSaved = 0;
    int fd = -1;
    int cfd = -1;
    static int ConnectCount;
    struct sockaddr *addr = NULL;
    int addrLen = 0;
    int connected = 0;
    int connectattempts = 0;

    WritesLosing = 0;

    clearResponseBuf();

    ++ConnectCount;

    stprintf("%s %s connect: %d", HostName, CurrentBatchFile + CBFIndex, ConnectCount);

#ifdef	USE_ZLIB
    if (CompressOn > 0) {
	int r;
	if ((r = deflateEnd(&Z_Strm)) != Z_OK)
	    logit(LOG_ERR, "deflateEnd error: %s (at connect)", Z_Strm.msg);
    }
#endif
    CompressOn = 0;

    ++Stats->SentStats.ConnectCnt;
    ++Stats->SentStats.ConnectTotal;
    if (HostStats != NULL) {
	LockFeedRegion(HostStats, XLOCK_EX, FSTATS_OUT);
	++HostStats->SentStats.ConnectCnt;
	++HostStats->SentStats.ConnectTotal;
	LockFeedRegion(HostStats, XLOCK_UN, FSTATS_OUT);
    }

    stprintf("%s %s connect: %d", HostName, CurrentBatchFile + CBFIndex,
							Stats->SentStats.ConnectCnt);

/*
 * This gets a little messy.
 *
 * The INET6 based host lookups allow us to keep track of the result
 * of an address lookup across multiple getaddrinfo() calls. The INET4
 * lookups don't without making a copy of the result.
 *
 * This means we need to do a socket() and local bind() before we lookup
 * the host for INET4. In the INET6 case, we only want to do the bind()
 * after we have done the lookup, so that we can do a local bind() using
 * the correct protocol.
 *
 * Not sure if this is the right way, but that's how it is done for now.
 *
 */
    while (!TermFlag && !connected && ++connectattempts <= 5) {

#ifndef INET6
	fd = socket(AF_INET, SOCK_STREAM, 0);
	if (addr == NULL && fd != -1) {
	    setSocketOptions(fd);
	    bindOutboundInterface(&fd);
	}
#endif

	getHostAddr(&fd, hostName, serviceName, defPort, &addr, &addrLen, UseSaved);

#ifdef INET6
	if (addr != NULL && fd != -1) {
	    setSocketOptions(fd);
	    bindOutboundInterface(&fd);
	}
#endif
	if (addr == NULL) {
	    close(fd);
	    fd = -1;
	    connected = -1;
	    UseSaved = 0;
	} else if (fd == -1) {
	    connected = 0;
	    UseSaved = 0;
	} else if (connect(fd, addr, addrLen) < 0) {
	    dl_logit("%s %s connect: %s", NetAddrToSt(0, addr, 1, 1, 1),
					CurrentBatchFile + CBFIndex,
					strerror(errno));
	    close(fd);
	    fd = -1;
	    stprintf("%s %s connect: fail", HostName,
					CurrentBatchFile + CBFIndex);
	    connected = 0;
	    UseSaved = 0;
	} else {
	    connected = 1;
	    connectattempts = 0;
	}
	if (connected == -1) {
	    if (RealTimeOpt)
		sleep(WaitTime);
	    else
		connectattempts = 999;
	}
    }

    /*
     * Sometimes setting the transmit and receive buffer sizes prior to
     * the connect does not work, because the connect() overrides the 
     * parameters based on the destination route.  So, we do it here as well.
     */

    setSocketOptions(fd);

#ifdef TCP_NODELAY
    /*
     * Turn on TCP_NODELAY
     */
    if (fd >= 0) {
	int one = 1;
	setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&one, sizeof(one));
    }
#endif

    LastErrBuf[0] = 0;

    /*
     * get initial message from remote
     */

    UseSaved = 0;

    if (fd >= 0) {
	char *ptr;

	clearCommandBuffer();
	cfd = fd;

	/* Set KillFd here to avoid deadlock when the remote server fails
	 * to respond after initial connection
	 */
	KillFd = cfd;

	switch(commandResponse(cfd, &ptr, NULL)) {
	case OK_CANPOST:	/* innd	*/
	case OK_NOPOST:		/* nntpd may still allow news transfers */
	    break;
	default:
	    dl_logit("connect: %s", (ptr ? ptr : "(unknown error)"));
	    stprintf("%s %s connect: err resp", HostName, CurrentBatchFile + CBFIndex);
	    clearCommandBuffer();
	    KillFd = -1;
	    close(cfd);
	    cfd = -1;
	    fd = -1;
	    break;
	}
	credreset();
    }

    /*
     * Unless streaming is disabled, we attempt to turn on streaming.  If
     * it succeeds, we set the StreamMode to STREAM_RELOAD to resend any
     * queued streaming requests from a prior connection.
     */

    if (cfd >= 0) {
	char *ptr = NULL;
	char negotiated[255];

	negotiated[0] = 0;

	if (TryStreaming) {
	    switch(commandResponse(cfd, &ptr, "mode stream\r\n")) {
	    case OK_STREAMOK:
		StreamMode = STREAM_ON;
		strcat(negotiated,"streaming enabled");                
		dl_logit("connect: %s (streaming)", (ptr ? ptr : "<Unexpected EOF>"));
		stprintf("%s %s connect: stream", HostName, CurrentBatchFile + CBFIndex);
		break;
	    default:
		StreamMode = STREAM_OFF;
		strcat(negotiated,"streaming disabled");                
		dl_logit("connect: %s (nostreaming)", (ptr ? ptr : "<Unexpected EOF>"));
		stprintf("%s %s connect: nostream", HostName, CurrentBatchFile + CBFIndex);
		break;
	    }
	} else {
	    strcat(negotiated,"streaming disabled");                
	    dl_logit("connect (streaming disabled)");
	    stprintf("%s %s connect (streaming disabled)", HostName, CurrentBatchFile + CBFIndex);
	    StreamMode = STREAM_OFF;
	}

	/*
	 * added to support compression, cmsedore@maxwell.syr.edu 12/4/97
	 */
#ifdef	USE_ZLIB
	if (CompressMode > 0) {
	    switch(commandResponse(cfd, &ptr, "mode compress\r\n")) {
	    case OK_COMPRESSOK:
		CompressOn = 1;
		strcat(negotiated," compression enabled");
		bzero(&Z_Strm, sizeof(Z_Strm));
		if (deflateInit(&Z_Strm, CompressMode) != Z_OK)
		    logit(LOG_ERR, "deflateInit error: %s", Z_Strm.msg);
		dl_logit("connect: %s (%s)\n", (ptr ? ptr : "<Unexpected EOF>"),
								 negotiated);
		break;
	    default:
		CompressOn = 0;
		strcat(negotiated," compression disabled");
		CompressMode = COMPRESS_OFF;
		dl_logit("connect: (%s)\n", negotiated);
		break;
	    }
	} else {
	    CompressMode = COMPRESS_OFF;
	    CompressOn = 0;
	}
#endif

	stprintf("%s %s connect: %s", HostName, CurrentBatchFile + CBFIndex,
						negotiated);

	if (HeaderOnlyFeed) {
	    switch(commandResponse(cfd, &ptr, "mode headfeed\r\n")) {
	    case OK_MODECMDOK:
		dl_logit("connect: %s (header feed only)", (ptr ? ptr : "<Unexpected EOF>"));
		stprintf("%s %s connect: headfeed", HostName,
					CurrentBatchFile + CBFIndex);
		break;
	    default:
		dl_logit("mode headfeed failed: %s",
					(ptr ? ptr : "<Unexpected EOF>"));
		stprintf("%s %s connect: mode headfeed failed", HostName,
					CurrentBatchFile + CBFIndex);
		clearCommandBuffer();
		KillFd = -1;
		close(cfd);
		cfd = -1;
		sprintf(LastErrBuf, "mode headfeed command failed");
		break;
	    }
	}
    }

    /*
     * clean up
     */

    if (cfd < 0) {
	if (LastErrBuf[0] == 0)
	    sprintf(LastErrBuf, "connect-timeout");
	sleep(WaitTime / 3);
    } else {
	int n;

	n = fcntl(cfd, F_GETFL);
	if (n < 0) {
	    dl_logit("getting socket flags: %s", strerror(errno));
	} else {
	    if (fcntl(cfd, F_SETFL, n | O_NONBLOCK) < 0) {
		dl_logit("setting O_NONBLOCK: %s", strerror(errno));
	    }
	}

	UseSaved = 1;
    }

    KillFd = fd;
    return(cfd);
}

/*
 * Transact() - begin a streaming or non-streaming transaction, or
 *		complete a streaming transaction.
 */

int
Transact(int cfd, const char *relPath, char *msgId, off_t off, int size, int cSize, int defers, char *stage, char *reason, char *buf, int *sentSize)
{
    int r = 0;
    static char *ptr = NULL;

    if (sentSize)
	*sentSize = 0;

    StrnCpyNull(stage, "-", MAXREASON);
    StrnCpyNull(reason, "-", MAXREASON);
    /*
     * Handle streaming.  If r returns zero, we revert to the
     * old operation (this is only occurs if streaming is disabled).
     */

#ifdef NOTDEF
    if (StreamMode == STREAM_RELOAD) {
	StreamReload(cfd);
	StreamMode = STREAM_ON;
    }
#endif
    if (StreamMode == STREAM_ON) {
	r = StreamTransact(cfd, relPath, msgId, off, size, cSize, defers, stage, reason, buf, sentSize);
	if (r)
	    return(r);
    }

    if (relPath == NULL)
	return(T_STREAMING);

    /*
     * This is necessary to guarentee sufficient receive buffer
     * space.
     */
    if (strlen(msgId) > MAXMSGIDLEN) {
	StrnCpyNull(stage, "pre-ihave", MAXREASON);
	StrnCpyNull(reason, "Message-ID too long", MAXREASON);
	return(T_REJECTED);
    }

    switch(commandResponse(cfd, &ptr, "ihave %s\r\n", msgId)) {
    case CONT_XFER:
	break;
    case ERR_XFERRJCT:
	r = T_REJECTED;
	break;
    case ERR_GOTIT:
	r = T_REFUSED;
	break;
    case ERR_ACCESS:
    case ERR_FAULT:
    case ERR_AUTHBAD:
    case ERR_GOODBYE:
	r = T_FAILEDEXIT;
	break;
    case ERR_NOAUTH:
    default:
	r = T_FAILED;
	break;
    }
    StrnCpyNull(stage, "ihave", MAXREASON);
    StrnCpyNull(reason, (ptr ? ptr : "<Unexpected EOF>"), MAXREASON);
    if (r == 0) {
	r = DumpArticle(cfd, relPath, off, size, cSize);
	if (sentSize)
	    *sentSize = size;
	switch(commandResponse(cfd, &ptr, NULL)) {
	case OK_XFERED:
	    if (r == 0)
		r = T_ACCEPTED;
	    break;
	case ERR_GOTIT:
	    if (r == 0)
		r = T_REFUSED;
	    break;
	case ERR_XFERRJCT:
	    if (r == 0)
		r = T_REJECTED;
	    break;
	case ERR_XFERFAIL:
	default:
	    if (r == 0)
		r = T_FAILED;
	    break;
	}

	StrnCpyNull(stage, "ihave-dump", MAXREASON);
	StrnCpyNull(reason, (ptr ? ptr : "<Unexpected EOF>"), MAXREASON);

	/*
	 * If we couldn't send the article, we ignore the
	 * response code because the remote might send the
	 * wrong response to the null article.
	 */
	if (r < 0) {
	    StrnCpyNull(reason, "Error sending article (1)", MAXREASON);
	    r = T_REJECTED;
	}
    }
    return(r);
}

int
DumpArticle(int cfd, const char *relPath, off_t off, int size, int cSize)
{
    static char pathBase[PATH_MAX];
    char *path;
    char *ptr = NULL;
    char *base = NULL;
    int r = -1;
    int multiArtFile = 0;
    int wasControl = 0;
    int haveLines = 0;
    int doBytes = 1;
    int wireFormat = 0;
    int artSize = 0;
    SpoolArtHdr ah;

    HeaderSize = 0;

    if (*relPath == '/') {
	path = (char *)relPath;
    } else {
	path = pathBase;
	sprintf(path, "%s/%s", PatExpand(SpoolHomePat), relPath);
    }

    if (cfd >= 0 &&
	(base = ptr = cdmap(path, off, &size, cSize, &multiArtFile)) != NULL
    ) {
	int i;
	int b;
	int inHeaders = 1;

	r = 0;

	if (DebugOpt > 1) {
	    printf("%s >> (article, %d bytes)\n", dtstamp(), size);
	}

	/*
	 * In terms of credtime, we assume that our reading
	 * of the file is instantanious.  I don't want to
	 * call gettimeofday() for each line!
	 */

	credtime(OUR_DELAY);

	artSize = size;

	bcopy(base, &ah, sizeof(ah));

	if (artSize > 24 && (uint8)ah.Magic1 == (uint8)STORE_MAGIC1 &&
				(uint8)ah.Magic2 == (uint8)STORE_MAGIC2) {
	    artSize -= ah.HeadLen;
	    ptr += ah.HeadLen;
	    wireFormat = 0;
	    if (ah.StoreType & STORETYPE_WIRE)
		wireFormat = 1; 
	} else {
	    wireFormat = 0;
	}

	if (wireFormat && !HeaderOnlyFeed && !GenLinesHeader &&
#ifdef USE_ZLIB
			!CompressOn &&
#endif
			artSize > CMDBUFFSIZE ) { 
	    if (writeLarge(cfd, ptr, artSize) < 0) {
		if (base != NULL)
		    cdunmap(base, size, multiArtFile, cSize > 0);
		return(T_FAILED);
	    } else {
		if (base != NULL)
		    cdunmap(base, size, multiArtFile, cSize > 0);
		return(0);
	    }
	}
	    
	for (i = b = 0; i < artSize; b = i) {
	    /*
	     * if first character is a '.', escape it
	     */

	    if (ptr[i] == '.' && !wireFormat)
		if (commandWrite(cfd, ".", 1, 1) < 0)
		    return(T_FAILED);

	    /*
	     * dump the article (body in a single write if wireformat).
	     */

	    if (wireFormat && !inHeaders) {
		if (commandWrite(cfd, ptr + i, artSize - i, 1) < 0)
		    return(T_FAILED);
		i = artSize;
		continue;
	    }

	    while (i < artSize && ptr[i] != '\n')
		++i;
	    if (wireFormat && i > 0 && ptr[i-1] == '\r')
		--i;
	    if (commandWrite(cfd, ptr + b, i - b, 1) < 0)
		return(T_FAILED);

	    if (inHeaders) {
		HeaderSize += i - b;

		if (i - b > 8 && strncasecmp(ptr + b, "Control:", 8) == 0)
		    wasControl = 1;
		if (i - b > 6 && strncasecmp(ptr + b, "Lines:", 6) == 0)
		    haveLines = 1;
		if (KeepBytes && i - b > 6 && strncasecmp(ptr + b, "Bytes:", 6) == 0)
		    doBytes = 0;
	    }

	    /*
	     * skip newline. if i > fsize, we hit the end of the file without
	     * a terminating LF.  We don't have to do anything
	     * since we've already terminated the last line.
	     */
	    ++i;
	    if (wireFormat && ptr[i] == '\n')
		++i;

	    /*
	     * end of headers ?  We include the blank line of the headers
	     * terminate in the output, but don't push the rest of the 
	     * article.
	     *
	     * If the article is a control message, we push the whole thing
	     */
	    if (inHeaders && (i - b == 1 || (wireFormat && i - b == 2))) {
		/*
		 * add Lines: header, if requested. note that we can only
		 * do so if we are going to otherwise discard the rest of
		 * the article (HeaderOnlyFeed and not control).
		 */
		if(HeaderOnlyFeed && GenLinesHeader &&
		   haveLines == 0 && wasControl == 0) {
		    int lineCount = 0;
		    char tmp[32];

		    while (i < artSize) {
			while (i < artSize && ptr[i] != '\n')
			    ++i;
			if (i < artSize && ptr[i] == '\n')
			    ++lineCount;
			++i;
		    }

		    snprintf(tmp, sizeof(tmp), "Lines: %d\r\n", lineCount);
		    if (commandWrite(cfd, tmp, strlen(tmp), 1) < 0)
			return(T_FAILED);
		    HeaderSize += strlen(tmp);
		}

		/*
		 * add Bytes: header, dreaderd needs it!  header feeds 
		 * *require* it, but non-header-only feeds can calculate it 
		 * themselves.
		 */
		if (HeaderOnlyFeed && doBytes) {
		    char tmp[32];
		    snprintf(tmp, sizeof(tmp), "Bytes: %d\r\n", artSize);
		    if (commandWrite(cfd, tmp, strlen(tmp), 1) < 0)
			return(T_FAILED);
		    HeaderSize += strlen(tmp);
		}
		if (commandWrite(cfd, "\r\n", 2, 1) < 0)
		    return(T_FAILED);
		inHeaders = 0;
		if (HeaderOnlyFeed && wasControl == 0)
		    break;
	    } else {
		if (commandWrite(cfd, "\r\n", 2, 1) < 0)
		    return(T_FAILED);
	    }
	}
#ifdef	USE_ZLIB
	if (CompressOn > 0)
	    flushCompressBuffer(cfd);
#endif
	credtime(THEIR_DELAY);
    } else {
	if (DebugOpt > 1)
	    printf("%s >> (file not found or other error)\n", dtstamp());
    }
    if (base != NULL)
	cdunmap(base, size, multiArtFile, cSize > 0);

    if (!wireFormat || (HeaderOnlyFeed && !wasControl))
	commandResponse(cfd, NULL, ".\r\n");

    return(r);
}
	
#ifdef NOTDEF

/*
 * Handle streaming protocol
 *
 * StreamReload() - retransmit pending stream after disconnect/reconnect
 * StreamTransact() - normal stream state machine
 */

void
StreamReload(int cfd)
{
    int i;

    for (i = 0; i < MaxStream; ++i) {
	Stream *s = &StreamAry[i];

	if (s->st_State != STATE_EMPTY) {
	    commandResponse(cfd, NULL, "check %s\r\n", s->st_MsgId);
	    s->st_State = STATE_CHECK;
	}
    }
}

#endif

int
StreamTransact(int cfd, const char *relPath, char *msgId, off_t off, int size, int cSize, int defers, char *stage, char *reason, char *buf, int *sentSize)
{
    Stream *s;
    int r = T_STREAMING;

    /*
     * Another article to stream?
     */

    if (relPath) {
	if ((s = LocateStream(msgId, 0)) != NULL) {
	    StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
	    StrnCpyNull(reason, "No in stream", MAXREASON);
	    return(T_REFUSED);
	}
	s = &StreamAry[StreamPend];
	s->st_MsgId  = zallocStr(&SysMemPool, msgId);
	s->st_RelPath   = zallocStr(&SysMemPool, relPath);
	s->st_Off = off;
	s->st_Size = size;
	s->st_CompSize = cSize;
	s->st_DumpRCode = 0;
	if (NoCheckOpt) {
	    int r;

	    commandResponse(cfd, NULL, "takethis %s\r\n", s->st_MsgId);
	    r = DumpArticle(cfd, s->st_RelPath, s->st_Off, s->st_Size,
							s->st_CompSize);
	    /* Trap: This r is NOT the r returned by StreamTransact! */
	    s->st_State = STATE_POSTED;
	    s->st_DumpRCode = r;
	    if (msgId && s->st_MsgId)
		StrnCpyNull(msgId, s->st_MsgId, 1024);
	    StrnCpyNull(stage, "takethis-nocheck", MAXREASON);
	} else {
	    commandResponse(cfd, NULL, "check %s\r\n", s->st_MsgId);
	    s->st_State = STATE_CHECK;
	}
	BytesPend += strlen(s->st_MsgId);
	++StreamPend;
	AveragePend = StreamPend * (1 - 0.98) + AveragePend * 0.98;
	if (StreamPend == NumStream) {
	    HiWaterMark = STREAMDRAIN;
	    if (DebugOpt > 1)
		printf("set watermark\n");
	}
    }

    /*
     * Should we wait for a response ?
     *
     * note: code has been written to allow for non-blocking reads in the 
     * future.  For now, we simply enforce the pipeline by ensuring the
     * receive buffer is large enough.
     *
     * check command response:	  OK_STRMCHECK, ERR_STRMCHECK, ERR_RESEND?
     * takethis command response: OK_STRMTAKE,  ERR_STRMTAKE, ERR_RESEND
     * misc:			  ERR_GOODBYE
     */

    if ((relPath == NULL && StreamPend > 0) || 
	StreamPend >= NumStream - HiWaterMark
    ){
	char *ptr = NULL;
	Stream *s = NULL;
	int delMe = 0;

	switch(commandResponse(cfd, &ptr, NULL)) {
	case ERR_RESEND:
	    if ((s = LocateStream(ptr, STATE_POSTED)) != NULL) {
		delMe = 1;
		r = T_REJECTED;	/* XXX */
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
		if (s->st_DumpRCode < 0) {
		    StrnCpyNull(reason, "Error sending article (2)", MAXREASON);
		    r = T_REJECTED;
		}
		break;
	    }
	    if (!defers && ((s = LocateStream(ptr, STATE_CHECK)) != NULL)) {
		char cst[12];
		delMe = 1;
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, "stream", MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
		r = T_DEFERIT;
		if (s->st_CompSize > 0)
		    sprintf(cst, " C%d", s->st_CompSize);
		else
		    cst[0] = 0;
		sprintf(buf,"%s %s %lld,%d %c%s", s->st_RelPath, s->st_MsgId,
			s->st_Off, s->st_Size, HeaderOnlyFeed ? 'H' : ' ',
			cst);
		break;
	    }
	    /*
	     * fall through.  Consider a resend-later response to a check,
	     * which is really a completely illegal response, to be the same
	     * as OK_STRMCHECK if we are already processing the defers
	     * and let the 'takethis' statemachine deal with it.
	     */
	case OK_STRMCHECK:
	    if ((s = LocateStream(ptr, STATE_CHECK)) != NULL) {
		int r;

		commandResponse(cfd, NULL, "takethis %s\r\n", s->st_MsgId);
		r = DumpArticle(cfd, s->st_RelPath, s->st_Off, s->st_Size,
							s->st_CompSize);
		/*
		 * Trap: This r is NOT the r returned by StreamTransact!
		 * Note that we are not waiting for a response here. The
		 * response is handled in the next call of StreamTransact.
		 */
		s->st_State = STATE_POSTED;
		s->st_DumpRCode = r;
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, (defers? "resend-takethis": "takethis"), MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
	    }
#ifdef NOTDEF
	    if (NumStream < MaxStream && ++NumStreamFrac == STREAMFRAC) {
		++NumStream;
		NumStreamFrac = 0;
	    }
#endif
	    break;
	case ERR_STRMCHECK:
	    if ((s = LocateStream(ptr, STATE_CHECK)) != NULL) {
		delMe = 1;
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
		r = T_REFUSED;
	    }
#ifdef NOTDEF
	    if (NumStream < MaxStream && ++NumStreamFrac == STREAMFRAC) {
		++NumStream;
		NumStreamFrac = 0;
	    }
#endif
	    break;
	case OK_STRMTAKE:
	    if ((s = LocateStream(ptr, STATE_POSTED)) != NULL) {
		delMe = 1;
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
		r = T_ACCEPTED;
		if (sentSize)
		    *sentSize = s->st_Size;
		if (s->st_DumpRCode < 0) {
		    if (reason)
			StrnCpyNull(reason, "Error sending article (3)", MAXREASON);
		    r = T_REJECTED;
		}
	    }
#ifdef NOTDEF
	    if (NumStream < MaxStream && ++NumStreamFrac == STREAMFRAC) {
		++NumStream;
		NumStreamFrac = 0;
	    }
#endif
	    break;
	case ERR_STRMTAKE:
	case ERR_XFERRJCT:
	    /* 
	     * NOTE: ERR_XFERRJCT is an illegal response to a streaming
	     * takethis, but some news systems return it so...
	     *
	     * But note also that we do not wait for the response of a
	     * takethis, so this may very well a perfectly valid 
	     * post-dump reject.
	     *
	     * !!! How do we get the proper state to ArtLog? We should really
	     * distinguish between a ERR_XFERRJCT as response to a CHECK and
	     * ERR_XFERRJCT as a response to TAKETHIS.
	     */
	    if ((s = LocateStream(ptr, STATE_POSTED)) != NULL) {
		delMe = 1;
		/*r = T_REFUSED; actually, this is a reject */
	        r = T_REJECTED;
		if (sentSize)
		    *sentSize = s->st_Size;
		StrnCpyNull(msgId, s->st_MsgId, 1024);
		if (stage && !*stage)
		    StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
		StrnCpyNull(reason, ptr, MAXREASON);
		if (s->st_DumpRCode < 0) {
		    if (reason)
			StrnCpyNull(reason, "Error sending article (4)", MAXREASON);
		    r = T_REJECTED;
		}
	    }
	    break;
	case ERR_GOODBYE:
	case ERR_ACCESS:
	case ERR_FAULT:
	case ERR_AUTHBAD:
	    if (stage && !*stage)
		StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
	    StrnCpyNull(reason, (ptr ? ptr : "<Unexpected EOF>"), MAXREASON);
	    r = T_FAILEDEXIT;
	    break;
	default:
	    if (stage && !*stage)
		StrnCpyNull(stage, (defers? "resend-stream": "stream"), MAXREASON);
	    StrnCpyNull(reason, (ptr ? ptr : "<Unexpected EOF>"), MAXREASON);
	    r = T_FAILED;
	    break;
	}
	if (delMe) {
	    --StreamPend;
	    BytesPend -= strlen(s->st_MsgId);
	    zfreeStr(&SysMemPool, &s->st_RelPath);
	    zfreeStr(&SysMemPool, &s->st_MsgId);
	    if (s != &StreamAry[StreamPend]) {
		*s = StreamAry[StreamPend];
		memset(&StreamAry[StreamPend], 0, sizeof(Stream));
	    } else {
		memset(s, 0, sizeof(Stream));
	    }
	}
    }
    if (DebugOpt > 1)
	printf("%s StreamTransact: return %d\n", dtstamp(), r);
    return(r);
}

/*
 * Locate id.  Applies only to active id's.  STATE_RETRY id's are
 * not (yet) active.
 */

Stream *
LocateStream(const char *msgId, int state)
{
    int i;
    int idLen;

    if (msgId == NULL)
	return(NULL);
    while (*msgId && *msgId != '<')
	++msgId;
    for (idLen = 0; msgId[idLen] && msgId[idLen] != '>'; ++idLen)
	;
    if (msgId[idLen] == '>')
	++idLen;

    if (DebugOpt > 1)
	printf("%s MsgId(%*.*s,%d):", dtstamp(), idLen, idLen, msgId, state);

    for (i = 0; i < MaxStream; ++i) {
	Stream *s = &StreamAry[i];

	if (s->st_State == STATE_RETRY)
	    continue;

	if ((state == 0 || s->st_State == state) && 
	    s->st_State &&
	    strlen(s->st_MsgId) == idLen &&
	    strncmp(msgId, s->st_MsgId, idLen) == 0
	) {
	    if (DebugOpt > 1)
		printf(" found slot %d\n", i);
	    return(s);
	}
    }
    if (DebugOpt > 1)
	printf(" not found\n");
    return(NULL);
}

/*
 * Refile pending streams.  If we have an output file, the
 * pending streams are refiled to the file.  If we do not,
 * we leave them in the stream array and mark them for RETRY.
 * This will cause them to be regenerated as file input
 * later.
 *
 * It is possible for a previous Refile to mark as retry what
 * we now wish to write to a file.
 */

int
RefilePendingStreams(FILE *fo)
{
    int i;
    int n = 0;

    for (i = 0; i < MaxStream; ++i) {
	Stream *s = &StreamAry[i];

	if (s->st_State != STATE_EMPTY) {
	    /*
	     * If an output file is available, drain pending/retry entries
	     * to it and clear the stream entirely.  Otherwise change all
	     * entries to STREAM_RETRY.
	     */

	    if (fo) {
		if (s->st_Off || s->st_Size) {
		    fprintf(fo, "%s\t%s\t%d,%d\n", 
			s->st_RelPath, 
			s->st_MsgId,
			(int)s->st_Off, 
			(int)s->st_Size
		    );
		} else {
		    fprintf(fo, "%s\t%s\n", s->st_RelPath, s->st_MsgId);
		}
		zfreeStr(&SysMemPool, &s->st_MsgId);
		zfreeStr(&SysMemPool, &s->st_RelPath);
		if (s->st_State != STATE_RETRY) {
		    --StreamPend;
		} else {
		    --StreamRetry;
		}
		memset(s, 0, sizeof(Stream));
	    } else {
		if (s->st_State != STATE_RETRY) {
		    s->st_State = STATE_RETRY;
		    ++StreamRetry;
		    --StreamPend;
		}
	    }
	    ++n;
	}
    }
    if (StreamPend != 0) {
	dl_logit("stream array corrupt");
	StreamPend = 0;
    }
    BytesPend = 0;
    return(n);
}

/*
 * Misc subroutines
 */

void 
dl_logit(const char *ctl, ...)
{
    va_list va;
    char buf[1024];

    sprintf(buf, "%s:%s ", 
	((HostName) ? HostName : "<unknown>"),
	((CurrentBatchFile) ? CurrentBatchFile : "<unknown>")
    );
    va_start(va, ctl);
    vsprintf(buf + strlen(buf), ctl, va);
    va_end(va);
    logit(LOG_NOTICE, "%s", buf);
    if (DebugOpt > 0)
	fprintf(stderr, "%s newslink[%ld] %s\n", LogTime(), (long)getpid(), buf);
    if (DebugOpt > 1)
	printf("%s %s", dtstamp(), buf);
}


void 
logStats(const char *description)
{
    time_t t = time(NULL);
    int secs = t - Stats->SentStats.DeltaStart;

    /*
     * If we got something through, cut off LastErrBuf
     */

    if (Stats->SentStats.OfferedCnt)
	LastErrBuf[9] = 0;

    credtime(OUR_DELAY);

    /*
     * Log!
     */

    dl_logit("%s secs=%-4d acc=%-4d dup=%-4d rej=%-4d tot=%-4d bytes=%-4.0f (%d/min"
#ifdef CREDTIME
    " %d/%d mS" 
#endif
    ") avpend=%-4.1f %s",
	description,
	secs,
	Stats->SentStats.AcceptedCnt,
	Stats->SentStats.RefusedCnt,
	Stats->SentStats.RejectedCnt,
	Stats->SentStats.OfferedCnt,
	Stats->SentStats.AcceptedBytes,
	((secs) ? Stats->SentStats.OfferedCnt * 60 / secs : 0),
#ifdef CREDTIME
	((MsCount) ? OurMs / MsCount : -1),
	((MsCount) ?  TheirMs / MsCount : -1),
#endif
	AveragePend,
	LastErrBuf
    );
    if (Stats->SentStats.DeferredCnt > 0 || Stats->SentStats.DeferredFailCnt > 0)
	dl_logit("%s secs=%-4d defer=%-4d deferfail=%-4d",
	    description,
	    secs,
	    Stats->SentStats.DeferredCnt,
	    Stats->SentStats.DeferredFailCnt
	);
#ifdef	USE_ZLIB
    if (CompressOn > 0)
	dl_logit("%s compbytes=%ld decompbytes=%ld (%.2f%% compression)\n",
	    description,
	    CompressedBytes,
	    RawBytes,
	    100 - (((float)CompressedBytes / (float)RawBytes) * 100)
	);
#endif

    Stats->SentStats.DeltaStart = t;
    Stats->SentStats.OfferedCnt = 0;
    Stats->SentStats.AcceptedCnt = 0;
    Stats->SentStats.RefusedCnt = 0;
    Stats->SentStats.RejectedCnt = 0;
    Stats->SentStats.DeferredCnt = 0;
    Stats->SentStats.DeferredFailCnt = 0;
    Stats->SentStats.RejectedBytes = 0.0;
    Stats->SentStats.AcceptedBytes = 0.0;

#ifdef	USE_ZLIB
    RawBytesTotal+=RawBytes;
    CompressedBytesTotal+=CompressedBytes;
    RawBytes=0;
    CompressedBytes=0;
#endif

    MsCountTotal += MsCount;
    OurMsTotal += OurMs;
    TheirMsTotal += TheirMs;

    MsCount = 0;
    OurMs = 0;
    TheirMs = 0;
}

/*
 * commandResponse() optionally send command, optionally read a response
 */

static char Buf[16384];
static int Bufb;
static int Bufe;
static char Cmd[CMDBUFFSIZE];
static int Cmde;

void
clearResponseBuf(void)
{
    Bufb = Bufe = 0;
}

void
clearCommandBuffer(void)
{
    Cmde = 0;
}

#ifdef NOTDEF

void
copyCommandBuffer(FILE *fo)
{
    if (Cmde) {
	fwrite(Cmd, 1, Cmde, fo);
	Cmde = 0;
    }
}

#endif

void
flushCompressBuffer(int cfd)
{
#ifdef	USE_ZLIB
    if (CompressBufInPos > 0) {
	Z_Strm.next_in = CompressBufIn;
	Z_Strm.avail_in = CompressBufInPos;
	RawBytes += CompressBufInPos;
	while (Z_Strm.avail_in) {
	    Z_Strm.next_out = CompressBufOut;
	    Z_Strm.avail_out = sizeof(CompressBufOut);
	    if (deflate(&Z_Strm, Z_SYNC_FLUSH) != Z_OK)
		logit(LOG_ERR, "deflate error: %s", Z_Strm.msg);
	    commandWrite(cfd, CompressBufOut,
				(sizeof(CompressBufOut) - Z_Strm.avail_out), 2);
	    CompressedBytes += (sizeof(CompressBufOut) - Z_Strm.avail_out);
	}
	CompressBufInPos = 0;
    }   
#endif
}

int
flushCommandBuffer(int cfd)
{
    int b = 0;

    if (WritesLosing) {
	Cmde = 0;
	return(-1);
    }

    while (b != Cmde) {
	int r = 0;

	credtime(OUR_DELAY);

	errno = 0;
	r = write(cfd, Cmd + b, Cmde - b);
	if (r < 0) {
	    if (errno != EAGAIN && errno != EINTR &&
		errno != EWOULDBLOCK && errno != EINPROGRESS) {
		    Cmde = 0;
		    return(-1);
	    }
	    r = 0;
	}
	b += r;

#if USE_POLL
	if (b != Cmde) {
	    struct pollfd pfd = { 0 };
	    struct linger l;

	    pfd.fd = cfd;
	    pfd.events = POLLOUT;
	    if (poll(&pfd, 1, Timeout * 1000) == 0) {
		dl_logit("write timeout");
		WritesLosing = 1;
		l.l_onoff = 1;
		l.l_linger = 0;
		setsockopt(cfd, SOL_SOCKET, SO_LINGER, (void*)&l, sizeof(l));
		TimeCounter = 100000;
		break;
	    }
	}
#endif

	credtime(THEIR_DELAY);
    }

    Cmde = 0;
    return(0);
}

int
writeLarge(int cfd, char *buffer, size_t size)
{
    struct iovec tmpvec[3]; /* structures to store writev info */

    int status;	
    size_t offset, bytes=0;
    int iovleft=0,	/* iov's to handle */
	i;		/* first iov in buffer that has data left */

    if (WritesLosing) {
	Cmde = 0;
	return(-1);
    }

    if (Cmde > 0) { /* if there is something to write out in Cmd buffer */ 
        tmpvec[iovleft].iov_base=Cmd;
        tmpvec[iovleft].iov_len=Cmde;
        iovleft++;
        bytes += Cmde;
	Cmde=0;
    }
    tmpvec[iovleft].iov_base=buffer;
    tmpvec[iovleft].iov_len=size;
    iovleft++;

    bytes += size;

    i = 0;
    do {
        /* Write out what's left and return success if it's all written. */
	credtime(OUR_DELAY);
	errno = 0;
	status = writev(cfd, tmpvec + i, iovleft);
	if (status < 0) {
	    if (errno != EAGAIN && errno != EINTR &&
		errno != EWOULDBLOCK && errno != EINPROGRESS) {
		    return(-1);
	    }
	    status = 0; offset=0;
	} else {
            offset = status;
            bytes -= offset;
        }
#if USE_POLL
	if (bytes > 0) {
	    struct pollfd pfd = { 0 };
	    struct linger l;

	    pfd.fd = cfd;
	    pfd.events = POLLOUT;
	    if (poll(&pfd, 1, Timeout * 1000) == 0) {
		dl_logit("write timeout");
		WritesLosing = 1;
		l.l_onoff = 1;
		l.l_linger = 0;
		setsockopt(cfd, SOL_SOCKET, SO_LINGER, (void*)&l, sizeof(l));
		TimeCounter = 100000;
		break;
	    }
	}
#endif
	credtime(THEIR_DELAY);

        /* Skip full iovecs */
        for (; offset >= (size_t) tmpvec[i].iov_len && iovleft > 0; i++) {
            offset -= tmpvec[i].iov_len;
            iovleft--;
        }
	if (offset > 0) {
	    tmpvec[i].iov_base = (char *) tmpvec[i].iov_base + offset;
	    tmpvec[i].iov_len -= offset;
	}

    } while (iovleft > 0 && (status >= 0 || errno == EINTR));

    assert(bytes == 0);

    return status;
}

int
commandWrite(int cfd, const void *buf, int bytes, int artdata)
{
    if (CompressOn > 0 && artdata != 2) {
#ifdef	USE_ZLIB
	while (bytes > 0) {
	    CompressBufIn[CompressBufInPos++] = *(char *)buf++;
	    bytes--;
	    if (CompressBufInPos > 8000)
		flushCompressBuffer(cfd);
	}   
	if (!artdata)
	    flushCompressBuffer(cfd);
#endif
    } else {
	while (bytes > 0) {
	    int n = sizeof(Cmd) - Cmde;
	    if (n == 0) {
		if (flushCommandBuffer(cfd) < 0)
		    return(-1);
		n = sizeof(Cmd);
	    }
	    if (n > bytes)
		n = bytes;
	    bcopy(buf, Cmd + Cmde, n);
	    Cmde += n;
	    bytes -= n;
	    buf = (const char *)buf + n;
	}
    }
    return(0);
}

int
commandResponse(int cfd, char **rptr, const char *ctl, ...)
{
    va_list va;
    int r = ERR_UNKNOWN;
    int n;

    if (ctl) {
	char tmp[4096];

	va_start(va, ctl);
	vsnprintf(tmp, sizeof(tmp), ctl, va);
	va_end(va);

	/*
	 * copy to outgoing Cmd buffer
	 */

	if (commandWrite(cfd, tmp, strlen(tmp), 0) < 0)
	    return(ERR_FAULT);

	if (DebugOpt > 1) {
	    int i;

	    printf("%s >> ", dtstamp());
	    for (i = 0; tmp[i]; ++i) {
		if (isprint((int)tmp[i]))
		    printf("%c", tmp[i]);
		else
		    printf("[%02x]", tmp[i]);
	    }
	    puts("");
	}
    }
    if (rptr) {
#if USE_POLL == 0
	int alarmSet = 0;
#endif

	/*
	 * flush output that has built up.  It's important to try to
	 * do this in one write() because TCP_NODELAY has been set.
	 */

	flushCompressBuffer(cfd);
	flushCommandBuffer(cfd);

	*rptr = NULL;

	/*
	 * Find next complete line in buffer, read more data if
	 * necessary.
	 */

	n = 0;

	credtime(OUR_DELAY);

	do {
	    int i;

	    credtime(THEIR_DELAY);
	    Bufe += n;
	    for (i = Bufb; i < Bufe; ++i) {
		if (Buf[i] == '\n') {
		    Buf[i] = 0;
		    if (i != Bufb && Buf[i-1] == '\r')
			Buf[i-1] = 0;
		    *rptr = Buf + Bufb;
		    Bufb = i + 1;
		    r = strtol(*rptr, NULL, 10);
		    break;
		}
	    }
	    if (i != Bufe)
		break;
	    /*
	     * We may have run out of space in the
	     * buffer, try to left-justify the data
	     * to make more room.  If we can't, the line
	     * is just too long.
	     */
	    if (Bufe == sizeof(Buf) - 1) {
		if (Bufb == 0)
		    break;
		memcpy(Buf, Buf + Bufb, Bufe - Bufb);
		Bufe -= Bufb;
		Bufb = 0;
	    }
	    credtime(OUR_DELAY);

	    n = 0;

	    if (WritesLosing)
		break;

	    do {
		if (Timeout > 0) {
#if USE_POLL
		    struct pollfd pfd = { 0 };

		    pfd.fd = cfd;
		    pfd.events = POLLIN;
		    if (poll(&pfd, 1, Timeout * 1000) == 0) {
			TimeCounter = 100000;
			break;
		    }
#else
		    /*
		     * set an alarm.  alarm() is really inefficient
		     * syscallwise, so we only do it when we really need
		     * to read().
		     */
		    alarm(Timeout);
		    alarmSet = 1;
#endif
		}
		n = read(cfd, Buf + Bufe, sizeof(Buf) - Bufe - 1);
	    } while (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
	} while (n > 0);

	credtime(THEIR_DELAY);
	++MsCount;

	/*
	 * turn off alarm, only if we previously
	 * turned it on.
	 */
#if USE_POLL == 0
	if (alarmSet)
	    alarm(0);
#endif

	if (DebugOpt > 1)
	    printf("%s << %s\n", dtstamp(), (*rptr) ? *rptr : "<interrupt/error>");

	if (*rptr) {
	    strncpy(LastErrBuf, *rptr, 32);
	    LastErrBuf[32] = 0;
	} else {
	    strcpy(LastErrBuf, "<interrupt/error>");
	}
    } else {
	r = 0;
    }
    return(r);
}

void
sigTerm(int sigNo)
{
    TermFlag = 1;
    if (sigNo == SIGALRM) {
	if (KillFd >= 0)
	    close(KillFd);
    }
}

#ifdef CREDTIME

void
credtime(int whos)
{
    struct timeval tv;
    static struct timeval Tv;

    gettimeofday(&tv, NULL);

    if (whos) {
	int ms = (tv.tv_usec + 1000000 - Tv.tv_usec) / 1000 +
	     (tv.tv_sec - Tv.tv_sec - 1) * 1000;

	switch(whos) {
	case OUR_DELAY:
	    OurMs += ms;
	    break;
	case THEIR_DELAY:
	    TheirMs += ms;
	    break;
	}
    }
    Tv = tv;
}

void
credreset(void)
{
    OurMs = TheirMs = 0;
}

#endif

typedef struct {
    char rb_Buf[MAXCLINE-sizeof(int)*3];
    int  rb_Base;
    int  rb_Index;
    int	 rb_Len;
} RBlock;

RBlock *RBlockAry[MAXFILEDES];
MemPool	*BMemPool;

void
readreset(int fd)
{
    RBlock *rbs;

    if (fd < 0 || fd >= MAXFILEDES) {
	dl_logit("readreset: bad fd: %d", fd);
	return;
    }
    if ((rbs = RBlockAry[fd]) != NULL) {
	zfree(&BMemPool, rbs, sizeof(RBlock));
	RBlockAry[fd] = NULL;
    }
}

/*
 * readretry() - read 
 *
 */

int
readretry(char *buf, int size)
{
    int inval = 1;

    if (StreamRetry) {
	int i;

	for (i = 0; i < MaxStream; ++i) {
	    Stream *s = &StreamAry[i];

	    if (s->st_State == STATE_RETRY) {
		if (s->st_Off || s->st_Size) {
		    sprintf(buf, 
			"%s\t%s\t%lld,%d",
			s->st_RelPath, 
			s->st_MsgId, 
			s->st_Off,
			s->st_Size
		    );
		} else {
		    sprintf(buf, "%s\t%s", s->st_RelPath, s->st_MsgId);
		}
		memset(s, 0, sizeof(Stream));
		inval = 0;
		--StreamRetry;
		if (DebugOpt > 1)
		    printf("%s (retry from stream): %s\n", dtstamp(), buf);
		break;
	    }
	}
    }
    return(inval);
}

/*
 * readline() - read next line from input.  This routine may block if we 
 *		are in realtime mode (-r), but only if StreamPend == 0
 */

int
readline(int fd, char *buf, int size, time_t t)
{
    RBlock *rbs;
    int statCounter = 0;
    int loopCounter = 0;
    static int usleeptime = 100000;

    if (fd < 0 || fd >= MAXFILEDES) {
	dl_logit("readline: bad fd: %d", fd);
	return(-1);
    }
    if ((rbs = RBlockAry[fd]) == NULL) {
	rbs = zalloc(&BMemPool, sizeof(RBlock));
	RBlockAry[fd] = rbs;
    }
    
    /*
     * Ensure there is enough room for a full line
     */

    if (rbs->rb_Base > 0 && rbs->rb_Len > MAXCLINE / 2) {
	memmove(rbs->rb_Buf, rbs->rb_Buf + rbs->rb_Base, rbs->rb_Len - rbs->rb_Base);
	rbs->rb_Len -= rbs->rb_Base;
	rbs->rb_Index -= rbs->rb_Base;
	rbs->rb_Base = 0;
    }

    for (;;) {
	int n;

	/*
	 * Look for newline
	 */

	for (n = rbs->rb_Index; n < rbs->rb_Len; ++n) {
	    if (rbs->rb_Buf[n] == '\n') {
		int s = (++n) - rbs->rb_Base;

		if (s >= size)
		    s = size - 1;
		memmove(buf, rbs->rb_Buf + rbs->rb_Base, s);
		buf[s] = 0;

		if (n == rbs->rb_Len) {
		    rbs->rb_Base = 0;
		    rbs->rb_Index = 0;
		    rbs->rb_Len = 0;
		} else {
		    rbs->rb_Base = n;
		    rbs->rb_Index = n;
		}
		if (*buf != '#')
		    return(0);
	    }
	}
	rbs->rb_Index = n;

	/*
	 * None found, read and loop
	 */

	if (rbs->rb_Len == sizeof(rbs->rb_Buf)) {
	    dl_logit("Line too long in batchfile!");
	    return(-1);
	}
	n = read(fd, rbs->rb_Buf + rbs->rb_Len, sizeof(rbs->rb_Buf) - rbs->rb_Len);
	if (n <= 0) {
	    struct stat st;

	    /*
	     * If a read error occurs or we are not in realtime mode or the
	     * termination flag has been set, break out.  If we ARE in realtime
	     * mode and StreamPend is not 0, breakout.  But if StreamPend
	     * is 0 and we are in realtime mode, we block.
	     */

	    if (n < 0 || RealTimeOpt == 0 || StreamPend != 0 || TermFlag)
		break;

	    /*
	     * If we don't have any articles in realtime mode, flush the
	     * article file cache occasionally to make sure that deleted
	     * files are removed.
	     */
	    if (loopCounter++ > 60)
		cdflush();

	    /*
	     * sleep and retry the read.  Effectively a tail -f.  When the file
	     * is renamed AND a new realtime batchfile exists, we switch to the
	     * new realtime batch and attempt to remove the renamed one that
	     * we still have a lock on.  This only works if the batch file
	     * is named after the label / sequence file.
	     *
	     * If the file is renamed, we stay with the file until a new 
	     * realtime batch is available.  
	     */
	    {
		if (statCounter == 0 &&
				stat(CurrentBatchFile,&st) == 0 &&
				st.st_ino != CurSt.st_ino) {
		    /*
		     * do not attempt to remove the batchfile if we would have
		     * had to refile some of our entries.
		     */
		    if (WouldHaveRefiled == 0)
			AttemptRemoveRenamedFile(&CurSt, BatchFileCtl);
		    break;
		}
	    }

	    /*
	     * RealTimeOpt:  -1 for 'fast sleep' or 'poll' (if supported).
	     *	note: there is a race condition in poll when used on files
	     *  in FreeBSD, so we set the timeout to 1 second.
	     *
	     * RealTimeOpt: > 0 for sleep period in seconds.
	     */
	    if (NotifyOpt != NULL && ListenNotify(&NotifyFd, &NotifyLockFd, NotifyOpt)) {
		fd_set rfds;
		struct timeval tv = { 0, 0 };
		char buf;
		int res;
		int i = 0;

		if (LastNotify + 60 < LastFeedData) {
		    RegisterNotify(NotifyOpt, 1);
		    LastNotify = t;
		}
		FD_ZERO(&rfds);
		FD_SET(NotifyFd, &rfds);
		tv.tv_usec = 300000;
		if ((res = select(NotifyFd + 1, &rfds, NULL, NULL, &tv)) == 0)
		    TimeCounter += 40;
		else
		    TimeCounter += 10;
		while (res > 0 && (res = read(NotifyFd, &buf, 1)) == 1 &&
								i++ < 100) {
		    if (res > 0) {
			LastNotify = t;
		    } else if (res == -1 && errno != EAGAIN) {
			RegisterNotify(NotifyOpt, 0);
			CloseNotify(&NotifyFd, &NotifyLockFd, NotifyOpt);
			LastNotify = 0;
		    }
		}
		statCounter = (statCounter + 1) % 5;
	    } else if (RealTimeOpt < 0) {
#if USE_POLL && defined(POLLEXTEND) && USE_POLLEXTEND
	        struct pollfd pfd = { 0 };
		int r;

		pfd.fd = fd;
		pfd.events = POLLEXTEND;

		errno = 0;
		if ((r = poll(&pfd, 1, 100)) < 0)
		    dl_logit("poll() failed: %s", strerror(errno));
		statCounter = (statCounter + 1) % 50;
		TimeCounter += 10;
#else
#if HAS_USLEEP
		usleep(usleeptime);
		statCounter = (statCounter + 1) % 50;
		TimeCounter += usleeptime / 10000 + 10;
#else
		sleep(1);		/* 1s  */
		statCounter = (statCounter + 1) % 5;
		TimeCounter += 100;
#endif
#endif
	    } else {
		sleep(RealTimeOpt);
		statCounter = (statCounter + 1) % 5;
		TimeCounter += 100;
	    }
	    usleeptime = usleeptime + usleeptime / 2;
	    if (usleeptime > 1000000)
		usleeptime = 1000000;
	} else {
	    LastFeedData = t;
	    rbs->rb_Len += n;
	    usleeptime = usleeptime - usleeptime / 2;
	    if (usleeptime < 100)
		usleeptime = 100;
	}
    }
    return(-1);
}

int
ValidMsgId(char *msgid)
{
    int i;
    int l = strlen(msgid);

    if (l >= MAXMSGIDLEN)
	return(0);
    if (msgid[0] == '<') {
	for (i = 1; i < l; ++i) {
	    if (msgid[i] == ' ' || msgid[i] == '\t')
		break;
	    if (msgid[i] == '>') {
		if (msgid[i+1] == 0)
		    return(1);
		break;
	    }
	}
    }
    return(0);
}

void
AttemptRemoveRenamedFile(struct stat *st, const char *spoolFile)
{
    char buf[256];
    int fd;
    int begSeq = -1;

    snprintf(buf, sizeof(buf), "%s/.%s.seq", PatExpand(DQueueHomePat), spoolFile);
    if ((fd = open(buf, O_RDONLY)) >= 0) {
	int n;
	if ((n = read(fd, buf, sizeof(buf) - 1)) > 0) {
	    buf[n] = 0;
	    sscanf(buf, "%d", &begSeq);
	}
	close(fd);
    }

    /*
     * Attempt to locate the renamed file.  It is no big deal if we can't
     * find it.  We still have our lock on the file, so we can safely remove
     * it.
     */

    if (begSeq >= 0) {
	int n;

	for (n = 3; n >= 0; --n, ++begSeq) {
	    struct stat nst;

	    snprintf(buf, sizeof(buf), "%s/%s.S%05d", PatExpand(DQueueHomePat), spoolFile, begSeq);
	    if (stat(buf, &nst) == 0 && st->st_ino == nst.st_ino) {
		remove(buf);
		break;
	    }
	}
    }
}

/*
 * CDMAP() - map part of a file.  If off & *psize are 0, we map the whole
 *	     file.
 */

#define MAXMCACHE	32

typedef struct XCache {
    char	*mc_Path;
    int		mc_Fd;
    int		mc_Size;	/* only if off/psize are not known */
    time_t	mc_OpenTime;
} XCache;

XCache	XCacheAry[MAXMCACHE];

void
cdinit(void)
{
    int i;
    bzero(&XCacheAry, sizeof(XCacheAry));
    for (i = 0; i < MAXMCACHE; ++i) {
	XCacheAry[i].mc_Path  = NULL;
	XCacheAry[i].mc_Fd  = -1;
    }
}

char *
cdmap(const char *path, off_t off, int *psize, int cSize, int *multiArtFile)
{
    int i;
    XCache *mc;
    char *ptr = NULL;

    *multiArtFile = 0;
    if (*psize || off) {
	*multiArtFile = 1;
    }

    for (i = 0; i < MAXMCACHE; ++i) {
	if (XCacheAry[i].mc_Path == NULL)
	    break;
	if (strcmp(path, XCacheAry[i].mc_Path) == 0)
	    break;
    }
    if (i == MAXMCACHE) {
	i = random() % MAXMCACHE;

	mc = &XCacheAry[i];
	close(mc->mc_Fd);
	zfreeStr(&SysMemPool, &mc->mc_Path);
	mc->mc_Fd = -1;
	mc->mc_Size = 0;
	mc->mc_OpenTime = 0;
    }
    mc = &XCacheAry[i];

    if (mc->mc_Path == NULL) {
	if ((mc->mc_Fd = cdopen(path, O_RDONLY, 0)) >= 0) {
	    struct stat st;

	    st.st_size = 0;
	    fstat(mc->mc_Fd, &st);
	    mc->mc_Size = st.st_size;
	    mc->mc_Path = zallocStr(&SysMemPool, path);
	    mc->mc_OpenTime = time(NULL);
	}
    }

    /*
     * When mapping multi-article files, a \0 terminator must also be mapped.
     * There is no \0 terminator for regular files.
     */

    if (mc->mc_Fd >= 0) {
	if (!*multiArtFile)
	    *psize = mc->mc_Size;

	if (cSize > 0) {
#ifdef USE_ZLIB
	    char *p;
	    gzFile *gzf;
	    SpoolArtHdr tah = { 0 };

	    lseek(mc->mc_Fd, off, 0);
	    if (read(mc->mc_Fd, &tah, sizeof(tah)) != sizeof(tah))
		return(NULL);
	    if ((uint8)tah.Magic1 != (uint8)STORE_MAGIC1 &&
				(uint8)tah.Magic2 != (uint8)STORE_MAGIC2) {
		lseek(mc->mc_Fd, off, 0);
		tah.Magic1 = STORE_MAGIC1;
		tah.Magic2 = STORE_MAGIC2;
		tah.HeadLen = sizeof(tah);
		tah.ArtLen = *psize;
		tah.ArtHdrLen = *psize;
		tah.StoreLen = *psize;
		tah.StoreType = STORETYPE_TEXT;
		*multiArtFile = 1;
	    }
	    gzf = gzdopen(dup(mc->mc_Fd), "r");
	    if (gzf == NULL)
		return(NULL);

	    ptr = (char *)malloc(tah.ArtLen + tah.HeadLen + 2);
	    if (ptr == NULL) {
		logit(LOG_CRIT, "Unable to malloc %d bytes for article (%s)\n",
				tah.ArtLen + tah.HeadLen + 2, strerror(errno));
		gzclose(gzf);
		return(NULL);
	    }
	    p = ptr;
	    bcopy(&tah, p, tah.HeadLen);
	    p += tah.HeadLen;
	    if (gzread(gzf, p, tah.ArtLen) != tah.ArtLen) {
		free(ptr);
		return(NULL);
	    }
	    p[tah.ArtLen] = 0;
	    *psize = tah.ArtLen + tah.HeadLen;
	    gzclose(gzf);
#else
	    logit(LOG_CRIT, "Queue batch file indicates compressed file and compression not enabled");
#endif
	} else {
	    ptr = xmap(NULL, *psize + *multiArtFile, PROT_READ, MAP_SHARED, mc->mc_Fd, off);
	    if (ptr == NULL)
		return(ptr);
	    if (ptr) {
		if (DOpts.FeederPreloadArt)
		    xadvise(ptr, *psize, XADV_WILLNEED);
		xadvise(ptr, *psize, XADV_SEQUENTIAL);
	    }
	    if (*multiArtFile && ptr && ptr[*psize] != 0) {
		logit(LOG_CRIT, "article batch corrupted: %s @ %lld,%ld", path, off, *psize);
		xunmap(ptr, *psize + *multiArtFile);
		ptr = NULL;
	    }
	}
    }
    return(ptr);
}

void
cdunmap(char *ptr, int bytes, int multiArtFile, int compressed)
{
    if (compressed)
	free(ptr);
    else
	xunmap((caddr_t)ptr, bytes + multiArtFile);
}

/*
 * Flush the open article cache every 10 minutes to ensure that we
 * don't keep spool files open too long
 */
void
cdflush(void)
{
    int i;
    time_t t = time(NULL);
    for (i = 0; i < MAXMCACHE; ++i) {
	if (XCacheAry[i].mc_Fd >= 0 &&
			(t - XCacheAry[i].mc_OpenTime) > CACHEFLUSHTIME) {
	close(XCacheAry[i].mc_Fd);
	zfreeStr(&SysMemPool, &XCacheAry[i].mc_Path);
	XCacheAry[i].mc_Fd = -1;
	XCacheAry[i].mc_Size = 0;
	XCacheAry[i].mc_OpenTime = 0;
	}
    }
}

void
doArtLog(const char *class, char *msgid, char *message, const char *stage, const char *reason)
{
    if (strstr(ArtLog, "all") || strstr(ArtLog, class)) {
	/* The strstr is only an approximation, but the classes are
	 * sufficiently distinct for this to work.
	 */
	if (Logfd == NULL) {
	    char logfname[PATH_MAX];
	    sprintf(logfname, "%s/feedlog.%s", PatExpand(LogHomePat), HostName);
	    Logfd = fopen(logfname, "a");
	    if (Logfd == NULL)
	        return;
        }
    	fprintf(Logfd, "%s [%ld]: %s %s %s %s\n",
		LogTime(), (long)getpid(),
		msgid, message, stage? stage: "-", reason? reason: "-");
    	fflush(Logfd);
    }
}

#define ISWHITE(c)	((c)=='\n' || (c) == '\r' || (c) == ' ' || (c) == '\t')

/*
 * Extract a line from the dqueue file
 * Return 0 for success and -1 for failure
 */
int
extractFeedLine(const char *buf, char *relPath, char *msgId, off_t *poff, int *psize, int *pheadOnly, time_t *queuedTime, int *cSize)
{
    const char *s = buf;
    const char *b;
    int l1 = 0;
    int l2 = 0;

    if (*buf == '#')
	return(-1);

    *pheadOnly = 0;

    /*
     * RELPATH
     */
    for (b = s; *s && !ISWHITE(*s); ++s)
	;
    l1 = s - b;
    bcopy(b, relPath, l1);
    relPath[l1] = 0;

    while (ISWHITE(*s))
	++s;

    /*
     * MSGID
     */

    if (*s == '<') {
	for (b = s; *s && *s != '>'; ++s)
	    ;
	if (*s == '>') {
	    ++s;
	    l2 = s - b;
	    bcopy(b, msgId, l2);
	    msgId[l2] = 0;
	}
	while (ISWHITE(*s))
	    ++s;
    }

    /*
     * offset/size [flags]
     */

    b = s;
    if (isdigit((int)*s))
	sscanf(s, "%lld,%d", poff, psize);
    while ((s = strchr(s, ' ')) != NULL) {
	while (ISWHITE(*s))
	    ++s;
	switch (*s++) {
	    case 'D' :
		*queuedTime = atol(s);
		break;
	    case 'C' :
		*cSize = atol(s);
		break;
	    case 'H' :
		*pheadOnly = 1;
		break;
	}
    }

    if (l1 && l2 && ValidMsgId(msgId))
	return(0);
    return(-1);
}

const char *
dtstamp(void)
{
    struct timeval tv;
    static struct timeval stv;
    static char buf[64];
    int dt = 0;

    gettimeofday(&tv, NULL);
    if (stv.tv_sec) {
	dt = (tv.tv_sec - stv.tv_sec - 1) * 1000 + (tv.tv_usec + 1000000 - stv.tv_usec) / 1000;
    }
    stv = tv;
    sprintf(buf, "%4d.%03d ", dt / 1000, dt % 1000);
    return(buf);
}



syntax highlighted by Code2HTML, v. 0.9.1