package Gearman::Taskset;

use strict;
use Carp ();
use Gearman::Client;
use Gearman::Util;
use Gearman::ResponseParser::Taskset;
use Scalar::Util ();  # i thought about weakening taskset's client, but might be too weak.
use Time::HiRes ();

sub new {
    my $class = shift;
    my Gearman::Client $client = shift;

    my $self = $class;
    $self = fields::new($class) unless ref $self;

    $self->{waiting}     = {};
    $self->{need_handle} = [];
    $self->{client}      = $client;
    $self->{loaned_sock} = {};
    $self->{cancelled}   = 0;

    return $self;
}

sub DESTROY {
    my Gearman::Taskset $ts = shift;

    # During global cleanup this may be called out of order, and the client my not exist in the taskset.
    return unless $ts->{client};

    if ($ts->{default_sock}) {
        $ts->{client}->_put_js_sock($ts->{default_sockaddr}, $ts->{default_sock});
    }

    while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
        $ts->{client}->_put_js_sock($hp, $sock);
    }
}

sub run_hook {
    my Gearman::Taskset $self = shift;
    my $hookname = shift || return;

    my $hook = $self->{hooks}->{$hookname};
    return unless $hook;

    eval { $hook->(@_) };

    warn "Gearman::Taskset hook '$hookname' threw error: $@\n" if $@;
}

sub add_hook {
    my Gearman::Taskset $self = shift;
    my $hookname = shift || return;

    if (@_) {
        $self->{hooks}->{$hookname} = shift;
    } else {
        delete $self->{hooks}->{$hookname};
    }
}

# this method is part of the "Taskset" interface, also implemented by
# Gearman::Client::Async, where no tasksets make sense, so instead the
# Gearman::Client::Async object itself is also its taskset.  (the
# client tracks all tasks).  so don't change this, without being aware
# of Gearman::Client::Async.  similarly, don't access $ts->{client} without
# going via this accessor.
sub client {
    my Gearman::Taskset $ts = shift;
    return $ts->{client};
}

sub cancel {
    my Gearman::Taskset $ts = shift;

    $ts->{cancelled} = 1;

    if ($ts->{default_sock}) {
        close($ts->{default_sock});
        $ts->{default_sock} = undef;
    }

    while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
        $sock->close;
    }

    $ts->{waiting}     = {};
    $ts->{need_handle} = [];
    $ts->{client}      = undef;
}

sub _get_loaned_sock {
    my Gearman::Taskset $ts = shift;
    my $hostport = shift;
    if (my $sock = $ts->{loaned_sock}{$hostport}) {
        return $sock if $sock->connected;
        delete $ts->{loaned_sock}{$hostport};
    }

    my $sock = $ts->{client}->_get_js_sock($hostport);
    return $ts->{loaned_sock}{$hostport} = $sock;
}

# event loop for reading in replies
sub wait {
    my Gearman::Taskset $ts = shift;

    my %opts = @_;

    my $timeout;
    if (exists $opts{timeout}) {
        $timeout = delete $opts{timeout};
        $timeout += Time::HiRes::time();
    }

    Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
        if keys %opts;

    my %parser;  # fd -> Gearman::ResponseParser object

    my ($rin, $rout, $eout) = ('', '', '');
    my %watching;

    for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
        next unless $sock;
        my $fd = $sock->fileno;
        vec($rin, $fd, 1) = 1;
        $watching{$fd} = $sock;
    }

    my $tries = 0;
    while (!$ts->{cancelled} && keys %{$ts->{waiting}}) {
        $tries++;

        my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
        my $nfound = select($rout=$rin, undef, $eout=$rin, $time_left);
        if ($timeout && $time_left <= 0) {
            $ts->cancel;
            return;
        }
        next if ! $nfound;

        foreach my $fd (keys %watching) {
            next unless vec($rout, $fd, 1);
            # TODO: deal with error vector

            my $sock   = $watching{$fd};
            my $parser = $parser{$fd} ||= Gearman::ResponseParser::Taskset->new(source  => $sock,
                                                                                taskset => $ts);
            eval { $parser->parse_sock($sock); };

            if ($@) {
                # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
                # We're not in an accessable place here, so if all job servers fail we must die to prevent hanging.
                die( "Job server failure: $@" );
            }
        }

    }
}

# ->add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hashref>
#      opts:
#        -- uniq
#        -- on_complete
#        -- on_fail
#        -- on_status
#        -- retry_count
#        -- fail_after_idle
#        -- high_priority
# ->add_task(Gearman::Task)
#

sub add_task {
    my Gearman::Taskset $ts = shift;
    my $task;

    if (ref $_[0]) {
        $task = shift;
    } else {
        my $func = shift;
        my $arg_p = shift;   # scalar or scalarref
        my $opts = shift;    # $uniq or hashref of opts

        my $argref = ref $arg_p ? $arg_p : \$arg_p;
        unless (ref $opts eq "HASH") {
            $opts = { uniq => $opts };
        }

        $task = Gearman::Task->new($func, $argref, $opts);
    }
    $task->taskset($ts);

    $ts->run_hook('add_task', $ts, $task);

    my $req = $task->pack_submit_packet;
    my $len = length($req);
    my $rv = $task->{jssock}->syswrite($req, $len);
    die "Wrote $rv but expected to write $len" unless $rv == $len;

    push @{ $ts->{need_handle} }, $task;
    while (@{ $ts->{need_handle} }) {
        my $rv = $ts->_wait_for_packet($task->{jssock});
        if (! $rv) {
            shift @{ $ts->{need_handle} };  # ditch it, it failed.
            # this will resubmit it if it failed.
            print " INITIAL SUBMIT FAILED\n";
            return $task->fail;
        }
    }

    return $task->handle;
}

sub _get_default_sock {
    my Gearman::Taskset $ts = shift;
    return $ts->{default_sock} if $ts->{default_sock};

    my $getter = sub {
        my $hostport = shift;
        return
            $ts->{loaned_sock}{$hostport} ||
            $ts->{client}->_get_js_sock($hostport);
    };

    my ($jst, $jss) = $ts->{client}->_get_random_js_sock($getter);
    $ts->{loaned_sock}{$jst} ||= $jss;

    $ts->{default_sock} = $jss;
    $ts->{default_sockaddr} = $jst;
    return $jss;
}

sub _get_hashed_sock {
    my Gearman::Taskset $ts = shift;
    my $hv = shift;

    my Gearman::Client $cl = $ts->{client};

    for (my $off = 0; $off < $cl->{js_count}; $off++) {
        my $idx = ($hv + $off) % ($cl->{js_count});
        my $sock = $ts->_get_loaned_sock($cl->{job_servers}[$idx]);
        return $sock if $sock;
    }

    return undef;
}

# returns boolean when given a sock to wait on.
# otherwise, return value is undefined.
sub _wait_for_packet {
    my Gearman::Taskset $ts = shift;
    my $sock = shift;  # socket to singularly read from

    my ($res, $err);
    $res = Gearman::Util::read_res_packet($sock, \$err);
    return 0 unless $res;
    return $ts->_process_packet($res, $sock);
}

sub _ip_port {
    my $sock = shift;
    return undef unless $sock;
    my $pn = getpeername($sock) or return undef;
    my ($port, $iaddr) = Socket::sockaddr_in($pn);
    return Socket::inet_ntoa($iaddr) . ":$port";
}

# note the failure of a task given by its jobserver-specific handle
sub _fail_jshandle {
    my Gearman::Taskset $ts = shift;
    my $shandle = shift;

    my $task_list = $ts->{waiting}{$shandle} or
        die "Uhhhh:  got work_fail for unknown handle: $shandle\n";

    my Gearman::Task $task = shift @$task_list or
        die "Uhhhh:  task_list is empty on work_fail for handle $shandle\n";

    $task->fail;
    delete $ts->{waiting}{$shandle} unless @$task_list;
}

sub _process_packet {
    my Gearman::Taskset $ts = shift;
    my ($res, $sock) = @_;

    if ($res->{type} eq "job_created") {
        my Gearman::Task $task = shift @{ $ts->{need_handle} } or
            die "Um, got an unexpected job_created notification";

        my $shandle = ${ $res->{'blobref'} };
        my $ipport = _ip_port($sock);

        # did sock become disconnected in the meantime?
        if (! $ipport) {
            $ts->_fail_jshandle($shandle);
            return 1;
        }

        $task->handle("$ipport//$shandle");
        push @{ $ts->{waiting}{$shandle} ||= [] }, $task;
        return 1;
    }

    if ($res->{type} eq "work_fail") {
        my $shandle = ${ $res->{'blobref'} };
        $ts->_fail_jshandle($shandle);
        return 1;
    }

    if ($res->{type} eq "work_complete") {
        ${ $res->{'blobref'} } =~ s/^(.+?)\0//
            or die "Bogus work_complete from server";
        my $shandle = $1;

        my $task_list = $ts->{waiting}{$shandle} or
            die "Uhhhh:  got work_complete for unknown handle: $shandle\n";

        my Gearman::Task $task = shift @$task_list or
            die "Uhhhh:  task_list is empty on work_complete for handle $shandle\n";

        $task->complete($res->{'blobref'});
        delete $ts->{waiting}{$shandle} unless @$task_list;

        return 1;
    }

    if ($res->{type} eq "work_status") {
        my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });

        my $task_list = $ts->{waiting}{$shandle} or
            die "Uhhhh:  got work_status for unknown handle: $shandle\n";

        # FIXME: the server is (probably) sending a work_status packet for each
        # interested client, even if the clients are the same, so probably need
        # to fix the server not to do that.  just put this FIXME here for now,
        # though really it's a server issue.
        foreach my Gearman::Task $task (@$task_list) {
            $task->status($nu, $de);
        }

        return 1;
    }

    die "Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]";

}

1;


syntax highlighted by Code2HTML, v. 0.9.1