/*
* Twisted, the Framework of Your Internet
* Copyright (C) 2001-2002 Matthew W. Lefkowitz
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of version 2.1 of the GNU Lesser General Public
* License as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
/* cReactor.c - Implementation of the IReactorCore. */
/* includes */
#include "cReactor.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <netdb.h>
/* Forward declare the cReactor type object. */
staticforward PyTypeObject cReactorType;
/* Available methods on the cReactor. */
static PyMethodDef cReactor_methods[] =
{
/* IReactorCore */
{ "resolve", (PyCFunction)cReactor_resolve,
(METH_VARARGS | METH_KEYWORDS), "resolve" },
{ "run", cReactor_run,
METH_VARARGS, "run" },
{ "stop", cReactor_stop,
METH_VARARGS, "stop" },
{ "crash", cReactor_crash,
METH_VARARGS, "crash" },
{ "iterate", (PyCFunction)cReactor_iterate,
(METH_VARARGS | METH_KEYWORDS), "iterate" },
{ "fireSystemEvent", cReactor_fireSystemEvent,
METH_VARARGS, "fireSystemEvent" },
{ "addSystemEventTrigger", (PyCFunction)cReactor_addSystemEventTrigger,
(METH_VARARGS | METH_KEYWORDS), "addSystemEventTrigger" },
{ "removeSystemEventTrigger", cReactor_removeSystemEventTrigger,
METH_VARARGS, "removeSystemEventTrigger" },
/* IReactorTime */
{ "callLater", (PyCFunction)cReactorTime_callLater,
(METH_VARARGS | METH_KEYWORDS), "callLater" },
{ "getDelayedCalls", cReactorTime_getDelayedCalls,
METH_VARARGS, "getDelayedCalls" },
{ "cancelCallLater", cReactorTime_cancelCallLater,
METH_KEYWORDS, "cancelCallLater" },
/* IReactorTCP */
{ "listenTCP", (PyCFunction)cReactorTCP_listenTCP,
(METH_VARARGS | METH_KEYWORDS), "listenTCP" },
{ "connectTCP", cReactorTCP_connectTCP,
METH_VARARGS, "connectTCP" },
/* IReactorThread */
{ "callFromThread", (PyCFunction)cReactorThread_callFromThread,
(METH_VARARGS | METH_KEYWORDS), "callFromThread" },
{ "callInThread", (PyCFunction)cReactorThread_callInThread,
(METH_VARARGS | METH_KEYWORDS), "callInThread" },
{ "suggestThreadPoolSize", cReactorThread_suggestThreadPoolSize,
METH_VARARGS, "suggestThreadPoolSize" },
{ "wakeUp", cReactorThread_wakeUp,
METH_VARARGS, "wakeUp" },
/* Custom addition to IReactorThread */
{ "initThreading", cReactorThread_initThreading,
METH_VARARGS, "initThreading" },
{ NULL, NULL, METH_VARARGS, NULL },
};
PyObject *
cReactor_not_implemented(PyObject *self,
PyObject *args,
const char *text)
{
UNUSED(self);
UNUSED(args);
PyErr_SetString(PyExc_NotImplementedError, text);
return NULL;
}
/* TODO: This blocks and I don't have a async resolver library. However, the
* implementation in base.py also blocks :)
*/
PyObject *
cReactor_resolve(PyObject *self, PyObject *args, PyObject *kw)
{
cReactor *reactor;
const char *name;
struct hostent *host;
PyObject *defer;
PyObject *defer_args;
PyObject *callback;
PyObject *errback;
struct in_addr addr;
int type = 1;
int timeout = 10;
static char *kwlist[] = { "name", "type", "timeout", NULL };
reactor = (cReactor *)self;
/* Args */
if (!PyArg_ParseTupleAndKeywords(args, kw, "s|ii:resolve", kwlist,
&name, &type, &timeout))
{
return NULL;
}
/* Create a Deferred. */
defer = cReactorUtil_CreateDeferred();
if (!defer)
{
return NULL;
}
/* Get the err and callback methods. */
errback = PyObject_GetAttrString(defer, "errback");
if (!errback)
{
Py_DECREF(defer);
return NULL;
}
callback = PyObject_GetAttrString(defer, "callback");
if (!callback)
{
Py_DECREF(defer);
Py_DECREF(errback);
return NULL;
}
/* Only type 1 is supported. TODO: What is type 1? */
if (type == 1)
{
/* Attempt the lookup. */
host = gethostbyname(name);
/* Schedule a method to call the "callback" or "errback" method on the
* derferred whether or not we resolved the name.
*/
if (host)
{
/* Verify the address length. */
if (host->h_length == sizeof(addr))
{
memcpy(&addr, host->h_addr_list[0], host->h_length);
defer_args = Py_BuildValue("(s)", inet_ntoa(addr));
cReactorUtil_AddDelayedCall(reactor, 0, callback, defer_args, NULL);
}
else
{
defer_args = Py_BuildValue("(s)", "h_length != sizeof(addr)");
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
}
Py_DECREF(defer_args);
}
else
{
defer_args = Py_BuildValue("(s)", hstrerror(h_errno));
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
Py_DECREF(defer_args);
}
}
else
{
/* Type was not 1, schedule an errback call. */
defer_args = Py_BuildValue("(s)", "only type 1 is supported");
cReactorUtil_AddDelayedCall(reactor, 0, errback, defer_args, NULL);
Py_DECREF(defer_args);
}
Py_DECREF(errback);
Py_DECREF(callback);
return defer;
}
void
cReactor_stop_finish(cReactor *reactor)
{
/* called when shutdown triggers have completed */
reactor->state = CREACTOR_STATE_STOPPED;
}
static void
stop_internal(cReactor *reactor)
{
/* Change state and fire system event. */
reactor->state = CREACTOR_STATE_STOPPING;
fireSystemEvent_internal(reactor, "shutdown");
/* state will move to STOPPED after all shutdown triggers have run. */
}
PyObject *
cReactor_stop(PyObject *self, PyObject *args)
{
/* No args. */
if (!PyArg_ParseTuple(args, ":stop"))
{
return NULL;
}
stop_internal((cReactor *)self);
Py_INCREF(Py_None);
return Py_None;
}
static volatile int received_signal;
static void
cReactor_sighandler(int sig)
{
/* Record the signal. */
received_signal = sig;
}
static void
iterate_rebuild_pollfd_arrray(cReactor *reactor)
{
unsigned int num_transports;
struct pollfd *pfd;
cReactorTransport *transport;
cReactorTransport *shadow;
cReactorTransport *target;
/* Make sure we have enough space to hold everything. */
if (reactor->pollfd_size < reactor->num_transports)
{
if (reactor->pollfd_array)
{
free(reactor->pollfd_array);
}
reactor->pollfd_size = reactor->num_transports * 2;
reactor->pollfd_array = (struct pollfd *)malloc(sizeof(struct pollfd) * reactor->pollfd_size);
}
/* Fill in the pollfd event struct using the transport info. */
num_transports = 0;
pfd = reactor->pollfd_array;
transport = reactor->transports;
shadow = NULL;
while (transport)
{
/* Check for transports that are closed. */
if (transport->state == CREACTOR_TRANSPORT_STATE_CLOSED)
{
target = transport;
transport = transport->next;
/* Remove the target node from the linked list. */
if (shadow)
{
shadow->next = transport;
}
else
{
reactor->transports = transport;
}
/* Call the close function. */
cReactorTransport_Close(target);
Py_DECREF((PyObject *)target);
}
else
{
/* The transport is still valid, so fill in a pollfd struct. */
pfd->fd = transport->fd;
pfd->events = 0;
/* If they are active and have a do_read function add the POLLIN
* event. */
if ( (transport->state == CREACTOR_TRANSPORT_STATE_ACTIVE)
&& transport->do_read)
{
pfd->events |= POLLIN;
}
/* If they have a do_write function and there is data in the write
* buffer or they have a producer, add in the POLLOUT event. */
if ( transport->do_write
&& ( (cReactorBuffer_DataAvailable(transport->out_buf) > 0)
|| transport->producer))
{
pfd->events |= POLLOUT;
}
/* Update the transport's pointer to the events mask */
transport->event_mask = &pfd->events;
++pfd;
++num_transports;
shadow = transport;
transport = transport->next;
}
}
/* Update the number of active transports. */
reactor->num_transports = num_transports;
/* No longer stale. */
reactor->pollfd_stale = 0;
}
static void
iterate_process_pollfd_array(cReactor *reactor)
{
struct pollfd *pfd;
cReactorTransport *transport;
/* Iterate over the results. */
for (pfd = reactor->pollfd_array, transport = reactor->transports;
transport;
++pfd, transport = transport->next)
{
/* Verify */
if (pfd->fd != transport->fd)
{
kill(0, SIGTRAP);
}
/* Check for any flags. */
if (! pfd->revents)
{
continue;
}
if (pfd->revents & POLLIN)
{
cReactorTransport_Read(transport);
}
if (pfd->revents & POLLOUT)
{
cReactorTransport_Write(transport);
}
if (pfd->revents & (~(POLLIN | POLLOUT)))
{
/* TODO: Handle errors. */
/* printf("fd=%d revents=0x%x\n", transport->fd, pfd->revents); */
transport->state = CREACTOR_TRANSPORT_STATE_CLOSED;
reactor->pollfd_stale = 1;
}
}
}
static void
ctrl_pipe_do_read(cReactorTransport *transport)
{
char buf[16];
read(transport->fd, buf, sizeof(buf));
}
static int
iterate_internal_init(cReactor *reactor)
{
cReactorTransport *transport;
int ctrl_pipes[2];
/* Clear the received signal. */
received_signal = 0;
/* Install signal handlers. */
signal(SIGINT, cReactor_sighandler);
signal(SIGTERM, cReactor_sighandler);
/* Create the control pipe. */
if (pipe(ctrl_pipes) < 0)
{
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
/* Make the read descriptor non-blocking. */
if (fcntl(ctrl_pipes[0], F_SETFL, O_NONBLOCK) < 0)
{
close(ctrl_pipes[0]);
close(ctrl_pipes[1]);
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
/* Save the write descriptor. */
reactor->ctrl_pipe = ctrl_pipes[1];
/* Create a control transport for reading. */
transport = cReactorTransport_New(reactor,
ctrl_pipes[0],
ctrl_pipe_do_read,
NULL,
NULL);
cReactor_AddTransport(reactor, transport);
return 0;
}
static int
iterate_internal(cReactor *reactor, int delay)
{
int method_delay;
int sleep_delay;
PyObject *result;
cReactorJob *job;
int poll_res;
PyThreadState *thread_state = NULL;
/* Figure out the method delay. */
method_delay = cReactorUtil_NextMethodDelay(reactor);
if (method_delay < 0)
{
/* No methods to run. Sleep for the specified delay time. */
sleep_delay = delay;
}
else if (delay >= 0)
{
/* Sleep until the next method or (at max) the given delay. */
sleep_delay = (method_delay < delay) ? method_delay : delay;
}
else
{
/* Sleep until the next method. */
sleep_delay = method_delay;
}
/* Refresh the pollfd list (if needed). */
if (reactor->pollfd_stale)
{
iterate_rebuild_pollfd_arrray(reactor);
}
/* If in threaded mode release the global interpreter lock. */
if (reactor->multithreaded)
{
thread_state = PyThreadState_Swap(NULL);
PyEval_ReleaseLock();
}
/* Look for activity. */
poll_res = poll(reactor->pollfd_array,
reactor->num_transports,
sleep_delay);
/* Acquire the lock if we are using threads. */
if (reactor->multithreaded)
{
PyEval_AcquireLock();
PyThreadState_Swap(thread_state);
}
/* Check the poll() result. */
if (poll_res < 0)
{
/* Anything other an EINTR raises an exception. */
if (errno != EINTR)
{
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
}
else
{
iterate_process_pollfd_array(reactor);
}
/* Run all the methods that need to run. */
cReactorUtil_RunDelayedCalls(reactor);
/* Check our job queue -- if there is one. */
if (reactor->main_queue)
{
/* Run all scheduled jobs. This might not be the safest idea. */
for ( ; ; )
{
job = cReactorJobQueue_Pop(reactor->main_queue);
if (! job)
{
break;
}
switch (job->type)
{
case CREACTOR_JOB_APPLY:
/* Run the callable. */
result = PyEval_CallObjectWithKeywords(job->u.apply.callable,
job->u.apply.args,
job->u.apply.kw);
Py_XDECREF(result);
if (! result)
{
PyErr_Print();
}
break;
case CREACTOR_JOB_EXIT:
/* No one can tell the reactor's main thread to quit! */
break;
}
cReactorJob_Destroy(job);
}
}
/* Lame signal handling for now. */
if (received_signal)
{
if (reactor->state == CREACTOR_STATE_RUNNING)
{
/* Stop. */
stop_internal(reactor);
}
}
return 0;
}
PyObject *
cReactor_iterate(PyObject *self, PyObject *args, PyObject *kw)
{
cReactor *reactor;
PyObject *delay_obj = NULL;
int delay = 0;
static char *kwlist[] = { "delay", NULL };
reactor = (cReactor *)self;
/* Args. */
if (!PyArg_ParseTupleAndKeywords(args, kw, "|O:delay", kwlist, &delay_obj))
{
return NULL;
}
if (delay_obj)
{
delay = cReactorUtil_ConvertDelay(delay_obj);
if (delay < 0)
{
return NULL;
}
}
/* Run once. */
if (iterate_internal(reactor, delay) < 0)
{
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactor_run(PyObject *self, PyObject *args)
{
cReactor *reactor;
reactor = (cReactor *)self;
/* We shouldn't get any args. */
if (!PyArg_ParseTuple(args, ":run"))
{
return NULL;
}
if (reactor->state != CREACTOR_STATE_STOPPED)
{
/* _RUNNING means they tried to nest reactor.run() calls, and we
don't allow that. _STOPPING means reactor.run() hasn't finished yet
(XXX:??) */
if (reactor->state == CREACTOR_STATE_RUNNING)
PyErr_SetString(PyExc_RuntimeError,
"the reactor was already running!");
else
PyErr_SetString(PyExc_RuntimeError,
"the reactor was trying to stop!");
return NULL;
}
/* Change our state to running. */
reactor->state = CREACTOR_STATE_RUNNING;
/* Fire the the startup system event. */
fireSystemEvent_internal(reactor, "startup");
/* "Begin at the beginning", the King said, very gravely, "and go on
till you come to the end: then stop." */
while (reactor->state != CREACTOR_STATE_STOPPED)
{
if (iterate_internal(reactor, -1) < 0)
{
return NULL;
}
}
/* do cleanup when we stop running */
cReactorThread_freeThreadpool(reactor);
Py_INCREF(Py_None);
return Py_None;
}
PyObject *
cReactor_crash(PyObject *self, PyObject *args)
{
cReactor *reactor;
reactor = (cReactor *)self;
/* Args */
if (!PyArg_ParseTuple(args, ":crash"))
{
return NULL;
}
/* Move the state to done. */
reactor->state = CREACTOR_STATE_STOPPED;
Py_INCREF(Py_None);
return Py_None;
}
void
cReactor_AddTransport(cReactor *reactor, cReactorTransport *transport)
{
/* Add the new transport into the list. This steals a reference. */
transport->next = reactor->transports;
reactor->transports = transport;
++(reactor->num_transports);
/* PollFD array is now stale */
reactor->pollfd_stale = 1;
}
static int
cReactor_init(cReactor *reactor)
{
PyObject *when_threaded;
PyObject *init_threading;
PyObject *obj;
static const char * interfaces[] =
{
"IReactorCore",
"IReactorTime",
"IReactorTCP",
"IReactorThreads",
};
/* Create the __implements__ attribute. */
obj = cReactorUtil_MakeImplements(interfaces, sizeof(interfaces) / sizeof(interfaces[0]));
if (!obj)
{
return -1;
}
/* Add the tuple into the attr dict. */
if (PyDict_SetItemString(reactor->attr_dict, "__implements__", obj) != 0)
{
Py_DECREF(obj);
return -1;
}
/* Add an attribute named __class__. We will use our type object. */
obj = (PyObject *)reactor->ob_type;
if (PyDict_SetItemString(reactor->attr_dict, "__class__", obj) != 0)
{
return -1;
}
/* Set our state. */
reactor->state = CREACTOR_STATE_STOPPED;
/* We need to know when threading has begun. */
when_threaded = cReactorUtil_FromImport("twisted.python.threadable",
"whenThreaded");
if (! when_threaded)
{
return -1;
}
/* Get our initThreading method. */
init_threading = Py_FindMethod(cReactor_methods,
(PyObject *)reactor,
"initThreading");
if (! init_threading)
{
Py_DECREF(when_threaded);
return -1;
}
/* Register a callback. */
obj = PyObject_CallFunction(when_threaded, "(O)", init_threading);
Py_DECREF(when_threaded);
Py_DECREF(init_threading);
Py_XDECREF(obj);
if (! obj)
{
return -1;
}
/* initialize signal handlers, signal-delivering pipes, */
if (iterate_internal_init(reactor))
return -1;
return 0;
}
PyObject *
cReactor_New(void)
{
cReactor *reactor;
/* Create a new object. */
cReactorType.ob_type = &PyType_Type;
reactor = PyObject_New(cReactor, &cReactorType);
/* No control pipe descriptors. */
reactor->ctrl_pipe = -1;
/* The object's attribute dictionary. */
reactor->attr_dict = PyDict_New();
/* List of timed methods. */
reactor->timed_methods = NULL;
/* Event triggers and the deferred list. */
reactor->event_triggers = NULL;
/* List of transports */
reactor->transports = NULL;
reactor->num_transports = 0;
/* Array of pollfd structs. */
reactor->pollfd_array = NULL;
reactor->pollfd_size = 0;
reactor->pollfd_stale = 0;
/* No thread job queue, or thread pool to start with. */
reactor->multithreaded = 0;
reactor->main_queue = NULL;
reactor->thread_pool = NULL;
reactor->worker_queue = NULL;
reactor->req_thread_pool_size = 3;
/* Attempt to initialize it. */
if ( (! reactor->attr_dict)
|| (cReactor_init(reactor) < 0))
{
Py_DECREF((PyObject *)reactor);
return NULL;
}
return (PyObject *)reactor;
}
static void
cReactor_dealloc(PyObject *self)
{
cReactor *reactor;
cReactorTransport *transport;
cReactorTransport *target;
reactor = (cReactor *)self;
Py_DECREF(reactor->attr_dict);
reactor->attr_dict = NULL;
cReactorUtil_DestroyDelayedCalls(reactor);
reactor->timed_methods = NULL;
cSystemEvent_FreeTriggers(reactor->event_triggers);
reactor->event_triggers = NULL;
transport = reactor->transports;
while (transport)
{
target = transport;
transport = transport->next;
Py_DECREF(target);
}
reactor->transports = NULL;
free(reactor->pollfd_array);
reactor->pollfd_array = NULL;
PyObject_Del(self);
}
static PyObject *
cReactor_getattr(PyObject *self, char *attr_name)
{
cReactor *reactor;
PyObject *obj;
reactor = (cReactor *)self;
/* First check for a method with the given name.
*/
obj = Py_FindMethod(cReactor_methods, self, attr_name);
if (obj)
{
return obj;
}
/* Py_FindMethod raises an exception if it does not find the mthod */
PyErr_Clear();
/* Special case! Woo */
if (!strcmp("__dict__", attr_name))
{
return reactor->attr_dict;
}
/* Now check the attribute dictionary. */
obj = PyDict_GetItemString(reactor->attr_dict, attr_name);
/* If we didn't find anything raise PyExc_AttributeError. */
if (!obj)
{
PyErr_SetString(PyExc_AttributeError, attr_name);
return NULL;
}
/* PyDict_GetItemString returns a borrowed reference so we need to incref
* it before returning it.
*/
Py_INCREF(obj);
return obj;
}
static PyObject *
cReactor_repr(PyObject *self)
{
char buf[100];
snprintf(buf, sizeof(buf) - 1, "<cReactor instance %p>", self);
buf[sizeof(buf) - 1] = 0x00;
return PyString_FromString(buf);
}
/* The cReactor type. */
static PyTypeObject cReactorType =
{
PyObject_HEAD_INIT(NULL)
0,
"cReactor", /* tp_name */
sizeof(cReactor), /* tp_basicsize */
0, /* tp_itemsize */
cReactor_dealloc, /* tp_dealloc */
NULL, /* tp_print */
cReactor_getattr, /* tp_getattr */
NULL, /* tp_setattr */
NULL, /* tp_compare */
cReactor_repr, /* tp_repr */
NULL, /* tp_as_number */
NULL, /* tp_as_sequence */
NULL, /* tp_as_mapping */
NULL, /* tp_hash */
NULL, /* tp_call */
NULL, /* tp_str */
NULL, /* tp_getattro */
NULL, /* tp_setattro */
NULL, /* tp_as_buffer */
0, /* tp_flags */
NULL, /* tp_doc */
NULL, /* tp_traverse */
NULL, /* tp_clear */
NULL, /* tp_richcompare */
0, /* tp_weaklistoffset */
};
/* vim: set sts=4 sw=4: */
syntax highlighted by Code2HTML, v. 0.9.1