#include <skstream/skstream.h>

#ifdef HAVE_CONFIG_H
    #include "config.h"
#endif

#include <Eris/Connection.h>

#include <Eris/Timeout.h>
#include <Eris/TypeInfo.h>
#include <Eris/Poll.h>
#include <Eris/Log.h>
#include <Eris/Exceptions.h>
#include <Eris/Router.h>
#include <Eris/Redispatch.h>
#include <Eris/Response.h>

#include <Atlas/Objects/Encoder.h>
#include <Atlas/Objects/Operation.h>
#include <Atlas/Objects/objectFactory.h>
#include <Atlas/Objects/Entity.h>
#include <sigc++/bind.h>

#include <Atlas/Codecs/Bach.h>

#include <cassert>
#include <algorithm>

// #define ATLAS_LOG 1

using namespace Atlas::Objects::Operation;
using Atlas::Objects::Root;
using Atlas::Objects::Entity::RootEntity;
using Atlas::Objects::smart_dynamic_cast;

namespace Eris {

Connection::Connection(const std::string &cnm, const std::string& host, short port, bool dbg) :
    BaseConnection(cnm, "game_", this),
    _host(host),
    _port(port),
    m_typeService(new TypeService(this)),
    m_defaultRouter(NULL),
    m_lock(0),
    m_info(host),
    m_responder(new ResponseTracker)
{	
    Poll::instance().Ready.connect(sigc::mem_fun(this, &Connection::gotData));
}
	
Connection::~Connection()
{
    // ensure we emit this before our vtable goes down, since we are the
    // Bridge on the underlying Atlas codec, and otherwise we might get
    // a pure virtual method call
    hardDisconnect(true);
}

int Connection::connect()
{
    return BaseConnection::connect(_host, _port);
}

int Connection::disconnect()
{
    if (_status == DISCONNECTING) {
        warning() << "duplicate disconnect on Connection that's already disconnecting";
        return -1;
    }

    if (_status == DISCONNECTED) {
        warning() << "called disconnect on already disconnected Connection";
        return -1;
    }
    
    assert(m_lock == 0);
	
    // this is a soft disconnect; it will give people a chance to do tear down and so on
    // in response, people who need to hold the disconnect will lock() the
    // connection, and unlock when their work is done. A timeout stops
    // locks from preventing disconnection
    setStatus(DISCONNECTING);
    Disconnecting.emit();

    if (m_lock == 0) {
        hardDisconnect(true);
        return 0;
    }
    
    // fell through, so someone has locked =>
    // start a disconnect timeout
    _timeout = new Timeout(5000);
    _timeout->Expired.connect(sigc::mem_fun(this, &Connection::onDisconnectTimeout));
    return 0;
}
   
void Connection::gotData(PollData &data)
{
    if (!_stream || !data.isReady(_stream))
        return;
	
    if (_status == DISCONNECTED) {
        error() << "Got data on a disconnected stream";
        return;
    }
   
    BaseConnection::recv();
    
// now dispatch recieved ops
    while (!m_opDeque.empty()) {
        RootOperation op = m_opDeque.front();
        m_opDeque.pop_front();
        dispatchOp(op);
    }
    
// finally, clean up any redispatches that fired (aka 'deleteLater')
    for (unsigned int D=0; D < m_finishedRedispatches.size(); ++D)
        delete m_finishedRedispatches[D];
        
    m_finishedRedispatches.clear();
}		

void Connection::send(const Atlas::Objects::Root &obj)
{
    if ((_status != CONNECTED) && (_status != DISCONNECTING))
    {
        error() << "called send on closed connection";
        return;
    }

    if (_stream->eof() || _stream->fail()) {
        handleFailure("Connection::send: stream failed");
		hardDisconnect(false);
        return;
    }

#ifdef ATLAS_LOG	
    std::stringstream debugStream;
    
    Atlas::Codecs::Bach debugCodec(debugStream, *this /*dummy*/);
    Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
    debugEncoder.streamObjectsMessage(obj);
    debugStream << std::flush;

    std::cout << "sending:" << debugStream.str() << std::endl;
#endif
    
    _encode->streamObjectsMessage(obj);
    (*_stream) << std::flush;
}

void Connection::registerRouterForTo(Router* router, const std::string toId)
{
    m_toRouters[toId] = router;
}

void Connection::unregisterRouterForTo(Router* router, const std::string toId)
{
    assert(m_toRouters[toId] == router);
    m_toRouters.erase(toId);
}
        
void Connection::registerRouterForFrom(Router* router, const std::string fromId)
{
    m_fromRouters[fromId] = router;
}

void Connection::unregisterRouterForFrom(Router* router, const std::string fromId)
{
    assert(m_fromRouters[fromId] == router);
    m_fromRouters.erase(fromId);
}
        
void Connection::setDefaultRouter(Router* router)
{
    if (m_defaultRouter || !router) {
        error() << "setDefaultRouter duplicate set or null argument";
        return;
    }
    
    m_defaultRouter = router;
}

void Connection::clearDefaultRouter()
{
    m_defaultRouter = NULL;
}
    
void Connection::lock()
{
    ++m_lock;
}

void Connection::unlock()
{
    if (m_lock < 1)
        throw InvalidOperation("Imbalanced lock/unlock calls on Connection");
	
    if (--m_lock == 0) {
        switch (_status)
        {
        case DISCONNECTING:	
            debug() << "Connection unlocked in DISCONNECTING, closing socket";
            debug() << "have " << m_opDeque.size() << " ops waiting";
            m_opDeque.clear();
            hardDisconnect(true);
            break;

        default:
            warning() << "Connection unlocked in spurious state : this may cause a failure later";
        }
    }
}

void Connection::getServerInfo(ServerInfo& si) const
{
    si = m_info;
}

void Connection::refreshServerInfo()
{
    if (_status != CONNECTED) {
        warning() << "called refreshServerInfo while not connected, ignoring";
        return;
    }
    
    m_info.setStatus(ServerInfo::QUERYING);
    Get gt;
    gt->setSerialno(getNewSerialno());
    send(gt);
}

#pragma mark -

void Connection::objectArrived(const Root& obj)
{
#ifdef ATLAS_LOG
    std::stringstream debugStream;
    Atlas::Codecs::Bach debugCodec(debugStream, *this /* dummy */);
    Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
    debugEncoder.streamObjectsMessage(obj);
    debugStream << std::flush;

    std::cout << "recieved:" << debugStream.str() << std::endl;
#endif    
    RootOperation op = smart_dynamic_cast<RootOperation>(obj);
    if (op.isValid()) {
        m_opDeque.push_back(op);
    } else
        error() << "Con::objectArrived got non-op";
}

void Connection::dispatchOp(const RootOperation& op)
{    
    try {
        Router::RouterResult rr = Router::IGNORED;
        bool anonymous = op->isDefaultTo();
        
        if (m_responder->handleOp(op)) return;
        
    // locate a router based on from
        if (!op->isDefaultFrom()) {
            IdRouterMap::const_iterator R = m_fromRouters.find(op->getFrom());
            if (R != m_fromRouters.end()) {
                rr = R->second->handleOperation(op);
                if ((rr == Router::HANDLED) || (rr == Router::WILL_REDISPATCH)) return;
            }
        }
        
    // locate a router based on the op's TO value
        if (!anonymous) {
            IdRouterMap::const_iterator R = m_toRouters.find(op->getTo());
            if (R != m_toRouters.end()) {
                rr = R->second->handleOperation(op);
                if ((rr == Router::HANDLED) || (rr == Router::WILL_REDISPATCH)) return;
            } else if (!m_toRouters.empty()) {
                warning() << "recived op with TO=" << op->getTo() << ", but no router is registered for that id";
            }
        }
        
    // special-case, server info refreshes are handled here directly
        if (op->instanceOf(INFO_NO) && anonymous) {
            handleServerInfo(op);
            return;
        }
                
    // go to the default router
        if (m_defaultRouter) rr = m_defaultRouter->handleOperation(op);
        if (rr != Router::HANDLED) warning() << "no-one handled op:" << op;
    } catch (Atlas::Exception& ae) {
        error() << "caught Atlas exception: " << ae.getDescription() <<
            " while dispatching op:\n" << op;
    }
}


void Connection::setStatus(Status ns)
{
	if (_status != ns) StatusChanged.emit(ns);
	_status = ns;
}

void Connection::handleFailure(const std::string &msg)
{
	Failure.emit(msg);
	// FIXME - reset I think, but ensure this is safe
    m_lock= 0;
}

void Connection::handleTimeout(const std::string& msg)
{
    handleFailure(msg); // all the same in the end
}

void Connection::handleServerInfo(const RootOperation& op)
{
    RootEntity svr = smart_dynamic_cast<RootEntity>(op->getArgs().front());	
    if (!svr.isValid()) {
        error() << "server INFO argument object is broken";
        return;
    }
    
    m_info.processServer(svr);
    GotServerInfo.emit();
}

void Connection::onConnect()
{
    BaseConnection::onConnect();
    m_typeService->init();
    m_info = ServerInfo(_host);
}

void Connection::onDisconnectTimeout()
{
    handleTimeout("timed out waiting for disconnection");
    hardDisconnect(true);
}

void Connection::postForDispatch(const Root& obj)
{
    RootOperation op = smart_dynamic_cast<RootOperation>(obj);
    assert(op.isValid());
    m_opDeque.push_back(op);
    
#ifdef ATLAS_LOG
    std::stringstream debugStream;
    Atlas::Codecs::Bach debugCodec(debugStream, *this /* dummy */);
    Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
    debugEncoder.streamObjectsMessage(obj);
    debugStream << std::flush;

    std::cout << "posted for re-dispatch:" << debugStream.str() << std::endl;
#endif
}

void Connection::cleanupRedispatch(Redispatch* r)
{
    m_finishedRedispatches.push_back(r);
}

#pragma mark -

long getNewSerialno()
{
	static long _nextSerial = 1001;
	// note this will eventually loop (in theorey), but that's okay
	// FIXME - using the same intial starting offset is problematic
	// if the client dies, and quickly reconnects
	return _nextSerial++;
}

} // of namespace


syntax highlighted by Code2HTML, v. 0.9.1