# BEGIN BPS TAGGED BLOCK {{{ # COPYRIGHT: # # This software is Copyright (c) 2003-2006 Best Practical Solutions, LLC # # # (Except where explicitly superseded by other copyright notices) # # # LICENSE: # # # This program is free software; you can redistribute it and/or # modify it under the terms of either: # # a) Version 2 of the GNU General Public License. You should have # received a copy of the GNU General Public License along with this # program. If not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 or visit # their web page on the internet at # http://www.gnu.org/copyleft/gpl.html. # # b) Version 1 of Perl's "Artistic License". You should have received # a copy of the Artistic License with this package, in the file # named "ARTISTIC". The license is also available at # http://opensource.org/licenses/artistic-license.php. # # This work is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # CONTRIBUTION SUBMISSION POLICY: # # (The following paragraph is not intended to limit the rights granted # to you to modify and distribute this software under the terms of the # GNU General Public License and is only of importance to you if you # choose to contribute your changes and enhancements to the community # by submitting them to Best Practical Solutions, LLC.) # # By intentionally submitting any modifications, corrections or # derivatives to this work, or any other work intended for use with SVK, # to Best Practical Solutions, LLC, you confirm that you are the # copyright holder for those contributions and you grant Best Practical # Solutions, LLC a nonexclusive, worldwide, irrevocable, royalty-free, # perpetual, license to use, copy, create derivative works based on # those contributions, and sublicense and distribute those contributions # and any derivatives thereof. # # END BPS TAGGED BLOCK }}} package SVK::Mirror::Backend::SVNRaPipe; use strict; use base 'Class::Accessor::Fast'; __PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors pid)); use POSIX 'EPIPE'; use Socket; use Storable qw(nfreeze thaw); use SVK::Editor::Serialize; use SVK::Util qw(slurp_fh); use SVK::Config; use SVK::I18N; =head1 NAME SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining =head1 SYNOPSIS my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR']) $generator = sub { shift @req }; $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator); $pra->rev_proplsit(3); $pra->replay(3, 0, 1, SVK::Editor->new); =head1 DESCRIPTION =cut sub new { my ($class, $ra , $gen) = @_; socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!"; my $self = $class->SUPER::new( { ra => $ra, requests => $gen, fh => $c, current_editors => 0, buf_call => [], unsent_buf => '' } ); if (my $pid = fork) { close $p; $self->pid($pid); return $self; } else { die "cannot fork: $!" unless defined $pid; close $c; } $self->fh($p); $File::Temp::KEEP_ALL = 1; # Begin external process for buffered ra requests and send response to parent. my $config = SVK::Config->svnconfig; my $max_editor_in_buf = $config ? $config->{config}->get( 'svk', 'ra-pipeline-buffer', '5' ) : 5; my $pool = SVN::Pool->new_default; local $SIG{INT} = 'IGNORE'; while ( my $req = $gen->() ) { $pool->clear; my ( $cmd, @arg ) = @$req; my $default_threshold = 2 * 1024 * 1024; @arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new( { textdelta_threshold => $config ? $config->{config}->get( 'svk', 'ra-pipeline-delta-threshold', "$default_threshold" ) : $default_threshold, cb_serialize_entry => sub { $self->_enqueue(@_); $self->try_flush } } ) : $_ } @arg; # Note that we might want to switch to bandwidth based buffering, while ($self->current_editors > $max_editor_in_buf) { $self->try_flush(1); } my $ret = $self->ra->$cmd(@arg); if ($cmd eq 'replay') { # XXX support other requests using editors ++$self->{current_editors}; $self->_enqueue([undef, 'close_edit']); } else { $self->_enqueue([$ret, $cmd]); } $self->try_flush(); } while ($#{$self->buf_call} >= 0) { $self->try_flush($p, 1) ; } exit; } sub _enqueue { my ($self, $entry) = @_; push @{$self->buf_call}, $entry; } sub try_flush { my $self = shift; my $wait = shift; my $max_write = $wait ? -1 : 10; if ($wait) { $self->fh->blocking(1); } else { $self->fh->blocking(0); my $wstate = ''; vec($wstate,fileno($self->fh),1) = 1; select(undef, $wstate, undef, 0);; return unless vec($wstate,fileno($self->fh),1); } my $i = 0; my $buf = $self->buf_call; while ( $#{$buf} >= 0 || length($self->unsent_buf) ) { if (my $len = length $self->unsent_buf) { if (my $ret = syswrite($self->fh, $self->unsent_buf)) { substr($self->{unsent_buf}, 0, $ret, ''); last if $ret != $len; } else { die if $! == EPIPE; return; } } last if $#{$buf} < 0; my $msg = nfreeze($buf->[0]); $msg = pack('N', length($msg)).$msg; if (my $ret = syswrite($self->fh, $msg)) { $self->{unsent_buf} .= substr($msg, $ret) if length($msg) != $ret; if ((shift @$buf)->[1] eq 'close_edit') { --$self->{current_editors} ; } } else { die if $! == EPIPE; # XXX: check $! for fatal last; } } } # Client code reading pipelined responses sub read_msg { my $self = shift; my ($len, $msg); read $self->fh, $len, 4 or Carp::confess $!; $len = unpack ('N', $len); my $rlen = read $self->fh, $msg, $len or die $!; return \$msg; } sub ensure_client_cmd { my ($self, @arg) = @_; # XXX: error message my @exp = @{$self->requests->()}; for (@exp) { my $arg = shift @arg; if ($_ eq 'EDITOR') { die unless UNIVERSAL::isa($arg, 'SVK::Editor'); return $arg; } Carp::confess "pipeline ra error: got $arg but expecting $_" if ($_ cmp $arg); } die join(',',@arg) if @arg; } sub rev_proplist { my $self = shift; $self->ensure_client_cmd('rev_proplist', @_); # read synchronous msg my $data = thaw( ${$self->read_msg} ); die 'inconsistent response' unless $data->[1] eq 'rev_proplist'; return $data->[0]; } sub replay { my $self = shift; my $editor = $self->ensure_client_cmd('replay', @_); my $baton_map = {}; my $baton_pool = {}; eval { while ((my $data = $self->read_msg )) { my ($next, $func, @arg) = @{thaw($$data)}; my $baton_at = SVK::Editor->baton_at($func); my $baton = $arg[$baton_at]; if ($baton_at >= 0) { $arg[$baton_at] = $baton_map->{$baton}; } my $pool = SVN::Pool->new; my $ret = $self->emit_editor_call($editor, $func, $pool, @arg); last if $func eq 'close_edit'; if ($func =~ m/^close/) { Carp::cluck $func unless $baton_map->{$baton}; delete $baton_map->{$baton}; delete $baton_pool->{$baton}; } if ($next) { # if we are keeping this parent baton, set the pool as the # default pool as well. $pool->default if $pool; $baton_pool->{$next} = $pool if $pool; $baton_map->{$next} = $ret } } }; if ($@) { kill 15, $self->pid; waitpid $self->pid, 0; $self->pid(undef); } # destroy the remaining pool that became default pools in order. delete $baton_pool->{$_} for reverse sort keys %$baton_pool; die $@ if $@; } sub emit_editor_call { my ($self, $editor, $func, $pool, @arg) = @_; my $ret; if ($func eq 'apply_textdelta') { my $svndiff = pop @arg; $ret = $editor->apply_textdelta(@arg, $pool); if ($ret && $#$ret > 0) { my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool); if (ref $svndiff) { # inline print $stream $$svndiff; } else { # filename open my $fh, '<', $svndiff or die $!; slurp_fh($fh, $stream); close $fh; unlink $svndiff; } close $stream; } } else { # do not emit the fabricated close_edit, as replay doesn't # give us that. We need that in the stream so the client code # of replay knows the end of response has reached. $ret = $editor->$func(@arg, $pool) unless $func eq 'close_edit'; } return $ret; } sub DESTROY { my $self = shift; return unless $self->pid; wait; } 1;