/*
 * DFA.C      - Direct File Access to articles
 *
 * (c)Copyright 2001, Francois Petillon, All Rights Reserved.  Refer to
 *    the COPYRIGHT file in the base directory of this distribution 
 *    for specific rights granted.
 *
 */

#include "defs.h"

#if USE_AIO
    #if USE_LINUX_KAIO
      #include <linux/aio.h>
    #else
      #include <aio.h>
    #endif
#include <signal.h>
#endif

Prototype DirectFileAccess* NewDFA(Connection *conn, int lf, int offset, int size);
Prototype void DelDFA(Connection *conn);
Prototype void NNSendLocalArticle(Connection *conn);

int NNCopyLocalArticle(Connection *conn, int rs);
void DFADataWrite(ServReq *sreq, const void *data, int len);
void NNSLA_Loop(Connection *conn);

#if USE_AIO
Prototype void AIOBlockSignal(void);
Prototype void AIOUnblockSignal(void);

int AIOBlockedSignal = 0;

void AIORDataRead(Connection *conn);
void Sig_AIO(int signo, siginfo_t *info, void *context);

struct sigaction* sig_aio = NULL;

#define AIOSIG        SIGUSR1

#endif

/* We are using a fifo to store unused DFA structure */
DirectFileAccess* dfa_Trash = NULL;

/* Alloc dfa struct */
DirectFileAccess*
NewDFA(Connection *conn, int lf, int offset, int size)
{
    ServReq *sreq = conn->co_SReq;
    DirectFileAccess* el=NULL;

#if USE_AIO
    if (!sig_aio) {
      sig_aio = (struct sigaction*) malloc(sizeof(struct sigaction)) ;
      sigemptyset(&sig_aio->sa_mask);
      sigaddset(&sig_aio->sa_mask, AIOSIG);
      sig_aio->sa_flags = SA_SIGINFO;
      sig_aio->sa_sigaction = Sig_AIO;
      if (sigaction(AIOSIG, sig_aio, NULL)) {
          logit(LOG_ERR, "NewDFA : cannot sigaction");
          exit(1);
      }
      AIOBlockSignal();
    }
#endif

    if (lf==-1) {
      logit(LOG_ERR, "NewDFA : fd is -1");
      return NULL;
    }

    if (dfa_Trash) {
      el = dfa_Trash ;
      dfa_Trash = el->dfa_Next ;
    } else {
      el = (DirectFileAccess*) malloc(sizeof(DirectFileAccess));
#if USE_AIO
      el->dfa_AIOcb = (struct aiocb*) malloc(sizeof(struct aiocb)) ;
#endif
    }

    if (!el) return NULL ;
    /* check end of article */
    if (lseek(lf, offset+size-1, SEEK_SET)!=(offset+size-1)) {
      logit(LOG_ERR, "NewDFA : cannot seek to the end of article");
      el->dfa_Next = dfa_Trash;
      dfa_Trash = el;
      return NULL ;
    }

#if USE_AIO
    el->dfa_AIOcb->aio_fildes = lf ;
    el->dfa_AIOcb->aio_offset = offset ;
    el->dfa_AIOcb->aio_nbytes = 0 ;
    el->dfa_AIOcb->aio_reqprio = 0 ;
    el->dfa_AIOcb->aio_buf = el->dfa_Buffer ;
    el->dfa_AIOcb->aio_sigevent.sigev_notify = SIGEV_SIGNAL ;
    el->dfa_AIOcb->aio_sigevent.sigev_signo = AIOSIG;
    el->dfa_AIOcb->aio_sigevent.sigev_value.sival_ptr = conn ;
#else
    if (lseek(lf, offset, SEEK_SET)!=offset) { /* bring it back home */
      logit(LOG_ERR, "NewDFA : cannot seek to the beginning of article");
      el->dfa_Next = dfa_Trash;
      dfa_Trash = el;
      return NULL ;
    }
    el->dfa_Fd     = lf ;
#endif

    conn->co_DirectFA = el ;

    el->dfa_Size   = size ;
    el->dfa_LF     = 0 ;
    el->dfa_Next   = NULL ;

    switch(conn->co_SReq->sr_CConn->co_ArtMode) {
      case COM_BODY :
      case COM_BODYWVF :
      case COM_BODYNOSTAT :
          el->dfa_InHdr = JMPBDY ;
          if (sreq->sr_Cache) {
              AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0) ;
              fclose(sreq->sr_Cache) ;
              sreq->sr_Cache = NULL ;
          }
          break ;
      case COM_HEAD :
          if (sreq->sr_Cache) {
              AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0) ;
              fclose(sreq->sr_Cache) ;
              sreq->sr_Cache = NULL ;
          }
      default :
          el->dfa_InHdr = 0 ;
    }

    return el;
}

void
DelDFA(Connection *conn)
{
    DirectFileAccess* el=conn->co_DirectFA ;
    conn->co_DirectFA = NULL ;
#if USE_AIO
    if (el->dfa_AIOcb->aio_fildes!=-1) {
      close(el->dfa_AIOcb->aio_fildes) ;
      el->dfa_AIOcb->aio_fildes = -1 ;
    }
#else
    if (el->dfa_Fd!=-1) {
      close(el->dfa_Fd) ;
      el->dfa_Fd = -1 ;
    }
#endif
    el->dfa_Next = dfa_Trash;
    dfa_Trash = el;
}

#if USE_AIO
/* Get a signal, read data */
void Sig_AIO(int signo, siginfo_t *info, void *context)
{
    Connection *conn = (Connection*) info->si_value.sival_ptr ;
    DirectFileAccess *dfa = conn->co_DirectFA;
    struct aiocb *aio=dfa->dfa_AIOcb;
    const struct aiocb_t *const aiolist[1] = {aio};
    int ret;

    /* check IO */
    if ((ret = aio_error(aio))) {
      if (ret==EINPROGRESS) {
          logit(LOG_ERR, "SigAIOPoll : EINPROGRESS", errno);
          if (aio_suspend(aiolist,1,NULL)) {
              logit(LOG_ERR, "SigAIOPoll : aio_suspend failed (errno=%i)", errno);
              DelDFA(conn);
              NNServerTerminate(conn);
              return;
          } else {
              logit(LOG_NOTICE, "SigAIOPoll : aio_suspend on %i", aio->aio_fildes) ;
          }
      } else {
          logit(LOG_ERR, "SigAIOPoll : IO error %i", ret);
          DelDFA(conn);
          NNServerTerminate(conn);
      }
    }
    /* we can read the buffer */
    NNSLA_Loop(conn);
}

void
AIODataRead(Connection *conn)
{
    DirectFileAccess *dfa = conn->co_DirectFA;

    dfa->dfa_AIOcb->aio_nbytes = (dfa->dfa_Size>sizeof(dfa->dfa_Buffer)) ? sizeof(dfa->dfa_Buffer) : dfa->dfa_Size ;
    if (DebugOpt)
      printf("AIODataRead : fd %i sz %i of %i", dfa->dfa_AIOcb->aio_fildes, dfa->dfa_AIOcb->aio_nbytes, (int)dfa->dfa_AIOcb->aio_offset);

    if (aio_read(dfa->dfa_AIOcb)) {
      logit(LOG_ERR, "AIODataRead : error on reading (errno=%i)", errno);
      DelDFA(conn);
      NNServerTerminate(conn);
    }
}

void AIOBlockSignal(void)
{
    sigprocmask(SIG_BLOCK, &sig_aio->sa_mask, NULL) ;
}

void AIOUnblockSignal(void)
{
    sigprocmask(SIG_UNBLOCK, &sig_aio->sa_mask, NULL) ;
}

#endif

void
DFADataWrite(ServReq *sreq, const void *data, int len)
{
    if (sreq->sr_CConn)
      MBWrite(&sreq->sr_CConn->co_ArtBuf, data, len) ;
    if (sreq->sr_Cache)
      fwrite(data, 1, len, sreq->sr_Cache);
}

/* Reading buffer */
void
NNSLA_Loop(Connection *conn)
{
    ServReq *sreq = conn->co_SReq;
    DirectFileAccess *dfa = conn->co_DirectFA;
    int rs ;

    conn->co_Func = NNSendLocalArticle;
    conn->co_State = "getlclart";

#if USE_AIO
      rs = aio_return(dfa->dfa_AIOcb);        
#else
    do {
      int sizofbuf = sizeof(dfa->dfa_Buffer) ;
      rs = read(dfa->dfa_Fd, dfa->dfa_Buffer, (dfa->dfa_Size>sizofbuf) ? sizofbuf : dfa->dfa_Size) ;

      if (rs<0) {
          if (errno == EAGAIN) { /* retry later */
              if (DebugOpt)
                  printf("NNGetLocal2: retry later...") ;
              return;
          } else {
              /* ooops, real error */
              DelDFA(conn) ;
              NNServerTerminate(conn) ;
              return;
          }
      }
#endif

      switch (NNCopyLocalArticle(conn, rs)) {
          case BLOCK_OK :
              dfa->dfa_Size -= rs ;
              if (!dfa->dfa_Size) {
                  if (sreq->sr_CConn)
                          MBCopy(&sreq->sr_CConn->co_ArtBuf, &sreq->sr_CConn->co_TMBuf);
                  if (sreq->sr_Cache) {
                      if (fflush(sreq->sr_Cache) || ferror(sreq->sr_Cache))
                          AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0);
                      else
                          CommitCache(conn, 0);
                      fclose(sreq->sr_Cache) ;
                      sreq->sr_Cache = NULL ;
                  }
                  DelDFA(conn) ;
                  NNFinishSReq(conn, ".\r\n", 0) ;
                  return;
              }
              if (FastCopyOpt && sreq->sr_CConn) {
                  MBCopy(&sreq->sr_CConn->co_ArtBuf, &sreq->sr_CConn->co_TMBuf);
              }
              break ;
          case BLOCK_END :
              if (sreq->sr_CConn)
                      MBCopy(&sreq->sr_CConn->co_ArtBuf, &sreq->sr_CConn->co_TMBuf);
              if (sreq->sr_Cache) {
                  AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0);
                  fclose(sreq->sr_Cache) ;
                  sreq->sr_Cache = NULL ;
              }
              DelDFA(conn) ;
              NNFinishSReq(conn, ".\r\n", 0) ;
              return;
          case BLOCK_ERROR :
          default :
              DelDFA(conn);
              if (sreq->sr_Cache) {
                  AbortCache(fileno(sreq->sr_Cache), sreq->sr_MsgId, 0);
                  fclose(sreq->sr_Cache) ;
                  sreq->sr_Cache = NULL ;
              }
              NNServerTerminate(conn);
              return;
      }
#if USE_AIO
    dfa->dfa_AIOcb->aio_offset += rs;
    AIODataRead(conn);
#else
    } while (rs) ;
#endif
}

/* Reading buffer */
void
NNSendLocalArticle(Connection *conn)
{
#if USE_AIO
    /* let's start the pump */
    AIODataRead(conn);
#else
    /* blocking loop */
    NNSLA_Loop(conn);
#endif
}

/* Copying article... Return 1 when an error occurred */
int
NNCopyLocalArticle(Connection *conn, int rs)
{
    ServReq *sreq = conn->co_SReq;
    DirectFileAccess *dfa = conn->co_DirectFA;
    int sizofbuf = sizeof(dfa->dfa_Buffer) ;
    int i=0, ptr=0 ;
    char *vs;

    while (i < rs) {
      switch (dfa->dfa_InHdr) {
          case INBODY : /* body, just regular copy */
              /* Fast read, halt on end of buffer/line */
              while ( (dfa->dfa_Buffer[i] != '\n') && (dfa->dfa_Buffer[i] != '\r') && !dfa->dfa_LF && (i+1<rs) ) {
                  i++ ;
              }
              switch (dfa->dfa_Buffer[i]) {
                  case '\n' :
                      if ( dfa->dfa_LF != CR) { /* if I get a CRLF, I can delay the writing */
                          dfa->dfa_Buffer[i] = '\r' ;
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          dfa->dfa_Buffer[i] = '\n' ;
                          ptr = i ;
                      }
                      dfa->dfa_LF = CRLF ;
                      break ;
                  case '\r' :
                      dfa->dfa_LF = CR ;
                      break ;
                  case '.' :
                      if (dfa->dfa_LF == CRLF) { /* we have to dupplicate this '.' */
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          ptr = i ;
                      }
                  default :
                      dfa->dfa_LF = 0 ;
              }
              break ;
          case JMPBDY : /* COM_BODY, del headers */
              /* fast read, halt on end of buffer/line */
              if (dfa->dfa_LF == LF) {
                  switch (dfa->dfa_Buffer[i]) {
                      case '\n' :
                          dfa->dfa_InHdr = INBODY ;
                      case '\t' :
                          ptr = i + 1 ;
                          break ;
                  } 
              }
              while ( (dfa->dfa_Buffer[i]!='\n') && (i+1<rs)) i++ ;
              if (dfa->dfa_Buffer[i]=='\n') 
                  dfa->dfa_LF = LF ;
              else
                  dfa->dfa_LF = 0 ;
              ptr = i + 1 ;
              break ;
          case WRDDEL : /* header, drop current word & copy til the end of line */
              /* Fast read, halt on end of buffer/word */
              while ( (i<rs) && (dfa->dfa_Buffer[i]!=' ') && (dfa->dfa_Buffer[i]!='\t')
                      && (dfa->dfa_Buffer[i]!='\r') && (dfa->dfa_Buffer[i]!='\n') )
                  i++ ;
              ptr = i ;
              if (i==rs) { /* end of buffer... */
                  break ;
              }
              dfa->dfa_LF = 0 ;
          case MAX_HDR_CC : /* stop checking, just copy line */
              dfa->dfa_InHdr = HDRCPY ;
          case HDRCPY : /* header, no more check on this line */
              /* Fast read, halt on end of buffer/line */
              while ( (dfa->dfa_Buffer[i] != '\n') && (dfa->dfa_Buffer[i] != '\r') && !dfa->dfa_LF && (i+1<rs))
                  i++ ;
              switch (dfa->dfa_Buffer[i]) {
                  case '\n' :
                      if (dfa->dfa_LF != CR) { /* no need to copy when having a CRLF */
                          /* copying & adding a CRLF */
                          dfa->dfa_Buffer[i] = '\r' ;
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          dfa->dfa_Buffer[i] = '\n' ;
                          ptr = i ;
                      }
                      dfa->dfa_InHdr = 0 ; 
                      dfa->dfa_LF = HDRCPY ;
                      break ;
                  case '\r' :
                      dfa->dfa_LF = CR ;
                      break ;
                  default :
                      dfa->dfa_LF = 0 ;
                      break ;
              }
              break ;
          case HDRDEL : /* header, drop the rest of the current line */ /* unused */
              /* Fast read, halt on end of buffer/line */
              while ((dfa->dfa_Buffer[i] != '\n') && (i+1<rs)) i++ ;
              if (dfa->dfa_Buffer[i] == '\n') {
                  dfa->dfa_InHdr = 0 ;
                  dfa->dfa_LF = HDRDEL ;
              }
              ptr = i+1 ;
              break ;
          case 0 : /* 1st char of line */
              switch (dfa->dfa_Buffer[i]) {
                  case '\t' :
                  case ' ' : /* header continuation... */
                      if (dfa->dfa_LF < 0) {
                          dfa->dfa_InHdr = dfa->dfa_LF ;
                      } else { /* this should not happen : we should be in HDRCPY or HDRDEL */
                          logit(LOG_ERR, "NNGetLocal2: bad header continuation (%i)", dfa->dfa_LF) ;
                          return BLOCK_ERROR;
                      }
                      break ;
                  case '\r' :
                      /* Check missing headers, end of last buffer or HEAD mode */
                      if ( ((dfa->dfa_Flag&FLAG_NEEDED)==FLAG_NEEDED) || (i+1 == sizofbuf)
                           || (sreq->sr_CConn->co_ArtMode == COM_HEAD)) {
                          /* we "simply" forget this '\r' for laziness reasons */
                          if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;
                          ptr = i+1 ;
                          dfa->dfa_LF = 0 ;
                      } else {
                          dfa->dfa_LF = CR ;
                      }
                      /* dfa->dfa_InHdr = 0 ; */
                      break ;
                  case '\n' : /* fin des entetes */
                      if (!sreq->sr_CConn->co_Auth.dr_VServerDef->vs_NoReadPath 
                           && (vs=sreq->sr_CConn->co_Auth.dr_VServerDef->vs_ClusterName)
                           && ((dfa->dfa_Flag&FLAG_PATH) != FLAG_PATH)
                         ) { /* Missing Path */
                          if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;
                          ptr = i ;
                          DFADataWrite(sreq, "Path: ", 6) ;
                          DFADataWrite(sreq, vs, strlen(vs)) ;
                          DFADataWrite(sreq, "!not-for-mail\r\n", 15);
                          dfa->dfa_Flag = dfa->dfa_Flag | FLAG_PATH ;
                      }
                      if (!sreq->sr_CConn->co_Auth.dr_VServerDef->vs_NoXrefHostUpdate
                           &&(vs=sreq->sr_CConn->co_Auth.dr_VServerDef->vs_ClusterName)
                           && ((dfa->dfa_Flag&FLAG_XREF) != FLAG_XREF) 
                         ) { /* Missing Xref */
                          if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;
                          ptr = i ;
                          DFADataWrite(sreq, "XRef: ", 6) ;
                          DFADataWrite(sreq, vs, strlen(vs)) ;
                          DFADataWrite(sreq, " no-xref\r\n", 10) ;
                          dfa->dfa_Flag = dfa->dfa_Flag | FLAG_XREF ;
                      }

                      if (sreq->sr_CConn->co_ArtMode == COM_HEAD) { /* end of header in HEAD mode */
                          if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;
                          return BLOCK_END;
                      }
                      if ( dfa->dfa_LF != CR) { /* if I get a CRLF, I can delay the writings */
                          dfa->dfa_Buffer[i] = '\r' ;
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          dfa->dfa_Buffer[i] = '\n' ;
                          ptr = i ;
                      }
                      dfa->dfa_LF = CRLF ;
                      dfa->dfa_InHdr = INBODY ;
                      break ;
                  default :
                      dfa->dfa_LF = 0 ;
                      dfa->dfa_Field[(int)dfa->dfa_InHdr] = dfa->dfa_Buffer[i] ;
                      dfa->dfa_InHdr++ ;
                      break ;
              }
              break ;
          case 0x05 : /* 6th char of line */
              switch (dfa->dfa_Buffer[i]) {
                  case ' ' :
                      if (!sreq->sr_CConn->co_Auth.dr_VServerDef->vs_NoXrefHostUpdate
                           && (vs=sreq->sr_CConn->co_Auth.dr_VServerDef->vs_ClusterName)
                           && (strncasecmp(dfa->dfa_Field, "Xref:",5) == 0)
                         ) {
                          /* copying & adding a CRLF */
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          ptr = i + 1 ;
                          /* We just replace hostname in Xref, maybe we should
                           * replace the full line with overview content ? */
                          DFADataWrite(sreq, vs, strlen(vs)) ;
                          dfa->dfa_InHdr = WRDDEL ;
                          dfa->dfa_Flag = dfa->dfa_Flag | FLAG_XREF ;
                      } else if ( !sreq->sr_CConn->co_Auth.dr_VServerDef->vs_NoReadPath 
                                  && (vs=sreq->sr_CConn->co_Auth.dr_VServerDef->vs_ClusterName)
                                  && (strncasecmp(dfa->dfa_Field, "Path:",5) == 0)  
                                ) {  /* it is possible to "double" path if reader & spooler has 
                                      * the same name but it would be painfull to check the first
                                      * path host... */
                          DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr+1) ;
                          ptr = i + 1 ;
                          DFADataWrite(sreq, vs, strlen(vs)) ;
                          DFADataWrite(sreq, "!", 1) ;
                          dfa->dfa_InHdr = HDRCPY ;
                          dfa->dfa_Flag = dfa->dfa_Flag | FLAG_PATH ;
                      } else {
                          dfa->dfa_InHdr = HDRCPY ;
                      }
                      break ;
                  default : /* no more check... */
                      dfa->dfa_InHdr = HDRCPY ;
                      break ;
              }
              break ;
          default : /* Else, we are reading firsts chars on headers lines */
              if (dfa->dfa_InHdr<0) { /* ooops */
                  return BLOCK_ERROR;
              }
              switch (dfa->dfa_Buffer[i]) {
                  case ' ' : /* end of "keyword: ", thus go for a HDRCPY */
                      dfa->dfa_InHdr = HDRCPY ;
                      break ;
                  case '\n' :
                      if (dfa->dfa_LF != CR) { /* no need to copy when having a CRLF */
                          /* copying & adding a CRLF */
                          if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;
                          DFADataWrite(sreq, "\r\n", 2) ;
                          ptr = i + 1 ;
                      }
                      dfa->dfa_LF = HDRCPY ; /* in case this header continues on the following line */
                      dfa->dfa_InHdr = 0 ;
                      break ;
                  case '\r' :
                      dfa->dfa_LF = CR ;
                      dfa->dfa_InHdr = HDRCPY ;
                      break ;
                  default :
                      dfa->dfa_LF = 0 ;
                      dfa->dfa_Field[(int)dfa->dfa_InHdr] = dfa->dfa_Buffer[i] ;
                      dfa->dfa_InHdr++ ;
                      break ;
              }
              break ;
      }

      i++ ;
    }

    if (i-ptr) DFADataWrite(sreq, dfa->dfa_Buffer+ptr, i-ptr) ;

    return BLOCK_OK;
}



syntax highlighted by Code2HTML, v. 0.9.1