package Alzabo::RDBMSRules::PostgreSQL;
use strict;
use vars qw($VERSION);
use Alzabo::Exceptions ( abbr => [ 'recreate_table_exception' ] );
use Alzabo::RDBMSRules;
use Digest::MD5;
use Text::Balanced ();
use base qw(Alzabo::RDBMSRules);
use Params::Validate qw( validate_pos );
Params::Validate::validation_options( on_fail => sub { Alzabo::Exception::Params->throw( error => join '', @_ ) } );
$VERSION = 2.0;
1;
sub new
{
my $proto = shift;
my $class = ref $proto || $proto;
return bless {}, $class;
}
sub validate_schema_name
{
my $self = shift;
my $name = shift->name;
$self->_check_name($name, 'schema');
Alzabo::Exception::RDBMSRules->throw( error => "Schema name ($name) contains a single quote char (')" )
if index($name, "'") != -1;
}
sub validate_table_name
{
my $self = shift;
$self->_check_name( shift->name, 'table' );
}
sub validate_column_name
{
my $self = shift;
$self->_check_name( shift->name, 'column' );
}
sub _check_name
{
my $self = shift;
my $name = shift;
Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) must be at least one character long" )
unless length $name;
Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) is too long. Names must be 31 characters or less." )
if length $name > 31;
Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) must start with an alpha or underscore(_) and must contain only alphanumerics and underscores." )
unless $name =~ /\A[a-zA-Z]\w*\z/;
}
sub validate_column_type
{
my $self = shift;
my $type = uc shift;
my $table = shift;
if ( $table->primary_key_size > 1 )
{
return 'INT4' if $type =~ /^SERIAL4?$/;
return 'INT8' if $type eq 'BIGSERIAL' or $type eq 'SERIAL8';
}
my %simple_types = map { $_ => 1 } qw( ABSTIME
BIT
BIGINT
BIGSERIAL
BOOL
BOOLEAN
BOX
BYTEA
CHAR
CHARACTER
CIDR
CIRCLE
DATE
DECIMAL
FLOAT
FLOAT4
FLOAT8
INET
SMALLINT
INT
INTEGER
INT2
INT4
INT8
INTERVAL
MACADDR
MONEY
NUMERIC
OID
RELTIME
SERIAL
SERIAL4
SERIAL8
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TIMETZ
VARBIT
VARCHAR );
return 'INTEGER' if $type eq 'INT' || $type eq 'INT4';
return 'SERIAL' if $type eq 'SERIAL4';
return 'INT8' if $type eq 'BIGINT';
return $type if $simple_types{$type};
return $type if $type =~ /BIT\s+VARYING/;
return $type if $type =~ /CHARACTER\s+VARYING/;
return $type if $type =~ /\ABOX|CIRCLE|LINE|LSEG|PATH|POINT|POLYGON/;
Alzabo::Exception::RDBMSRules->throw( error => "Invalid column type: $type" );
}
sub validate_column_length
{
my $self = shift;
my $column = shift;
if ( defined $column->length )
{
Alzabo::Exception::RDBMSRules->throw( error => "Length is not supported except for char, varchar, decimal, float, and numeric columns (" . $column->name . " column)" )
unless $column->type =~ /\A(?:(?:VAR)?CHAR|CHARACTER|DECIMAL|FLOAT|NUMERIC|(?:VAR)?BIT|BIT VARYING)\z/i;
}
if ( defined $column->precision )
{
Alzabo::Exception::RDBMSRules->throw( error => "Precision is not supported except for decimal, float, and numeric columns" )
unless $column->type =~ /\A(?:DECIMAL|FLOAT|NUMERIC)\z/i;
}
}
# placeholder in case we decide to try to do something better later
sub validate_table_attribute { 1 }
sub validate_column_attribute
{
my $self = shift;
my %p = @_;
my $column = $p{column};
my $type = $column->type;
my $a = uc $p{attribute};
$a =~ s/\A\s//;
$a =~ s/\s\z//;
return if $a =~ /\A(?:UNIQUE\z|CHECK|CONSTRAINT|REFERENCES)/i;
Alzabo::Exception::RDBMSRules->throw( error => "Only column constraints are supported as column attributes" )
}
sub validate_primary_key
{
my $self = shift;
my $col = shift;
my $serial_col = (grep { $_->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/ } $col->table->primary_key)[0];
if ( defined $serial_col &&
$serial_col->name ne $col->name )
{
$serial_col->set_type( $serial_col->type =~ /^SERIAL4?$/
? 'INT4'
: 'INT8' );
}
}
sub validate_sequenced_attribute
{
my $self = shift;
my $col = shift;
Alzabo::Exception::RDBMSRules->throw( error => 'Non-number columns cannot be sequenced' )
unless $col->is_integer || $col->is_floating_point;
}
sub validate_index
{
my $self = shift;
my $index = shift;
foreach my $c ( $index->columns )
{
Alzabo::Exception::RDBMSRules->throw( error => "PostgreSQL does not support index prefixes" )
if defined $index->prefix($c)
}
Alzabo::Exception::RDBMSRules->throw( error => "PostgreSQL does not support fulltext indexes" )
if $index->fulltext;
}
sub type_is_integer
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type =~ /\A(?:
INT(?:2|4|8)?|
SMALLINT|
INTEGER|
OID|
SERIAL(?:4|8)?|
BIGSERIAL
)
\z
/x;
}
sub type_is_floating_point
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type =~ /\A(?:
DECIMAL|
FLOAT(?:4|8)?|
MONEY|
NUMERIC
)
\z
/x;
}
sub type_is_char
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type =~ /(?:CHAR|CHARACTER|TEXT)\z/;
}
sub type_is_date
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type eq 'DATE' || $self->type_is_datetime($col);
}
sub type_is_datetime
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type =~ /^TIMESTAMP/;
}
sub type_is_time
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type eq 'TIME';
}
sub type_is_time_interval
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type eq 'INTERVAL';
}
sub type_is_blob
{
my $self = shift;
my $col = shift;
my $type = uc $col->type;
return 1 if $type =~ /\ABYTEA\z/;
}
sub blob_type { return 'BYTEA' }
sub column_types
{
return ( qw( INTEGER
INT2
INT8
NUMERIC
FLOAT
FLOAT4
CHAR
VARCHAR
TEXT
BYTEA
DATE
TIME
TIMESTAMP
INTERVAL
SERIAL
BIGSERIAL
BOOLEAN
BIT
),
'BIT VARYING',
qw( INET
CIDR
MACADDR ) );
}
my %features = map { $_ => 1 } qw ( extended_column_types
constraints
functional_indexes
allows_raw_default
);
sub feature
{
shift;
return $features{+shift};
}
sub quote_identifiers { 1 }
sub quote_identifiers_character { '"' }
sub schema_sql
{
my $self = shift;
validate_pos( @_, { isa => 'Alzabo::Schema' } );
my $schema = shift;
my @sql = $self->SUPER::schema_sql($schema);
# This has to come at the end because we don't know which tables
# reference other tables.
foreach my $t ( $schema->tables )
{
foreach my $con ( grep { /\s*(?:check|constraint)/i } $t->attributes )
{
push @sql, $self->table_constraint_sql($t);
}
foreach my $fk ( $t->all_foreign_keys )
{
push @sql, $self->foreign_key_sql($fk);
}
}
return @sql;
}
sub table_sql
{
my $self = shift;
my $table = shift;
my $create_sequence = shift;
# Create table sequence by default
$create_sequence = 1 unless defined $create_sequence;
my $sql = qq|CREATE TABLE "| . $table->name . qq|" (\n |;
$sql .= join ",\n ", map { $self->column_sql($_) } $table->columns;
my @att = $table->attributes;
if (my @pk = $table->primary_key)
{
$sql .= ",\n";
$sql .= ' PRIMARY KEY (';
$sql .= join ', ', map { '"' . $_->name . '"' } @pk;
$sql .= ")\n";
}
$sql .= ")\n";
my @sql = ($sql);
foreach my $i ( $table->indexes )
{
push @sql, $self->index_sql($i);
}
if ($create_sequence)
{
foreach my $c ( grep { $_->sequenced } $table->columns )
{
push @sql, $self->_sequence_sql($c);
}
}
if (@att)
{
$sql .= ' ';
$sql .= join ' ', grep { ! /\s*(?:check|constraint)/i } @att;
}
$self->{state}{table_sql}{ $table->name } = 1;
return @sql;
}
sub _sequence_sql
{
my $self = shift;
my $col = shift;
return if $col->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/;
my $seq_name = $self->_sequence_name($col);
return qq|CREATE SEQUENCE "$seq_name";\n|;
}
sub _sequence_name
{
my $self = shift;
my $col = shift;
return join '___', $col->table->name, $col->name;
}
sub column_sql
{
my $self = shift;
my $col = shift;
my $p = shift; # hashref for skip_nullable, skip_default, && skip_name
my @default;
if ( ! $p->{skip_default} && defined $col->default )
{
my $def = $self->_default_for_column($col);
@default = ( "DEFAULT $def" );
}
my $type = $col->type;
my @length;
if ( defined $col->length )
{
my $length = '(' . $col->length;
$length .= ', ' . $col->precision if defined $col->precision;
$length .= ')';
$type .= $length;
}
my @nullable;
unless ( $p->{skip_nullable} )
{
@nullable = $col->nullable ? 'NULL' : 'NOT NULL';
}
my @name = $p->{skip_name} ? () : '"' . $col->name . '"';
my $sql .= join ' ', ( @name,
$type,
@default,
@nullable,
$col->attributes );
return $sql;
}
sub _default_for_column
{
my $self = shift;
my $col = shift;
return unless defined $col->default;
return $col->default if $col->is_numeric || $col->default_is_raw;
my $d = $col->default;
$d =~ s/'/''/g;
qq|'$d'|;
}
sub foreign_key_sql
{
my $self = shift;
my $fk = shift;
if ( grep { $_->is_primary_key } $fk->columns_from )
{
return unless $fk->from_is_dependent;
}
return () if $self->{state}{fk_sql}{ $fk->id };
my $sql = 'ALTER TABLE "';
$sql .= $fk->table_from->name;
$sql .= '" ADD CONSTRAINT ';
$sql .= $self->_fk_name($fk);
$sql .= ' FOREIGN KEY ( ';
$sql .= join ', ', map { '"' . $_->name . '"' } $fk->columns_from;
$sql .= ' ) REFERENCES "';
$sql .= $fk->table_to->name;
$sql .= '" (';
$sql .= join ', ', map { '"' . $_->name . '"' } $fk->columns_to;
$sql .= ')';
$sql .= ' ON DELETE ';
if ( $fk->from_is_dependent )
{
$sql .= 'CASCADE';
}
else
{
my @from = $fk->columns_from;
unless ( ( grep { $_->nullable } @from ) == @from )
{
$sql .= 'SET DEFAULT';
}
else
{
$sql .= 'SET NULL';
}
}
$self->{state}{fk_sql}{ $fk->id } = 1;
return $sql;
}
sub _fk_name
{
my $id = $_[1]->id;
return
( length $id > 63
? 'fk_' . Digest::MD5::md5_hex( $_[1]->id )
: $id
);
}
sub table_constraint_sql
{
my $self = shift;
my $table = shift;
return map { 'ALTER TABLE "' . $table->name . '" ADD ' . $_ } $table->attributes;
}
sub drop_table_sql
{
my $self = shift;
my $table = shift;
my $is_recreate = shift;
my @sql;
if ($is_recreate)
{
# We need to drop foreign keys referring to this table before
# we drop it.
foreach my $fk ( $table->all_foreign_keys )
{
push @sql, $self->drop_foreign_key_sql( $fk->reverse );
}
}
push @sql, $self->SUPER::drop_table_sql($table);
unless ($is_recreate)
{
foreach my $c ( $table->columns )
{
push @sql, $self->_drop_sequence_sql($c) if $c->sequenced;
}
}
return @sql;
}
sub _drop_sequence_sql
{
my $self = shift;
my $col = shift;
return if $col->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/;
my $seq_name = $self->_sequence_name($col);
return qq|DROP SEQUENCE "$seq_name";\n|;
}
sub drop_column_sql
{
my $self = shift;
my %p = @_;
recreate_table_exception();
}
sub recreate_table_sql
{
my $self = shift;
my %p = @_;
# This is a hack to prevent this SQL from being made multiple
# times (which would be pointless)
return () if $self->{state}{table_sql}{ $p{new}->name };
push @{ $self->{state}{deferred_sql} },
$self->_restore_foreign_key_sql( $p{new} );
return ( $self->_temp_table_sql( $p{new}, $p{old} ),
$self->drop_table_sql( $p{old}, 1 ),
# the 0 param indicates that we should not create sequences
$self->table_sql( $p{new}, 0 ),
$self->_restore_table_data_sql( $p{new}, $p{old} ),
$self->_drop_temp_table( $p{new} ),
);
}
sub _temp_table_sql
{
my $self = shift;
my $new_table = shift;
my $old_table = shift;
my $temp_name = "TEMP" . $new_table->name;
my $sql = "SELECT ";
$sql .= join ', ', map { '"' . $_->name . '"' } $old_table->columns;
$sql .= qq|\n INTO TEMPORARY "$temp_name" FROM "| . $old_table->name . '"';
return $sql;
}
sub _restore_table_data_sql
{
my $self = shift;
my $new_table = shift;
my $old_table = shift;
my @cols;
foreach my $column ( $new_table->columns )
{
my $old_name =
defined $column->former_name ? $column->former_name : $column->name;
push @cols, [ $column->name, $old_name ]
if $old_table->has_column($old_name);
}
my $temp_name = "TEMP" . $new_table->name;
my $sql = 'INSERT INTO "' . $new_table->name . '" (';
$sql .= join ', ', map { qq|"$_->[0]"| } @cols;
$sql .= " ) \n SELECT ";
$sql .= join ', ', map { qq|"$_->[1]"| } @cols;
$sql .= qq| FROM "$temp_name"|;
return $sql;
}
sub _drop_temp_table
{
my $self = shift;
my $table = shift;
my $temp_name = "TEMP" . $table->name;
return qq|DROP TABLE "$temp_name"|;
}
sub _restore_foreign_key_sql
{
my $self = shift;
my $table = shift;
my @sql;
foreach my $fk ( $table->all_foreign_keys )
{
push @sql, $self->foreign_key_sql($fk);
push @sql, $self->foreign_key_sql( $fk->reverse );
}
return @sql;
}
sub rename_sequences
{
my $self = shift;
my %p = @_;
return () if $self->{state}{rename_sequence_sql}{ $p{new}->name };
my @sql;
for my $old_col ( grep { $_->sequenced } $p{old}->columns )
{
my $new_col = $p{new}->column( $old_col->name )
or next;
my $old_seq = $self->_sequence_name($old_col);
my $new_seq = $self->_sequence_name($new_col);
push @sql,
qq|ALTER TABLE "$old_seq" RENAME TO "$new_seq";\n|;
}
$self->{state}{rename_sequence_sql}{ $p{new}->name } = 1;
return @sql;
}
sub drop_foreign_key_sql
{
my $self = shift;
my $fk = shift;
if ( grep { $_->is_primary_key } $fk->columns_from )
{
return unless $fk->from_is_dependent;
}
return () if $self->{state}{drop_fk_sql}{ $fk->id };
$self->{state}{drop_fk_sql}{ $fk->id } = 1;
return 'ALTER TABLE "' . $fk->table_from->name . '" DROP CONSTRAINT '
. $self->_fk_name($fk);
}
sub drop_index_sql
{
my $self = shift;
my $index = shift;
return 'DROP INDEX "' . $index->id . '"';
}
sub column_sql_add
{
my $self = shift;
my $col = shift;
return () if $self->{state}{table_sql}{ $col->table->name };
# Skip default and not null while adding column
my @sql = 'ALTER TABLE "' . $col->table->name . '" ADD COLUMN ' . $self->column_sql($col, { skip_default => 1, skip_nullable => 1 });
my $def = $self->_default_for_column($col);
if ($def)
{
push @sql,
( 'ALTER TABLE "' . $col->table->name . '" ALTER COLUMN "' .
$col->name . qq|" SET DEFAULT $def| );
}
if ( ! $col->nullable )
{
push @sql,
( 'UPDATE "' . $col->table->name
. '" SET "' . $col->name . qq|" = $def WHERE "|
. $col->name . '" IS NULL'
);
push @sql,
( 'ALTER TABLE "' . $col->table->name
. '" ADD CONSTRAINT "'
. $col->table->name . '_' . $col->name . '_not_null" CHECK ( "'
. $col->name . '" IS NOT NULL )'
);
}
return @sql;
}
sub column_sql_diff
{
my $self = shift;
my %p = @_;
return $self->drop_column_sql( new_table => $p{new}->table,
old => $p{old} )
unless $self->_columns_are_equivalent( $p{new}, $p{old} );
return;
}
sub _columns_are_equivalent
{
my $self = shift;
my $new = shift;
my $old = shift;
return 0 unless $self->_types_are_equivalent( $new, $old );
return 0 unless $self->_defaults_are_equivalent( $new, $old );
return 0 unless $new->sequenced == $old->sequenced;
my $new_att = join "\0", sort $new->attributes;
$new_att ||= '';
my $old_att = join "\0", sort $old->attributes;
$old_att ||= '';
return 0 unless $new_att eq $old_att;
return 1;
}
{
my %CanonicalTypes =
( BOOL => 'BOOLEAN',
INT => 'INTEGER',
INT4 => 'INTEGER',
INT2 => 'SMALLINT',
INT8 => 'BIGINT',
VARBIT => 'BIT VARYING',
VARCHAR => 'CHARACTER VARYING',
CHAR => 'CHARACTER',
FLOAT => 'DOUBLE PRECISION',
FLOAT8 => 'DOUBLE PRECISION',
FLOAT4 => 'REAL',
DECIMAL => 'NUMERIC',
);
sub _types_are_equivalent
{
shift;
my $col1 = shift;
my $col2 = shift;
my $type1 = $col1->type;
$type1 = $CanonicalTypes{ uc $type1 } if $CanonicalTypes{ uc $type1 };
my $type2 = $col2->type;
$type2 = $CanonicalTypes{ uc $type2 } if $CanonicalTypes{ uc $type2 };
$type1 .= join '-', grep { defined && length } $col1->length, $col1->precision;
$type2 .= join '-', grep { defined && length } $col1->length, $col1->precision;
return 1 if $type1 eq $type2;
}
}
sub _defaults_are_equivalent
{
my $self = shift;
my $col1 = shift;
my $col2 = shift;
return 1 if ! defined $col1->default && ! defined $col2->default;
return 0 if defined $col1->default && ! defined $col2->default;
return 0 if ! defined $col1->default && defined $col2->default;
if ( $col1->type =~ /^bool/i )
{
return 1
if lc substr( $col1->default, 0, 1 ) eq lc substr( $col2->default, 0, 1 );
return 0;
}
elsif ( $col1->is_date
&& $col1->default_is_raw
&& $col2->default_is_raw )
{
my $d1 = $col1->default;
my $d2 = $col2->default;
my $re = qr/^(?:current_timestamp|localtime|localtimestamp|now\(\))$/i;
return 1
if $col1->default =~ /$re/
&& $col2->default =~ /$re/;
}
return 1 if
$self->_default_for_column($col1) eq $self->_default_for_column($col2);
}
sub alter_primary_key_sql
{
my $self = shift;
my %p = @_;
my @sql;
push @sql, 'DROP INDEX "' . $p{old}->name . '_pkey"';
if ( $p{new}->primary_key )
{
push @sql, ( 'CREATE UNIQUE INDEX "' . $p{new}->name . '_pkey" ON "' .
$p{new}->name . '" (' .
( join ', ',
map { '"' . $_->name . '"' } $p{new}->primary_key ) . ')' );
}
return @sql;
}
# Actually, Postgres _can_ change table names, but it's inability to
# change most aspects of a column definition make it very difficult to
# properly change a table name and then change its column definitions,
# so its easier just to recreate the table
sub can_alter_table_name
{
0;
}
# Not sure if this is possible
sub alter_table_attributes_sql
{
my $self = shift;
recreate_table_exception();
}
sub alter_column_name_sql
{
my $self = shift;
my $column = shift;
return
( 'ALTER TABLE "' . $column->table->name . '" RENAME COLUMN ' .
$column->former_name . ' TO ' . $column->name
);
}
sub reverse_engineer
{
my $self = shift;
my $schema = shift;
my $driver = $schema->driver;
foreach my $table ( $driver->tables )
{
$table =~ s/^[^\.]+\.//;
$table =~ s/^\"|\"$//g;
print STDERR "Adding table $table to schema\n"
if Alzabo::Debug::REVERSE_ENGINEER;
my $t = $schema->make_table( name => $table );
my $t_oid = $driver->one_row( sql => 'SELECT oid FROM pg_class WHERE relname = ?',
bind => $table );
my $sql = <<'EOF';
SELECT a.attname, a.attnotnull, t.typname, a.attnum, a.atthasdef, a.atttypmod
FROM pg_attribute a, pg_type t
WHERE a.attrelid = ?
AND a.atttypid = t.oid
AND a.attnum > 0
EOF
$sql .= ' AND NOT a.attisdropped' if $driver->rdbms_version ge '7.3';
$sql .= ' ORDER BY attnum';
my %cols_by_number;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my %p;
$p{type} = $row->[2];
# has default
if ( $row->[4] )
{
$p{default} =
$driver->one_row
( sql => 'SELECT adsrc FROM pg_attrdef WHERE adrelid = ? AND adnum = ?',
bind => [ $t_oid, $row->[3] ] );
if ( $p{default} =~ /^nextval\(/ )
{
$p{sequenced} = 1;
$p{type} =~ s/(?:int(?:eger)?|numeric)/serial/;
}
else
{
# strip quotes (and type!) Postgres added
$p{default} =~ s/^'//; #'
if ( $driver->rdbms_version ge '7.4' )
{
# 'grotesque' becomes 'grotesque'::character
# varying. See
# src/backend/utils/adt/format_type.c
# This is from
# src/backend/util/adt/format_type.c
$p{default} =~ s/'(?:::[^']{3,})?$//;
$p{default} =~ s/\('(\w+)$/$1/;
}
else
{
$p{'default'} =~ s/'$//;
}
if ( $p{default} =~ /\([^\)]*\)/
|| $p{default} =~ /^(?:current_timestamp|localtime|localtimestamp|now)$/i )
{
$p{default_is_raw} = 1;
}
$p{default} = 'now()' if $p{default} eq 'now';
}
}
if ( $p{type} =~ /char/i )
{
# The real length is the value of: a.atttypmod - ((int32) sizeof(int32))
#
# Sure wish I knew how to figure this out in Perl.
# Its provided as VARHDRSZ in postgres.h but I can't
# really get at it. On my linux machine this is 4. A
# better way of doing this would be welcome.
$p{length} = $row->[5] - 4;
}
if ( lc $p{type} eq 'numeric' )
{
# see comment above.
my $num = $row->[5] - 4;
$p{length} = ($num >> 16) & 0xffff;
$p{precision} = $num & 0xffff;
}
$p{type} = 'char' if lc $p{type} eq 'bpchar';
print STDERR "Adding $row->[0] column to $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
my $col = $t->make_column( name => $row->[0],
nullable => ! $row->[1],
%p
);
if ( $col->is_integer )
{
if ( $self->_re_sequence_exists( $driver, $col ) )
{
$col->set_sequenced(1);
}
}
$cols_by_number{ $row->[3] } = $row->[0];
}
$sql = <<'EOF';
SELECT indkey
FROM pg_index
WHERE indisprimary
AND indrelid = ?
EOF
foreach my $cols ( $driver->column( sql => $sql,
bind => $t_oid ) )
{
my @cols = @cols_by_number{ split ' ', $cols };
local $" = ", ";
print STDERR "Setting @cols as primary key for $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
$t->add_primary_key( $_ ) for $t->columns( @cols );
}
my %i;
if ( $driver->rdbms_version ge '7.4' )
{
%i = $self->_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
}
else
{
%i = $self->_pre_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
}
foreach my $idx (values %i)
{
my @c = map { { column => $_ } } @{ $idx->{cols} };
print STDERR "Adding index "
. ( defined $idx->{'function'}
? $idx->{'function'}
: join(', ', map $_->name, @{$idx->{'cols'}} ) )
. " to $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
$t->make_index( columns => \@c,
unique => $idx->{unique},
function => $idx->{function},
);
}
$sql = <<'EOF';
SELECT consrc, array_to_string(conkey,' ')
FROM pg_constraint
WHERE conrelid = ?
AND contype = 'c'
EOF
my @att;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my ( $con, $cols ) = @$row;
# this stuff is not needed
$con =~ s/::(\w+)//g;
# If $cols ever covers more than one value then this will fail.
if ( $cols =~ /^(\d+)$/ )
{
my $column = $cols_by_number{$1};
print STDERR qq|Adding constraint "$con" to $table.$column\n|
if Alzabo::Debug::REVERSE_ENGINEER;
$t->column($column)->add_attribute("CHECK $con");
}
else
{
print STDERR qq|Adding constraint "$con" to $table\n|
if Alzabo::Debug::REVERSE_ENGINEER;
$t->add_attribute("CHECK $con");
}
}
}
# Foreign key info is available in PG 7.3.0 and higher (could fake
# it from pg_triggers with extensive gymnastics in version 7.0 and
# higher, but that's a little iffy)
$self->_foreign_keys_to_relationships($schema)
if $driver->rdbms_version ge '7.3';
}
sub _re_sequence_exists
{
my $self = shift;
my $driver = shift;
my $col = shift;
my $seq_name = $self->_sequence_name($col);
my $sql = <<'EOF';
SELECT 1
FROM pg_class
WHERE relname = ?
AND relkind = ?
EOF
return $driver->one_row( sql => $sql,
bind => [ $seq_name, 'S' ],
);
}
sub _74_indexes
{
my $self = shift;
my $driver = shift;
my $table = shift;
my $t_oid = shift;
my $cols_by_number = shift;
my $sql = <<'EOF';
SELECT indexrelid, indisunique, indkey, indnatts
FROM pg_index
WHERE indrelid = ?
AND NOT indisprimary
EOF
my %i;
INDEX:
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my $function;
my @col_numbers;
my $spi =
$driver->one_row
( sql => "SELECT COALESCE(indexprs,'') FROM pg_index WHERE indexrelid = ?",
bind => $row->[0] );
if ( $spi )
{
SPI_EXPRESSION:
while ( my $spi_expr =
Text::Balanced::extract_bracketed( $spi, '{}', '[^{}]*' ) )
{
# A wanton lack of respect for boundaries. 'Parse' the
# PostgreSQL internal SPI language to find out what
# columns are being accessed.
push( @col_numbers,
join( ' ',
$spi_expr =~ /:varattno (\d+)/g ) );
}
}
if ( scalar( @col_numbers ) > 1 )
{
# Index objects are not prepared to handle functional
# indexes that use more than one function.
die "Alzabo " . Alzabo->VERSION . " does not support functional"
. " indexes that are not strictly a single function."
. " There are multiple functions on an index on the "
. $table->name() . " table.\n";
}
elsif ( scalar( @col_numbers ) == 1 )
{
my $func =
$driver->one_row
( sql => 'SELECT pg_catalog.pg_get_indexdef( ?, 1, true)',
bind => $row->[0] );
# XXX - not sure if this is a good idea but it makes the
# rev-eng tests pass
$func =~ s/\b(\w+)::\w+\b/$1/g;
my $col_in_func = $1;
my @function;
for my $num ( split / +/, $row->[2] )
{
if ( $num == 0 )
{
push @function, $func;
}
else
{
push @function, $cols_by_number->{$num};
push @col_numbers, $num;
}
}
$function = join ', ', @function;
}
else
{
# A regular index!
@col_numbers = split / +/, $row->[2];
}
push( @{ $i{ $row->[0] }{cols} },
$table->columns( @{ $cols_by_number }{ @col_numbers } ) );
$i{ $row->[0] }{function} = $function;
$i{ $row->[0] }{unique} = $row->[1];
}
return %i;
}
sub _pre_74_indexes
{
my $self = shift;
my $driver = shift;
my $table = shift;
my $t_oid = shift;
my $cols_by_number = shift;
my $sql = <<'EOF';
SELECT c.oid, a.attname, i.indisunique, i.indproc, i.indkey
FROM pg_index i, pg_attribute a, pg_class c
WHERE i.indrelid = ?
AND NOT i.indisprimary
AND i.indexrelid = c.oid
AND c.oid = a.attrelid
AND a.attnum > 0
ORDER BY a.attnum
EOF
my %i;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my @col_names = @{ $cols_by_number }{ split ' ', $row->[4] };
my $function;
if ( $row->[3] && $row->[3] =~ /\w/ && $row->[3] ne '-' )
{
# some function names come out as "pg_catalog.foo"
$row->[3] =~ s/\w+\.(\w+)/$1/;
$function = uc $row->[3];
$function .= '(';
$function .= join ', ', @col_names;
$function .= ')';
}
push( @{ $i{ $row->[0] }{cols} },
$table->columns( @col_names ) );
$i{ $row->[0] }{unique} = $row->[2];
$i{ $row->[0] }{function} = $function;
}
return %i;
}
sub _foreign_keys_to_relationships
{
my ($self, $schema) = @_;
my $driver = $schema->driver;
my $constraint_sql = <<'EOF';
SELECT conrelid, confrelid,
array_to_string(conkey,' '),
array_to_string(confkey,' ')
FROM pg_constraint
WHERE contype = 'f'
EOF
my $table_sql = <<'EOF';
SELECT relname
FROM pg_class
WHERE oid = ?
EOF
my $column_sql = <<'EOF';
SELECT attname
FROM pg_attribute
WHERE attrelid = ?
AND attnum = ?
EOF
foreach my $row ( $driver->rows( sql => $constraint_sql ) )
{
my $from_table = $driver->one_row( sql => $table_sql,
bind => $row->[0] );
my $to_table = $driver->one_row( sql => $table_sql,
bind => $row->[1] );
# Column numbers are given as strings like "3 5"
my @from_cols = split ' ', $row->[2]
or die "Weird column specification $row->[2]";
my @to_cols = split ' ', $row->[3]
or die "Weird column specification $row->[3]";
# Convert column numbers to names
foreach (@from_cols)
{
$_ = $driver->one_row( sql => $column_sql,
bind => [$row->[0], $_] );
}
foreach (@to_cols)
{
$_ = $driver->one_row( sql => $column_sql,
bind => [$row->[1], $_] );
}
print STDERR "Adding $from_table foreign key to $to_table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
# Convert to Alzabo objects
$from_table = $schema->table($from_table);
$to_table = $schema->table($to_table);
@from_cols = map { $from_table->column($_) } @from_cols;
@to_cols = map { $to_table->column($_) } @to_cols;
# If there's a unique constraint on the "from" columns, treat
# is as 1-to-1. Otherwise treat it as n-to-1.
my $from_unique = 0;
# Only use PK as determination of uniqueness if the FK is from
# the _whole_ PK to something else. If the FK only includes
# _part_ of the PK then it is not unique.
$from_unique = 1
if ( ( @from_cols == grep { $_->is_primary_key } @from_cols )
&&
( @from_cols == $from_table->primary_key_size ) );
$from_unique = 1
if @from_cols == grep { $_->has_attribute( attribute => 'UNIQUE' ) } @from_cols;
INDEX:
foreach my $i ( grep { $_->unique } $from_table->indexes )
{
my @i_cols = $i->columns;
next unless @i_cols == @from_cols;
for ( my $x = 0; $x < @i_cols; $x++ )
{
next INDEX unless $i_cols[$x] eq $from_cols[$x];
}
$from_unique = 1;
}
my $from_cardinality = $from_unique ? '1' : 'n';
my $from_is_dependent =
( grep { $_->nullable || defined $_->default } @from_cols ) ? 0 : 1;
my $to_is_dependent =
( grep { $_->nullable || $_->is_primary_key } @to_cols ) ? 0 : 1;
$schema->add_relationship( cardinality => [ $from_cardinality, '1' ],
table_from => $from_table,
table_to => $to_table,
columns_from => \@from_cols,
columns_to => \@to_cols,
from_is_dependent => $from_is_dependent,
to_is_dependent => $to_is_dependent,
);
}
}
sub rules_id
{
return 'PostgreSQL';
}
__END__
=head1 NAME
Alzabo::RDBMSRules::PostgreSQL - PostgreSQL specific database rules
=head1 SYNOPSIS
use Alzabo::RDBMSRules::PostgreSQL;
=head1 DESCRIPTION
This module implements all the methods descibed in Alzabo::RDBMSRules
for the PostgreSQL database. The syntax rules follow those of the 7.0
releases. Older versions may work but are not supported.
=head1 AUTHOR
Dave Rolsky, <dave@urth.org>
=cut
syntax highlighted by Code2HTML, v. 0.9.1