package Gearman::Taskset;
use strict;
use Carp ();
use Gearman::Client;
use Gearman::Util;
use Gearman::ResponseParser::Taskset;
use Scalar::Util (); # i thought about weakening taskset's client, but might be too weak.
use Time::HiRes ();
sub new {
my $class = shift;
my Gearman::Client $client = shift;
my $self = $class;
$self = fields::new($class) unless ref $self;
$self->{waiting} = {};
$self->{need_handle} = [];
$self->{client} = $client;
$self->{loaned_sock} = {};
$self->{cancelled} = 0;
return $self;
}
sub DESTROY {
my Gearman::Taskset $ts = shift;
# During global cleanup this may be called out of order, and the client my not exist in the taskset.
return unless $ts->{client};
if ($ts->{default_sock}) {
$ts->{client}->_put_js_sock($ts->{default_sockaddr}, $ts->{default_sock});
}
while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
$ts->{client}->_put_js_sock($hp, $sock);
}
}
sub run_hook {
my Gearman::Taskset $self = shift;
my $hookname = shift || return;
my $hook = $self->{hooks}->{$hookname};
return unless $hook;
eval { $hook->(@_) };
warn "Gearman::Taskset hook '$hookname' threw error: $@\n" if $@;
}
sub add_hook {
my Gearman::Taskset $self = shift;
my $hookname = shift || return;
if (@_) {
$self->{hooks}->{$hookname} = shift;
} else {
delete $self->{hooks}->{$hookname};
}
}
# this method is part of the "Taskset" interface, also implemented by
# Gearman::Client::Async, where no tasksets make sense, so instead the
# Gearman::Client::Async object itself is also its taskset. (the
# client tracks all tasks). so don't change this, without being aware
# of Gearman::Client::Async. similarly, don't access $ts->{client} without
# going via this accessor.
sub client {
my Gearman::Taskset $ts = shift;
return $ts->{client};
}
sub cancel {
my Gearman::Taskset $ts = shift;
$ts->{cancelled} = 1;
if ($ts->{default_sock}) {
close($ts->{default_sock});
$ts->{default_sock} = undef;
}
while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
$sock->close;
}
$ts->{waiting} = {};
$ts->{need_handle} = [];
$ts->{client} = undef;
}
sub _get_loaned_sock {
my Gearman::Taskset $ts = shift;
my $hostport = shift;
if (my $sock = $ts->{loaned_sock}{$hostport}) {
return $sock if $sock->connected;
delete $ts->{loaned_sock}{$hostport};
}
my $sock = $ts->{client}->_get_js_sock($hostport);
return $ts->{loaned_sock}{$hostport} = $sock;
}
# event loop for reading in replies
sub wait {
my Gearman::Taskset $ts = shift;
my %opts = @_;
my $timeout;
if (exists $opts{timeout}) {
$timeout = delete $opts{timeout};
$timeout += Time::HiRes::time();
}
Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
if keys %opts;
my %parser; # fd -> Gearman::ResponseParser object
my ($rin, $rout, $eout) = ('', '', '');
my %watching;
for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
next unless $sock;
my $fd = $sock->fileno;
vec($rin, $fd, 1) = 1;
$watching{$fd} = $sock;
}
my $tries = 0;
while (!$ts->{cancelled} && keys %{$ts->{waiting}}) {
$tries++;
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
my $nfound = select($rout=$rin, undef, $eout=$rin, $time_left);
if ($timeout && $time_left <= 0) {
$ts->cancel;
return;
}
next if ! $nfound;
foreach my $fd (keys %watching) {
next unless vec($rout, $fd, 1);
# TODO: deal with error vector
my $sock = $watching{$fd};
my $parser = $parser{$fd} ||= Gearman::ResponseParser::Taskset->new(source => $sock,
taskset => $ts);
eval { $parser->parse_sock($sock); };
if ($@) {
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
# We're not in an accessable place here, so if all job servers fail we must die to prevent hanging.
die( "Job server failure: $@" );
}
}
}
}
# ->add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hashref>
# opts:
# -- uniq
# -- on_complete
# -- on_fail
# -- on_status
# -- retry_count
# -- fail_after_idle
# -- high_priority
# ->add_task(Gearman::Task)
#
sub add_task {
my Gearman::Taskset $ts = shift;
my $task;
if (ref $_[0]) {
$task = shift;
} else {
my $func = shift;
my $arg_p = shift; # scalar or scalarref
my $opts = shift; # $uniq or hashref of opts
my $argref = ref $arg_p ? $arg_p : \$arg_p;
unless (ref $opts eq "HASH") {
$opts = { uniq => $opts };
}
$task = Gearman::Task->new($func, $argref, $opts);
}
$task->taskset($ts);
$ts->run_hook('add_task', $ts, $task);
my $req = $task->pack_submit_packet;
my $len = length($req);
my $rv = $task->{jssock}->syswrite($req, $len);
die "Wrote $rv but expected to write $len" unless $rv == $len;
push @{ $ts->{need_handle} }, $task;
while (@{ $ts->{need_handle} }) {
my $rv = $ts->_wait_for_packet($task->{jssock});
if (! $rv) {
shift @{ $ts->{need_handle} }; # ditch it, it failed.
# this will resubmit it if it failed.
print " INITIAL SUBMIT FAILED\n";
return $task->fail;
}
}
return $task->handle;
}
sub _get_default_sock {
my Gearman::Taskset $ts = shift;
return $ts->{default_sock} if $ts->{default_sock};
my $getter = sub {
my $hostport = shift;
return
$ts->{loaned_sock}{$hostport} ||
$ts->{client}->_get_js_sock($hostport);
};
my ($jst, $jss) = $ts->{client}->_get_random_js_sock($getter);
$ts->{loaned_sock}{$jst} ||= $jss;
$ts->{default_sock} = $jss;
$ts->{default_sockaddr} = $jst;
return $jss;
}
sub _get_hashed_sock {
my Gearman::Taskset $ts = shift;
my $hv = shift;
my Gearman::Client $cl = $ts->{client};
for (my $off = 0; $off < $cl->{js_count}; $off++) {
my $idx = ($hv + $off) % ($cl->{js_count});
my $sock = $ts->_get_loaned_sock($cl->{job_servers}[$idx]);
return $sock if $sock;
}
return undef;
}
# returns boolean when given a sock to wait on.
# otherwise, return value is undefined.
sub _wait_for_packet {
my Gearman::Taskset $ts = shift;
my $sock = shift; # socket to singularly read from
my ($res, $err);
$res = Gearman::Util::read_res_packet($sock, \$err);
return 0 unless $res;
return $ts->_process_packet($res, $sock);
}
sub _ip_port {
my $sock = shift;
return undef unless $sock;
my $pn = getpeername($sock) or return undef;
my ($port, $iaddr) = Socket::sockaddr_in($pn);
return Socket::inet_ntoa($iaddr) . ":$port";
}
# note the failure of a task given by its jobserver-specific handle
sub _fail_jshandle {
my Gearman::Taskset $ts = shift;
my $shandle = shift;
my $task_list = $ts->{waiting}{$shandle} or
die "Uhhhh: got work_fail for unknown handle: $shandle\n";
my Gearman::Task $task = shift @$task_list or
die "Uhhhh: task_list is empty on work_fail for handle $shandle\n";
$task->fail;
delete $ts->{waiting}{$shandle} unless @$task_list;
}
sub _process_packet {
my Gearman::Taskset $ts = shift;
my ($res, $sock) = @_;
if ($res->{type} eq "job_created") {
my Gearman::Task $task = shift @{ $ts->{need_handle} } or
die "Um, got an unexpected job_created notification";
my $shandle = ${ $res->{'blobref'} };
my $ipport = _ip_port($sock);
# did sock become disconnected in the meantime?
if (! $ipport) {
$ts->_fail_jshandle($shandle);
return 1;
}
$task->handle("$ipport//$shandle");
push @{ $ts->{waiting}{$shandle} ||= [] }, $task;
return 1;
}
if ($res->{type} eq "work_fail") {
my $shandle = ${ $res->{'blobref'} };
$ts->_fail_jshandle($shandle);
return 1;
}
if ($res->{type} eq "work_complete") {
${ $res->{'blobref'} } =~ s/^(.+?)\0//
or die "Bogus work_complete from server";
my $shandle = $1;
my $task_list = $ts->{waiting}{$shandle} or
die "Uhhhh: got work_complete for unknown handle: $shandle\n";
my Gearman::Task $task = shift @$task_list or
die "Uhhhh: task_list is empty on work_complete for handle $shandle\n";
$task->complete($res->{'blobref'});
delete $ts->{waiting}{$shandle} unless @$task_list;
return 1;
}
if ($res->{type} eq "work_status") {
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
my $task_list = $ts->{waiting}{$shandle} or
die "Uhhhh: got work_status for unknown handle: $shandle\n";
# FIXME: the server is (probably) sending a work_status packet for each
# interested client, even if the clients are the same, so probably need
# to fix the server not to do that. just put this FIXME here for now,
# though really it's a server issue.
foreach my Gearman::Task $task (@$task_list) {
$task->status($nu, $de);
}
return 1;
}
die "Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]";
}
1;
syntax highlighted by Code2HTML, v. 0.9.1