#include #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // #define ATLAS_LOG 1 using namespace Atlas::Objects::Operation; using Atlas::Objects::Root; using Atlas::Objects::Entity::RootEntity; using Atlas::Objects::smart_dynamic_cast; namespace Eris { Connection::Connection(const std::string &cnm, const std::string& host, short port, bool dbg) : BaseConnection(cnm, "game_", this), _host(host), _port(port), m_typeService(new TypeService(this)), m_defaultRouter(NULL), m_lock(0), m_info(host), m_responder(new ResponseTracker) { Poll::instance().Ready.connect(sigc::mem_fun(this, &Connection::gotData)); } Connection::~Connection() { // ensure we emit this before our vtable goes down, since we are the // Bridge on the underlying Atlas codec, and otherwise we might get // a pure virtual method call hardDisconnect(true); } int Connection::connect() { return BaseConnection::connect(_host, _port); } int Connection::disconnect() { if (_status == DISCONNECTING) { warning() << "duplicate disconnect on Connection that's already disconnecting"; return -1; } if (_status == DISCONNECTED) { warning() << "called disconnect on already disconnected Connection"; return -1; } assert(m_lock == 0); // this is a soft disconnect; it will give people a chance to do tear down and so on // in response, people who need to hold the disconnect will lock() the // connection, and unlock when their work is done. A timeout stops // locks from preventing disconnection setStatus(DISCONNECTING); Disconnecting.emit(); if (m_lock == 0) { hardDisconnect(true); return 0; } // fell through, so someone has locked => // start a disconnect timeout _timeout = new Timeout(5000); _timeout->Expired.connect(sigc::mem_fun(this, &Connection::onDisconnectTimeout)); return 0; } void Connection::gotData(PollData &data) { if (!_stream || !data.isReady(_stream)) return; if (_status == DISCONNECTED) { error() << "Got data on a disconnected stream"; return; } BaseConnection::recv(); // now dispatch recieved ops while (!m_opDeque.empty()) { RootOperation op = m_opDeque.front(); m_opDeque.pop_front(); dispatchOp(op); } // finally, clean up any redispatches that fired (aka 'deleteLater') for (unsigned int D=0; D < m_finishedRedispatches.size(); ++D) delete m_finishedRedispatches[D]; m_finishedRedispatches.clear(); } void Connection::send(const Atlas::Objects::Root &obj) { if ((_status != CONNECTED) && (_status != DISCONNECTING)) { error() << "called send on closed connection"; return; } if (_stream->eof() || _stream->fail()) { handleFailure("Connection::send: stream failed"); hardDisconnect(false); return; } #ifdef ATLAS_LOG std::stringstream debugStream; Atlas::Codecs::Bach debugCodec(debugStream, *this /*dummy*/); Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec); debugEncoder.streamObjectsMessage(obj); debugStream << std::flush; std::cout << "sending:" << debugStream.str() << std::endl; #endif _encode->streamObjectsMessage(obj); (*_stream) << std::flush; } void Connection::registerRouterForTo(Router* router, const std::string toId) { m_toRouters[toId] = router; } void Connection::unregisterRouterForTo(Router* router, const std::string toId) { assert(m_toRouters[toId] == router); m_toRouters.erase(toId); } void Connection::registerRouterForFrom(Router* router, const std::string fromId) { m_fromRouters[fromId] = router; } void Connection::unregisterRouterForFrom(Router* router, const std::string fromId) { assert(m_fromRouters[fromId] == router); m_fromRouters.erase(fromId); } void Connection::setDefaultRouter(Router* router) { if (m_defaultRouter || !router) { error() << "setDefaultRouter duplicate set or null argument"; return; } m_defaultRouter = router; } void Connection::clearDefaultRouter() { m_defaultRouter = NULL; } void Connection::lock() { ++m_lock; } void Connection::unlock() { if (m_lock < 1) throw InvalidOperation("Imbalanced lock/unlock calls on Connection"); if (--m_lock == 0) { switch (_status) { case DISCONNECTING: debug() << "Connection unlocked in DISCONNECTING, closing socket"; debug() << "have " << m_opDeque.size() << " ops waiting"; m_opDeque.clear(); hardDisconnect(true); break; default: warning() << "Connection unlocked in spurious state : this may cause a failure later"; } } } void Connection::getServerInfo(ServerInfo& si) const { si = m_info; } void Connection::refreshServerInfo() { if (_status != CONNECTED) { warning() << "called refreshServerInfo while not connected, ignoring"; return; } m_info.setStatus(ServerInfo::QUERYING); Get gt; gt->setSerialno(getNewSerialno()); send(gt); } #pragma mark - void Connection::objectArrived(const Root& obj) { #ifdef ATLAS_LOG std::stringstream debugStream; Atlas::Codecs::Bach debugCodec(debugStream, *this /* dummy */); Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec); debugEncoder.streamObjectsMessage(obj); debugStream << std::flush; std::cout << "recieved:" << debugStream.str() << std::endl; #endif RootOperation op = smart_dynamic_cast(obj); if (op.isValid()) { m_opDeque.push_back(op); } else error() << "Con::objectArrived got non-op"; } void Connection::dispatchOp(const RootOperation& op) { try { Router::RouterResult rr = Router::IGNORED; bool anonymous = op->isDefaultTo(); if (m_responder->handleOp(op)) return; // locate a router based on from if (!op->isDefaultFrom()) { IdRouterMap::const_iterator R = m_fromRouters.find(op->getFrom()); if (R != m_fromRouters.end()) { rr = R->second->handleOperation(op); if ((rr == Router::HANDLED) || (rr == Router::WILL_REDISPATCH)) return; } } // locate a router based on the op's TO value if (!anonymous) { IdRouterMap::const_iterator R = m_toRouters.find(op->getTo()); if (R != m_toRouters.end()) { rr = R->second->handleOperation(op); if ((rr == Router::HANDLED) || (rr == Router::WILL_REDISPATCH)) return; } else if (!m_toRouters.empty()) { warning() << "recived op with TO=" << op->getTo() << ", but no router is registered for that id"; } } // special-case, server info refreshes are handled here directly if (op->instanceOf(INFO_NO) && anonymous) { handleServerInfo(op); return; } // go to the default router if (m_defaultRouter) rr = m_defaultRouter->handleOperation(op); if (rr != Router::HANDLED) warning() << "no-one handled op:" << op; } catch (Atlas::Exception& ae) { error() << "caught Atlas exception: " << ae.getDescription() << " while dispatching op:\n" << op; } } void Connection::setStatus(Status ns) { if (_status != ns) StatusChanged.emit(ns); _status = ns; } void Connection::handleFailure(const std::string &msg) { Failure.emit(msg); // FIXME - reset I think, but ensure this is safe m_lock= 0; } void Connection::handleTimeout(const std::string& msg) { handleFailure(msg); // all the same in the end } void Connection::handleServerInfo(const RootOperation& op) { RootEntity svr = smart_dynamic_cast(op->getArgs().front()); if (!svr.isValid()) { error() << "server INFO argument object is broken"; return; } m_info.processServer(svr); GotServerInfo.emit(); } void Connection::onConnect() { BaseConnection::onConnect(); m_typeService->init(); m_info = ServerInfo(_host); } void Connection::onDisconnectTimeout() { handleTimeout("timed out waiting for disconnection"); hardDisconnect(true); } void Connection::postForDispatch(const Root& obj) { RootOperation op = smart_dynamic_cast(obj); assert(op.isValid()); m_opDeque.push_back(op); #ifdef ATLAS_LOG std::stringstream debugStream; Atlas::Codecs::Bach debugCodec(debugStream, *this /* dummy */); Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec); debugEncoder.streamObjectsMessage(obj); debugStream << std::flush; std::cout << "posted for re-dispatch:" << debugStream.str() << std::endl; #endif } void Connection::cleanupRedispatch(Redispatch* r) { m_finishedRedispatches.push_back(r); } #pragma mark - long getNewSerialno() { static long _nextSerial = 1001; // note this will eventually loop (in theorey), but that's okay // FIXME - using the same intial starting offset is problematic // if the client dies, and quickly reconnects return _nextSerial++; } } // of namespace