package POE::Component::IKC::Responder; ############################################################ # $Id: Responder.pm 168 2006-11-16 19:57:48Z fil $ # Based on tests/refserver.perl # Contributed by Artur Bergman # Revised for 0.06 by Rocco Caputo # Turned into a module by Philp Gwyn # # Copyright 1999,2001,2002,2004 Philip Gwyn. All rights reserved. # This program is free software; you can redistribute it and/or modify # it under the same terms as Perl itself. # # Contributed portions of IKC may be copyright by their respective # contributors. use strict; use vars qw($VERSION @ISA @EXPORT @EXPORT_OK $ikc); use Carp; use Data::Dumper; use POE qw(Session); use POE::Component::IKC::Specifier; use Scalar::Util qw(reftype); require Exporter; @ISA = qw(Exporter); @EXPORT = qw(create_ikc_responder $ikc); $VERSION = '0.1904'; sub DEBUG { 0 } ############################################################################## #---------------------------------------------------- # This is just a convenient way to create only one responder. sub create_ikc_responder { __PACKAGE__->spawn(); } sub spawn { my($package)=@_; return 1 if $ikc; POE::Session->create( package_states => [ $package, [qw( _start _stop request post call raw_message post2 remote_error register unregister default register_local publish retract subscribe unsubscribe published monitor inform_monitors shutdown do_you_have ping sig_INT )] ]); return 1; } #---------------------------------------------------- # Accept POE's standard _start message, and start the responder. sub _start { my($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION]; DEBUG and warn "$$: Responder started.\n"; $kernel->alias_set('IKC'); # allow it to be called by name # $kernel->signal(INT=>'sig_INT'); # sig_INT is empty, so don't bother $ikc=POE::Component::IKC::Responder::Object->new($kernel, $session); $heap->{self}=$ikc; } sub _stop { DEBUG and warn "$$: $_[HEAP] responder _stop\n"; # use YAML qw(Dump); # use Data::Denter; # warn Denter $poe_kernel; } #---------------------------------------------------- # Shutdown everything IKC related that we know about sub shutdown { my($kernel, $heap)=@_[KERNEL, HEAP]; $heap->{self}->shutdown($kernel); } #---------------------------------------------------- # Foreign kernel called something here sub request { my($kernel, $heap, $request) = @_[KERNEL, HEAP, ARG0]; $heap->{self}->request($request); } #---------------------------------------------------- # Register foreign kernels so that we can send states to them sub register { my($heap, $channel, $rid, $aliases) = @_[HEAP, SENDER, ARG0, ARG1]; $heap->{self}->register($channel, $rid, $aliases); } #---------------------------------------------------- # Register new aliases for local kernel sub register_local { my($heap, $aliases) = @_[HEAP, ARG0]; $heap->{self}->register_local($aliases); } #---------------------------------------------------- # Unregister foreign kernels when this disconnect (say) sub unregister { my($kernel, $heap, $channel, $rid, $aliases) = @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $heap->{self}->unregister($channel, $rid, $aliases); } #---------------------------------------------------- # Set a default foreign channel to send messages to sub default { my($heap, $name) = @_[HEAP, ARG0]; $heap->{self}->default($name); } ############################################################################## ## This state allows sessions to monitor a remote kernel #---------------------------------------------------- # Watch any activity regarding a foreign kernel sub monitor { my($heap, $name, $states, $sender) = @_[HEAP, ARG0, ARG1, SENDER]; $heap->{self}->monitor($sender, $name, $states); return } ############################################################################## ## These are the 4 states that interact with the foreign kernel #---------------------------------------------------- # Send a request to the foreign kernel sub post { my($heap, $to, $params, $sender) = @_[HEAP, ARG0, ARG1, SENDER]; $heap->{self}->post($to, $params, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel sub post2 { my($heap, $to, $sender, $params) = @_[HEAP, ARG0, ARG1, ARG2]; # use Data::Dumper; # warn "post2 params=", Dumper $params; $heap->{self}->post($to, $params, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel and ask it to provide # the state's return value back sub call { my($kernel, $heap, $sender, $to, $params, $rsvp) = @_[KERNEL, HEAP, SENDER, ARG0, ARG1, ARG2]; $heap->{self}->call($to, $params, $rsvp, $sender); return; } #---------------------------------------------------- # Send a raw message over. use at your own risk :) # This is useful for sending errors to remote ClientLite sub raw_message { my($heap, $msg, $sender) = @_[HEAP, ARG0, ARG1]; $heap->{self}->send_msg($msg, $sender); } #---------------------------------------------------- # Remote kernel had an error sub remote_error { my($heap, $msg) = @_[HEAP, ARG0]; warn "$$: Remote error: $msg\n"; } ############################################################################## # publish/retract/subscribe mechanism of setting up foreign sessions #---------------------------------------------------- sub publish { my($kernel, $heap, $sender, $session, $states)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $session||=$sender; $heap->{self}->publish($session, $states); } #---------------------------------------------------- sub published { my($kernel, $heap, $which)=@_[KERNEL, HEAP, ARG0]; $heap->{self}->published($which); } #---------------------------------------------------- sub retract { my($heap, $sender, $session, $states)= @_[HEAP, SENDER, ARG0, ARG1]; $session||=$sender; $heap->{self}->retract($session, $states); } #---------------------------------------------------- sub subscribe { my($kernel, $heap, $sender, $sessions, $callback)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $sessions=[$sessions] unless ref $sessions; return unless @$sessions; if($callback and 'CODE' ne ref $callback) { $sender = $sender->ID if ref $sender; my $state=$callback; $callback=sub { DEBUG and warn "Subscription callback to '$state'\n"; $kernel->post($sender, $state, @_); }; } $heap->{self}->subscribe($sessions, $callback, $sender->ID); } # Called by a foreign IKC session # We respond with the session, or with "NOT $specifier"; sub do_you_have { my($kernel, $heap, $param)=@_[KERNEL, HEAP, ARG0]; my $ses=specifier_parse($param->[0]); die "Bad state $param->[0]\n" unless $ses; my $self=$heap->{self}; DEBUG and warn "Wants to subscribe to ", specifier_name($ses), "\n"; if(exists $self->{'local'}{$ses->{session}} and (not $ses->{state} or exists $self->{'local'}{$ses->{session}}{$ses->{state}} )) { $ses->{kernel}||=$kernel->ID; # make sure we uniquely identify DEBUG and warn "Allowed (we are $ses->{kernel})\n"; return [$ses, $kernel->ID]; # this session } else { DEBUG and warn specifier_name($ses), " is not published in this kernel\n"; return "NOT ".specifier_name($ses); } } #---------------------------------------------------- sub unsubscribe { my($kernel, $heap, $sessions)=@_[KERNEL, HEAP, ARG0]; $heap->{self}->unsubscribe($sessions); } #---------------------------------------------------- sub ping { "PONG"; } #---------------------------------------------------- # User wants to kill process / kernel sub sig_INT { my ($heap, $kernel)=@_[HEAP, KERNEL]; DEBUG && warn "$$: Responder::sig_INT\n"; $kernel->sig_handled(); return; } #---------------------------------------------------- # User wants to kill process / kernel sub inform_monitors { my ($heap)=$_[HEAP]; $heap->{self}->inform_monitors(@_[ARG0..$#_]); } ############################################################################## ############################################################################## # Here is the object interface package POE::Component::IKC::Responder::Object; use strict; use Carp; use POE::Component::IKC::Specifier; use POE::Component::IKC::Proxy; use POE::Component::IKC::LocalKernel; use POE qw(Session); use Data::Dumper; sub DEBUG { 0 } sub DEBUG2 { 0 } sub DEBUGM { DEBUG or 0 } sub new { my($package, $kernel, $session)=@_; my $self=bless { 'local'=>{IKC=>{remote_error=>1, # these states are auto-published do_you_have=>1, ping=>1, }, }, remote=>{}, rsvp=>{}, kernel=>{}, channel=>{}, default=>{}, monitors=>{}, poe_kernel=>$kernel, # myself=>$session->ID, }, $package; } #---------------------------------------------------- # shutdown sub shutdown { my($self, $kernel)=@_; DEBUG and warn "$$: Some one wants us to go away... off we go\n"; # kill our alias $kernel->alias_remove('IKC'); # tell every channel to shutdown while(my($rid, $c)=each %{$self->{channel}}) { DEBUG and warn "$$: Posting shutdown to $rid (id=$c)\n"; $kernel->post($c, 'shutdown'); } # tell monitors to shutdown $self->inform_monitors('*', 'shutdown'); # kill pending subscription states foreach my $uevent (keys %{$self->{pending_subscription}}) { $self->_remove_state($uevent); } # use YAML qw(Dump); # warn Dump $kernel; } #---------------------------------------------------- # Foreign kernel called something here sub request { my($self, $request)=@_;; my($kernel)=@{$self}{qw(poe_kernel)}; DEBUG2 and warn "IKC request=", Dumper $request; # We ignore the kernel for now, but we should really use it to decide # weither we should run the request or not my $to=specifier_parse($request->{event}); eval { die "$request->{event} isn't a valid specifier" unless $to; my $args=$request->{params}; ### allow proxied states to have multiple ARGs if($to->{state} eq 'IKC:proxy') { $to->{state}=$args->[0]; $args=$args->[1]; DEBUG and warn "IKC proxied request for ", specifier_name($to), "\n"; } else { DEBUG and warn "IKC request for ", specifier_name($to), "\n"; $args=[$args]; } # this is where we'd catch a disconnect message # 2001/07 : eh? # find out if the state we want to get at has been published if(exists $self->{rsvp}{$to->{session}} and exists $self->{rsvp}{$to->{session}}{$to->{state}} and $self->{rsvp}{$to->{session}}{$to->{state}} ) { $self->{rsvp}{$to->{session}}{$to->{state}}--; DEBUG and warn "Allow $to->{session}/$to->{state} is now $self->{rsvp}{$to->{session}}{$to->{state}}\n"; } elsif(not exists $self->{'local'}{$to->{session}}) { my $p=$self->published; die "Session '$to->{session}' is not available for remote kernels:", join "\n", '', map({ " $_=>[" . join(', ', @{$p->{$_}}) . "]"} keys %$p), ''; } elsif(not exists $self->{'local'}{$to->{session}}{$to->{state}}) { die "Session '$to->{session}' has not published state '", $to->{state}, "'\n"; } # maybe caller specified #arg? This got into $msg->{rsvp}, which # went to the remote side, then came back here as $to if(exists $to->{args}) { push @$args, $to->{args}; # it goes on the end } my $session=$kernel->alias_resolve($to->{session}); die "Unknown session '$to->{session}'\n" unless $session; # warn "No FROM" unless $request->{from}; _thunked_post($request->{rsvp}, [$session, $to->{state}, @$args], $request->{from}, $request->{wantarray}); }; # Error handling consists of posting a "remote_error" state to # the foreign kernel. # $request->{errors_to} is set by the local IKC::Channel if($@) { chomp($@); my $err=$@.' ['.specifier_name($to).']'; $err.=' sent by ['.specifier_name($request->{from}).']' if $request->{from}; warn "$err\n"; DEBUG && warn "$$: Error in request: $err\n"; unless($request->{is_error}) # don't send an error message back { # if this was an error itself $self->send_msg({ event=>$request->{errors_to}, params=>$err, is_error=>1, }); } else { warn $$, Dumper $request; } } } #---------------------------------------------------- # Register foreign kernels so that we can send states to them sub register { my($self, $channel, $rid, $aliases)=@_; $aliases=[$aliases] if not ref $aliases; my($kernel)=@{$self}{qw(poe_kernel)}; if($self->{channel}{$rid}) { warn "$$: Remote kernel '$rid' already exists\n"; return; } else { DEBUG and warn "$$: Registered remote kernel '$rid'\n"; $self->{channel}{$rid}=$channel; $self->{remote}{$rid}=[]; # list of proxy sessions $self->{alias}{$rid}=$aliases; $self->{default}||=$rid; } foreach my $name (@$aliases) { unless(defined $name) { warn "$$: attempt to register undefined remote kernel alias\n"; next; } if($self->{kernel}{$name}) { DEBUG and warn "$$: Remote alias '$name' already exists\n"; next; } DEBUG and warn "$$: Registered alias '$name'\n"; $self->{kernel}{$name}=$rid; # find real remote ID $self->{remote}{$name}||=[]; # list of proxy sessions } $self->inform_monitors($rid, 'register'); return 1; } #---------------------------------------------------- # Register a new alias for the local kernel sub register_local { my($self, $aliases)=@_; $aliases=[$aliases] if not ref $aliases; my($kernel)=@{$self}{qw(poe_kernel)}; my $rid=$kernel->ID; DEBUG and warn "$$: Registering local kernel '$rid'\n"; $self->{local_channel}||=POE::Component::IKC::LocalKernel->spawn->ID; my $channel=$self->{local_channel}; $self->{channel}{$rid}||=$channel; $self->{remote}{$rid}||=[]; # list of proxy sessions $self->{alias}{$rid}||=[]; # use Data::Dumper; # die Dumper $aliases; foreach my $name (@$aliases) { unless(defined $name) { DEBUG and warn "$$: attempt to register undefined local kernel alias\n"; next; } if($self->{kernel}{$name}) { DEBUG and warn "$$: Local kernel alias '$name' already exists\n"; next; } DEBUG and warn "$$: Registered local alias '$name'\n"; $self->{kernel}{$name}=$rid; # find real remote ID $self->{remote}{$name}||=[]; # list of proxy sessions push @{$self->{alias}{$rid}}, $name; } return 1; } #---------------------------------------------------- sub default { my($self, $name) = @_; if(exists $self->{kernel}{$name}) { $self->{default}=$self->{kernel}{$name}; } elsif(exists $self->{channel}{$name}) { $self->{default}=$name; } else { carp "We do not know the kernel $name.\n"; return; } DEBUG and warn "Default kernel is on channel $name.\n"; } #---------------------------------------------------- # Unregister foreign kernels when they disconnect (say) sub unregister { my($self, $channel, $rid, $aliases)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; return unless $rid; unless($aliases) { unless($self->{channel}{$rid}) { # unregister one alias only $aliases=[$rid]; undef $rid; } } elsif(not ref $aliases) { $aliases=[$aliases]; } my @todo; if($rid) { if($self->{channel}{$rid}) { # this is in fact the real name DEBUG and warn "Unregistered kernel '$rid'.\n"; $self->inform_monitors($rid, 'unregister'); $self->{'default'}='' if $self->{'default'} eq $rid; $kernel->post($self->{channel}{$rid}, 'close'); delete $self->{channel}{$rid}; # delete $self->{monitors}{$rid}; $aliases||=delete $self->{alias}{$rid}; push @todo, $rid; } else { warn "$rid isn't a channel???\n"; } } foreach my $name (@$aliases) { next unless defined $name; # delete $self->{monitors}{$name}; if($self->{kernel}{$name}) { DEBUG and warn "Unregistered kernel alias '$name'.\n"; delete $self->{kernel}{$name}; $self->{'default'}='' if $self->{'default'} eq $name; push @todo, $name; } else { DEBUG and warn "Already done: $name\n"; next; } } # tell the proxies they are no longer needed foreach my $name (@todo) { if($name) { foreach my $alias (@{$self->{remote}{$name}}) { $self->{poe_kernel}->post($alias, '_delete'); } delete $self->{remote}{$name}; } } return 1; } #---------------------------------------------------- # Internal function that does all the work of preparing a request to be sent sub send_msg { my($self, $msg, $sender)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; my $e=$msg->{rsvp} ? 'call' : 'post'; my $to=specifier_parse($msg->{event}); unless($to) { die "Bad state ", Dumper $msg; } unless($to) { warn "Bad or missing 'event' parameter '$msg->{event}' to poe://IKC/$e\n"; return; } unless($to->{session}) { warn "Need a session name in poe://IKC/$e\n"; return; } unless($to->{state}) { warn "Need a state name in poe://IKC/$e\n", Dumper $to; return; } my $name=$to->{kernel}||$self->{'default'}; unless($name) { warn "Unable to decide which kernel to send state '$to->{state}' to."; return; } DEBUG and warn "send_msg poe://IKC/$e to '", specifier_name($to), "'\n"; # This way the thunk session will proxy a request back to us if($sender and not $msg->{from} and # Leolo: question. doesnt .13 require bleadPOE? # sungo : i can put the offending code into a conditional # if you want $self->{poe_kernel}->can('alias_list')) { my $sid=$sender->ID if ref $sender; foreach my $a ($self->{poe_kernel}->alias_list($sender)) { $sid.=" ($a)"; if($self->{'local'}{$a}) { # SENDER published something $msg->{from}={ kernel=>$self->{poe_kernel}->ID, session=>$a, state=>'IKC:proxy', }; last; } } DEBUG2 and do { unless($msg->{from}) { warn "Session $sid didn't publish anything SENDER isn't set";#, Denter $self->{'local'}, $sender; } else { warn "Session $sid will be thunked"; } } } # This is where we should recurse $msg->{params} to turn anything # extravagant like a subref, $poe_kernel, session etc into a call back to # us. # $msg->{params}=$self->marshall($msg->{params}); # Get a list of channels to send the message to my @channels=$self->channel_list( $name ); unless(@channels) { warn "$$: MSG TO ", Dumper $to; warn (($name eq '*') ? "$$: Not connected to any foreign kernels.\n" : "$$: Unknown kernel '$name'.\n"); warn "$$: Known kernels: ". $self->channel_names; return 0; } # now send the message over the wire # hmmm.... i wonder if this could be stream-lined into a direct call my $count=0; my $rsvp; $rsvp=$msg->{rsvp} if exists $msg->{rsvp}; foreach my $channel (@channels) { # We need to be able to access this state w/out forcing folks # to use publish if($rsvp) { DEBUG and warn "Allow $rsvp->{session}/$rsvp->{state} once\n"; $self->{rsvp}{$rsvp->{session}}{$rsvp->{state}}++; } DEBUG2 and warn "Sending to '$channel'..."; if($kernel->call($channel, 'send', $msg)) { $count++; DEBUG2 and warn " done.\n"; } else { DEBUG2 and warn " failed.\n"; $self->{rsvp}{$rsvp->{session}}{$rsvp->{state}}-- if $rsvp; } } DEBUG2 and warn specifier_name($to), " sent to $count kernel(s).\n"; DEBUG and do {warn "$$: send_msg failed!\n" unless $count}; return $count; } #---------------------------------------------------- sub _true_type { my($self, $data, $can)=@_; my $r=ref $data; return unless $r; return reftype( $data ); } #---------------------------------------------------- sub marshall { my($self, $data)=@_; my $r=$self->_true_type($data, 'ikc_marshall'); return $data unless $r; if($r eq 'HASH') { foreach my $q (values %$data) { $data->{$q}=$self->marshall($data->{$q}) if ref $data->{$q}; } } elsif($r eq 'ARRAY') { foreach my $q (@$data) { $q=$self->marshall($q) if ref $q; } } elsif($r eq 'SCALAR') { $$data=$self->marshall($$data) if ref $$data; } elsif($r eq 'CODE') { my $q=Devel::Peek::CvGV($data); if($q=~/__ANON__$/) { warn "Can't marshall anonymous code ref $q\n"; return; } return "-IKC-CODEREF-$q"; } else { warn "Marshalling $r wouldn't be meaningful\n"; return; } return $data; } #---------------------------------------------------- sub demarshall { my($self, $data)=@_; my $r=$self->_true_type($data, 'ikc_demarshall'); unless($r) { if($r=~/^-IKC-CODEREF-(.+)-(\*[:\w]+)$/) { my $func=$2; my $rk=$1; die "need to call $func in $rk"; $data=sub {$poe_kernel->post(IKC=>'post', "poe://$rk/IKC/coderef"=>$func)}; } return $data; } if($r eq 'HASH') { foreach my $q (values %$data) { $data->{$q}=$self->demarshall($data->{$q}) if ref $data->{$q}; } } elsif($r eq 'ARRAY') { foreach my $q (@$data) { $q=$self->demarshall($q) if ref $q; } } elsif($r eq 'SCALAR') { $$data=$self->demarshall($$data) if ref $$data; } return $data; } #---------------------------------------------------- ## Turn a kernel name or alias into a list of possible channels sub channel_list { my($self, $name)=@_; if($name eq '*') { # all kernels return values %{$self->{channel}}; } if(exists $self->{kernel}{$name}) { # kernel alias my $t=$self->{kernel}{$name}; unless(exists $self->{channel}{$t}) { die "What happened to channel $t!"; } return ($self->{channel}{$t}) } if(exists $self->{channel}{$name}) { # kernel ID return ($self->{channel}{$name}) } return (); } #---------------------------------------------------- ## Get a list of all the channel names (for debugging) sub channel_names { my($self, $name) = @_; if( $name and $name ne '*' ) { return "$name (".join(', ', grep { $self->{kernel}{$_} eq $name } keys %{ $self->{kernel} } ) .")"; } my @ret; foreach $name ( keys %{ $self->{channel} } ) { push @ret, $self->channel_names( $name ); } return @ret if wantarray; return join ', ', @ret; } #---------------------------------------------------- # Send a request to the foreign kernel sub post { my($self, $to, $params, $sender) = @_; $to="poe://$to" unless ref $to or $to=~/^poe:/; # use Data::Dumper; # warn "params=", Dumper $params; $self->send_msg({params=>$params, 'event'=>$to}, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel and ask it to provide # the state's return value back sub call { my($self, $to, $params, $rsvp, $sender)=@_; $to="poe://$to" if $to and not ref $to and $to!~/^poe:/; $rsvp="poe://$rsvp" if $rsvp and not ref $rsvp and $rsvp!~/^poe:/; unless($rsvp) { warn "$$: Missing 'rsvp' parameter in poe://IKC/call\n"; return; } my $t=specifier_parse($rsvp); unless($t) { warn "$$: Bad 'rsvp' parameter '$rsvp' in poe://IKC/call\n"; return; } $rsvp=$t; unless($rsvp->{state}) { DEBUG and warn Dumper $rsvp; warn "$$: rsvp state not set in poe://IKC/call\n"; return; } # Question : should $rsvp->{session} be forced to be the sender? # or will we allow people to point callbacks to other poe:kernel/sessions $rsvp->{session}||=$sender->ID if ref $sender; # maybe a session ID? if(not $rsvp->{session}) # no session alias { die "IKC call requires session IDs, please patch your version of POE\n"; } DEBUG2 and warn "RSVP is ", specifier_name($rsvp), "\n"; # use Data::Dumper; # warn "params=", Dumper $params; $self->send_msg({params=>$params, 'event'=>$to, rsvp=>$rsvp }, $sender ); } ############################################################################## # publish/retract/subscribe mechanism of setting up foreign sessions sub _aliases { my($kernel, $session)=@_; return $session unless ref $session; # make sure it's an object if($kernel->can('alias_list')) { # post-0.15 we register as all aliases for session my @a=$kernel->alias_list($session->ID); return @a if @a; } # pre-0.15 means that we register as session ID... which is less # then useful return $session->ID; } #---------------------------------------------------- sub publish { my($self, $session, $states)=@_; unless($session) { carp "You must specify the session that publishes these states"; return 0; } my @aliases =_aliases($self->{poe_kernel}, $session); foreach my $alias (@aliases) { $self->{'local'}{$alias}||={}; my $p=$self->{'local'}{$alias}; die "\$states isn't an array ref" unless ref($states) eq 'ARRAY'; foreach my $q (@$states) { DEBUG and print STDERR "Published poe:$alias/$q\n"; $p->{$q}=1; } } return 1; } #---------------------------------------------------- sub published { my($self, $session)=@_; if($session) { my $sid=$session; if(not ref $session) { $sid||=$self->{poe_kernel}->ID_lookup($session); } return [keys %{$self->{'local'}{$sid}}]; } my %ret; foreach my $sid (keys %{$self->{'local'}}) { $ret{$sid}=[keys %{$self->{'local'}{$sid}}]; } return \%ret; } #---------------------------------------------------- sub retract { my($self, $session, $states)=@_; unless($session) { warn "You must specify the session that publishes these states"; return 0; } my @aliases=_aliases($self->{poe_kernel}, $session); foreach my $alias (@aliases) { unless($self->{'local'}{$alias}) { warn "Session '$session' ($alias) didn't publish anything, can't retract"; return 0; } if($states) { my $p=$self->{'local'}{$alias}; foreach my $q (@$states) { delete $p->{$q}; } delete $self->{'local'}{$alias} unless keys %$p; } else { delete $self->{'local'}{$alias}; } } return 1; } #---------------------------------------------------- # Subscribing is in two phases # 1- we call a IKC/do_you_have to the foreign kernels # 2- the foreign responds with the session-specifier (if it has published it) # # We create a unique state for the callback for each subscription request # from the user session. It keeps count of how many subscription receipts # it receives and when they are all subscribed, it localy posts the callback # event. # # If more then one kernel sends a subscription receipt, first one is used. sub subscribe { my($self, $sessions, $callback, $s_id)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; $s_id||=join '-', caller; my($ses, $s, $fiddle); # unique identifier for this request $callback||=''; my $unique="IKC:receipt $s_id $callback"; my $id=$kernel->ID; my $count; foreach my $spec (@$sessions) { $ses=specifier_parse($spec); # Session specifier # Create the subscription receipt state $kernel->state($unique.$spec, sub { _subscribe_receipt($self, $unique, $spec, $_[ARG0]) }); $kernel->delay($unique.$spec, 60); # timeout $self->{pending_subscription}{$unique.$spec}=1; if($ses->{kernel}) { $count=$self->send_msg( {event=>{kernel=>$ses->{kernel}, session=>'IKC', state=>'do_you_have' }, params=>[$ses, $id], from=>{kernel=>$id, session=>'IKC'}, rsvp=>{kernel=>$id, session=>'IKC', state=>$unique.$spec}, }, ); # TODO What if this post failed? Session that posted this would # surely want to know } else { # Bleh. User shouldn't be that dumb die "You can't subscribe to a session within the current kernel."; } if($callback) # We need to keep some information around { # for when the subscription receipt comes in $self->{subscription_callback}{$unique}||= { callback=>$callback, sessions=>{}, yes=>[], count=>0, states=>{}, }; $fiddle=$self->{subscription_callback}{$unique}; $fiddle->{states}{$unique.$spec}=$count; $fiddle->{count}+=($count||0); $fiddle->{sessions}->{$spec}=1; if(not $count) { $fiddle->{count}++; $kernel->yield($unique.$spec); } else { DEBUG and warn "Sent $count subscription requests for [$spec]\n"; } } } return 1; } #---------------------------------------------------- # Subscription receipt # All foreign kernel's that have published the desired session # will send back a receipt. # Others will send a "NOT". # This will cause problems when the Proxy session creates an alias :( # # Callback is called we are "done". But what is "done"? When at least # one remote kernel has allowed us to subscribe to each session we are # waiting for. However, at some point we should give up. # # Scenarios : # one foreign kernel says 'yes', one 'no'. # - 'yes' creates a proxy # - 'no' decrements wait count # ... callback is called with session specifier # 2 foreign kernels says 'yes' # - first 'yes' creates a proxy # - 2nd 'yes' should also create a proxy! alias conflict (for now) # ... callback is called with session specifier # one foreign kernel says 'no', and after, another says no # - first 'no' decrements wait count # - second 'no' decrements wait count # ... Subscription failed! callback is called with specifier empty # no answers ever came... # - we wait forever :( sub _subscribe_receipt { my($self, $unique, $spec, $resp)=@_; my $accepted=1; my($ses, $rid)=@$resp if $resp and ref $resp and @$resp; my $del; if(not $ses or not ref $ses) { # REFUSED warn "$$: Refused to subscribe to $spec"; warn "$$: $resp" if $resp; $accepted=0; $del=$unique.$spec; } else { # accepted $ses=specifier_parse($ses); die "Bad state" unless $ses; my($kernel)=@{$self}{qw(poe_kernel)}; DEBUG and warn "Create proxy for ", specifier_name($ses), "\n"; my $proxy=POE::Component::IKC::Proxy->spawn( $ses->{kernel}, $ses->{session}, sub { $kernel->post(IKC=>'inform_monitors', $rid, 'subscribe', $ses)}, # 2002/04 monitor_stop is called in _stop, but we can't # can't post() from _stop, so we call() ourself sub { $kernel->call(IKC=>'inform_monitors', $rid, 'unsubscribe', $ses)}, ); push @{$self->{remote}{$ses->{kernel}}}, $proxy; } # cleanup the subscription request if(exists $self->{subscription_callback}{$unique}) { DEBUG and warn "Subscription [$unique] callback... "; my $fiddle=$self->{subscription_callback}{$unique}; if($fiddle->{sessions}->{$spec} and $accepted) { delete $fiddle->{sessions}->{$spec}; push @{$fiddle->{yes}}, $spec; } $fiddle->{count}-- if $fiddle->{count}; if(0==$fiddle->{count}) { DEBUG and warn "yes."; delete $self->{subscription_callback}{$unique}; # use Data::Denter; # warn "Fiddle =", Denter $fiddle; $fiddle->{callback}->($fiddle->{yes}); } else { DEBUG and warn "no, $fiddle->{count} left."; } $fiddle->{states}{$unique.$spec}--; if($fiddle->{states}{$unique.$spec}<=0) { # this state is no longer needed $del=$unique.$spec; } } else { # this state is no longer needed $del=$unique.$spec; } $self->_remove_state($del) if $del; } # clean-up sub _remove_state { my($self, $del)=@_; return unless $self->{pending_subscription}{$del}; my $kernel=$self->{poe_kernel}; $kernel->delay($del); $kernel->state($del); delete $self->{states}{$del}; delete $self->{pending_subscription}{$del}; } #---------------------------------------------------- sub unsubscribe { my($self, $sessions)=@_; $sessions=[$sessions] unless ref $sessions; return unless @$sessions; foreach my $ses (@$sessions) { $self->{poe_kernel}->post($ses, '_shutdown'); } } #---------------------------------------------------- sub ping { "PONG"; } #------------------------------------------------------------------ sub monitor { my($self, $sender, $name, $states)=@_; # dngor : also, if i keep a ref to $_[SENDER], does this mess # up stuff? # yeah, it will mess stuff up. take its ID instead; you can # post to an ID $sender=$sender->ID if ref $sender; my $spec=$name; $spec=specifier_part($spec, 'kernel') unless $spec eq '*'; undef($states) unless ref $states and keys %$states; if($states) { $states->{__name}=$name; DEBUGM and warn "$$: Session $sender is monitoring $spec\n"; $self->{monitors}{$spec} ||= {}; $self->{monitors}{$spec}{$sender}=$states; } else { DEBUGM and warn "$$: Session $sender is neglecting $spec\n"; delete $self->{monitors}{$spec}{$sender}; delete $self->{monitors}{$spec} if 0==keys %{$self->{monitors}{$spec}}; } return; } #---------------------------------------------------- # Tell monitors about something in foreign kernel # $rid == kernel name (in which case we ALSO inform about alises) or alias # or * (tell every monitor about something... future use) # $event == name of event we are informing about # @params == other stuff # NB : inform_monitors *MUST* post or call the monitors before exiting # because unregister will delete {monitors}{$rid} right after sub inform_monitors { my($self, $rid, $event, @params)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; $rid=specifier_part($rid, 'kernel') unless $rid eq '*'; croak "$$: No kernel in $_[1]!" unless $rid; my $real=1 if $self->{channel}{$rid}; DEBUGM and warn "$$: $rid is", ($real ? '' : "n't"), " real\n"; # got to be a better way of doing this... my @todo=($rid); push @todo, '*' unless $rid eq '*'; foreach my $n (@todo) { next unless $n; my $ms=$self->{monitors}{$n}; unless($ms and %$ms) { DEBUGM and warn "$$: No sessions care about $event $n\n"; next; } foreach my $sender (keys %$ms) { my $states=$ms->{$sender}; my $e=$states->{$event}; next unless $e; DEBUGM and warn "$$: Informing Session $sender/$e about $n/$event\n"; # ARG0 = what Session called the kernel # ARG1 = what kernel calls the kernel # ARG2 = true if kernel is name, false if alias # ARG3 = $states->{data} # ARG4.... = per-message info $kernel->post($sender, $e, $states->{__name}, $rid, $real, $states->{data}, @params); } } # $rid might be an alias to something else, inform about those as well if($self->{channel}{$rid}) { foreach my $ra (@{$self->{alias}{$rid}}) { $self->inform_monitors($ra, $event, @params); } } } ############################################################################## # These are Thunks used to post the actual state on behalf of the foreign # kernel. Currently, the thunks are used as a "proof of concept" and # to accur extra over head. :) # # The original idea was to make sure that $_[SENDER] would be something # valid to the posted state. However, garbage collection becomes a problem # If we are to allow the poste session to "hold on to" $_[SENDER]. # # Having the Responder save all Thunks until the related foreign connection # disapears would be wasteful. Or, POE could use soft-refs to track # active sessions and the thunk would "kill" itself in _stop. This # would force POE to require 5.005, however. # # Export thunk the quick way. *_thunked_post=\&POE::Component::IKC::Responder::Thunk::thunk; package POE::Component::IKC::Responder::Thunk; use strict; use POE qw(Session); use Data::Dumper; sub DEBUG { 0 } { my $name=__PACKAGE__.'00000000'; $name=~s/\W//g; sub thunk { # my($rsvp, $args, $from, $wantarray)=@_; POE::Session->create( package_states => [__PACKAGE__, [qw(_start _stop _default)]], args => [$name++, @_] ); } } sub _start { my($kernel, $heap, $name, $rsvp, $args, $from, $wantarray)= @_[KERNEL, HEAP, ARG0, ARG1, ARG2, ARG3, ARG4]; $heap->{from}=$from; $heap->{name}=$name; # warn "no FROM" unless $from; DEBUG and warn "$name created\n"; if($rsvp) { # foreign session wants returned value DEBUG and do { warn "Wants an array" if $wantarray}; my(@ret, $yes); if($wantarray) { @ret=$kernel->call(@$args); $yes = 0<@ret; } else { $ret[0]=$kernel->call(@$args); $yes = defined $ret[0]; } if($yes) { DEBUG and do { local $"=', '; warn "Posted response '@ret' to ", Dumper $rsvp; }; # This is the POSTBACK $POE::Component::IKC::Responder::ikc->send_msg( {params=>($wantarray ? \@ret : $ret[0]), event=>$rsvp}, $args->[0]); } } else { DEBUG and do { warn "Posting ", Dumper $args; }; $kernel->post(@$args); } } sub _stop { DEBUG and warn "$_[HEAP]->{name} delete\n"; } sub _default { my($kernel, $heap, $sender, $state, $args)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; return if $state =~ /^_/; unless($heap->{from}) { warn "$$: Attempt to respond to an anonymous foreign post with '$state'\n"; return; } $POE::Component::IKC::Responder::ikc->send_msg( {params=>[$state, $args], event=>$heap->{from}}, $sender ); } 1; __END__ =head1 NAME POE::Component::IKC::Responder - POE IKC state handler =head1 SYNOPSIS use POE; use POE::Component::IKC::Responder; create_ikc_responder(); ... $kernel->post('IKC', 'post', $to_state, $state); $ikc->publish('my_name', [qw(state1 state2 state3)]); =head1 DESCRIPTION This module implements an POE IKC state handling. The responder handles posting states to foreign kernels and calling states in the local kernel at the request of foreign kernels. There are 2 interfaces to the responder. Either by sending states to the 'IKC' session or the object interface. While the latter is faster, the better behaved, because POE is a cooperative system. =head1 STATES/METHODS =head2 C POE::Component::IKC::Responder->spawn(); This function creates the Responder session and object. Normally, L or L does this for you. But in some applications you want to make sure that the Responder is up and running before then. =head2 C Sends an state request to a foreign kernel. Returns logical true if the state was sent and logical false if it was unable to send the request to the foreign kernel. This does not mean that the foreign kernel was able to post the state, however. Parameters are as follows : =over 2 =item C Specifier for the foreign state. See L. =item C A reference to anything you want the foreign state to get as ARG0. If you want to specify several parameters, use an array ref and have the foreign state dereference it. $kernel->post('IKC', 'post', {kernel=>'Syslog', session=>'logger', state=>'log'}, [$faculty, $priority, $message]; or $ikc->post('poe://Syslog/logger/log', [$faculty, $priority, $message]); This logs an state with a hypothetical logger. =back See the L below. =head2 C This is identical to C, except it has a 3rd parameter that describes what state should receive the return value from the foreign kernel. $kernel->post('IKC', 'call', 'poe://Pulse/timeserver/time', '', 'poe:get_time'); or $ikc->call({kernel=>'Pulse', session=>'timeserver', state=>'time'}, '', 'poe://me/get_time'); This asks the foreign kernel 'Pulse' for the time. 'get_time' state in the current session is posted with whatever the foreign state returned. You do not have to publish callback messages, because they are temporarily published. How temporary? They can be posted from a remote kernel ONCE only. This, of course, is a problem because someone else could get in a post before the callback. Such is life. =over 3 =item C Identical to the C C parameter. =item C Identical to the C C parameter. =item C Event identification for the callback. That is, this state is called with the return value of the foreign state. Can be a C specifier or simply the name of an state in the current session. =back $kernel->call('IKC', 'post', {kernel=>'e-comm', session=>'CC', state=>'check'}, {CC=>$cc, expiry=>$expiry}, folder=>$holder}, 'is_valid'); # or $ikc->call('poe://e-comm/CC/check', {CC=>$cc, expiry=>$expiry}, folder=>$holder}, 'poe://me/is_valid'); This asks the e-comm server to check if a credit card number is "well formed". Yes, this would probably be massive overkill. The C state does not need to be published. IKC keeps track of the rsvp state and will allow the foreign kernel to post to it. See the L below. =head2 C Sets the default foreign kernel. You must be connected to the foreign kernel first. Unique parameter is the name of the foreign kernel kernel. Returns logical true on success. =head2 C Registers foreign kernel names with the responder. This is done during the negociation phase of IKC and is normaly handled by C. Will define the default kernel if no previous default kernel exists. First parameter is either a single kernel name. Second optional parameter is an array ref of kernel aliases to be registered. =head2 C Unregisters one or more foreign kernel names with the responder. This is done when the foreign kernel disconnects by L. If this is the default kernel, there is no more default kernel. First parameter is either a single kernel name or a kernel alias. Second optional parameter is an array ref of kernel aliases to be unregistered. This second parameter is a tad silly, because if you unregister a remote kernel, it goes without saying that all it's aliases get unregistered also. =head2 C Registers new aliases for local kernel with the responder. This is done internally by L and L. Will NOT define the default kernel. First and only parameter is an array ref of kernel aliases to be registered. =head2 C Tell IKC that some states in the current session are available for use by foreign sessions. =over 2 =item C A session alias by which the foreign kernels will call it. The alias must already have been registered with the local kernel. =item C Arrayref of states that foreign kernels may post. $kernel->post('IKC', 'publish', 'me', [qw(foo bar baz)]); # or $ikc->publish('me', [qw(foo bar baz)]); =back =head2 C Tell IKC that some states should no longer be available for use by foreign sessions. You do not have to retract all published states. =over 2 =item C Same as in C =item C Same as in C. If not supplied, *all* published states are retracted. $kernel->post('IKC', 'retract', 'me', [qw(foo mibble dot)]); # or $ikc->retract('me', [qw(foo)]); =back =head2 C $list=$kernel->call(IKC=>'published', $session); Returns a list of all the published states. $hash=$kernel->call(IKC=>'published'); Returns a hashref, keyed on session IDs. Values are arrayref of states published by that session. =over 2 =item C A session alias that you wish the list of states for. =back =head2 C Subscribe to foreign sessions or states. When you have subscribed to a foreign session, a proxy session is created on the local kernel that will allow you to post to it like any other local session. =over 3 =item C An arrayref of the session or state specifiers you wish to subscribe to. While the wildcard '*' kernel may be used, only the first kernel that acknowledges the subscription will be proxied. =item C Either a state (for the state interface) or a coderef (for the object interface) that is posted (or called) when all subscription requests have either been replied to, or have timed out. When called, it has a single parameter, an arrayref of all the specifiers that IKC was able to subscribe to. It is up to you to see if you have enough of the foreign sessions or states to get the job done, or if you should give up. While C isn't required, it makes a lot of sense to use it because it is only way to find out when the proxy sessions become available. Example : $ikc->subscribe([qw(poe://Pulse/timeserver)], sub { $kernel->post('poe://Pulse/timeserver', 'connect') }); (OK, that's a bad example because we don't check if we actually managed to subscribe or not.) $kernel->post('IKC', 'subscribe', [qw(poe://e-comm/CC poe://TouchNet/validation poe://Cantax/JDE poe://Informatrix/JDE) ], 'poe:subscribed', ); # and in state 'subscribed' sub subscribed { my($kernel, $specs)=@_[KERNEL, ARG0]; if(@$specs != 4) { die "Unable to find all the foreign sessions needed"; } $kernel->post('poe://Cantax/JDE', 'write', {...somevalues...}); } This is a bit of a mess. You might want to use the C parameter to L instead. Subscription receipt timeout is currently set to 120 seconds. =back =head2 C Reverse of the L method. However, it is currently not documented well. =head2 C Responds with 'PONG'. This is auto-published, so it can be called from remote kernels to see if the local kernel is still around. In fact, I don't see any other use for this. $kernel->post('poe://remote/IKC', 'ping', 'some_state'); $kernel->delay('some_state', 60); # timeout sub some_state { my($pong)=$_[ARG0]; return if $pong; # all is cool # YOW! Remote kernel timed out. RUN AROUND SCREAMING! } =head2 C Hopefully causes IKC and all peripheral sessions to dissapear in a puff of smoke. At the very least, any sessions left will be either not related to IKC or barely breathing (that is, only have aliases keeping them from GC). This should allow you to sanely shut down your process. =head2 C Allows a session to monitor the state of remote kernels. Currently, a session is informed when a remote kernel is registered, unregistered, subscribed to or unsubscribed from. One should make sure that the IKC alias exists before trying to monitor. Do this by calling L->spawn or in an C callback. $kernel->post('IKC', 'monitor', $remote_kernel_id, $states); =over 3 =item C<$remote_kernel_id> Name or alias or IKC specifier of the remote kernel you wish to monitor. You can also specify C<*> to monitor ALL remote kernels. If you do, your monitor will be called several times for a given kernel. This is because a kernel has one name and many aliases. For example, a remote kernel will have a unique ID within the local kernel, a name (passed to or generated by create_ikc_{kernel,client}) and a globaly unique ID assigned by the remote kernel via $kernel->ID. This suprises some people, but see the short note after the explanation of the callback parameters. Note: An effort has been made to insure that when monitoring C<*>, L is first called with the remote kernel's unique ID, and subsequent calls are aliases. This can't be guaranteed at this time, however. =item C<$states> Hashref that specifies what callback states are called when something interesting happens. If $state is empty or undef, the session will no longer monitor the given remote kernel. =back =head2 Callback states The following states can be monitored: =over 6 =item C Called when a remote kernel or alias is registered. This is equivalent to when the connection phase is finished. =item C Called when a remote kernel or alias is unregistered. This is equivalent to when the remote kernel disconnects. =item C Called when IKC succeeds in subscribing to a remote session. ARG3 is an IKC::Specifier of what was subscribed to. Use this for posting to the proxy session. =item C Called when IKC succeeds in unsubscribing from a remote session. =item C You are informed whenever someone tries to do a sane shutdown of IKC and all peripheral sessions. This will called only once, after somebody posts an IKC/shutdown event. =item C Little bit of data (can be scalar or reference) that is passed to the callback. This allows you to more magic. =back The callback states are called the following parameters : =over 6 =item C Name of the kernel that was passed to poe://*/IKC/monitor =item C ID or alias of remote kernel from IKC's point of view. =item C A flag. If this is true, then ARG1 is the remote kernel unique ID, if false, then ARG1 is an alias. This is mostly useful when monitoring C<*> and is in fact a bit bloatful. =item C C<$state-E{data}> ie any data you want. =item C ... C Callback-specific parameters. See above. =back Most of the time, ARG0 and ARG1 will be the same. Exceptions are if you are monitoring C<*> or if you supplied a full IKC event specifier to IKC/monitor rather then just a plain kernel name. =head2 Short note about monitoring all kernels with C<*> There are 2 reasons circonstances in which you will be monitoring all remote kernels : names known in advance and names unknown in advance. If you know kernel names in advance, you might be better off monitoring a given kernel name. However, you might prefer doing a case-like compare on ARG1 (with regexes, say). This would be useful for clustering, where various redundant kernels could follow a naming convention like [application]-[host], so you could compare C with C to find out if you want to set up specific things for that kernel. Not knowing the name of a kernel in advance, you could be doing some sort of autodiscovery or maybe just monitoring for debuging, logging or book-keeping purposes. You obviously don't want to do autodiscovery for every alias of every kernel, only for the "cannonical name", hence the need for ARG2. =head2 Short note the second You are more then allowed (in fact, you are encouraged) to use the same callback states when monitoring multiple kernels. In this case, you will find ARG0 useful for telling them apart. $kernel->post('IKC', 'monitor', '*', {register=>'remote_register', unregister=>'remote_unregister', subscribe=>'remote_subscribe', unsubscribe=>'remote_unsubscribe', data=>'magic box'}); Now remote_{register,unregister,subscribe,unsubscribe} is called for any remote kernel. $kernel->post('IKC', 'monitor', 'Pulse', {register=>'pulse_connected'}); C will be called in current session when you succeed in connecting to a kernel called 'Pulse'. $kernel->post('IKC', 'monitor', '*'); Session is no longer monitoring all kernels, only 'Pulse'. $kernel->post('IKC', 'monitor', 'Pulse', {}); Now we aren't even interested in 'Pulse'; =head1 EXPORTED FUNCTIONS =head2 C This function creates the Responder session and object. However, you don't need to call this directly, because L or L does this for you. Deprecated, use L. =head1 L An attempt is made to provide a sane SENDER param to called or posted states. If the calling session has published some states, SENDER is usable during the called state, but not afterwards. Don't try keeping a reference to this session. This makes callbacks a tad easier. Furthur, if you IKC/call a remote state, the SENDER during the callback will point back to the remote session. =head1 BUGS Sending session references and coderefs to a foreign kernel is a bad idea :) At some point it would be desirable to recurse through the paramerters and and turn any session references into state specifiers. C state in call is a bit problematic. IKC allows it to be posted to once, but doesn't check to see if the foreign kernel is the right one. C does not currently tell foreign kernels that have subscribed to a session/state that it has been retracted. C a state in a proxied foreign session doesn't work, for obvious reasons. =head1 AUTHOR Philip Gwyn, =head1 SEE ALSO L, L, L, L, L, L, L, L. =cut