/*
* SERVER.C - /news/dserver.hosts management
*
* (c)Copyright 1998, Matthew Dillon, All Rights Reserved. Refer to
* the COPYRIGHT file in the base directory of this distribution
* for specific rights granted.
*/
#include "defs.h"
Prototype void CheckServerConfig(time_t t, int force);
Prototype void LogServerInfo(Connection *conn, int fd);
Prototype void NNArticleRetrieveByMessageId(Connection *conn, const char *msgid, int grouphint, int TimeRcvd, int grpIter, int endNo);
Prototype void NNServerIdle(Connection *conn);
Prototype void NNServerTerminate(Connection *conn);
Prototype void NNFinishSReq(Connection *conn, const char *ctl, int requeue);
Prototype void NNServerRequest(Connection *conn, const char *grp, const char *msgid, int req, int TimeRcvd, int grpIter, int endNo);
Prototype int ServersTerminated;
Prototype int NReadServers;
Prototype int NReadServAct;
Prototype int NWriteServers;
Prototype int NWriteServAct;
void NNServerStart(Connection *conn);
void QueueServerRequests(void);
int queueServerRequest(ServReq *sreq, int type, int maxq);
void queueDesc(ServReq *sreq, ForkDesc *desc);
int requeueServerRequest(Connection *conn, ServReq *sreq, int type, int maxq);
int AddServer(char *bindinfo, int txbufsize, int rxbufsize, const char *host, int type, int port, int flags, int pri, char *groups, const char *localspool, HashFeed *hashFeed, int cache, int cachemin, int cachemax, int cacheabletime, double rratio, double cratio, char *login, char *password);
void NNServerPrime1(Connection *conn);
void NNServerPrime2(Connection *conn);
void NNServerPrime3(Connection *conn);
void NNServerPrime4(Connection *conn);
void NNServerPrimeAuth1(Connection *conn);
void NNServerPrimeAuth2(Connection *conn);
void NNServerConnect(Connection *conn);
ServReq *SReadBase;
ServReq **PSRead = &SReadBase;
ServReq *SWriteBase;
ServReq **PSWrite = &SWriteBase;
int NReadServers;
int NReadServAct;
int NWriteServers;
int NWriteServAct;
int ServersTerminated;
/*
* CheckServerConfig() - determine if configuration file has changed and
* resynchronize with servers list.
*
* NOTE: a server cannot be destroyed until its current
* pending request completes, but no new requests will
* be queued to it during that time.
*/
void
setServerMaybeCloseFlag(ForkDesc *desc)
{
Connection *conn = desc->d_Data;
conn->co_Flags |= COF_MAYCLOSESRV;
if (conn->co_Func == NNServerIdle) /* wakeup idle server */
FD_SET(desc->d_Fd, &RFds);
}
void
setServerClosedRemovals(ForkDesc *desc)
{
Connection *conn;
if ((conn = desc->d_Data) != NULL) {
if (conn->co_Flags & COF_MAYCLOSESRV) {
NNServerTerminate(conn);
}
}
}
void
CheckServerConfig(time_t t, int force)
{
static struct stat St = { 0 };
struct stat st;
const char *path = PatLibExpand(DServerHostsPat);
/*
* Assuming we can stat the file, if we haven't checked the server config
* before or the server config time is different from when we last checked
* AND the server config time is not now (so we don't read the file while
* someone is writing it), then read the configuration file (again).
* I've also got some reverse-time-index protection in there.
*
* NOTE: path only valid until next Pat*Expand() call, so be careful
* with it's use.
*/
if (force || (stat(path, &st) == 0 &&
(St.st_mode == 0 || st.st_mtime != St.st_mtime) &&
((long)(t - st.st_mtime) > 2 || (long)(t - st.st_mtime) < 0))
) {
FILE *fi;
if (force)
stat(path, &st);
memcpy(&St, &st, sizeof(St));
if ((fi = fopen(path, "r")) != NULL) {
char buf[PATH_MAX];
ServersTerminated = 0;
ScanThreads(THREAD_SPOOL, setServerMaybeCloseFlag);
ScanThreads(THREAD_POST, setServerMaybeCloseFlag);
while (fgets(buf, sizeof(buf), fi) != NULL) {
char *host = strtok(buf, " \t\n");
char *flags = "";
char c;
int addAsServer = 0;
int addAsPoster = 0;
int port = 119;
int nflags = 0;
int npri = 0;
char *option;
char *bindinfo = NULL;
char *groups = NULL;
char *localspool = NULL;
HashFeed hash = { 0 };
int txbufsize = 0;
int rxbufsize = 0;
char *login = NULL;
char *password = NULL;
int cache = CACHE_OFF;
int cachemin = 0;
int cachemax = 0;
int cacheabletime = 0;
double rratio = 4;
double cratio = -1;
if (DOpts.ReaderCacheMode)
cache = CACHE_ON;
if (host == NULL || host[0] == '#')
continue;
if ((flags = strtok(NULL, " \t\n")) == NULL)
flags = "";
while ((c = *flags) != 0) {
ForkDesc *desc = NULL;
switch(c) {
case 'p': /* port */
port = strtol(flags + 1, NULL, 0);
if (port == 0)
port = 119;
break;
case 's': /* spool */
npri = strtol(flags + 1, NULL, 10);
if ((desc = FindThreadId(THREAD_SPOOL, host)) != NULL) {
Connection *conn = desc->d_Data;
conn->co_Flags &= ~COF_MAYCLOSESRV;
desc->d_Pri = npri;
} else {
addAsServer = 1;
}
break;
case 'M':
nflags |= COF_MODEREADER;
break;
case 'R':
nflags |= COF_READONLY;
break;
case 'o': /* outgoing (post) */
npri = strtol(flags + 1, NULL, 10);
if ((desc = FindThreadId(THREAD_POST, host)) != NULL) {
Connection *conn = desc->d_Data;
conn->co_Flags &= ~COF_MAYCLOSESRV;
desc->d_Pri = npri;
} else {
addAsPoster = 1;
}
break;
default:
/*
* ignore unknown flag or number
*/
break;
}
++flags;
}
while ((option = strtok(NULL, " \t\n")) != NULL) {
if (strncmp(option, "bind=", 5) == 0) {
bindinfo = option + 5;
} else if (strncmp(option, "txbufsize=", 10) == 0) {
txbufsize = strtol(option + 10, NULL, 0);
} else if (strncmp(option, "rxbufsize=", 10) == 0) {
rxbufsize = strtol(option + 10, NULL, 0);
} else if (strncmp(option, "groups=", 7) == 0) {
groups = option + 7;
} else if (strncmp(option, "localspool=", 11) == 0) {
localspool = option + 11;
} else if (strncmp(option, "hash=", 5) == 0) {
char *p;
option += 5;
if ((p = strchr(option, '-')) != NULL) {
hash.hf_Begin = strtol(option, NULL, 0);
hash.hf_End = strtol(++p, NULL, 0);
if ((p = strchr(option, '/')) != NULL)
hash.hf_Mod = strtol(++p, NULL, 0);
} else {
hash.hf_Begin = hash.hf_End = strtol(option, NULL, 0);
if ((p = strchr(option, '/')) != NULL)
hash.hf_Mod = strtol(++p, NULL, 0);
}
} else if (strncmp(option, "login=", 6) == 0) {
login = option + 6;
nflags |= COF_LOGIN;
} else if (strncmp(option, "password=", 9) == 0) {
password = option + 9;
} else if (strncmp(option, "cache=", 6) == 0) {
if (strncmp(option+6,"off", 3) == 0) {
cache = CACHE_OFF;
} else if (strncmp(option+6,"on", 2) == 0) {
if (DOpts.ReaderCacheMode) {
cache = CACHE_ON;
} else {
logit(LOG_ERR, "Ignoring cache option as diablo.conf cache option is disabled");
}
} else if (strncmp(option+6,"lazy", 4) == 0) {
if (DOpts.ReaderCacheMode) {
cache = CACHE_LAZY;
} else {
logit(LOG_ERR, "Ignoring cache option as diablo.conf cache option is disabled");
}
} else if (strncmp(option+6,"scoring(", 8) == 0) {
char *p;
option += 14;
if (strchr(option, ')') == NULL) {
logit(LOG_ERR, "Syntax error on scoring option");
} else {
if (*option == ',') {
rratio = -1;
cratio = strtod(++option, NULL);
} else if ((p = strchr(option, ',')) != NULL) {
rratio = strtod(option, NULL);
cratio = strtod(++p, NULL);
} else {
rratio = strtod(option, NULL);
cratio = -1;
}
if (DOpts.ReaderCacheMode) {
cache = CACHE_SCOREBOARD;
OpenCacheHits();
} else {
logit(LOG_ERR, "Ignoring cache option as diablo.conf cache option is disabled");
}
}
} else if (strncmp(option+6,"scoring", 7) == 0) {
if (DOpts.ReaderCacheMode) {
cache = CACHE_SCOREBOARD;
OpenCacheHits();
} else {
logit(LOG_ERR, "Ignoring cache option as diablo.conf cache option is disabled");
}
} else {
logit(LOG_ERR, "Unknown option '%s' - ignoring", option);
}
} else if (strncmp(option, "cachemin=", 9) == 0) {
cachemin = strtol(option + 9, NULL, 0);
} else if (strncmp(option, "cachemax=", 9) == 0) {
cachemax = strtol(option + 9, NULL, 0);
} else if (strncmp(option, "cacheabletime=", 14) == 0) {
cacheabletime = strtol(option + 14, NULL, 0);
} else {
logit(LOG_ERR, "Unknown option '%s' - ignoring", option);
}
}
if (addAsServer) {
if (AddServer(bindinfo, txbufsize, rxbufsize, host,
THREAD_SPOOL, port, nflags, npri,
groups, localspool, &hash, cache,
cachemin, cachemax, cacheabletime,
rratio, cratio, login, password) == 0) {
++NReadServers;
++NReadServAct;
} else {
++ServersTerminated;
}
}
if (addAsPoster) {
if (AddServer(bindinfo, txbufsize, rxbufsize, host,
THREAD_POST, port, nflags, npri,
groups, localspool, &hash, cache,
cachemin, cachemax, cacheabletime,
rratio, cratio, login, password) == 0) {
++NWriteServers;
++NWriteServAct;
} else {
++ServersTerminated;
}
}
}
fclose(fi);
ScanThreads(THREAD_SPOOL, setServerClosedRemovals);
ScanThreads(THREAD_POST, setServerClosedRemovals);
}
}
}
/*
* Log some spool server stats on an hourly basis
*/
void
LogServerInfo(Connection *conn, int fd)
{
time_t now = time(NULL);
if (!conn->co_LastServerLog)
conn->co_LastServerLog = now;
if (!conn->co_ServerByteCount ||
conn->co_LastServerLog + 300 > now)
return;
logit(LOG_INFO, "info server %s articles=%ld bytes=%ld req=%ld notfound=%ld err=%ld",
(conn->co_Desc->d_Id ? conn->co_Desc->d_Id : "UNKNOWN"),
conn->co_ServerArticleCount,
conn->co_ServerByteCount,
conn->co_ServerArticleRequestedCount,
conn->co_ServerArticleNotFoundErrorCount,
conn->co_ServerArticleMiscErrorCount
);
{
DnsRes dr;
dr.dr_ResultFlags = DR_SERVER_STATS;
dr.dr_ArtCount = conn->co_ServerArticleCount;
dr.dr_ByteCount = conn->co_ServerByteCount;
SendMsg(fd, -1, &dr);
}
conn->co_ServerByteCount = 0;
conn->co_ServerArticleCount = 0;
conn->co_ServerArticleRequestedCount = 0;
conn->co_ServerArticleNotFoundErrorCount = 0;
conn->co_ServerArticleMiscErrorCount = 0;
conn->co_LastServerLog = now;
}
GroupList *
makeGroupList(Connection *conn, char *groups)
{
GroupList *gr = NULL;
GroupList *grStart = NULL;
char *p;
if (groups == NULL)
return(NULL);
for (p = strtok(groups, ","); p != NULL; p = strtok(NULL, ",")) {
GroupList *g = zalloc(&conn->co_MemPool, sizeof(GroupList));
g->group = zallocStr(&conn->co_MemPool, p);
g->next = NULL;
if (gr != NULL)
gr->next = g;
gr = g;
if (grStart == NULL)
grStart = gr;
}
return(grStart);
}
int
AddServer(char *bindinfo, int txbufsize, int rxbufsize, const char *host, int type, int port, int flags, int pri, char *groups, const char *localspool, HashFeed *hashFeed, int cache, int cachemin, int cachemax, int cacheabletime, double rratio, double cratio, char *login, char *password)
{
ForkDesc *desc;
int fd;
struct sockaddr_in sin;
Connection *conn;
/*
* connect() to the host (use asynchronous connect())
*/
bzero(&sin, sizeof(sin));
{
struct hostent *he = gethostbyname(host);
if (he != NULL) {
sin.sin_family = he->h_addrtype;
memmove(&sin.sin_addr, he->h_addr, he->h_length);
} else if (strtol(host, NULL, 0) != 0) {
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = inet_addr(host);
} else {
logit(LOG_ERR, "hostname lookup failure: %s\n", host);
return(-1);
}
}
sin.sin_port = htons(port);
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
logit(LOG_ERR, "socket() call failed on host %s\n", host);
return(-1);
}
if (bindinfo != NULL) {
struct hostent *he;
struct sockaddr_in lsin;
bzero(&lsin, sizeof(lsin));
if ((he = gethostbyname(bindinfo)) != NULL) {
lsin.sin_addr = *(struct in_addr *)he->h_addr;
} else {
lsin.sin_addr.s_addr = inet_addr(bindinfo);
if (lsin.sin_addr.s_addr == INADDR_NONE) {
logit(LOG_ERR, "Unknown host for bindhost option: %s\n",
bindinfo);
return(-1);
}
}
lsin.sin_family = AF_INET;
lsin.sin_port = 0;
if (bind(fd, (struct sockaddr *) &lsin, sizeof(lsin)) < 0) {
logit(LOG_ERR, "failed to bind source address %s (%s)",
bindinfo, strerror(errno));
return(-1);
}
}
{
int on = 1;
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
if (txbufsize > 0)
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&txbufsize, sizeof(int));
if (rxbufsize > 0)
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rxbufsize, sizeof(int));
}
fcntl(fd, F_SETFL, O_NONBLOCK); /* asynchronous connect() */
errno = 0;
if (connect(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
if (errno != EINPROGRESS) {
close(fd);
logit(LOG_ERR, "connect() call failed on host %s:%d: %s\n",
host, port, strerror(errno));
return(-1);
}
}
/*
* add thread. Preset d_Count to LIMSIZE to prevent the server
* from being allocated for client requests until we know
* we have a good connection.
*/
desc = AddThread(host, fd, -1, type, -1, pri);
FD_SET(desc->d_Fd, &RFds);
conn = InitConnection(desc, NULL);
if (flags & COF_LOGIN) {
if (password) {
snprintf(conn->co_Auth.dr_AuthUser, sizeof(conn->co_Auth.dr_AuthUser), "%s", login);
snprintf(conn->co_Auth.dr_AuthPass, sizeof(conn->co_Auth.dr_AuthPass), "%s", password);
} else {
logit(LOG_ERR, "no password specified with login: %s\n", login);
flags &= ~COF_LOGIN;
}
}
conn->co_Flags |= flags;
conn->co_Flags |= COF_INPROGRESS | COF_ININIT;
desc->d_Count = THREAD_LIMSIZE;
conn->co_Auth.dr_ReaderDef = NULL;
conn->co_ListCacheGroups = makeGroupList(conn, groups);
bcopy(hashFeed, &conn->co_RequestHash, sizeof(HashFeed));
if (localspool != NULL)
desc->d_LocalSpool = zallocStr(&conn->co_MemPool, localspool);
desc->d_Cache = cache;
desc->d_CacheMin = cachemin;
desc->d_CacheMax = cachemax;
desc->d_CacheableTime = cacheabletime;
desc->d_ReadNewRatio = rratio;
desc->d_CacheReadRatio = cratio;
NNServerConnect(conn);
if (DebugOpt)
printf("Added Server (type=%d fd=%d pid=%d)\n", type, fd, (int)getpid());
return(0);
}
/*
* QUEUESERVERREADREQUEST() - move requests to the appropriate server.
*
* New article fetch and post requests are placed on the SRead/SWrite
* lists. This function moves as many of those requests as possible to
* actual spool & post server queues.
*
* Even though a server can only handle one request at a time, we
* allow up to N requests (THREAD_QSIZE/PSIZE) to be queued to an
* actual server in order to judge load. If the load exceeds the
* queue limit, FindLeastUsedThread will start queueing to higher priority
* servers.
*/
void
QueueServerRequests(void)
{
ServReq *sreq;
while ((sreq = SReadBase) != NULL) {
if ((SReadBase = sreq->sr_Next) == NULL)
PSRead = &SReadBase;
sreq->sr_Next = NULL;
if (queueServerRequest(sreq, THREAD_SPOOL, THREAD_QSIZE) < 0) {
*PSRead = sreq;
PSRead = &sreq->sr_Next;
break;
}
}
while ((sreq = SWriteBase) != NULL) {
if ((SWriteBase = sreq->sr_Next) == NULL)
PSWrite = &SWriteBase;
sreq->sr_Next = NULL;
if (queueServerRequest(sreq, THREAD_POST, THREAD_PSIZE) < 0) {
*PSWrite = sreq;
PSWrite = &sreq->sr_Next;
break;
}
}
}
int
cbQueueReqFindThread(void *cbData, void *data)
{
int result;
ServReq *sreq = cbData;
Connection *conn = data;
if (conn->co_RequestHash.hf_Mod != 0 &&
!HashFeedMatch(&conn->co_RequestHash, quickhash(sreq->sr_MsgId)))
return(0);
if (sreq->sr_Group == NULL || conn->co_ListCacheGroups == NULL)
result = 1;
else
result = GroupFindWild(sreq->sr_Group, conn->co_ListCacheGroups);
return(result);
}
/*
* queueServerRequest() - find best server to queue request to, return 0
* on success, -1 if the request could not be queued.
*
* On success, the request will have been properly
* queued.
*/
int
queueServerRequest(ServReq *sreq, int type, int maxq)
{
ForkDesc *desc;
static int Randex1 = -1;
desc = FindLeastUsedThread(type, maxq, 0, &Randex1, -1,
cbQueueReqFindThread, sreq);
if (desc != NULL) {
sreq->sr_NoPass = desc->d_Fd;
queueDesc(sreq, desc);
return(0);
}
logit(LOG_ERR, "Unable to find any spools (or spool threads too busy) for request %s ", sreq->sr_MsgId);
return(-1);
}
void
queueDesc(ServReq *sreq, ForkDesc *desc)
{
Connection *conn = desc->d_Data;
ServReq **psreq = &conn->co_SReq;
while (*psreq)
psreq = &(*psreq)->sr_Next;
*psreq = sreq;
sreq->sr_SConn = conn;
++desc->d_Count;
if (desc->d_Type == THREAD_SPOOL)
++NReadServAct;
else
++NWriteServAct;
sreq->sr_Rolodex = desc->d_Fd;
if (DebugOpt)
printf("Request %s queued to fd %d\n", sreq->sr_MsgId, desc->d_Fd);
/*
* If server was idle, kick it.
*/
if (psreq == &conn->co_SReq)
NNServerIdle(conn);
}
/*
* requeueServerRequest() - if request failed with previous server,
* requeue for next server.
*
* This function requeues a request to another server given the previous
* server. It will attempt to send the request to all servers at the
* current priority and then will go to the next priority level, and
* so on, return -1 if the request could not be queued.
*
* When requeueing a request, queue limits are relaxed in order to
* ensure that the request does not skip to higher priority queues
* due to high load.
*/
int
requeueServerRequest(Connection *conn, ServReq *sreq, int type, int maxq)
{
static int Randex2;
ForkDesc *desc;
/*
* Find the next server
*/
desc = FindLeastUsedThread(
type,
maxq * 2,
conn->co_Desc->d_Pri,
&sreq->sr_Rolodex,
sreq->sr_NoPass,
cbQueueReqFindThread,
sreq
);
if (desc == NULL) {
desc = FindLeastUsedThread(
type,
maxq * 2,
conn->co_Desc->d_Pri + 1,
&Randex2,
-1,
cbQueueReqFindThread,
sreq
);
if (desc) {
sreq->sr_NoPass = desc->d_Fd;
if (DebugOpt)
printf("Requeue %s to nextpri %d\n", sreq->sr_MsgId, desc->d_Pri);
} else {
if (DebugOpt)
printf("Requeue %s failed\n", sreq->sr_MsgId);
}
} else {
if (DebugOpt)
printf("Requeue %s to priority %d\n", sreq->sr_MsgId, desc->d_Pri);
}
if (desc) {
queueDesc(sreq, desc);
return(0);
}
return(-1);
}
/*
* FreeSReq()
*/
void
FreeSReq(ServReq *sreq)
{
/*
* If cache file write in progress, abort it. allow the
* fclose() to release the lock after the truncation.
*
* We NULL the FILE * out even though we free the structure
* so potential memory corruption doesn't mess with random
* (future) files.
*/
if (sreq->sr_Cache != NULL) {
fflush(sreq->sr_Cache);
AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0);
fclose(sreq->sr_Cache);
sreq->sr_Cache = NULL;
}
zfreeStr(&SysMemPool, &sreq->sr_Group);
zfreeStr(&SysMemPool, &sreq->sr_MsgId);
zfree(&SysMemPool, sreq, sizeof(ServReq));
}
/*
* NNServerRequest()
*
* NOTE: don't get confused by co_SReq, it serves two functions. It
* placeholds a single request from a client in client Connection
* structures, which this call handles, and placeholds MULTIPLE client
* requests in server Connection structures.
*/
void
NNServerRequest(Connection *conn, const char *grp, const char *msgid, int req, int TimeRcvd, int grpIter, int endNo)
{
ServReq *sreq = zalloc(&SysMemPool, sizeof(ServReq));
sreq->sr_CConn = conn;
sreq->sr_SConn = NULL; /* for clarity: not assigned to server yet */
sreq->sr_Time = time(NULL);
sreq->sr_Group = grp ? zallocStr(&SysMemPool, grp) : NULL;
sreq->sr_MsgId = msgid ? zallocStr(&SysMemPool, msgid) : NULL;
sreq->sr_Cache = NULL;
sreq->sr_TimeRcvd = TimeRcvd;
sreq->sr_GrpIter = grpIter;
sreq->sr_endNo = endNo;
conn->co_SReq = sreq; /* client has active sreq */
FD_CLR(conn->co_Desc->d_Fd, &RFds);
if (req == SREQ_RETRIEVE) {
*PSRead = sreq;
PSRead = &sreq->sr_Next;
} else if (req == SREQ_POST) {
*PSWrite = sreq;
PSWrite = &sreq->sr_Next;
}
QueueServerRequests();
}
/*
* NNFinishSReq() - finish up an SReq, but requeue to a new server if requested
* (i.e. article not found on old server).
*/
void
NNFinishSReq(Connection *conn, const char *ctl, int requeue)
{
ServReq *sreq;
if ((sreq = conn->co_SReq)) {
/*
* dequeue request from server side
*/
conn->co_SReq = sreq->sr_Next;
sreq->sr_Next = NULL;
if (conn->co_Desc) {
--conn->co_Desc->d_Count;
if (conn->co_Desc->d_Type == THREAD_POST)
--NWriteServAct;
else
--NReadServAct;
}
conn->co_Flags &= ~COF_INPROGRESS;
/*
* if requeue requested, attempt to requeue. requeueServerRequest
* returns 0 on success, -1 on failure, so we have to reverse the
* sense of the return value.
*/
if (requeue) {
if (conn->co_Desc->d_Type == THREAD_POST)
requeue= requeueServerRequest(conn, sreq, THREAD_POST, THREAD_PSIZE);
else
requeue= requeueServerRequest(conn, sreq, THREAD_SPOOL,THREAD_QSIZE);
requeue = !requeue;
}
/*
* If no requeue, closeout the request.
*/
if (requeue == 0) {
/*
* can be NULL if client was terminated
*/
if (sreq->sr_CConn) {
if (ctl) {
if (ctl[1] == '4' || ctl[0] == '4') {
/*
* x4x codes are POST results of some sort -
* this result only appears here via NNPostResponseX.
* we probably want to log this response to the
* client as it may be an informative error message
* unlike the average spool server transaction
* which we probably don't care about.
*
* 4xx codes may be spool failures - these should
* also be of interest
*/
MBLogPrintf(sreq->sr_CConn, &sreq->sr_CConn->co_TMBuf, "%s", ctl);
} else {
MBWrite(&sreq->sr_CConn->co_TMBuf, ctl, strlen(ctl));
}
}
/*
* set FCounter to 1 to prevent further recursion, which might
* feed back and screw up the state machine for this
* connection.
*/
sreq->sr_CConn->co_FCounter = 1;
sreq->sr_CConn->co_SReq = NULL;
NNCommand(sreq->sr_CConn);
}
FreeSReq(sreq);
}
}
/*
* XXX what if NNCommand does something which hits a server which
* kills the server ? boom, conn will bad illegal. XXX
*/
NNServerIdle(conn);
}
/*
* we have to send a garbage command to prevent INN's nnrpd from timing
* out in 60 seconds upon initial connect.
*/
void
NNServerPrime1(Connection *conn)
{
if (conn->co_Flags & COF_LOGIN) {
MBPrintf(&conn->co_TMBuf, "authinfo user %s\r\n", conn->co_Auth.dr_AuthUser);
NNServerPrimeAuth1(conn);
} else if (conn->co_Flags & COF_MODEREADER) {
MBPrintf(&conn->co_TMBuf, "mode reader\r\n");
NNServerPrime2(conn);
} else if (conn->co_Flags & COF_READONLY) {
MBPrintf(&conn->co_TMBuf, "mode readonly\r\n");
NNServerPrime4(conn);
} else {
MBPrintf(&conn->co_TMBuf, "mode thisbetterfail\r\n");
NNServerPrime3(conn);
}
}
void
NNServerPrime2(Connection *conn)
{
int len;
char *ptr;
conn->co_Func = NNServerPrime2;
conn->co_State = "prime2";
if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) {
int code = strtol(ptr, NULL, 10);
if (code >= 200 && code <= 299) {
logit(LOG_INFO, "mode-reader(%s) %s", conn->co_Desc->d_Id, ptr);
NNServerStart(conn);
} else {
logit(LOG_ERR, "mode-reader(%s) failed: %s", conn->co_Desc->d_Id, ptr);
NNServerTerminate(conn);
}
}
}
void
NNServerPrime3(Connection *conn)
{
int len;
char *buf;
conn->co_Func = NNServerPrime3;
conn->co_State = "prime2";
/*
* any return code or EOF. 0 means we haven't gotten the
* return code yet.
*/
if ((len = MBReadLine(&conn->co_RMBuf, &buf)) != 0) {
NNServerStart(conn);
}
}
void
NNServerPrime4(Connection *conn)
{
int len;
char *ptr;
conn->co_Func = NNServerPrime4;
conn->co_State = "prime4";
if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) {
int code = strtol(ptr, NULL, 10);
if (code >= 200 && code <= 299)
logit(LOG_INFO, "mode-readonly(%s) %s", conn->co_Desc->d_Id, ptr);
else
logit(LOG_ERR, "mode-readonly(%s) failed: %s", conn->co_Desc->d_Id, ptr);
NNServerStart(conn);
}
}
void
NNServerPrimeAuth1(Connection *conn)
{
int len;
char *ptr;
conn->co_Func = NNServerPrimeAuth1;
conn->co_State = "primeauth1";
if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) {
int code = strtol(ptr, NULL, 10);
if (code >= 300 && code <= 399) {
MBPrintf(&conn->co_TMBuf, "authinfo pass %s\r\n", conn->co_Auth.dr_AuthPass);
NNServerPrimeAuth2(conn);
} else {
logit(LOG_ERR, "authinfo-user(%s) failed: %s", conn->co_Desc->d_Id, ptr);
NNServerTerminate(conn);
}
}
}
void
NNServerPrimeAuth2(Connection *conn)
{
int len;
char *ptr;
conn->co_Func = NNServerPrimeAuth2;
conn->co_State = "primeauth2";
if ((len = MBReadLine(&conn->co_RMBuf, &ptr)) != 0) {
int code = strtol(ptr, NULL, 10);
if (code >= 200 && code <= 299) {
logit(LOG_INFO, "authinfo-pass(%s) %s", conn->co_Desc->d_Id, ptr);
NNServerStart(conn);
} else {
logit(LOG_ERR, "authinfo-pass(%s) failed: %s", conn->co_Desc->d_Id, ptr);
NNServerTerminate(conn);
}
}
}
/*
* NNSERVERSTART() - start normal server operations. Clean up from connect
* code, make server available to clients.
*/
void
NNServerStart(Connection *conn)
{
conn->co_Desc->d_Count = 0;
conn->co_Flags &= ~(COF_INPROGRESS|COF_ININIT);
if (conn->co_Desc->d_Type == THREAD_SPOOL)
--NReadServAct;
else
--NWriteServAct;
NNServerIdle(conn);
}
/*
* NNSERVERIDLE() - server idle, wait for EOF or start next queued request.
* If no more queued requests, try to requeue from unqueued
* requests.
*/
void
NNServerIdle(Connection *conn)
{
conn->co_Func = NNServerIdle;
conn->co_State = "sidle";
/*
* Ooops, not really idle. This can happen due to the recursive
* nature of much of the code.
*/
if ((conn->co_Flags & (COF_INPROGRESS|COF_CLOSESERVER)) == COF_INPROGRESS)
return;
/*
* Check for an unexpected condition on server, i.e. data or
* EOF on the input where we didn't expect any.
*/
if ((conn->co_Flags & COF_CLOSESERVER) == 0) {
int len;
char *buf;
if ((len = MBReadLine(&conn->co_RMBuf, &buf)) != 0) {
if (len > 1) {
buf[len - 2] = 0;
logit(LOG_ERR, "Server closed connection: %s: %s",
conn->co_Desc->d_Id, buf);
} else {
logit(LOG_ERR, "Server closed connection: %s",
conn->co_Desc->d_Id);
}
conn->co_Flags |= COF_CLOSESERVER;
}
}
if (conn->co_Flags & COF_CLOSESERVER) {
conn->co_Desc->d_Count = THREAD_LIMSIZE;
NNServerTerminate(conn);
} else if (conn->co_SReq) {
conn->co_Flags |= COF_INPROGRESS;
if (conn->co_Desc->d_Type == THREAD_POST)
NNPostCommand1(conn);
else
NNSpoolCommand1(conn);
} else {
QueueServerRequests();
}
}
/*
* NNServerTerminate() - terminate a server connection. Usually occurs
* if the server goes down or the related process
* on the server is killed. Any client request
* queued to the server is requeued to another
* server.
*/
void
NNServerTerminate(Connection *conn)
{
ServReq *sreq;
while ((sreq = conn->co_SReq) != NULL) {
conn->co_SReq = sreq->sr_Next;
--conn->co_Desc->d_Count;
sreq->sr_Next = NULL;
sreq->sr_SConn = NULL;
sreq->sr_Time = time(NULL);
if (conn->co_Desc->d_Type == THREAD_POST) {
*PSWrite = sreq;
PSWrite = &sreq->sr_Next;
} else {
*PSRead = sreq;
PSRead = &sreq->sr_Next;
}
if (conn->co_Desc->d_Type == THREAD_POST)
--NWriteServAct;
else
--NReadServAct;
}
/*
* If we bumped the active count for the duration of the startup and
* terminated prior to the startup completing, we have to fix it here.
*/
if (conn->co_Flags & COF_ININIT) {
conn->co_Flags &= ~COF_ININIT;
if (conn->co_Desc->d_Type == THREAD_POST)
--NWriteServAct;
else
--NReadServAct;
}
/*
* closeup the server. If d_Count is non-zero we have
* to cleanup our reference counts, but then we set d_Count
* to ensure nothing else gets queued to the server
* between calling NNTerminate() and the actual termination.
*/
conn->co_Flags |= COF_CLOSESERVER;
if (conn->co_Desc->d_Type == THREAD_POST)
--NWriteServers;
else
--NReadServers;
++ServersTerminated; /* flag for rescan/reopen */
conn->co_Desc->d_Count = THREAD_LIMSIZE;
conn->co_Flags |= COF_INPROGRESS;
QueueServerRequests(); /* try to requeue requests */
NNTerminate(conn);
}
/*
* NNSERVERCONNECT() - initial connection, get startup message (co_UCounter
* starts out 1, we do not clear it and make the server
* available until we get the startup message)
*/
void
NNServerConnect(Connection *conn)
{
char *buf;
char *ptr;
int len;
int code;
conn->co_Func = NNServerConnect;
conn->co_State = "sconn";
if ((len = MBReadLine(&conn->co_RMBuf, &buf)) <= 0) {
if (len < 0) {
logit(LOG_ERR, "connect(%s) failed", conn->co_Desc->d_Id);
NNServerTerminate(conn);
}
if ( (conn->co_SessionStartTime + 60 ) < CurTime.tv_sec) {
logit(LOG_ERR, "connect(%s) timed out", conn->co_Desc->d_Id);
NNServerTerminate(conn);
}
return;
}
ptr = buf;
code = strtol(ptr, NULL, 10);
if (code == 200) {
logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf);
} else if (code == 201) {
if (conn->co_Desc->d_Type == THREAD_POST)
logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf);
else
logit(LOG_INFO, "connect(%s) %s", conn->co_Desc->d_Id, buf);
} else {
logit(LOG_ERR, "connect(%s) unrecognized banner: %s", conn->co_Desc->d_Id, buf);
NNServerTerminate(conn);
return;
}
NNServerPrime1(conn);
}
/*
* NNARTICLERETRIEVEBYMESSAGEID() - retrieve article by message-id
*
* Retrieve an article by its message id and write the article
* to the specified connection.
*
* (a) attempt to fetch the article from cache (positive or negative hit)
*
* (b) initiate the state machine to attempt to fetch the article from a
* remote server.
*
* (c) on remote server fetch completion, cache the article locally
*
* (d) place article in transmission buffer for connection, transmitting
* it, then return to the command state.
*
* COM_ARTICLEWVF retrieve article from remote by message-id but retrieve
* headers by co_ArtNo.
*
* COM_ARTICLE retrieve entire article from remote by message-id
*
* COM_...
*/
void
NNArticleRetrieveByMessageId(Connection *conn, const char *msgid, int grouphint, int TimeRcvd, int grpIter, int endNo)
{
/*
* (a) retrieve from cache if caching is enabled
*/
if (DOpts.ReaderCacheMode) {
int valid;
int size;
int cfd;
valid = OpenCache(msgid, &cfd, &size);
if (valid > 0) {
/*
* good cache
*/
const char *map;
if (DebugOpt)
printf("good cache\n");
if ((map = xmap(NULL, size, PROT_READ, MAP_SHARED, cfd, 0)) != NULL) {
xadvise(map, size, XADV_WILLNEED);
if (conn->co_ArtMode != COM_BODYNOSTAT) {
MBLogPrintf(conn, &conn->co_TMBuf, "%03d %d %s %s\r\n",
GoodRC(conn),
((conn->co_ArtMode==COM_ARTICLEWVF)?conn->co_ArtNo:0),
msgid,
GoodResId(conn)
);
}
if (conn->co_ArtMode != COM_STAT) {
DumpArticleFromCache(conn, map, size, grpIter, endNo);
MBPrintf(&conn->co_TMBuf, ".\r\n");
}
xunmap((void *)map, size);
} else {
if (conn->co_ArtMode == COM_BODYNOSTAT)
MBLogPrintf(conn, &conn->co_TMBuf, "(article not available)\r\n.\r\n");
else if (conn->co_RequestFlags == ARTFETCH_ARTNO)
MBLogPrintf(conn, &conn->co_TMBuf, "423 No such article number in this group\r\n");
else
MBLogPrintf(conn, &conn->co_TMBuf, "430 No such article\r\n");
}
close(cfd);
NNCommand(conn);
return;
} else if (valid < 0) {
/*
* negatively cached
*/
if (DebugOpt)
printf("neg cache\n");
if (conn->co_ArtMode == COM_BODYNOSTAT)
MBLogPrintf(conn, &conn->co_TMBuf, "(article not available)\r\n.\r\n");
else if (conn->co_RequestFlags == ARTFETCH_ARTNO)
MBLogPrintf(conn, &conn->co_TMBuf, "423 No such article number in this group\r\n");
else
MBLogPrintf(conn, &conn->co_TMBuf, "430 No such article\r\n");
NNCommand(conn);
return;
}
}
/*
* (b)(c)(d)
*/
if (DebugOpt)
printf("bad cache\n");
NNServerRequest(conn, grouphint ? conn->co_GroupName : NULL, msgid, SREQ_RETRIEVE, TimeRcvd, grpIter, endNo);
NNWaitThread(conn);
}
syntax highlighted by Code2HTML, v. 0.9.1