package IPC::PubSub::Cache; use strict; use warnings; use File::Spec; use Time::HiRes (); #method fetch (Str *@keys --> List of Pair) { ... } #method store (Str $key, Str $val, Num $time, Num $expiry) { ... } #method add_publisher (Str $chan, Str $pub) { ... } #method remove_publisher (Str $chan, Str $pub) { ... } #method get_index (Str $chan, Str $pub --> Int) { ... } #method set_index (Str $chan, Str $pub, Int $index) { ... } #method publisher_indices (Str $chan --> Hash of Int) { ... } sub fetch_data { my $self = shift; my $key = shift; return (($self->fetch("data:$key"))[0] || [])->[-1]; } sub store_data { my $self = shift; my $key = shift; my $val = shift; $self->store("data:$key" => $val, -1, 0); } sub modify { my $self = shift; my $key = shift; return $self->fetch_data($key) unless @_; my $with = shift; if (ref($with) eq 'CODE') { $self->lock("data:$key"); local $_ = $self->fetch_data($key); my $rv = $with->(); $self->store_data($key => $_); $self->unlock("data:$key"); return $rv; } else { $self->store_data($key => $with); return $with; } } sub get { my ($self, $chan, $orig, $curr) = @_; no warnings 'uninitialized'; sort { $a->[0] <=> $b->[0] } $self->fetch( map { my $pub = $_; my $index = $curr->{$pub}; map { "chan:$chan-$pub$_" } (($orig->{$pub}+1) .. $index); } keys(%$curr) ); } sub put { my ($self, $chan, $pub, $index, $msg, $expiry) = @_; $self->store("chan:$chan-$pub$index", $msg, Time::HiRes::time(), $expiry); $self->set_index($chan, $pub, $index); } use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC-PubSub-lock-'); my %locks; sub lock { my ($self, $chan) = @_; for my $i (1..10) { return if mkdir((LOCK . unpack("H*", $chan)), 0777); Time::HiRes::usleep(rand(250000)+250000); } } sub disconnect { } END { rmdir(LOCK . unpack("H*", $_)) for keys %locks; } sub unlock { my ($self, $chan) = @_; rmdir(LOCK . unpack("H*", $chan)); delete $locks{$chan}; } 1;