/* * Twisted, the Framework of Your Internet * Copyright (C) 2001-2002 Matthew W. Lefkowitz * * This library is free software; you can redistribute it and/or * modify it under the terms of version 2.1 of the GNU Lesser General Public * License as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ /* cReactor.c - Implementation of the IReactorCore. */ /* includes */ #include "cReactor.h" #include #include #include #include #include #include #include #include /* Forward declare the cReactor type object. */ staticforward PyTypeObject cReactorType; /* Available methods on the cReactor. */ static PyMethodDef cReactor_methods[] = { /* IReactorCore */ { "resolve", (PyCFunction)cReactor_resolve, (METH_VARARGS | METH_KEYWORDS), "resolve" }, { "run", cReactor_run, METH_VARARGS, "run" }, { "stop", cReactor_stop, METH_VARARGS, "stop" }, { "crash", cReactor_crash, METH_VARARGS, "crash" }, { "iterate", (PyCFunction)cReactor_iterate, (METH_VARARGS | METH_KEYWORDS), "iterate" }, { "fireSystemEvent", cReactor_fireSystemEvent, METH_VARARGS, "fireSystemEvent" }, { "addSystemEventTrigger", (PyCFunction)cReactor_addSystemEventTrigger, (METH_VARARGS | METH_KEYWORDS), "addSystemEventTrigger" }, { "removeSystemEventTrigger", cReactor_removeSystemEventTrigger, METH_VARARGS, "removeSystemEventTrigger" }, /* IReactorTime */ { "callLater", (PyCFunction)cReactorTime_callLater, (METH_VARARGS | METH_KEYWORDS), "callLater" }, { "getDelayedCalls", cReactorTime_getDelayedCalls, METH_VARARGS, "getDelayedCalls" }, { "cancelCallLater", cReactorTime_cancelCallLater, METH_KEYWORDS, "cancelCallLater" }, /* IReactorTCP */ { "listenTCP", (PyCFunction)cReactorTCP_listenTCP, (METH_VARARGS | METH_KEYWORDS), "listenTCP" }, { "connectTCP", cReactorTCP_connectTCP, METH_VARARGS, "connectTCP" }, /* IReactorThread */ { "callFromThread", (PyCFunction)cReactorThread_callFromThread, (METH_VARARGS | METH_KEYWORDS), "callFromThread" }, { "callInThread", (PyCFunction)cReactorThread_callInThread, (METH_VARARGS | METH_KEYWORDS), "callInThread" }, { "suggestThreadPoolSize", cReactorThread_suggestThreadPoolSize, METH_VARARGS, "suggestThreadPoolSize" }, { "wakeUp", cReactorThread_wakeUp, METH_VARARGS, "wakeUp" }, /* Custom addition to IReactorThread */ { "initThreading", cReactorThread_initThreading, METH_VARARGS, "initThreading" }, { NULL, NULL, METH_VARARGS, NULL }, }; PyObject * cReactor_not_implemented(PyObject *self, PyObject *args, const char *text) { UNUSED(self); UNUSED(args); PyErr_SetString(PyExc_NotImplementedError, text); return NULL; } /* TODO: This blocks and I don't have a async resolver library. However, the * implementation in base.py also blocks :) */ PyObject * cReactor_resolve(PyObject *self, PyObject *args, PyObject *kw) { cReactor *reactor; const char *name; struct hostent *host; PyObject *defer; PyObject *defer_args; PyObject *callback; PyObject *errback; struct in_addr addr; int type = 1; int timeout = 10; static char *kwlist[] = { "name", "type", "timeout", NULL }; reactor = (cReactor *)self; /* Args */ if (!PyArg_ParseTupleAndKeywords(args, kw, "s|ii:resolve", kwlist, &name, &type, &timeout)) { return NULL; } /* Create a Deferred. */ defer = cReactorUtil_CreateDeferred(); if (!defer) { return NULL; } /* Get the err and callback methods. */ errback = PyObject_GetAttrString(defer, "errback"); if (!errback) { Py_DECREF(defer); return NULL; } callback = PyObject_GetAttrString(defer, "callback"); if (!callback) { Py_DECREF(defer); Py_DECREF(errback); return NULL; } /* Only type 1 is supported. TODO: What is type 1? */ if (type == 1) { /* Attempt the lookup. */ host = gethostbyname(name); /* Schedule a method to call the "callback" or "errback" method on the * derferred whether or not we resolved the name. */ if (host) { /* Verify the address length. */ if (host->h_length == sizeof(addr)) { memcpy(&addr, host->h_addr_list[0], host->h_length); defer_args = Py_BuildValue("(s)", inet_ntoa(addr)); cReactorUtil_AddDelayedCall(reactor, 0, callback, defer_args, NULL); } else { defer_args = Py_BuildValue("(s)", "h_length != sizeof(addr)"); cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL); } Py_DECREF(defer_args); } else { defer_args = Py_BuildValue("(s)", hstrerror(h_errno)); cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL); Py_DECREF(defer_args); } } else { /* Type was not 1, schedule an errback call. */ defer_args = Py_BuildValue("(s)", "only type 1 is supported"); cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL); Py_DECREF(defer_args); } Py_DECREF(errback); Py_DECREF(callback); return defer; } void cReactor_stop_finish(cReactor *reactor) { /* called when shutdown triggers have completed */ reactor->state = CREACTOR_STATE_STOPPED; } static void stop_internal(cReactor *reactor) { /* Change state and fire system event. */ reactor->state = CREACTOR_STATE_STOPPING; fireSystemEvent_internal(reactor, "shutdown"); /* state will move to STOPPED after all shutdown triggers have run. */ } PyObject * cReactor_stop(PyObject *self, PyObject *args) { /* No args. */ if (!PyArg_ParseTuple(args, ":stop")) { return NULL; } stop_internal((cReactor *)self); Py_INCREF(Py_None); return Py_None; } static volatile int received_signal; static void cReactor_sighandler(int sig) { /* Record the signal. */ received_signal = sig; } static void iterate_rebuild_pollfd_arrray(cReactor *reactor) { unsigned int num_transports; struct pollfd *pfd; cReactorTransport *transport; cReactorTransport *shadow; cReactorTransport *target; /* Make sure we have enough space to hold everything. */ if (reactor->pollfd_size < reactor->num_transports) { if (reactor->pollfd_array) { free(reactor->pollfd_array); } reactor->pollfd_size = reactor->num_transports * 2; reactor->pollfd_array = (struct pollfd *)malloc(sizeof(struct pollfd) * reactor->pollfd_size); } /* Fill in the pollfd event struct using the transport info. */ num_transports = 0; pfd = reactor->pollfd_array; transport = reactor->transports; shadow = NULL; while (transport) { /* Check for transports that are closed. */ if (transport->state == CREACTOR_TRANSPORT_STATE_CLOSED) { target = transport; transport = transport->next; /* Remove the target node from the linked list. */ if (shadow) { shadow->next = transport; } else { reactor->transports = transport; } /* Call the close function. */ cReactorTransport_Close(target); Py_DECREF((PyObject *)target); } else { /* The transport is still valid, so fill in a pollfd struct. */ pfd->fd = transport->fd; pfd->events = 0; /* If they are active and have a do_read function add the POLLIN * event. */ if ( (transport->state == CREACTOR_TRANSPORT_STATE_ACTIVE) && transport->do_read) { pfd->events |= POLLIN; } /* If they have a do_write function and there is data in the write * buffer or they have a producer, add in the POLLOUT event. */ if ( transport->do_write && ( (cReactorBuffer_DataAvailable(transport->out_buf) > 0) || transport->producer)) { pfd->events |= POLLOUT; } /* Update the transport's pointer to the events mask */ transport->event_mask = &pfd->events; ++pfd; ++num_transports; shadow = transport; transport = transport->next; } } /* Update the number of active transports. */ reactor->num_transports = num_transports; /* No longer stale. */ reactor->pollfd_stale = 0; } static void iterate_process_pollfd_array(cReactor *reactor) { struct pollfd *pfd; cReactorTransport *transport; /* Iterate over the results. */ for (pfd = reactor->pollfd_array, transport = reactor->transports; transport; ++pfd, transport = transport->next) { /* Verify */ if (pfd->fd != transport->fd) { kill(0, SIGTRAP); } /* Check for any flags. */ if (! pfd->revents) { continue; } if (pfd->revents & POLLIN) { cReactorTransport_Read(transport); } if (pfd->revents & POLLOUT) { cReactorTransport_Write(transport); } if (pfd->revents & (~(POLLIN | POLLOUT))) { /* TODO: Handle errors. */ /* printf("fd=%d revents=0x%x\n", transport->fd, pfd->revents); */ transport->state = CREACTOR_TRANSPORT_STATE_CLOSED; reactor->pollfd_stale = 1; } } } static void ctrl_pipe_do_read(cReactorTransport *transport) { char buf[16]; read(transport->fd, buf, sizeof(buf)); } static int iterate_internal_init(cReactor *reactor) { cReactorTransport *transport; int ctrl_pipes[2]; /* Clear the received signal. */ received_signal = 0; /* Install signal handlers. */ signal(SIGINT, cReactor_sighandler); signal(SIGTERM, cReactor_sighandler); /* Create the control pipe. */ if (pipe(ctrl_pipes) < 0) { PyErr_SetFromErrno(PyExc_RuntimeError); return -1; } /* Make the read descriptor non-blocking. */ if (fcntl(ctrl_pipes[0], F_SETFL, O_NONBLOCK) < 0) { close(ctrl_pipes[0]); close(ctrl_pipes[1]); PyErr_SetFromErrno(PyExc_RuntimeError); return -1; } /* Save the write descriptor. */ reactor->ctrl_pipe = ctrl_pipes[1]; /* Create a control transport for reading. */ transport = cReactorTransport_New(reactor, ctrl_pipes[0], ctrl_pipe_do_read, NULL, NULL); cReactor_AddTransport(reactor, transport); return 0; } static int iterate_internal(cReactor *reactor, int delay) { int method_delay; int sleep_delay; PyObject *result; cReactorJob *job; int poll_res; PyThreadState *thread_state = NULL; /* Figure out the method delay. */ method_delay = cReactorUtil_NextMethodDelay(reactor); if (method_delay < 0) { /* No methods to run. Sleep for the specified delay time. */ sleep_delay = delay; } else if (delay >= 0) { /* Sleep until the next method or (at max) the given delay. */ sleep_delay = (method_delay < delay) ? method_delay : delay; } else { /* Sleep until the next method. */ sleep_delay = method_delay; } /* Refresh the pollfd list (if needed). */ if (reactor->pollfd_stale) { iterate_rebuild_pollfd_arrray(reactor); } /* If in threaded mode release the global interpreter lock. */ if (reactor->multithreaded) { thread_state = PyThreadState_Swap(NULL); PyEval_ReleaseLock(); } /* Look for activity. */ poll_res = poll(reactor->pollfd_array, reactor->num_transports, sleep_delay); /* Acquire the lock if we are using threads. */ if (reactor->multithreaded) { PyEval_AcquireLock(); PyThreadState_Swap(thread_state); } /* Check the poll() result. */ if (poll_res < 0) { /* Anything other an EINTR raises an exception. */ if (errno != EINTR) { PyErr_SetFromErrno(PyExc_RuntimeError); return -1; } } else { iterate_process_pollfd_array(reactor); } /* Run all the methods that need to run. */ cReactorUtil_RunDelayedCalls(reactor); /* Check our job queue -- if there is one. */ if (reactor->main_queue) { /* Run all scheduled jobs. This might not be the safest idea. */ for ( ; ; ) { job = cReactorJobQueue_Pop(reactor->main_queue); if (! job) { break; } switch (job->type) { case CREACTOR_JOB_APPLY: /* Run the callable. */ result = PyEval_CallObjectWithKeywords(job->u.apply.callable, job->u.apply.args, job->u.apply.kw); Py_XDECREF(result); if (! result) { PyErr_Print(); } break; case CREACTOR_JOB_EXIT: /* No one can tell the reactor's main thread to quit! */ break; } cReactorJob_Destroy(job); } } /* Lame signal handling for now. */ if (received_signal) { if (reactor->state == CREACTOR_STATE_RUNNING) { /* Stop. */ stop_internal(reactor); } } return 0; } PyObject * cReactor_iterate(PyObject *self, PyObject *args, PyObject *kw) { cReactor *reactor; PyObject *delay_obj = NULL; int delay = 0; static char *kwlist[] = { "delay", NULL }; reactor = (cReactor *)self; /* Args. */ if (!PyArg_ParseTupleAndKeywords(args, kw, "|O:delay", kwlist, &delay_obj)) { return NULL; } if (delay_obj) { delay = cReactorUtil_ConvertDelay(delay_obj); if (delay < 0) { return NULL; } } /* Run once. */ if (iterate_internal(reactor, delay) < 0) { return NULL; } Py_INCREF(Py_None); return Py_None; } PyObject * cReactor_run(PyObject *self, PyObject *args) { cReactor *reactor; reactor = (cReactor *)self; /* We shouldn't get any args. */ if (!PyArg_ParseTuple(args, ":run")) { return NULL; } if (reactor->state != CREACTOR_STATE_STOPPED) { /* _RUNNING means they tried to nest reactor.run() calls, and we don't allow that. _STOPPING means reactor.run() hasn't finished yet (XXX:??) */ if (reactor->state == CREACTOR_STATE_RUNNING) PyErr_SetString(PyExc_RuntimeError, "the reactor was already running!"); else PyErr_SetString(PyExc_RuntimeError, "the reactor was trying to stop!"); return NULL; } /* Change our state to running. */ reactor->state = CREACTOR_STATE_RUNNING; /* Fire the the startup system event. */ fireSystemEvent_internal(reactor, "startup"); /* "Begin at the beginning", the King said, very gravely, "and go on till you come to the end: then stop." */ while (reactor->state != CREACTOR_STATE_STOPPED) { if (iterate_internal(reactor, -1) < 0) { return NULL; } } /* do cleanup when we stop running */ cReactorThread_freeThreadpool(reactor); Py_INCREF(Py_None); return Py_None; } PyObject * cReactor_crash(PyObject *self, PyObject *args) { cReactor *reactor; reactor = (cReactor *)self; /* Args */ if (!PyArg_ParseTuple(args, ":crash")) { return NULL; } /* Move the state to done. */ reactor->state = CREACTOR_STATE_STOPPED; Py_INCREF(Py_None); return Py_None; } void cReactor_AddTransport(cReactor *reactor, cReactorTransport *transport) { /* Add the new transport into the list. This steals a reference. */ transport->next = reactor->transports; reactor->transports = transport; ++(reactor->num_transports); /* PollFD array is now stale */ reactor->pollfd_stale = 1; } static int cReactor_init(cReactor *reactor) { PyObject *when_threaded; PyObject *init_threading; PyObject *obj; static const char * interfaces[] = { "IReactorCore", "IReactorTime", "IReactorTCP", "IReactorThreads", }; /* Create the __implements__ attribute. */ obj = cReactorUtil_MakeImplements(interfaces, sizeof(interfaces) / sizeof(interfaces[0])); if (!obj) { return -1; } /* Add the tuple into the attr dict. */ if (PyDict_SetItemString(reactor->attr_dict, "__implements__", obj) != 0) { Py_DECREF(obj); return -1; } /* Add an attribute named __class__. We will use our type object. */ obj = (PyObject *)reactor->ob_type; if (PyDict_SetItemString(reactor->attr_dict, "__class__", obj) != 0) { return -1; } /* Set our state. */ reactor->state = CREACTOR_STATE_STOPPED; /* We need to know when threading has begun. */ when_threaded = cReactorUtil_FromImport("twisted.python.threadable", "whenThreaded"); if (! when_threaded) { return -1; } /* Get our initThreading method. */ init_threading = Py_FindMethod(cReactor_methods, (PyObject *)reactor, "initThreading"); if (! init_threading) { Py_DECREF(when_threaded); return -1; } /* Register a callback. */ obj = PyObject_CallFunction(when_threaded, "(O)", init_threading); Py_DECREF(when_threaded); Py_DECREF(init_threading); Py_XDECREF(obj); if (! obj) { return -1; } /* initialize signal handlers, signal-delivering pipes, */ if (iterate_internal_init(reactor)) return -1; return 0; } PyObject * cReactor_New(void) { cReactor *reactor; /* Create a new object. */ cReactorType.ob_type = &PyType_Type; reactor = PyObject_New(cReactor, &cReactorType); /* No control pipe descriptors. */ reactor->ctrl_pipe = -1; /* The object's attribute dictionary. */ reactor->attr_dict = PyDict_New(); /* List of timed methods. */ reactor->timed_methods = NULL; /* Event triggers and the deferred list. */ reactor->event_triggers = NULL; /* List of transports */ reactor->transports = NULL; reactor->num_transports = 0; /* Array of pollfd structs. */ reactor->pollfd_array = NULL; reactor->pollfd_size = 0; reactor->pollfd_stale = 0; /* No thread job queue, or thread pool to start with. */ reactor->multithreaded = 0; reactor->main_queue = NULL; reactor->thread_pool = NULL; reactor->worker_queue = NULL; reactor->req_thread_pool_size = 3; /* Attempt to initialize it. */ if ( (! reactor->attr_dict) || (cReactor_init(reactor) < 0)) { Py_DECREF((PyObject *)reactor); return NULL; } return (PyObject *)reactor; } static void cReactor_dealloc(PyObject *self) { cReactor *reactor; cReactorTransport *transport; cReactorTransport *target; reactor = (cReactor *)self; Py_DECREF(reactor->attr_dict); reactor->attr_dict = NULL; cReactorUtil_DestroyDelayedCalls(reactor); reactor->timed_methods = NULL; cSystemEvent_FreeTriggers(reactor->event_triggers); reactor->event_triggers = NULL; transport = reactor->transports; while (transport) { target = transport; transport = transport->next; Py_DECREF(target); } reactor->transports = NULL; free(reactor->pollfd_array); reactor->pollfd_array = NULL; PyObject_Del(self); } static PyObject * cReactor_getattr(PyObject *self, char *attr_name) { cReactor *reactor; PyObject *obj; reactor = (cReactor *)self; /* First check for a method with the given name. */ obj = Py_FindMethod(cReactor_methods, self, attr_name); if (obj) { return obj; } /* Py_FindMethod raises an exception if it does not find the mthod */ PyErr_Clear(); /* Special case! Woo */ if (!strcmp("__dict__", attr_name)) { return reactor->attr_dict; } /* Now check the attribute dictionary. */ obj = PyDict_GetItemString(reactor->attr_dict, attr_name); /* If we didn't find anything raise PyExc_AttributeError. */ if (!obj) { PyErr_SetString(PyExc_AttributeError, attr_name); return NULL; } /* PyDict_GetItemString returns a borrowed reference so we need to incref * it before returning it. */ Py_INCREF(obj); return obj; } static PyObject * cReactor_repr(PyObject *self) { char buf[100]; snprintf(buf, sizeof(buf) - 1, "", self); buf[sizeof(buf) - 1] = 0x00; return PyString_FromString(buf); } /* The cReactor type. */ static PyTypeObject cReactorType = { PyObject_HEAD_INIT(NULL) 0, "cReactor", /* tp_name */ sizeof(cReactor), /* tp_basicsize */ 0, /* tp_itemsize */ cReactor_dealloc, /* tp_dealloc */ NULL, /* tp_print */ cReactor_getattr, /* tp_getattr */ NULL, /* tp_setattr */ NULL, /* tp_compare */ cReactor_repr, /* tp_repr */ NULL, /* tp_as_number */ NULL, /* tp_as_sequence */ NULL, /* tp_as_mapping */ NULL, /* tp_hash */ NULL, /* tp_call */ NULL, /* tp_str */ NULL, /* tp_getattro */ NULL, /* tp_setattro */ NULL, /* tp_as_buffer */ 0, /* tp_flags */ NULL, /* tp_doc */ NULL, /* tp_traverse */ NULL, /* tp_clear */ NULL, /* tp_richcompare */ 0, /* tp_weaklistoffset */ }; /* vim: set sts=4 sw=4: */