package Thread::Pool::Simple;
use 5.008;
use strict;
use threads;
use threads::shared;
use warnings;
use Carp;
use Storable qw(nfreeze thaw);
use Thread::Queue;
use Thread::Semaphore;

our $VERSION = '0.23';

sub new {
    my ($class, %arg) = @_;
    my %config : shared
      = (min => ($arg{min} || 1),
         max => ($arg{max} || 10),
         load => ($arg{load} || 20),
         lifespan => ($arg{lifespan} || 50000),
         passid => ($arg{passid} || 0),
        );
    my %handler;
    for (qw(init pre do post)) {
        next unless exists $arg{$_} && ref $arg{$_} eq 'ARRAY';
        $handler{$_} = $arg{$_}
    }
    my $self = &share({});
    $self->{config} = \%config;
    $self->{pending} = Thread::Queue->new();
    $self->{submitted} = &share({});
    $self->{done} = &share({});
    my $state : shared = 0;
    $self->{state} = \$state;
    my $worker : shared = 0;
    $self->{worker} = \$worker;
    $self->{shutdown_lock} = Thread::Semaphore->new();
    bless $self, $class;
    $self->{shutdown_lock}->down();
    async {
        $self->_run(\%handler);
        $self->{shutdown_lock}->up();
    }->detach();
    return $self;
}

sub _run : locked method {
    my ($self, $handler) = @_;
    while (1) {
        last if $self->terminating();
        if ($self->busy()) {
            $self->_increase($handler);
        }
        else {
            sleep 1;
        }
        threads->yield();
    }
    my $worker = $self->{worker};
    {
        lock $$worker;
        cond_wait $$worker while $$worker;
    }
}

sub _increase : locked method {
    my ($self, $handler) = @_;
    my $max = do { lock %{$self->{config}}; $self->{config}{max} };
    my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
    return unless $worker < $max;

    $self->_handle_func($handler->{init});

    eval {
        threads->create(\&_handle, $self, $handler)->detach();
        lock ${$self->{worker}};
        ++${$self->{worker}};
    };
    carp "fail to add new thread: $@" if $@;
}

sub _handle {
    my ($self, $handler) = @_;

    $self->_handle_func($handler->{pre});

    my $do = $handler->{do};
    my $func = defined $do ? shift @$do : undef;
    my ($lifespan, $passid)
      = do {
          lock %{$self->{config}};
          @{$self->{config}}{qw(lifespan passid)}
      };
    eval {
        while (!$self->terminating()
               && $lifespan--
              ) {
            my ($id, $job) = unpack 'Na*', $self->{pending}->dequeue();
            $self->_state(-2) && last unless $id;
            $self->_drop($id) && next unless $self->job_exists($id);

            my $arg = thaw($job);
            my @ret;
            if ($id % 3 == 2) {  # void context
                if (defined $func) {
                    eval {
                        no strict 'refs';
                        scalar $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
                    };
                    $self->_drop($id);
                    next;
                }
            }
            elsif ($id % 3 == 1) { # list context
                if (defined $func) {
                    @ret = eval {
                        no strict 'refs';
                        $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
                    };
                }
            }
            else { # scalar context
                if (defined $func) {
                    $ret[0] = eval {
                        no strict 'refs';
                        $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
                    };
                }
            }

            $self->_drop($id) && next unless $self->job_exists($id);

            if ($@) {
                @ret = ('e', $@);
            }
            else {
                unshift @ret, 'n';
            }
            my $ret = nfreeze(\@ret);
            {
                lock %{$self->{done}};
                $self->{done}{$id} = $ret;
                cond_signal %{$self->{done}};
            }
        }
        continue {
            threads->yield();
        }
    };
    carp "job handling error: $@" if $@;

    $self->_handle_func($handler->{post});

    my $worker = $self->{worker};
    lock $$worker;
    --$$worker;
    cond_signal $$worker;
}

sub _handle_func {
    my ($self, $handler) = @_;
    return unless defined $handler;
    my @arg = @$handler;
    my $func = shift @arg;
    if (defined $func) {
        eval {
            no strict 'refs';
            $func->(@arg);
        };
        carp $@ if $@;
    }
}

sub _state : locked method {
    my $self = shift;
    my $state = $self->{state};
    lock $$state;
    return $$state unless @_;
    my $s = shift;
    $$state = $s;
    return $s;
}

sub join : locked method {
    my ($self, $nb) = @_;
    $self->_state(-1);
    my $max = do { lock %{$self->{config}}; $self->{config}{max} };
    $self->{pending}->enqueue((pack('Na*', 0, '')) x $max);
    return if $nb;
    $self->{shutdown_lock}->down();
    sleep 1; # cool down, otherwise may coredump while run tests
}

sub detach : locked method {
    my ($self) = @_;
    $self->join(1);
}

sub busy : locked method {
    my ($self) = @_;
    my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
    my ($min, $max, $load) = do { lock %{$self->{config}}; @{$self->{config}}{'min', 'max', 'load'} };
    my $pending = $self->{pending}->pending();

    # do not count the fake job added after join()
    $pending -= $max if $self->_state() == -1;
    return $worker < $min || $pending > $worker * $load;
}

sub terminating : locked method {
    my ($self) = @_;
    my $state = $self->_state();
    my $job = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
    return 1 if $state == -1 && !$job;
    return 1 if $state == -2;
    return;
}

sub config : locked method {
    my $self = shift;
    my $config = $self->{config};
    lock %$config;
    return %$config unless @_;
    %$config = (%$config, @_);
    return %$config;
}

sub add : locked method {
    my $self = shift;
    my $context = wantarray;
    $context = 2 unless defined $context; # void context = 2
    my $arg = nfreeze(\@_);
    my $id;
    while (1) {
        $id = int(rand(time()));
        next unless $id;
        ++$id unless $context == $id % 3;
        ++$id unless $context == $id % 3;
        {
            lock %{$self->{submitted}};
            next if $self->job_exists($id);
            {
                # this is necessary as some cancelled jobs may slip in
                lock %{$self->{done}};
                delete $self->{done}{$id};
            }
            $self->{pending}->enqueue(pack('Na*', $id, $arg));
            $self->{submitted}{$id} = 1;
        }
        last;
    }
    return $id;
}

sub job_exists : locked method {
    my ($self, $id) = @_;
    lock %{$self->{submitted}};
    return $self->{submitted}{$id};
}

sub job_done : locked method {
    my ($self, $id) = @_;
    lock %{$self->{done}};
    return $self->{done}{$id};
}

sub _drop : locked method {
    my ($self, $id) = @_;
    lock %{$self->{submitted}};
    delete $self->{submitted}{$id};
}

sub _remove : locked method {
    my ($self, $id, $nb) = @_;
    return if $id % 3 == 2;
    return unless $self->job_exists($id);
    my ($exist, $ret);
    {
        lock %{$self->{done}};
        if (!$nb) {
            cond_wait %{$self->{done}} until exists $self->{done}{$id};
            cond_signal %{$self->{done}} if 1 < keys %{$self->{done}};
        }
        $exist = ($ret) = delete $self->{done}{$id};
    }
    $self->_drop($id) if $exist;
    return $exist unless defined $ret;
    $ret = thaw($ret);
    my $err = shift @$ret;
    croak $ret->[0] if $err eq 'e';
    return ($exist, @$ret) if $id % 3 == 1;
    return ($exist, $ret->[0]);
}

sub remove : locked method {
    my ($self, $id) = @_;
    my ($exist, @ret) = $self->_remove($id);
    return @ret;
}


sub remove_nb : locked method {
    my ($self, $id) = @_;
    return $self->_remove($id, 1);
}

sub cancel : locked method {
    my ($self, $id) = @_;
    my ($exist) = eval { $self->remove_nb($id) };
    if (!$exist) {
        lock %{$self->{submitted}};
        $self->{submitted}{$id} = 0;
    }
}

sub cancel_all : locked method {
    my ($self) = @_;
    my @id = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
    for (@id) {
        $self->cancel($_);
    }
}

1;
__END__

=head1 NAME

Thread::Pool::Simple - A simple thread-pool implementation

=head1 SYNOPSIS

  use Thread::Pool::Simple;

  my $pool = Thread::Pool::Simple->new(
                 min => 3,           # at least 3 workers
                 max => 5,           # at most 5 workers
                 load => 10,         # increase worker if on average every worker has 10 jobs waiting
                 init => [\&init_handle, $arg1, $arg2, ...]   # run before creating worker thread
                 pre => [\&pre_handle, $arg1, $arg2, ...]   # run after creating worker thread
                 do => [\&do_handle, $arg1, $arg2, ...]     # job handler for each worker
                 post => [\&post_handle, $arg1, $arg2, ...] # run before worker threads end
                 passid => 1,        # whether to pass the job id as the first argument to the &do_handle
                 lifespan => 10000,  # total jobs handled by each worker
               );

  my ($id1) = $pool->add(@arg1); # call in list context
  my $id2 = $pool->add(@arg2);   # call in scalar conetxt
  $pool->add(@arg3)              # call in void context

  my @ret = $pool->remove($id1); # get result (block)
  my $ret = $pool->remove_nb($id2); # get result (no block)

  $pool->cancel($id1);           # cancel the job
  $pool->cancel_all();           # cancel all jobs

  $pool->join();                 # wait till all jobs are done
  $pool->detach();               # don't wait.

=head1 DESCRIPTION

C<Thread::Pool::Simple> provides a simple thread-pool implementaion
without external dependencies outside core modules.

Jobs can be submitted to and handled by multi-threaded `workers'
managed by the pool.

=head1 AUTHOR

Jianyuan Wu, E<lt>jwu@cpan.orgE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright 2007 by Jianyuan Wu

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut




syntax highlighted by Code2HTML, v. 0.9.1