package Alzabo::RDBMSRules::PostgreSQL;

use strict;
use vars qw($VERSION);

use Alzabo::Exceptions ( abbr => [ 'recreate_table_exception' ] );
use Alzabo::RDBMSRules;

use Digest::MD5;

use Text::Balanced ();

use base qw(Alzabo::RDBMSRules);

use Params::Validate qw( validate_pos );
Params::Validate::validation_options( on_fail => sub { Alzabo::Exception::Params->throw( error => join '', @_ ) } );

$VERSION = 2.0;

1;

sub new
{
    my $proto = shift;
    my $class = ref $proto || $proto;

    return bless {}, $class;
}

sub validate_schema_name
{
    my $self = shift;
    my $name = shift->name;

    $self->_check_name($name, 'schema');

    Alzabo::Exception::RDBMSRules->throw( error => "Schema name ($name) contains a single quote char (')" )
        if index($name, "'") != -1;
}

sub validate_table_name
{
    my $self = shift;

    $self->_check_name( shift->name, 'table' );
}

sub validate_column_name
{
    my $self = shift;

    $self->_check_name( shift->name, 'column' );
}

sub _check_name
{
    my $self = shift;
    my $name = shift;

    Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) must be at least one character long" )
        unless length $name;
    Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) is too long.  Names must be 31 characters or less." )
        if length $name > 31;
    Alzabo::Exception::RDBMSRules->throw( error => "Name ($name) must start with an alpha or underscore(_) and must contain only alphanumerics and underscores." )
        unless $name =~ /\A[a-zA-Z]\w*\z/;
}

sub validate_column_type
{
    my $self = shift;
    my $type = uc shift;
    my $table = shift;

    if ( $table->primary_key_size > 1 )
    {
        return 'INT4' if $type =~ /^SERIAL4?$/;
        return 'INT8' if $type eq 'BIGSERIAL' or $type eq 'SERIAL8';
    }

    my %simple_types = map { $_ => 1 } qw( ABSTIME
                                           BIT
                                           BIGINT
                                           BIGSERIAL
                                           BOOL
                                           BOOLEAN
                                           BOX
                                           BYTEA
                                           CHAR
                                           CHARACTER
                                           CIDR
                                           CIRCLE
                                           DATE
                                           DECIMAL
                                           FLOAT
                                           FLOAT4
                                           FLOAT8
                                           INET
                                           SMALLINT
                                           INT
                                           INTEGER
                                           INT2
                                           INT4
                                           INT8
                                           INTERVAL
                                           MACADDR
                                           MONEY
                                           NUMERIC
                                           OID
                                           RELTIME
                                           SERIAL
                                           SERIAL4
                                           SERIAL8
                                           TEXT
                                           TIME
                                           TIMESTAMP
                                           TIMESTAMPTZ
                                           TIMETZ
                                           VARBIT
                                           VARCHAR );

    return 'INTEGER' if $type eq 'INT' || $type eq 'INT4';
    return 'SERIAL' if $type eq 'SERIAL4';
    return 'INT8' if $type eq 'BIGINT';

    return $type if $simple_types{$type};

    return $type if $type =~ /BIT\s+VARYING/;

    return $type if $type =~ /CHARACTER\s+VARYING/;

    return $type if $type =~ /\ABOX|CIRCLE|LINE|LSEG|PATH|POINT|POLYGON/;

    Alzabo::Exception::RDBMSRules->throw( error => "Invalid column type: $type" );
}

sub validate_column_length
{
    my $self = shift;
    my $column = shift;

    if ( defined $column->length )
    {
        Alzabo::Exception::RDBMSRules->throw( error => "Length is not supported except for char, varchar, decimal, float, and numeric columns (" . $column->name . " column)" )
            unless $column->type =~ /\A(?:(?:VAR)?CHAR|CHARACTER|DECIMAL|FLOAT|NUMERIC|(?:VAR)?BIT|BIT VARYING)\z/i;
    }

    if ( defined $column->precision )
    {
        Alzabo::Exception::RDBMSRules->throw( error => "Precision is not supported except for decimal, float, and numeric columns" )
            unless $column->type =~ /\A(?:DECIMAL|FLOAT|NUMERIC)\z/i;
    }
}

# placeholder in case we decide to try to do something better later
sub validate_table_attribute { 1 }

sub validate_column_attribute
{
    my $self = shift;
    my %p = @_;

    my $column = $p{column};
    my $type = $column->type;
    my $a = uc $p{attribute};
    $a =~ s/\A\s//;
    $a =~ s/\s\z//;

    return if  $a =~ /\A(?:UNIQUE\z|CHECK|CONSTRAINT|REFERENCES)/i;

    Alzabo::Exception::RDBMSRules->throw( error => "Only column constraints are supported as column attributes" )
}

sub validate_primary_key
{
    my $self = shift;
    my $col = shift;

    my $serial_col = (grep { $_->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/ } $col->table->primary_key)[0];
    if ( defined $serial_col &&
         $serial_col->name ne $col->name )
    {
        $serial_col->set_type( $serial_col->type =~ /^SERIAL4?$/
                               ? 'INT4'
                               : 'INT8' );
    }
}

sub validate_sequenced_attribute
{
    my $self = shift;
    my $col = shift;

    Alzabo::Exception::RDBMSRules->throw( error => 'Non-number columns cannot be sequenced' )
        unless $col->is_integer || $col->is_floating_point;
}

sub validate_index
{
    my $self = shift;
    my $index = shift;

    foreach my $c ( $index->columns )
    {
        Alzabo::Exception::RDBMSRules->throw( error => "PostgreSQL does not support index prefixes" )
            if defined $index->prefix($c)
    }

    Alzabo::Exception::RDBMSRules->throw( error => "PostgreSQL does not support fulltext indexes" )
        if $index->fulltext;
}

sub type_is_integer
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type =~ /\A(?:
                             INT(?:2|4|8)?|
                             SMALLINT|
                             INTEGER|
                             OID|
                             SERIAL(?:4|8)?|
                             BIGSERIAL
                            )
                          \z
                         /x;
}

sub type_is_floating_point
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type =~ /\A(?:
                             DECIMAL|
                             FLOAT(?:4|8)?|
                             MONEY|
                             NUMERIC
                            )
                          \z
                         /x;
}

sub type_is_char
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type =~ /(?:CHAR|CHARACTER|TEXT)\z/;
}

sub type_is_date
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type eq 'DATE' || $self->type_is_datetime($col);
}

sub type_is_datetime
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type =~ /^TIMESTAMP/;
}

sub type_is_time
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type eq 'TIME';
}

sub type_is_time_interval
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type eq 'INTERVAL';
}

sub type_is_blob
{
    my $self = shift;
    my $col  = shift;
    my $type = uc $col->type;

    return 1 if $type =~ /\ABYTEA\z/;
}

sub blob_type { return 'BYTEA' }

sub column_types
{
    return ( qw( INTEGER
                 INT2
                 INT8
                 NUMERIC
                 FLOAT
                 FLOAT4

                 CHAR
                 VARCHAR
                 TEXT

                 BYTEA

                 DATE
                 TIME
                 TIMESTAMP
                 INTERVAL

                 SERIAL
                 BIGSERIAL

                 BOOLEAN

                 BIT
               ),
               'BIT VARYING',

             qw( INET
                 CIDR
                 MACADDR ) );
}

my %features = map { $_ => 1 } qw ( extended_column_types
                                    constraints
                                    functional_indexes
                                    allows_raw_default
                                  );
sub feature
{
    shift;
    return $features{+shift};
}

sub quote_identifiers { 1 }

sub quote_identifiers_character { '"' }

sub schema_sql
{
    my $self = shift;

    validate_pos( @_, { isa => 'Alzabo::Schema' } );

    my $schema = shift;

    my @sql = $self->SUPER::schema_sql($schema);

    # This has to come at the end because we don't know which tables
    # reference other tables.
    foreach my $t ( $schema->tables )
    {
        foreach my $con ( grep { /\s*(?:check|constraint)/i } $t->attributes )
        {
            push @sql, $self->table_constraint_sql($t);
        }


        foreach my $fk ( $t->all_foreign_keys )
        {
            push @sql, $self->foreign_key_sql($fk);
        }
    }

    return @sql;
}

sub table_sql
{
    my $self = shift;
    my $table = shift;

    my $create_sequence = shift;

    # Create table sequence by default
    $create_sequence = 1 unless defined $create_sequence;

    my $sql = qq|CREATE TABLE "| . $table->name . qq|" (\n  |;

    $sql .= join ",\n  ", map { $self->column_sql($_) } $table->columns;

    my @att = $table->attributes;

    if (my @pk = $table->primary_key)
    {
        $sql .= ",\n";
        $sql .= '  PRIMARY KEY (';
        $sql .= join ', ', map { '"' . $_->name . '"' } @pk;
        $sql .= ")\n";
    }

    $sql .= ")\n";

    my @sql = ($sql);

    foreach my $i ( $table->indexes )
    {
        push @sql, $self->index_sql($i);
    }

    if ($create_sequence)
    {
        foreach my $c ( grep { $_->sequenced } $table->columns )
        {
            push @sql, $self->_sequence_sql($c);
        }
    }

    if (@att)
    {
        $sql .= ' ';
        $sql .= join ' ', grep { ! /\s*(?:check|constraint)/i } @att;
    }

    $self->{state}{table_sql}{ $table->name } = 1;

    return @sql;
}

sub _sequence_sql
{
    my $self = shift;
    my $col = shift;

    return if $col->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/;

    my $seq_name = $self->_sequence_name($col);

    return qq|CREATE SEQUENCE "$seq_name";\n|;
}

sub _sequence_name
{
    my $self = shift;
    my $col = shift;

    return join '___', $col->table->name, $col->name;
}

sub column_sql
{
    my $self = shift;
    my $col = shift;
    my $p = shift;   # hashref for skip_nullable, skip_default, && skip_name

    my @default;
    if ( ! $p->{skip_default} && defined $col->default )
    {
        my $def = $self->_default_for_column($col);

        @default = ( "DEFAULT $def" );
    }

    my $type = $col->type;
    my @length;
    if ( defined $col->length )
    {
        my $length = '(' . $col->length;
        $length .= ', ' . $col->precision if defined $col->precision;
        $length .= ')';
        $type .= $length;
    }

    my @nullable;
    unless ( $p->{skip_nullable} )
    {
        @nullable = $col->nullable ? 'NULL' : 'NOT NULL';
    }

    my @name = $p->{skip_name} ? () : '"' . $col->name . '"';

    my $sql .= join '  ', ( @name,
                            $type,
                            @default,
                            @nullable,
                            $col->attributes );

    return $sql;
}

sub _default_for_column
{
    my $self = shift;
    my $col = shift;

    return unless defined $col->default;

    return $col->default if $col->is_numeric || $col->default_is_raw;

    my $d = $col->default;
    $d =~ s/'/''/g;
    qq|'$d'|;
}

sub foreign_key_sql
{
    my $self = shift;
    my $fk = shift;

    if ( grep { $_->is_primary_key } $fk->columns_from )
    {
        return unless $fk->from_is_dependent;
    }

    return () if $self->{state}{fk_sql}{ $fk->id };

    my $sql = 'ALTER TABLE "';
    $sql .= $fk->table_from->name;
    $sql .= '" ADD CONSTRAINT ';
    $sql .= $self->_fk_name($fk);
    $sql .= ' FOREIGN KEY ( ';
    $sql .= join ', ', map { '"' . $_->name . '"' } $fk->columns_from;
    $sql .= ' ) REFERENCES "';
    $sql .= $fk->table_to->name;
    $sql .= '" (';
    $sql .= join ', ', map { '"' . $_->name . '"' } $fk->columns_to;
    $sql .= ')';
    $sql .= ' ON DELETE ';

    if ( $fk->from_is_dependent )
    {
        $sql .= 'CASCADE';
    }
    else
    {
        my @from = $fk->columns_from;
        unless ( ( grep { $_->nullable } @from ) == @from )
        {
            $sql .= 'SET DEFAULT';
        }
        else
        {
            $sql .= 'SET NULL';
        }
    }

    $self->{state}{fk_sql}{ $fk->id } = 1;

    return $sql;
}

sub _fk_name
{
    my $id = $_[1]->id;

    return
        ( length $id > 63
          ? 'fk_' . Digest::MD5::md5_hex( $_[1]->id )
          : $id
        );
}

sub table_constraint_sql
{
    my $self = shift;
    my $table = shift;

    return map { 'ALTER TABLE "' . $table->name . '" ADD ' . $_ } $table->attributes;
}

sub drop_table_sql
{
    my $self = shift;
    my $table = shift;
    my $is_recreate = shift;

    my @sql;

    if ($is_recreate)
    {
        # We need to drop foreign keys referring to this table before
        # we drop it.
        foreach my $fk ( $table->all_foreign_keys )
        {
            push @sql, $self->drop_foreign_key_sql( $fk->reverse );
        }
    }

    push @sql, $self->SUPER::drop_table_sql($table);

    unless ($is_recreate)
    {
        foreach my $c ( $table->columns )
        {
            push @sql, $self->_drop_sequence_sql($c) if $c->sequenced;
        }
    }

    return @sql;
}

sub _drop_sequence_sql
{
    my $self = shift;
    my $col = shift;

    return if $col->type =~ /^(?:SERIAL(?:4|8)?|BIGSERIAL)$/;

    my $seq_name = $self->_sequence_name($col);

    return qq|DROP SEQUENCE "$seq_name";\n|;
}

sub drop_column_sql
{
    my $self = shift;
    my %p = @_;

    recreate_table_exception();
}

sub recreate_table_sql
{
    my $self = shift;
    my %p = @_;

    # This is a hack to prevent this SQL from being made multiple
    # times (which would be pointless)
    return () if $self->{state}{table_sql}{ $p{new}->name };

    push @{ $self->{state}{deferred_sql} },
        $self->_restore_foreign_key_sql( $p{new} );

    return ( $self->_temp_table_sql( $p{new}, $p{old} ),
             $self->drop_table_sql( $p{old}, 1 ),
             # the 0 param indicates that we should not create sequences
             $self->table_sql( $p{new}, 0 ),
             $self->_restore_table_data_sql( $p{new}, $p{old} ),
             $self->_drop_temp_table( $p{new} ),
           );

}

sub _temp_table_sql
{
    my $self = shift;
    my $new_table = shift;
    my $old_table = shift;

    my $temp_name = "TEMP" . $new_table->name;

    my $sql = "SELECT ";
    $sql .= join ', ', map { '"' . $_->name . '"' } $old_table->columns;
    $sql .= qq|\n INTO TEMPORARY "$temp_name" FROM "| . $old_table->name . '"';

    return $sql;
}

sub _restore_table_data_sql
{
    my $self = shift;
    my $new_table = shift;
    my $old_table = shift;

    my @cols;
    foreach my $column ( $new_table->columns )
    {
        my $old_name =
            defined $column->former_name ? $column->former_name : $column->name;

        push @cols, [ $column->name, $old_name ]
            if $old_table->has_column($old_name);
    }

    my $temp_name = "TEMP" . $new_table->name;

    my $sql = 'INSERT INTO "' . $new_table->name . '" (';
    $sql .= join ', ', map { qq|"$_->[0]"| } @cols;
    $sql .= " ) \n  SELECT ";
    $sql .= join ', ', map { qq|"$_->[1]"| } @cols;
    $sql .= qq| FROM "$temp_name"|;

    return $sql;
}

sub _drop_temp_table
{
    my $self = shift;
    my $table = shift;

    my $temp_name = "TEMP" . $table->name;

    return qq|DROP TABLE "$temp_name"|;
}

sub _restore_foreign_key_sql
{
    my $self = shift;
    my $table = shift;

    my @sql;
    foreach my $fk ( $table->all_foreign_keys )
    {
        push @sql, $self->foreign_key_sql($fk);
        push @sql, $self->foreign_key_sql( $fk->reverse );
    }

    return @sql;
}

sub rename_sequences
{
    my $self = shift;
    my %p = @_;

    return () if $self->{state}{rename_sequence_sql}{ $p{new}->name };

    my @sql;

    for my $old_col ( grep { $_->sequenced } $p{old}->columns )
    {
        my $new_col = $p{new}->column( $old_col->name )
            or next;

        my $old_seq = $self->_sequence_name($old_col);
        my $new_seq = $self->_sequence_name($new_col);

        push @sql,
            qq|ALTER TABLE "$old_seq" RENAME TO "$new_seq";\n|;
    }

    $self->{state}{rename_sequence_sql}{ $p{new}->name } = 1;

    return @sql;
}

sub drop_foreign_key_sql
{
    my $self = shift;
    my $fk = shift;

    if ( grep { $_->is_primary_key } $fk->columns_from )
    {
        return unless $fk->from_is_dependent;
    }

    return () if $self->{state}{drop_fk_sql}{ $fk->id };

    $self->{state}{drop_fk_sql}{ $fk->id } = 1;

    return 'ALTER TABLE "' . $fk->table_from->name . '" DROP CONSTRAINT '
           . $self->_fk_name($fk);
}

sub drop_index_sql
{
    my $self = shift;
    my $index = shift;

    return 'DROP INDEX "' . $index->id . '"';
}

sub column_sql_add
{
    my $self = shift;
    my $col = shift;

    return () if $self->{state}{table_sql}{ $col->table->name };

    # Skip default and not null while adding column
    my @sql = 'ALTER TABLE "' . $col->table->name . '" ADD COLUMN ' . $self->column_sql($col, { skip_default => 1, skip_nullable => 1 });

    my $def = $self->_default_for_column($col);
    if ($def)
    {
        push @sql,
            ( 'ALTER TABLE "' . $col->table->name . '" ALTER COLUMN "' .
              $col->name . qq|" SET DEFAULT $def| );
    }

    if ( ! $col->nullable )
    {
        push @sql,
            ( 'UPDATE "' . $col->table->name
              . '" SET "' . $col->name . qq|" = $def WHERE "|
              . $col->name . '" IS NULL'
            );

        push @sql,
            ( 'ALTER TABLE "' . $col->table->name
              . '" ADD CONSTRAINT "'
              . $col->table->name . '_' . $col->name . '_not_null" CHECK ( "'
              . $col->name . '" IS NOT NULL )'
            );
    }

    return @sql;
}

sub column_sql_diff
{
    my $self = shift;
    my %p = @_;

    return $self->drop_column_sql( new_table => $p{new}->table,
                                   old => $p{old} )
        unless $self->_columns_are_equivalent( $p{new}, $p{old} );

    return;
}

sub _columns_are_equivalent
{
    my $self = shift;
    my $new = shift;
    my $old = shift;

    return 0 unless $self->_types_are_equivalent( $new, $old );

    return 0 unless $self->_defaults_are_equivalent( $new, $old );

    return 0 unless $new->sequenced == $old->sequenced;

    my $new_att = join "\0", sort $new->attributes;
    $new_att ||= '';

    my $old_att = join "\0", sort $old->attributes;
    $old_att ||= '';

    return 0 unless $new_att eq $old_att;

    return 1;
}

{
    my %CanonicalTypes =
        ( BOOL    => 'BOOLEAN',
          INT     => 'INTEGER',
          INT4    => 'INTEGER',
          INT2    => 'SMALLINT',
          INT8    => 'BIGINT',
          VARBIT  => 'BIT VARYING',
          VARCHAR => 'CHARACTER VARYING',
          CHAR    => 'CHARACTER',
          FLOAT   => 'DOUBLE PRECISION',
          FLOAT8  => 'DOUBLE PRECISION',
          FLOAT4  => 'REAL',
          DECIMAL => 'NUMERIC',
        );

    sub _types_are_equivalent
    {
        shift;
        my $col1 = shift;
        my $col2 = shift;

        my $type1 = $col1->type;
        $type1 = $CanonicalTypes{ uc $type1 } if $CanonicalTypes{ uc $type1 };

        my $type2 = $col2->type;
        $type2 = $CanonicalTypes{ uc $type2 } if $CanonicalTypes{ uc $type2 };

        $type1 .= join '-', grep { defined && length } $col1->length, $col1->precision;
        $type2 .= join '-', grep { defined && length } $col1->length, $col1->precision;

        return 1 if $type1 eq $type2;
    }
}

sub _defaults_are_equivalent
{
    my $self = shift;
    my $col1 = shift;
    my $col2 = shift;

    return 1 if ! defined $col1->default && ! defined $col2->default;
    return 0 if defined $col1->default && ! defined $col2->default;
    return 0 if ! defined $col1->default && defined $col2->default;

    if ( $col1->type =~ /^bool/i )
    {
        return 1
            if lc substr( $col1->default, 0, 1 ) eq lc substr( $col2->default, 0, 1 );
        return 0;
    }
    elsif ( $col1->is_date
            && $col1->default_is_raw
            && $col2->default_is_raw )
    {
        my $d1 = $col1->default;
        my $d2 = $col2->default;

        my $re = qr/^(?:current_timestamp|localtime|localtimestamp|now\(\))$/i;
        return 1
            if $col1->default =~ /$re/
            && $col2->default =~ /$re/;
    }

    return 1 if
        $self->_default_for_column($col1) eq $self->_default_for_column($col2);
}

sub alter_primary_key_sql
{
    my $self = shift;
    my %p = @_;

    my @sql;
    push @sql, 'DROP INDEX "' . $p{old}->name . '_pkey"';

    if ( $p{new}->primary_key )
    {
        push @sql, ( 'CREATE UNIQUE INDEX "' . $p{new}->name . '_pkey" ON "' .
                     $p{new}->name . '" (' .
                     ( join ', ',
                       map { '"' . $_->name . '"' } $p{new}->primary_key ) . ')' );
    }

    return @sql;
}

# Actually, Postgres _can_ change table names, but it's inability to
# change most aspects of a column definition make it very difficult to
# properly change a table name and then change its column definitions,
# so its easier just to recreate the table
sub can_alter_table_name
{
    0;
}

# Not sure if this is possible
sub alter_table_attributes_sql
{
    my $self = shift;

    recreate_table_exception();
}

sub alter_column_name_sql
{
    my $self = shift;
    my $column = shift;

    return
        ( 'ALTER TABLE "' . $column->table->name . '" RENAME COLUMN ' .
          $column->former_name . ' TO ' . $column->name
        );
}

sub reverse_engineer
{
    my $self = shift;
    my $schema = shift;

    my $driver = $schema->driver;

    foreach my $table ( $driver->tables )
    {
        $table =~ s/^[^\.]+\.//;
        $table =~ s/^\"|\"$//g;

        print STDERR "Adding table $table to schema\n"
            if Alzabo::Debug::REVERSE_ENGINEER;

        my $t = $schema->make_table( name => $table );

        my $t_oid = $driver->one_row( sql => 'SELECT oid FROM pg_class WHERE relname = ?',
                                      bind => $table );

        my $sql = <<'EOF';
SELECT a.attname, a.attnotnull, t.typname, a.attnum, a.atthasdef, a.atttypmod
FROM pg_attribute a, pg_type t
WHERE a.attrelid = ?
AND a.atttypid = t.oid
AND a.attnum > 0
EOF

        $sql .= ' AND NOT a.attisdropped' if $driver->rdbms_version ge '7.3';

        $sql .= ' ORDER BY attnum';


        my %cols_by_number;
        foreach my $row ( $driver->rows( sql => $sql,
                                         bind => $t_oid ) )
        {
            my %p;

            $p{type} = $row->[2];

            # has default
            if ( $row->[4] )
            {
                $p{default} =
                    $driver->one_row
                        ( sql => 'SELECT adsrc FROM pg_attrdef WHERE adrelid = ? AND adnum = ?',
                          bind => [ $t_oid, $row->[3] ] );

                if ( $p{default} =~ /^nextval\(/ )
                {
                    $p{sequenced} = 1;
                    $p{type} =~ s/(?:int(?:eger)?|numeric)/serial/;
                }
                else
                {
                    # strip quotes (and type!) Postgres added
                    $p{default} =~ s/^'//; #'
                    if ( $driver->rdbms_version ge '7.4' )
                    {
                        # 'grotesque' becomes 'grotesque'::character
                        # varying. See
                        # src/backend/utils/adt/format_type.c

                        # This is from
                        # src/backend/util/adt/format_type.c
                        $p{default} =~ s/'(?:::[^']{3,})?$//;
                        $p{default} =~ s/\('(\w+)$/$1/;
                    }
                    else
                    {
                        $p{'default'} =~ s/'$//;
                    }

                    if ( $p{default} =~ /\([^\)]*\)/
                         || $p{default} =~ /^(?:current_timestamp|localtime|localtimestamp|now)$/i )
                    {
                        $p{default_is_raw} = 1;
                    }

                    $p{default} = 'now()' if $p{default} eq 'now';
                }
            }

            if ( $p{type} =~ /char/i )
            {
                # The real length is the value of: a.atttypmod - ((int32) sizeof(int32))
                #
                # Sure wish I knew how to figure this out in Perl.
                # Its provided as VARHDRSZ in postgres.h but I can't
                # really get at it.  On my linux machine this is 4.  A
                # better way of doing this would be welcome.
                $p{length} = $row->[5] - 4;
            }
            if ( lc $p{type} eq 'numeric' )
            {
                # see comment above.
                my $num = $row->[5] - 4;
                $p{length} = ($num >> 16) & 0xffff;
                $p{precision} = $num & 0xffff;
            }

            $p{type} = 'char' if lc $p{type} eq 'bpchar';

            print STDERR "Adding $row->[0] column to $table\n"
                if Alzabo::Debug::REVERSE_ENGINEER;

            my $col = $t->make_column( name => $row->[0],
                                       nullable => ! $row->[1],
                                       %p
                                     );

            if ( $col->is_integer )
            {
                if ( $self->_re_sequence_exists( $driver, $col ) )
                {
                    $col->set_sequenced(1);
                }
            }

            $cols_by_number{ $row->[3] } = $row->[0];
        }


        $sql = <<'EOF';
SELECT indkey
FROM pg_index
WHERE indisprimary
AND indrelid = ?
EOF

        foreach my $cols ( $driver->column( sql => $sql,
                                           bind => $t_oid ) )
        {
	    my @cols = @cols_by_number{ split ' ', $cols };
	    local $" = ", ";

	    print STDERR "Setting @cols as primary key for $table\n"
 		if Alzabo::Debug::REVERSE_ENGINEER;

	    $t->add_primary_key( $_ ) for $t->columns( @cols );
        }

	my %i;
	if ( $driver->rdbms_version ge '7.4' )
	{
            %i = $self->_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
	}
        else
        {
            %i = $self->_pre_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
 	}

	foreach my $idx (values %i)
 	{
	    my @c = map { { column => $_ } } @{ $idx->{cols} };

	    print STDERR "Adding index "
		. ( defined $idx->{'function'}
		    ? $idx->{'function'}
		    : join(', ', map $_->name, @{$idx->{'cols'}} ) )
		. " to $table\n"
		if Alzabo::Debug::REVERSE_ENGINEER;

 	    $t->make_index( columns  => \@c,
			    unique   => $idx->{unique},
                            function => $idx->{function},
                          );
        }

        $sql = <<'EOF';
SELECT consrc, array_to_string(conkey,' ')
FROM pg_constraint
WHERE conrelid = ?
AND contype = 'c'
EOF

        my @att;

        foreach my $row ( $driver->rows( sql => $sql,
                                         bind => $t_oid ) )
        {
            my ( $con, $cols ) = @$row;

            # this stuff is not needed
            $con =~ s/::(\w+)//g;

	    # If $cols ever covers more than one value then this will fail.
            if ( $cols =~ /^(\d+)$/ )
            {
                my $column = $cols_by_number{$1};

                print STDERR qq|Adding constraint "$con" to $table.$column\n|
                    if Alzabo::Debug::REVERSE_ENGINEER;

                $t->column($column)->add_attribute("CHECK $con");
            }
            else
            {
                print STDERR qq|Adding constraint "$con" to $table\n|
                    if Alzabo::Debug::REVERSE_ENGINEER;

                $t->add_attribute("CHECK $con");
            }
        }

    }

    # Foreign key info is available in PG 7.3.0 and higher (could fake
    # it from pg_triggers with extensive gymnastics in version 7.0 and
    # higher, but that's a little iffy)
    $self->_foreign_keys_to_relationships($schema)
        if $driver->rdbms_version ge '7.3';
}

sub _re_sequence_exists
{
    my $self = shift;
    my $driver = shift;
    my $col = shift;

    my $seq_name = $self->_sequence_name($col);

    my $sql = <<'EOF';
SELECT 1
  FROM pg_class
 WHERE relname = ?
   AND relkind = ?
EOF

    return $driver->one_row( sql  => $sql,
                             bind => [ $seq_name, 'S' ],
                           );
}

sub _74_indexes
{
    my $self   = shift;
    my $driver = shift;
    my $table  = shift;
    my $t_oid  = shift;
    my $cols_by_number  = shift;

    my $sql = <<'EOF';
SELECT indexrelid, indisunique, indkey, indnatts
FROM pg_index
WHERE indrelid = ?
AND NOT indisprimary
EOF

    my %i;
   INDEX:
    foreach my $row ( $driver->rows( sql => $sql,
                                     bind => $t_oid ) )
    {
        my $function;
        my @col_numbers;

        my $spi =
            $driver->one_row
                ( sql => "SELECT COALESCE(indexprs,'') FROM pg_index WHERE indexrelid = ?",
                  bind => $row->[0] );

        if ( $spi )
        {
          SPI_EXPRESSION:
            while ( my $spi_expr =
                    Text::Balanced::extract_bracketed( $spi, '{}', '[^{}]*' ) )
            {
                # A wanton lack of respect for boundaries. 'Parse' the
                # PostgreSQL internal SPI language to find out what
                # columns are being accessed.
                push( @col_numbers,
                      join( ' ',
                            $spi_expr =~ /:varattno (\d+)/g ) );
            }
        }

        if ( scalar( @col_numbers ) > 1 )
        {
            # Index objects are not prepared to handle functional
            # indexes that use more than one function.
            die "Alzabo " . Alzabo->VERSION . " does not support functional"
                . " indexes that are not strictly a single function."
                . "  There are multiple functions on an index on the "
                . $table->name() . " table.\n";
        }
        elsif ( scalar( @col_numbers ) == 1 )
        {
            my $func =
                $driver->one_row
                    ( sql => 'SELECT pg_catalog.pg_get_indexdef( ?, 1, true)',
                      bind => $row->[0] );

            # XXX - not sure if this is a good idea but it makes the
            # rev-eng tests pass
            $func =~ s/\b(\w+)::\w+\b/$1/g;
            my $col_in_func = $1;

            my @function;
            for my $num ( split / +/, $row->[2] )
            {
                if ( $num == 0 )
                {
                    push @function, $func;
                }
                else
                {
                    push @function, $cols_by_number->{$num};
                    push @col_numbers, $num;
                }
            }

            $function = join ', ', @function;
        }
        else
        {
            # A regular index!
            @col_numbers = split / +/, $row->[2];
        }

        push( @{ $i{ $row->[0] }{cols} },
              $table->columns( @{ $cols_by_number }{ @col_numbers } ) );

        $i{ $row->[0] }{function} = $function;
        $i{ $row->[0] }{unique} = $row->[1];
    }

    return %i;
}

sub _pre_74_indexes
{
    my $self   = shift;
    my $driver = shift;
    my $table  = shift;
    my $t_oid  = shift;
    my $cols_by_number  = shift;

    my $sql = <<'EOF';
SELECT c.oid, a.attname, i.indisunique, i.indproc, i.indkey
FROM pg_index i, pg_attribute a, pg_class c
WHERE i.indrelid = ?
AND NOT i.indisprimary
AND i.indexrelid = c.oid
AND c.oid = a.attrelid
AND a.attnum > 0
ORDER BY a.attnum
EOF

    my %i;
    foreach my $row ( $driver->rows( sql => $sql,
                                     bind => $t_oid ) )
    {
        my @col_names = @{ $cols_by_number }{ split ' ', $row->[4] };

        my $function;
        if ( $row->[3] && $row->[3] =~ /\w/ && $row->[3] ne '-' )
        {
            # some function names come out as "pg_catalog.foo"
            $row->[3] =~ s/\w+\.(\w+)/$1/;
            $function = uc $row->[3];
            $function .= '(';

            $function .= join ', ', @col_names;

            $function .= ')';
        }

        push( @{ $i{ $row->[0] }{cols} },
              $table->columns( @col_names ) );

        $i{ $row->[0] }{unique} = $row->[2];
        $i{ $row->[0] }{function} = $function;
    }

    return %i;
}

sub _foreign_keys_to_relationships
{
    my ($self, $schema) = @_;
    my $driver = $schema->driver;

    my $constraint_sql = <<'EOF';
SELECT conrelid, confrelid,
    array_to_string(conkey,' '),
    array_to_string(confkey,' ')
FROM pg_constraint
WHERE contype = 'f'
EOF

    my $table_sql = <<'EOF';
SELECT relname
FROM pg_class
WHERE oid = ?
EOF

    my $column_sql = <<'EOF';
SELECT attname
FROM pg_attribute
WHERE attrelid = ?
  AND attnum = ?
EOF

    foreach my $row ( $driver->rows( sql => $constraint_sql ) )
    {
        my $from_table = $driver->one_row( sql => $table_sql,
                                           bind => $row->[0] );
        my $to_table   = $driver->one_row( sql => $table_sql,
                                           bind => $row->[1] );

	# Column numbers are given as strings like "3 5"
	my @from_cols = split ' ', $row->[2]
 	    or die "Weird column specification $row->[2]";

	my @to_cols   = split ' ', $row->[3]
 	    or die "Weird column specification $row->[3]";

        # Convert column numbers to names
        foreach (@from_cols)
        {
            $_ = $driver->one_row( sql => $column_sql,
                                   bind => [$row->[0], $_] );
        }
        foreach (@to_cols)
        {
            $_ = $driver->one_row( sql => $column_sql,
                                   bind => [$row->[1], $_] );
        }

        print STDERR "Adding $from_table foreign key to $to_table\n"
            if Alzabo::Debug::REVERSE_ENGINEER;

        # Convert to Alzabo objects
        $from_table = $schema->table($from_table);
        $to_table   = $schema->table($to_table);
        @from_cols = map { $from_table->column($_) } @from_cols;
        @to_cols   = map {   $to_table->column($_) } @to_cols;

        # If there's a unique constraint on the "from" columns, treat
        # is as 1-to-1.  Otherwise treat it as n-to-1.
        my $from_unique = 0;

        # Only use PK as determination of uniqueness if the FK is from
        # the _whole_ PK to something else.  If the FK only includes
        # _part_ of the PK then it is not unique.
        $from_unique = 1
            if ( ( @from_cols == grep { $_->is_primary_key } @from_cols )
                 &&
                 ( @from_cols == $from_table->primary_key_size ) );

        $from_unique = 1
            if @from_cols == grep { $_->has_attribute( attribute => 'UNIQUE' ) } @from_cols;

      INDEX:
        foreach my $i ( grep { $_->unique } $from_table->indexes )
        {
            my @i_cols = $i->columns;

            next unless @i_cols == @from_cols;

            for ( my $x = 0; $x < @i_cols; $x++ )
            {
                next INDEX unless $i_cols[$x] eq $from_cols[$x];
            }

            $from_unique = 1;
        }

        my $from_cardinality = $from_unique ? '1' : 'n';

        my $from_is_dependent =
            ( grep { $_->nullable || defined $_->default } @from_cols ) ? 0 : 1;
        my $to_is_dependent =
            ( grep { $_->nullable || $_->is_primary_key } @to_cols ) ? 0 : 1;

        $schema->add_relationship( cardinality => [ $from_cardinality, '1' ],
                                   table_from => $from_table,
                                   table_to   => $to_table,
                                   columns_from => \@from_cols,
                                   columns_to   => \@to_cols,
                                   from_is_dependent => $from_is_dependent,
                                   to_is_dependent => $to_is_dependent,
                                 );
    }
}

sub rules_id
{
    return 'PostgreSQL';
}

__END__

=head1 NAME

Alzabo::RDBMSRules::PostgreSQL - PostgreSQL specific database rules

=head1 SYNOPSIS

  use Alzabo::RDBMSRules::PostgreSQL;

=head1 DESCRIPTION

This module implements all the methods descibed in Alzabo::RDBMSRules
for the PostgreSQL database.  The syntax rules follow those of the 7.0
releases.  Older versions may work but are not supported.

=head1 AUTHOR

Dave Rolsky, <dave@urth.org>

=cut


syntax highlighted by Code2HTML, v. 0.9.1