<previous | contents | next> | Pyro Manual |
In various situations it is needed that the servers and the clients
are decoupled. In abstract terms this means that information producers
do not know nor care about the parties that are interested in the
information, and the information consumers do not know nor care about
the source or sources of the information. All they know is that they
produce or consume information on a certain subject.
Here does the Event Server fit in nicely. It is a third party that
controls the flow of information about certain subjects ("events"). A publisher
uses the Event Server to publish a message on a specific subject. A subscriber
uses the Event Server to subscribe itself to specific subjects, or to a
pattern that matches certain subjects. As soon as new information on a
subject is produced (an "event" occurs) all subscribers for this subject
receive the information. Nobody knows (and cares) about anybody else.
It is important to rembember that all events processed by the ES are
transient, which means they are not stored. If there is no listener, all
events disappear in the void. The store-and-forward programming model is
part of a messaging service, which is not what the ES is meant to do. It
is also important to know that all subscription data is transient. Once
the ES is stopped, all subscriptions are lost. The clients that are
subscribed are not notified of this! If no care is taken, they keep on
waiting forever for events to occur, because the ES doesn't know about
them anymore!
Usually your subscribers will receive the events in the order they are published. However, this is not guaranteed. If you rely on the exact order of receiving events, you must add some logic to check this (possibly by examining the event's timestamps). The chance of events not arriving in the order they were published is very, very small in a high-performance LAN. Only on very high server load, high network traffic, or a high-latency (WAN?) connection it is likely to occur.
Another thing to pay attention to is that the ES does not guarantee delivery of events. As mentioned above, the ES does not have a store-and-forward mechanism, but even if everything is up and running, the ES does not enhance Pyro's way of transporting messages. This means that it's still possible (perhaps due to a network error) that an event gets lost. For reliable, guaranteed, asynchronous message delivery you'll have to look somewhere else, sorry ;-)
The ES is a multithreaded server and will not work if your Python
installation doesn't have thread support. Publications are dispatched to
the subscribers in different threads, so they don't block eachother.
Please note that events may arrive at your listener in multithreaded
fashion! Pyro itself starts another thread in your listener to handle
the new event, possibly while the previous one is still being handled. Theevent
method may be called concurrently from several threads. If you
can't handle this, you have to use some form of thread locking in your
client! (see the threading
module on Semaphore
),
or Pyro.util.getLockObject
. Keep in mind that one of the
things that requires a thread lock is calling Pyro methods on a
Pyro object, because the proxy cannot be shared among different
threads!
To summarize:
es
command from the bin
directory (use es.bat
on windows). You can specify the
following arguments:
essvc
(Windows-only Event Server 'NT-service' control scripts)
essvc.bat
script to register it as a service.
Make sure you have Pyro properly installed in your Python's site-packages.
Or make sure to register the service using an account with the correct PYTHONPATH setting, so that Pyro can be located.
The ES service logs to C:\Pyro_ES_svc.log
where C: is your system drive.
HKLM\System\CurrentControlSet\Services\PyroES
, and
the value under that key is: PyroServiceArguments
(REG_SZ, it will be asked and created
for you when doing a essvc.bat install
from a command prompt).
Like the Name Server, if you want to start the Event Server from
within your own program, you can ofcourse start it by executing the
start script mentioned above. You could also use the EventServiceStarter
class from the Pyro.EventService.Server
module to start it
directly (this is what the script also does). Be sure to start it
in a separate process or thread because it will run in its own endless
loop. Have a look at the "AllInOne" example to see how you can start the
Event Server using the EventServiceStarter
class.
You probably have to wait until the ES has been fully started, call the waitUntilStarted()
method on
the starter object. It returns true if the ES has been started, false if it is not yet ready. You can provide a timeout
argument (in seconds).
To start the ES you will first have to start the Name Server because
the ES needs that to register itself. After starting the ES you will
then see something like this:
*** Pyro Event Server ***
Pyro Server Initialized. Using Pyro V3.2
URI= PYRO://192.168.1.40:7766/c0a8012804bc0c96774244d7d79d5db3
Event Server started.
PYRO_ES_QUEUESIZE
and PYRO_ES_BLOCKQUEUE
. Read about them in the Installation and Configuration chapter. By
default, the ES will allocate moderately sized queues for subscribers,
and publishers will block if such a queue becomes full (so no events get
lost). You might want to change this behavior. Every subscriber has its
own queue. So if the queue of a slow subscriber fills up, other
subscribers are still serviced nicely. By setting PYRO_NS_BLOCKQUEUE
to 0
, new messages for full queues are lost. This may be a
way to allow slow subscribers to catch up, because new messages are put
in the queue when there is room again.
Pyro.constants.EVENTSERVER_NAME
.
All subjects are case insensitive, so if you publish something on the
"stockquotes" channel it is the same as if you published it on the
"STOCKQuotes" channel.To publish an event on a certain topic, you need to have a Pyro
proxy object for the ES, and then call the publish
method:
publish(subjects, message)
where subjects
is a
subject name or a sequence of one or more subject names (strings), and message
is the actual message. The message can be any Python object (as long as
it can be pickled):
import Pyro.core
import Pyro.constants
Pyro.core.initClient()
es = Pyro.core.getProxyForURI("PYRONAME://"+Pyro.constants.EVENTSERVER_NAME)
es.publish("StockQuotes",( "SUN", 22.44 ) )
If you think this is too much work, or if you want to abstract from
the Pyro details, you can use the Publisher
base class
that is provided in Pyro.EventService.Clients.
Subclass
your event publishers from this class. The init takes care of locating
the ES, and you can just call the publish(subjects, message)
method of the base class. No ES proxy code needed:
import Pyro.EventService.Clients class StockPublisher(Pyro.EventService.Clients.Publisher): def __init__(self): Pyro.EventService.Clients.Publisher.__init__(self) def publishQuote(self, symbol, quote): self.publish("StockQuotes", ( symbol, quote) ) sp = StockPublisher() sp.publishQuote("SUN", 22.44)
__init__
of both the Publisher and the Subscriber
takes an optional ident
argument. Use this to specify the
authentication passphrase that will be used to connect to the ES (and
also to connect to the Name Server).
Pyro.constants.EVENTSERVER_NAME
. All subjects are case
insensitive, so if you publish something on the "stockquotes" channel it
is the same as if you published it on the "STOCKQuotes" channel.Event subscribers are a little more involved that event publishers.
This is becaue they are full-blown Pyro server objects that receive
calls from the ES when an event is published on one of the topics
you've subscribed to! Therefore, your clients (subscribers) need to call
the Pyro daemon's handleRequests
or requestLoop
(just like a Pyro server). They also have to call Pyro.core.initServer()
because
they also act as a Pyro server. Furthermore, they usually have to run
as a multithreaded server, because the ES may call it as soon as a new
event arrives and you are not done processing the previous event.
Single-threaded servers will build up a backlog of undelivered events
if this happens. You still get all events (with the original timestamp
- so you could skip events that "have expired" to catch up). You can
change this behavior by changing the before mentioned config items.
subscribe(subjects, subscriber) |
Subscribe to events. subjects is a subject name
or a sequence of one or more subject names (strings), and subscriber
is a proxy for your subscriber object |
subscribeMatch(subjectPatterns, subscriber) |
Subscribe to events based on patterns. subjectPatterns
is a subject pattern or a
sequence of one or more subject patterns (strings), and subscriber
is a proxy for your subscriber object |
unsubscribe(subjects, subscriber) |
Unsubscribe from subjects. subjects is a subject
or subject pattern or a
sequence thereof, and subscriber is a proxy for
your subscriber object |
But first, create a subscriber object,
which must be a Pyro object (or use delegation). The subscriber object
should have an event(self, event)
method. This method is
called by the ES if a new event arrives on a channel you subscribed to. event
is a Pyro.EventService.Event
object, which has the
following attributes:
msg |
the actual message that was published. Can be any Python object. |
subject |
the subject (string) on which the message was published.
(topic name) |
time |
the event's timestamp (from the server - synchronised for all
subscribers). A float, taken from time.time() |
To subscribe, call the subscribe
method of the ES with
the desired subject(s) and a proxy for your subscriber object. If you
want to subscribe to multiple subjects based on pattern
matching, call the subscribeMatch
method instead
with the desired subject pattern(s) and a proxy for your subscriber
object. The patterns are standard re
-style regex
expressions. See the standard re
module for more
information. The pattern '^STOCKQUOTE\\.S.*$'
matches
STOCKQUOTE.SUN, STOCKQUOTE.SAP but not STOCKQUOTE.IBM,
NYSE.STOCKQUOTE.SUN etcetera. Once more: the subjects are case
insensitive. The patterns are matched case insensitive too.
To unsubscribe, call the unsubscribe
method with the
subject(s) or pattern(s) you want to unsubscribe from, and a proxy for
the subscriber object that has been previously subscribed. This will
remove the subscriber from the subscription list and also from the
pattern match list if the subject occurs as a pattern there. The ES
(actually, Pyro) is smart enough to see if multiple (different) proxy
objects point to the same subscriber object and will act correctly.
Subscriber
base
class provided in Pyro.EventService.Clients.
Subclass your event listeners (subscribers) from this class. The init
takes care of locating the ES, and you can just call the subscribe(subjects)
,
subscribeMatch(subjectPatterns)
and unsubscribe(subjects)
methods on the object itself. No ES proxy code needed. This base class
also starts a Pyro daemon and by calling listen()
, your
code starts listening on incoming events. When you want to abort the
event loop, you have to call self.abort()
from within the
event handler method.
The multithreading of the event
method can be
controlled using the setThreading(threading)
method. If you threading=
0,
the threading will be switched off (it is on by default unless
otherwise configured). Your events will then arrive purely
sequentially, after processing each event. Call this method before
entering the requestLoop
or handleRequests
or listen.
A minimalistic event listener that prints the stockquote events
published by the example code above:
from Pyro.EventService.Clients import Subscriber
class StockSubscriber(Subscriber):
def __init__(self):
Subscriber.__init__(self)
self.subscribe("StockQuotes")
def event(self, event):
print "Got a stockquote: %s=%f" % (event.msg)
sub = StockSubscriber()
sub.listen()
The __init__
of both the Publisher and the Subscriber
takes an optional ident
argument. Use this to specify the
authentication passphrase that will be used to connect to the ES (and
also to connect to the Name Server).
setThreading(threading)
method of the Subscriber
base class
to control the threading. If you set threading=0, the threading will be switched off (it is on by default).
But a better way to process events sequentially is to harness the power of Python's Queue
module:
you create a Queue in your subscriber process that is filled with arriving events,
and you have a single event consumer process that takes events out of the queue one-by-one:
Pyro Event Server | multithreaded |
↓ | |
Subscriber(s) | multithreaded |
↓ | |
Queue.Queue | |
↓ | |
Consumer/Worker | singlethreaded |
<previous | contents | next> | Pyro Manual |