#!/usr/bin/perl # # This file is part of the exilog suite. # # http://duncanthrax.net/exilog/ # # (c) Tom Kistner 2004 # # See LICENSE for licensing information. # package exilog_sql; use strict; use DBI; use exilog_config; use exilog_util; use Data::Dumper; BEGIN { use Exporter; use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS); # set the version for version checking $VERSION = 0.1; @ISA = qw(Exporter); @EXPORT = qw( &reconnect &sql_select &sql_delete &sql_optimize &sql_count &sql_queue_add &sql_queue_update &sql_queue_delete &sql_queue_set_action &sql_queue_clear_action &write_message ); %EXPORT_TAGS = (); # your exported package globals go here, # as well as any optionally exported functions @EXPORT_OK = qw(); } # open DB connection my $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass}); unless (defined($dbh) && $dbh) { print STDERR "[exilog_sql] Can't open exilog database.\n"; exit(255); }; sub reconnect { my $conditional = shift || 0; if ($conditional) { return 1 if ($dbh->ping); }; eval { $dbh->disconnect() if (defined($dbh)); }; $dbh = 0; $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass}); unless (defined($dbh) && $dbh) { print STDERR "[exilog_sql] Can't open exilog database.\n"; return 0; }; return 1; }; # -------------------------------------------------------------------------- # Generic Stubs, these are just frontends that call the backend-specific # SQL subroutines for each database type. sub write_message { no strict "refs"; return &{ "_".$config->{sql}->{type}."_write_message" }(@_); }; sub sql_select { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_select" }(@_); }; sub sql_delete { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_delete" }(@_); }; sub sql_optimize { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_optimize" }(@_); }; sub sql_queue_add { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_queue_add" }(@_); }; sub sql_queue_update { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_queue_update" }(@_); }; sub sql_queue_delete { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_queue_delete" }(@_); }; sub sql_queue_set_action { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_queue_set_action" }(@_); }; sub sql_queue_clear_action { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_queue_clear_action" }(@_); }; sub sql_count { no strict "refs"; return &{ "_".$config->{sql}->{type}."_sql_count" }(@_); }; # -------------------------------------------------------------------------- # -------------------------------------------------------------------------- # PostgreSQL functions sub _pgsql_sql_count { my $where = shift; my $criteria = shift || {}; my $sql = "SELECT ". "COUNT(*) ". "FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); my $sh = $dbh->prepare($sql); $sh->execute; my $tmp = $sh->fetchrow_arrayref(); return @{$tmp}[0]; }; sub _pgsql_sql_queue_delete { my $spool_path = shift; $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'"); }; sub _pgsql_sql_queue_update { my $hdr = shift; return unless (ref($hdr) eq 'HASH'); my $server = $hdr->{server}; my $message_id = $hdr->{message_id}; delete $hdr->{server}; delete $hdr->{message_id}; # PostgreSQL is case sensitive by default. Nice feature, # but it complicates our life tremendously. # Since we want to keep indexes working, the columns in # this list are lowercased before they are inserted. Sigh. my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' ); foreach my $col (@lowercase) { $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col)); }; my @tmp; foreach my $item (keys %{ $hdr }) { my $value = $hdr->{$item}; $value =~ s/\'/\'\'/g; $value =~ s/\n/\\n/g; push @tmp, $item.'='."'".$value."'"; }; $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'"); }; sub _pgsql_sql_queue_add { my $hdr = shift; return unless (ref($hdr) eq 'HASH'); # PostgreSQL is case sensitive by default. Nice feature, # but it complicates our life tremendously. # Since we want to keep indexes working, the columns in # this list are lowercased before they are inserted. Sigh. my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' ); foreach my $col (@lowercase) { $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col)); }; my @fields = sort {$a cmp $b} keys(%{$hdr}); my @vals = (); foreach (@fields) { my $val = $hdr->{$_}; $val =~ s/\'/\'\'/g; $val =~ s/\n/\\n/g; push @vals, "'".$val."'"; }; $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")"); }; sub _pgsql_sql_optimize { my $where = shift || "nothing"; my $sql = "VACUUM ANALYZE ".$where; my $sh = $dbh->prepare($sql); $sh->execute; $sh->finish; return 1; }; sub _pgsql_sql_delete { my $where = shift || "nothing"; my $criteria = shift || {}; my $sql = "DELETE FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); my $sh = $dbh->prepare($sql); my $num = $sh->execute; $sh->finish; return (($num eq '0E0') ? 0 : $num); }; sub _pgsql_sql_select { my $where = shift; my @what = @{ (shift || [ "*" ]) }; my $criteria = shift || {}; my $order_by = shift || ""; my $order_direction = shift || "DESC"; my $limit_min = shift; my $limit_max = shift; my $distinct = shift; my $sql = "SELECT ". (defined($distinct) ? "DISTINCT " : ""). join(", ", @what). " FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ). ($order_by ? " ORDER BY ".$order_by." ".$order_direction : ""). (defined($limit_min) ? " LIMIT ".$limit_min : ""). (defined($limit_max) ? ",".$limit_max : ""); return _fetch_multirow($where, $sql); }; sub _pgsql_write_message { my $server = shift || 'default'; my $h = shift; my $rc = 0; # PostgreSQL is case sensitive by default. Nice feature, # but it complicates our life tremendously. # Since we want to keep indexes working, the columns in # this list are lowercased before they are inserted. Sigh. my @lowercase = ( 'mailfrom', 'rcpt', 'rcpt_final', 'host_dns', 'host_helo', 'host_rdns' ); foreach my $col (@lowercase) { $h->{data}->{$col} = lc($h->{data}->{$col}) if (edt($h->{data},$col)); }; # Special case: we only need to UPDATE the 'completed' field # in the messages table. if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) { my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'"); if (defined($rc)) { return 1; } else { # error return 0; }; } else { my @fields = sort {$a cmp $b} keys(%{$h->{data}}); my @vals = ( "'".$server."'" ); foreach (@fields) { my $val = $h->{data}->{$_}; $val =~ s/\'/\'\'/g; # shorten $val to limit and remove eventual # trailing quote and backslash characters. $val = substr($val,0,255); $val =~ s/[\\']+$//; push @vals, "'".$val."'"; }; unshift @fields, 'server'; my $sql = "INSERT INTO ".$h->{table}.' ("'.join('","',@fields).'") VALUES('.join(',',@vals).")"; my $rc = $dbh->do($sql); if (defined($rc)) { return 1; } else { return 2 if ($dbh->errstr =~ /duplicate/i); print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n"; return 0; }; }; }; # -------------------------------------------------------------------------- # MySQL functions sub _mysql_sql_count { my $where = shift; my $criteria = shift || {}; my $sql = "SELECT ". "COUNT(*) ". "FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); my $sh = $dbh->prepare($sql); $sh->execute; my $tmp = $sh->fetchrow_arrayref(); return @{$tmp}[0]; }; sub _mysql_sql_queue_delete { my $spool_path = shift; $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'"); }; sub _mysql_sql_queue_update { my $hdr = shift; return unless (ref($hdr) eq 'HASH'); my $server = $hdr->{server}; my $message_id = $hdr->{message_id}; delete $hdr->{server}; delete $hdr->{message_id}; my @tmp; foreach my $item (keys %{ $hdr }) { my $value = $hdr->{$item}; $value =~ s/\'/\'\'/g; $value =~ s/\n/\\n/g; push @tmp, $item.'='."'".$value."'"; }; $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'"); }; sub _mysql_sql_queue_add { my $hdr = shift; return unless (ref($hdr) eq 'HASH'); my @fields = sort {$a cmp $b} keys(%{$hdr}); my @vals = (); foreach (@fields) { my $val = $hdr->{$_}; $val =~ s/\'/\'\'/g; $val =~ s/\n/\\n/g; push @vals, "'".$val."'"; }; $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")"); }; sub _mysql_sql_queue_set_action { my $server = shift; my $message_id = shift; my $action = shift; $dbh->do("UPDATE queue SET action='$action' WHERE server='$server' AND message_id='$message_id'"); }; sub _mysql_sql_queue_clear_action { my $server = shift; my $message_id = shift; $dbh->do("UPDATE queue SET action=NULL WHERE server='$server' AND message_id='$message_id'"); }; sub _mysql_sql_optimize { my $where = shift || "nothing"; my $sql = "OPTIMIZE TABLE ".$where; my $sh = $dbh->prepare($sql); $sh->execute; $sh->finish; return 1; }; sub _mysql_sql_delete { my $where = shift || "nothing"; my $criteria = shift || {}; my $sql = "DELETE FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); my $sh = $dbh->prepare($sql); my $num = $sh->execute; $sh->finish; return (($num eq '0E0') ? 0 : $num); }; sub _mysql_sql_select { my $where = shift; my @what = @{ (shift || [ "*" ]) }; my $criteria = shift || {}; my $order_by = shift || ""; my $order_direction = shift || "DESC"; my $limit_min = shift; my $limit_max = shift; my $distinct = shift; my $sql = "SELECT ". (defined($distinct) ? "DISTINCT " : ""). join(", ", @what). " FROM ".$where. ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ). ($order_by ? " ORDER BY ".$order_by." ".$order_direction : ""). (defined($limit_min) ? " LIMIT ".$limit_min : ""). (defined($limit_max) ? ",".$limit_max : ""); return _fetch_multirow($where, $sql); }; sub _mysql_write_message { my $server = shift || 'default'; my $h = shift; my $rc = 0; # Special case: we only need to UPDATE the 'completed' field # in the messages table. if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) { my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'"); if (defined($rc)) { return 1; } else { # error return 0; }; } else { my @fields = sort {$a cmp $b} keys(%{$h->{data}}); my @vals = ( "'".$server."'" ); foreach (@fields) { my $val = $h->{data}->{$_}; $val =~ s/\'/\'\'/g; # shorten $val to limit and remove eventual # trailing quote and backslash characters. $val = substr($val,0,255); $val =~ s/[\\']+$//; push @vals, "'".$val."'"; }; unshift @fields, 'server'; my $sql = "INSERT INTO ".$h->{table}." (".join(',',@fields).") VALUES(".join(',',@vals).")"; my $rc = $dbh->do($sql); if (defined($rc)) { return 1; } else { # error 1062 means "Duplicate key". return 2 if ($dbh->err == 1062); print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n"; return 0; }; }; }; # -------------------------------------------------------------------------- # misc subroutines used across several DB types sub _fetch_multirow { my $table = shift; my $sql = shift; my $limit = shift || 0; my $a = []; my $sh = $dbh->prepare($sql); $sh->execute; while (my $tmp = $sh->fetchrow_hashref) { push @{ $a }, $tmp; $limit--; last if ($limit == 0); }; $sh->finish; return $a; }; sub _build_WHERE { my $criteria = shift || {}; my @set = (); foreach my $col (keys %{ $criteria }) { next unless(defined($criteria->{$col})); if ( ($col eq "timestamp") || ($col eq "completed") || ($col eq "frozen") || ($col eq "size") ) { # integer column my ($min,$max) = split / /,$criteria->{$col}; if (defined($min)) { # greater than X push @set, $col." > ".$min; } if (defined($max)) { # smaller than X push @set, $col." < ".$max; } } elsif (ref($criteria->{$col}) eq 'ARRAY') { # array ref, use exact string match with OR my $str = "( "; foreach my $entry (@{ $criteria->{$col} }) { $str .= " ".$col." = '".$entry."' OR"; }; chop($str);chop($str); $str .= " )"; push @set, $str; } else { # string column if (($criteria->{$col} =~ /\%/) || ($criteria->{$col} =~ /\_/)) { # use ILIKE for PGSQL if ($config->{sql}->{type} eq 'pgsql') { push @set, $col." ILIKE '".$criteria->{$col}."'"; } else { push @set, $col." LIKE '".$criteria->{$col}."'"; }; } else { push @set, $col." = '".$criteria->{$col}."'"; }; }; }; return " WHERE ".join(" AND ", @set); }; 1;