/*
 * sockagg.cxx
 *
 * Generalised Socket Aggregation functions
 *
 * Portable Windows Library
 *
 * Copyright (C) 2005 Post Increment
 *
 * The contents of this file are subject to the Mozilla Public License
 * Version 1.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * The Original Code is Portable Windows Library.
 *
 * The Initial Developer of the Original Code is Post Increment
 *
 * Portions of this code were written with the financial assistance of 
 * Metreos Corporation (http://www.metros.com).
 *
 * Contributor(s): ______________________________________.
 *
 * $Log: sockagg.cxx,v $
 * Revision 1.6  2006/01/05 11:39:32  rjongbloed
 * Fixed DevStudio warning
 *
 * Revision 1.5  2006/01/03 04:23:32  csoutheren
 * Fixed Unix implementation
 *
 * Revision 1.4  2005/12/23 07:49:27  csoutheren
 * Fixed Unix implementation
 *
 * Revision 1.3  2005/12/23 06:44:31  csoutheren
 * Working implementation
 *
 * Revision 1.2  2005/12/22 07:27:36  csoutheren
 * More implementation
 *
 * Revision 1.1  2005/12/22 03:55:52  csoutheren
 * Added initial version of socket aggregation classes
 *
 */

#ifdef __GNUC__
#pragma implementation "sockagg.h"
#endif


#include <ptlib.h>
#include <ptclib/sockagg.h>

#include <fcntl.h>

////////////////////////////////////////////////////////////////

#if _WIN32

class LocalEvent : public PHandleAggregator::EventBase
{
  public:
    LocalEvent()
    { 
      event = CreateEvent(NULL, TRUE, FALSE,NULL); 
      PAssert(event != NULL, "CreateEvent failed");
    }

    ~LocalEvent()
    { CloseHandle(event); }

    PAggregatorFD::FD GetHandle()
    { return event; }

    void Set()
    { SetEvent(event);  }

    void Reset()
    { ResetEvent(event); }

  protected:
    HANDLE event;
};

PAggregatorFD::PAggregatorFD(SOCKET v)
  : socket(v) 
{ 
  fd = WSACreateEvent(); 
  PAssert(WSAEventSelect(socket, fd, FD_READ | FD_CLOSE) == 0, "WSAEventSelect failed"); 
}

PAggregatorFD::~PAggregatorFD()
{ 
  WSACloseEvent(fd); 
}

bool PAggregatorFD::IsValid()
{ 
  return socket != INVALID_SOCKET; 
}

#else // #if _WIN32

class LocalEvent : public PHandleAggregator::EventBase
{
  public:
    LocalEvent()
    { ::pipe(fds); }

    virtual ~LocalEvent()
    {
      close(fds[0]);
      close(fds[1]);
    }

    PAggregatorFD::FD GetHandle()
    { return fds[0]; }

    void Set()
    { char ch; write(fds[1], &ch, 1); }

    void Reset()
    { char ch; read(fds[0], &ch, 1); }

  protected:
    int fds[2];
};

PAggregatorFD::PAggregatorFD(int v)
  : fd(v) 
{ 
}

PAggregatorFD::~PAggregatorFD()
{
}

bool PAggregatorFD::IsValid()
{ 
  return fd >= 0; 
}

#endif // #endif _WIN32
  

////////////////////////////////////////////////////////////////

class WorkerThread : public PHandleAggregator::WorkerThreadBase
{
  public:
    WorkerThread()
      : WorkerThreadBase(localEvent)
    { }

    void Trigger()
    { localEvent.Set(); }

    LocalEvent localEvent;
};


////////////////////////////////////////////////////////////////

PHandleAggregator::PHandleAggregator(unsigned _max)
  : maxWorkerSize(_max), minWorkerSize(1)
{ 
}

BOOL PHandleAggregator::AddHandle(PAggregatedHandle * handle)
{
  PTRACE(4, "Adding handle to socket aggregator thread");

  // perform the handle init function
  if (!handle->Init())
    return FALSE;

  PWaitAndSignal m(mutex);

  // look for a worker thread to use that has less than the maximum number of handles
  WorkerList_t::iterator r;
  for (r = workers.begin(); r != workers.end(); ++r) {
    WorkerThreadBase & worker = **r;
    PWaitAndSignal m2(worker.mutex);
    if (worker.handleList.size() < maxWorkerSize) {
      worker.handleList.push_back(handle);
      worker.listChanged = TRUE;
      worker.Trigger();
      return TRUE;
    }
  }

  // no worker threads usable, create a new one
  //cout << "New worker thread created" << endl;
  WorkerThread * worker = new WorkerThread;
  worker->handleList.push_back(handle);
  worker->Resume();
  workers.push_back(worker);

  return TRUE;
}

BOOL PHandleAggregator::RemoveHandle(PAggregatedHandle * handle)
{
  PTRACE(4, "Removing handles from socket aggregator thread");

  PWaitAndSignal m(mutex);

  // look for the thread containing the handle we need to delete
  WorkerList_t::iterator r;
  for (r = workers.begin(); r != workers.end(); ++r) {
    WorkerThreadBase * worker = *r;

    // lock the worker
    worker->mutex.Wait();

    HandleContextList_t & hList = worker->handleList;

    // if handle is not in this thread, then continue searching
    HandleContextList_t::iterator s = find(hList.begin(), hList.end(), handle);
    if (s == worker->handleList.end()) {
      worker->mutex.Signal();
      continue;
    }

    // remove the handle from the worker's list of handled
    worker->handleList.erase(s);

    // if the worker thread has enough handles to keep running, trigger it to update
    if (worker->handleList.size() >= minWorkerSize) {
      worker->listChanged = TRUE;
      worker->Trigger();
      worker->mutex.Signal();
      return TRUE;
    }

    // remove the worker thread from the list of workers
    workers.erase(r);

    // add it's handles to other threads
    {
      HandleContextList_t::iterator t;
      for (t = hList.begin(); t != hList.end(); ++t)
        AddHandle(*t);
    }

    // trigger and unlock the worker
    worker->Trigger();
    worker->mutex.Signal();

    // the worker is now finished
    worker->WaitForTermination();
    delete worker;

    return TRUE;
  }

  return FALSE;
}

////////////////////////////////////////////////////////////////

typedef std::vector<PAggregatorFD::FD> fdList_t;
typedef std::vector<PAggregatorFD * > aggregatorFdList_t;
typedef std::map<PAggregatorFD::FD, PAggregatedHandle *> aggregatorFdToHandleMap_t;

#ifdef _WIN32
#define	FDLIST_SIZE	WSA_MAXIMUM_WAIT_EVENTS
#else
#define	FDLIST_SIZE	64
#endif

void PHandleAggregator::WorkerThreadBase::Main()
{
  PTRACE(4, "Socket aggregator thread started");

  fdList_t                  fdList;
  aggregatorFdList_t        aggregatorFdList;
  aggregatorFdToHandleMap_t aggregatorFdToHandleMap;

  for (;;) {

    // create the list of fds to wait on and find minimum timeout
    PTimeInterval timeout(PMaxTimeInterval);
    PAggregatedHandle * timeoutHandle = NULL;
    HandleContextList_t handlesToRemove;

#ifndef _WIN32
    fd_set rfds;
    FD_ZERO(&rfds);
    int maxFd = 0;
#endif

    {
      PWaitAndSignal m(mutex);

      // if no handles, then thread is no longer needed
      if (handleList.size() == 0)
        break;

      // if the list of handles has changed, clear the list of handles
      if (listChanged) {
        aggregatorFdList.erase       (aggregatorFdList.begin(),      aggregatorFdList.end());
        aggregatorFdList.reserve     (FDLIST_SIZE);
        fdList.erase                 (fdList.begin(),                fdList.end());
        fdList.reserve               (FDLIST_SIZE);
        aggregatorFdToHandleMap.erase(aggregatorFdToHandleMap.begin(),         aggregatorFdToHandleMap.end());
      }

      HandleContextList_t::iterator r;
      for (r = handleList.begin(); r != handleList.end(); ++r) {
        PAggregatedHandle * handle = *r;

        if (listChanged) {
          PAggregatorFDList_t fds = handle->GetFDs();
          PAggregatorFDList_t::iterator s;
          for (s = fds.begin(); s != fds.end(); ++s) {
            fdList.push_back          ((*s)->fd);
            aggregatorFdList.push_back((*s));
            aggregatorFdToHandleMap.insert(aggregatorFdToHandleMap_t::value_type((*s)->fd, handle));
          }
        }

        if (!handle->IsPreReadDone()) {
          handle->PreRead();
          handle->SetPreReadDone();
        }

        PTimeInterval t = handle->GetTimeout();
        if (t < timeout) {
          timeout = t;
          timeoutHandle = handle;
        }
      }

      // add in the event fd
      if (listChanged) {
        fdList.push_back(event.GetHandle());
        listChanged = FALSE;
      }

#ifndef _WIN32
      fdList_t::iterator s;
      for (s = fdList.begin(); s != fdList.end(); ++s) {
        FD_SET(*s, &rfds);
        maxFd = PMAX(maxFd, *s);
      }
#endif
    }

#ifdef _WIN32
    PTime wstart;
    DWORD nCount = fdList.size();
    DWORD ret = WSAWaitForMultipleEvents(nCount, &fdList[0], false, timeout.GetInterval(), FALSE);

    if (ret == WAIT_FAILED) {
      DWORD err = GetLastError();
      PTRACE(1, "WaitForMultipleObjects error " << err);
    }

    {
      PWaitAndSignal m(mutex);

      if (ret == WAIT_TIMEOUT) {
        PTime start;
        BOOL closed = !timeoutHandle->OnRead();
        unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
        if (duration > 50) {
          PTRACE(4, "Warning: aggregator read routine was of extended duration = " << duration << " msecs");
        }
        if (!closed)
          timeoutHandle->SetPreReadDone(FALSE);
        else {
          handlesToRemove.push_back(timeoutHandle);
          timeoutHandle->DeInit();
        }
      }

      else if (WAIT_OBJECT_0 <= ret && ret <= (WAIT_OBJECT_0 + nCount - 1)) {
        DWORD index = ret - WAIT_OBJECT_0;

        // if the event was triggered, redo the select
        if (index == nCount-1) {
          event.Reset();
          continue;
        }

        PAggregatorFD * fd = aggregatorFdList[index];
        PAssert(fdList[index] == fd->fd, "Mismatch in fd lists");
        aggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
        if (r != aggregatorFdToHandleMap.end()) {
          PAggregatedHandle * handle = r->second;
          WSANETWORKEVENTS events;
          WSAEnumNetworkEvents(fd->socket, fd->fd, &events);
          if (events.lNetworkEvents != 0) {
            BOOL closed = FALSE;
            if ((events.lNetworkEvents & FD_CLOSE) != 0)
              closed = TRUE;
            else if ((events.lNetworkEvents & FD_READ) != 0) {
              PTime start;
              closed = !handle->OnRead();
              unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
              if (duration > 50) {
                PTRACE(4, "Warning: aggregator read routine was of extended duration = " << duration << " msecs");
              }
            }
            if (!closed)
              handle->SetPreReadDone(FALSE);
            else {
              handle->DeInit();
              handlesToRemove.push_back(handle);
              listChanged = TRUE;
            }
          }
        }
      }

#else

    P_timeval pv = timeout;
    int ret = ::select(maxFd+1, &rfds, NULL, NULL, pv);

    if (ret < 0) {
      PTRACE(1, "Select failed with error " << errno);
    }

    // loop again if nothing was ready
    if (ret <= 0)
      continue;

    {
      PWaitAndSignal m(mutex);

      if (ret == 0) {
        PTime start;
        BOOL closed = !timeoutHandle->OnRead();
        unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
        if (duration > 50) {
          PTRACE(4, "Warning: aggregator read routine was of extended duration = " << duration << " msecs");
        }
        if (!closed)
          timeoutHandle->SetPreReadDone(FALSE);
        else {
          handlesToRemove.push_back(timeoutHandle);
          timeoutHandle->DeInit();
        }
      }

      // check the event first
      else if (FD_ISSET(event.GetHandle(), &rfds)) {
        event.Reset();
        continue;
      }

      else {
        PAggregatorFD * fd = aggregatorFdList[ret];
        PAssert(fdList[ret] == fd->fd, "Mismatch in fd lists");
        aggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
        if (r != aggregatorFdToHandleMap.end()) {
          PAggregatedHandle * handle = r->second;
          PTime start;
          BOOL closed = !handle->OnRead();
          unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
          if (duration > 50) {
            PTRACE(4, "Warning: aggregator read routine was of extended duration = " << duration << " msecs");
          }
          if (!closed)
            handle->SetPreReadDone(FALSE);
          else {
            handle->DeInit();
            handlesToRemove.push_back(handle);
            listChanged = TRUE;
          }
        }
      }

#endif

      // remove handles that are now closed
      while (handlesToRemove.begin() != handlesToRemove.end()) {
        PAggregatedHandle * handle = *handlesToRemove.begin();
        handlesToRemove.erase(handlesToRemove.begin());
        HandleContextList_t::iterator r = find(handleList.begin(), handleList.end(), handle);
        if (r != handleList.end())
          handleList.erase(r);
        if (handle->autoDelete) 
          delete handle;
      }
    }
  }

  PTRACE(4, "Socket aggregator thread finished");
}



syntax highlighted by Code2HTML, v. 0.9.1