/* $Id: pending.c,v 1.81.2.1 2006/09/04 22:05:59 manu Exp $ */
/*
* Copyright (c) 2004 Emmanuel Dreyfus
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by Emmanuel Dreyfus
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#ifdef HAVE_SYS_CDEFS_H
#include <sys/cdefs.h>
#ifdef __RCSID
__RCSID("$Id: pending.c,v 1.81.2.1 2006/09/04 22:05:59 manu Exp $");
#endif
#endif
#ifdef HAVE_OLD_QUEUE_H
#include "queue.h"
#else
#include <sys/queue.h>
#endif
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include <strings.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <sysexits.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "sync.h"
#include "dump.h"
#include "conf.h"
#include "pending.h"
#include "autowhite.h"
#include "milter-greylist.h"
struct pendinglist pending_head;
struct pending_bucket *pending_buckets;
pthread_rwlock_t pending_lock; /* protects pending_head and dump_dirty */
pthread_mutex_t pending_change_lock;
/*
* protects pending->p_refcnt
* since we hold many pending entries, requied memory for the lock
* object is considerably large to have it in each pending entry. so,
* we use just one lock object for all pending entries.
*/
static pthread_rwlock_t refcnt_lock;
void
pending_init(void) {
int error;
int i;
TAILQ_INIT(&pending_head);
if ((pending_buckets = calloc(PENDING_BUCKETS,
sizeof(struct pending_bucket))) == NULL) {
mg_log(LOG_ERR,
"Unable to allocate pending list buckets: %s",
strerror(errno));
exit(EX_OSERR);
}
if ((error = pthread_rwlock_init(&pending_lock, NULL)) != 0 ||
(error = pthread_rwlock_init(&refcnt_lock, NULL)) != 0 ||
(error = pthread_mutex_init(&pending_change_lock, NULL)) != 0) {
mg_log(LOG_ERR,
"pthread_rwlock_init failed: %s", strerror(error));
exit(EX_OSERR);
}
for(i = 0; i < PENDING_BUCKETS; i++) {
TAILQ_INIT(&pending_buckets[i].b_pending_head);
if ((error = pthread_mutex_init(&pending_buckets[i].bucket_mtx,
NULL)) != 0) {
mg_log(LOG_ERR,
"pthread_mutex_init failed: %s", strerror(error));
exit(EX_OSERR);
}
}
return;
}
/* pending_lock must be write-locked & bucket must be locked */
struct pending *
pending_get(sa, salen, from, rcpt, date)
struct sockaddr *sa;
socklen_t salen;
char *from;
char *rcpt;
time_t date;
{
struct pending *pending;
struct timeval tv;
char addr[IPADDRSTRLEN];
if ((pending = malloc(sizeof(*pending))) == NULL)
goto out;
bzero((void *)pending, sizeof(pending));
pending->p_tv.tv_sec = date;
if ((pending->p_sa = malloc(salen)) == NULL) {
free(pending);
pending = NULL;
goto out;
}
memcpy(pending->p_sa, sa, salen);
pending->p_salen = salen;
if (!iptostring(sa, salen, addr, sizeof(addr)) ||
(pending->p_addr = strdup(addr)) == NULL) {
free(pending->p_sa);
free(pending);
pending = NULL;
goto out;
}
if ((pending->p_from = strdup(from)) == NULL) {
free(pending->p_addr);
free(pending->p_sa);
free(pending);
pending = NULL;
goto out;
}
if ((pending->p_rcpt = strdup(rcpt)) == NULL) {
free(pending->p_from);
free(pending->p_addr);
free(pending->p_sa);
free(pending);
pending = NULL;
goto out;
}
pending->p_refcnt = 1;
pthread_mutex_lock(&pending_change_lock);
TAILQ_INSERT_TAIL(&pending_head, pending, p_list);
TAILQ_INSERT_TAIL(&pending_buckets[BUCKET_HASH(pending->p_sa,
from, rcpt, PENDING_BUCKETS)].b_pending_head, pending, pb_list);
dump_dirty++;
(void)gettimeofday(&tv, NULL);
if (conf.c_debug) {
mg_log(LOG_DEBUG, "created: %s from %s to %s delayed for %lds",
pending->p_addr, pending->p_from, pending->p_rcpt,
pending->p_tv.tv_sec - tv.tv_sec);
}
pthread_mutex_unlock(&pending_change_lock);
out:
return pending;
}
/* pending list should be write-locked & bucket must be locked */
void
pending_put(pending)
struct pending *pending;
{
if (conf.c_debug) {
mg_log(LOG_DEBUG, "removed: %s from %s to %s",
pending->p_addr, pending->p_from, pending->p_rcpt);
}
pthread_mutex_lock(&pending_change_lock);
TAILQ_REMOVE(&pending_head, pending, p_list);
TAILQ_REMOVE(&pending_buckets[BUCKET_HASH(pending->p_sa,
pending->p_from, pending->p_rcpt, PENDING_BUCKETS)].b_pending_head,
pending, pb_list);
pending_free(pending);
dump_dirty++;
pthread_mutex_unlock(&pending_change_lock);
return;
}
int
pending_timeout(void)
{
struct pending *pending;
struct pending *next;
struct timeval now;
int dirty = 0;
gettimeofday(&now, NULL);
PENDING_WRLOCK;
for (pending = TAILQ_FIRST(&pending_head); pending; pending = next) {
next = TAILQ_NEXT(pending, p_list);
/*
* Check for expired entries
*/
if (now.tv_sec - pending->p_tv.tv_sec > conf.c_timeout) {
if (conf.c_debug || conf.c_logexpired) {
mg_log(LOG_DEBUG,
"(local): %s from %s to %s: greylisted "
"entry timed out",
pending->p_addr, pending->p_from,
pending->p_rcpt);
}
dirty = 1;
pending_put(pending);
continue;
}
break; /* The rest of the entries are OK */
}
PENDING_UNLOCK;
return dirty;
}
void
pending_del(sa, salen, from, rcpt, time)
struct sockaddr *sa;
socklen_t salen;
char *from;
char *rcpt;
time_t time;
{
char addr[IPADDRSTRLEN];
struct pending *pending;
struct pending *next;
struct timeval tv;
struct pending_bucket *b;
gettimeofday(&tv, NULL);
if (!iptostring(sa, salen, addr, sizeof(addr)))
return;
PENDING_RDLOCK;
b = &pending_buckets[BUCKET_HASH(sa, from, rcpt, PENDING_BUCKETS)];
pthread_mutex_lock(&b->bucket_mtx);
for (pending = TAILQ_FIRST(&b->b_pending_head);
pending; pending = next) {
next = TAILQ_NEXT(pending, pb_list);
/*
* Look for our entry.
*/
if ((strncmp(addr, pending->p_addr, sizeof(addr)) == 0) &&
(strcmp(from, pending->p_from) == 0) &&
(strcmp(rcpt, pending->p_rcpt) == 0) &&
(pending->p_tv.tv_sec == time)) {
pending_put(pending);
break;
}
}
pthread_mutex_unlock(&b->bucket_mtx);
PENDING_UNLOCK;
pending_timeout();
return;
}
int
pending_check(sa, salen, from, rcpt, remaining, elapsed, queueid, delay, aw)
struct sockaddr *sa;
socklen_t salen;
char *from;
char *rcpt;
time_t *remaining;
time_t *elapsed;
char *queueid;
time_t delay;
time_t aw;
{
char addr[IPADDRSTRLEN];
struct pending *pending;
struct pending *next;
time_t now;
time_t rest = -1;
time_t accepted = -1;
int dirty = 0;
struct pending_bucket *b;
ipaddr *mask = NULL;
time_t date;
now = time(NULL);
if (!iptostring(sa, salen, addr, sizeof(addr)))
return 1;
dirty = pending_timeout();
PENDING_RDLOCK;
b = &pending_buckets[BUCKET_HASH(sa, from, rcpt, PENDING_BUCKETS)];
pthread_mutex_lock(&b->bucket_mtx);
for (pending = TAILQ_FIRST(&b->b_pending_head);
pending; pending = next) {
next = TAILQ_NEXT(pending, pb_list);
/*
* The time the entry shall be accepted
*/
accepted = pending->p_tv.tv_sec;
/*
* Look for our entry.
*/
switch (pending->p_sa->sa_family) {
case AF_INET:
mask = (ipaddr *)&conf.c_match_mask;
break;
#ifdef AF_INET6
case AF_INET6:
mask = (ipaddr *)&conf.c_match_mask6;
break;
#endif
}
if (ip_match(sa, pending->p_sa, mask) &&
(strcmp(from, pending->p_from) == 0) &&
(strcmp(rcpt, pending->p_rcpt) == 0)) {
rest = accepted - now;
if (rest <= 0) {
date = now + aw;
peer_delete(pending, date);
pending_put(pending);
autowhite_add(sa, salen, from, rcpt,
&date, queueid);
rest = 0;
dirty = 1;
}
goto out;
}
}
/*
* It was not found. Create it and propagagte it to peers.
* Error handling is useless here, we will tempfail anyway
*/
date = now + delay;
pending = pending_get(sa, salen, from, rcpt, date);
peer_create(pending); /* XXXmanu if pending == NULL? */
rest = pending->p_tv.tv_sec - now;
dirty = 1;
out:
pthread_mutex_unlock(&b->bucket_mtx);
PENDING_UNLOCK;
if (remaining != NULL)
*remaining = rest;
if (elapsed != NULL)
*elapsed = now - (accepted - delay);
if (dirty)
dump_flush();
if (rest <= 0)
return 1;
else
return 0;
}
int
pending_textdump(stream)
FILE *stream;
{
struct pending *pending;
int done = 0;
char textdate[DATELEN + 1];
struct tm tm;
fprintf(stream, "\n\n#\n# greylisted tuples\n#\n");
fprintf(stream, "# Sender IP\t%s\t%s\tTime accepted\n",
"Sender e-mail", "Recipient e-mail");
PENDING_RDLOCK;
pthread_mutex_lock(&pending_change_lock);
TAILQ_FOREACH(pending, &pending_head, p_list) {
if (conf.c_dump_no_time_translation) {
fprintf(stream, "%s\t%s\t%s\t%ld\n",
pending->p_addr, pending->p_from,
pending->p_rcpt, (long)pending->p_tv.tv_sec);
} else {
localtime_r((time_t *)&pending->p_tv.tv_sec, &tm);
strftime(textdate, DATELEN, "%Y-%m-%d %T", &tm);
fprintf(stream, "%s\t%s\t%s\t%ld # %s\n",
pending->p_addr, pending->p_from,
pending->p_rcpt,
(long)pending->p_tv.tv_sec, textdate);
}
done++;
}
pthread_mutex_unlock(&pending_change_lock);
PENDING_UNLOCK;
return done;
}
struct pending *
pending_ref(pending)
struct pending *pending;
{
WRLOCK(refcnt_lock);
pending->p_refcnt++;
UNLOCK(refcnt_lock);
return pending;
}
void
pending_free(pending)
struct pending *pending;
{
WRLOCK(refcnt_lock);
pending->p_refcnt--;
if (pending->p_refcnt > 0) {
UNLOCK(refcnt_lock);
return;
}
UNLOCK(refcnt_lock);
free(pending->p_sa);
free(pending->p_addr);
free(pending->p_from);
free(pending->p_rcpt);
free(pending);
}
int
ip_match(sa, pat, mask)
struct sockaddr *sa;
struct sockaddr *pat;
ipaddr *mask;
{
#ifdef AF_INET6
int i;
#endif
if (sa->sa_family != pat->sa_family)
return 0;
switch (sa->sa_family) {
case AF_INET:
if ((SADDR4(sa)->s_addr & mask->in4.s_addr) !=
(SADDR4(pat)->s_addr & mask->in4.s_addr))
return 0;
break;
#ifdef AF_INET6
case AF_INET6:
#if defined(HAVE_SIN6_SCOPE_ID) && defined(HAVE_GETADDRINFO)
if (SA6(pat)->sin6_scope_id != 0 &&
SA6(sa)->sin6_scope_id != SA6(pat)->sin6_scope_id)
return 0;
#endif
for (i = 0; i < 16; i += 4) {
if ((*(uint32_t *)&SADDR6(sa)->s6_addr[i] &
*(uint32_t *)&mask->in6.s6_addr[i]) !=
(*(uint32_t *)&SADDR6(pat)->s6_addr[i] &
*(uint32_t *)&mask->in6.s6_addr[i]))
return 0;
}
break;
#endif
default:
return 0;
}
return 1;
}
int
ip_equal(sa, pat)
struct sockaddr *sa;
struct sockaddr *pat;
{
if (sa->sa_family != pat->sa_family)
return 0;
switch (sa->sa_family) {
case AF_INET:
if (SADDR4(sa)->s_addr != SADDR4(pat)->s_addr)
return 0;
break;
#ifdef AF_INET6
case AF_INET6:
#if defined(HAVE_SIN6_SCOPE_ID) && defined(HAVE_GETADDRINFO)
if (SA6(pat)->sin6_scope_id != 0 &&
SA6(sa)->sin6_scope_id != SA6(pat)->sin6_scope_id)
return 0;
#endif
if (memcmp(SADDR6(sa), SADDR6(pat),
sizeof(struct in6_addr)) != 0)
return 0;
break;
#endif
default:
return 0;
}
return 1;
}
char *
iptostring(sa, salen, buf, buflen)
struct sockaddr *sa;
socklen_t salen;
char *buf;
size_t buflen;
{
/*
* This was ifdef HAVE_GETNAMEINFO, but we hit an ABI clash on some systems.
* From <netdb.h> of libbind:
* #define NI_NOFQDN 0x00000001
* #define NI_NUMERICHOST 0x0000000
* From <netdb.h> of glibc:
* # define NI_NUMERICHOST 1
* # define NI_NUMERICSERV 2
* The result is that on Linux, when linking with libspf_alt (which is
* linked with libbind), we get NI_NOFQDN where we expect NI_NUMERICHOST.
* Symptom: the dump file gets hostnames instead of IP addresses.
*
* Disabling the use of getnameinfo() breaks IPv6 Scope-Id, which is
* not handled by Sendmail anyway.
*/
#if 0
if (getnameinfo(sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0)
return buf;
#else
void *addr;
switch (sa->sa_family) {
case AF_INET:
addr = (void *)SADDR4(sa);
break;
#ifdef AF_INET6
case AF_INET6:
addr = (void *)SADDR6(sa);
break;
#endif
default:
return NULL;
}
if (inet_ntop(sa->sa_family, addr, buf, buflen) != NULL)
return buf;
#endif
return NULL;
}
int
ipfromstring(str, sa, salen, family)
char *str;
struct sockaddr *sa;
socklen_t *salen;
sa_family_t family;
{
#ifdef HAVE_GETADDRINFO
struct addrinfo hints, *res;
bzero(&hints, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = family;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(str, "0", &hints, &res) != 0)
return 0;
if (*salen < res->ai_addrlen) {
freeaddrinfo(res);
return -1;
}
memcpy(sa, res->ai_addr, res->ai_addrlen);
*salen = res->ai_addrlen;
freeaddrinfo(res);
return 1;
#else
struct in_addr addr;
#ifdef AF_INET6
struct in6_addr addr6;
#endif
if ((family == AF_UNSPEC || family == AF_INET) &&
inet_pton(AF_INET, str, (void *)&addr) == 1) {
if (*salen < sizeof(struct sockaddr_in))
return -1;
bzero(sa, *salen);
memcpy(SADDR4(sa), &addr, sizeof(struct in_addr));
SA4(sa)->sin_family = AF_INET;
#ifdef HAVE_SA_LEN
SA4(sa)->sin_len = sizeof(struct sockaddr_in);
#endif
*salen = sizeof(struct sockaddr_in);
return 1;
}
#ifdef AF_INET6
if ((family == AF_UNSPEC || family == AF_INET6) &&
inet_pton(AF_INET6, str, (void *)&addr6) == 1) {
if (*salen < sizeof(struct sockaddr_in6))
return -1;
bzero(sa, *salen);
memcpy(SADDR6(sa), &addr6, sizeof(struct in6_addr));
SA6(sa)->sin6_family = AF_INET6;
#ifdef SIN6_LEN
SA6(sa)->sin6_len = sizeof(struct sockaddr_in6);
#endif
*salen = sizeof(struct sockaddr_in6);
return 1;
}
#endif
return 0;
#endif
}
void
pending_del_addr(sa, salen, queueid, acl_line)
struct sockaddr *sa;
socklen_t salen;
char *queueid;
int acl_line;
{
char addr[IPADDRSTRLEN];
struct pending *pending;
struct pending *next;
int sync_flush_done = 0;
int count_pending = 0;
int count_white = 0;
char aclstr[16];
if (!iptostring(sa, salen, addr, sizeof(addr)))
return;
PENDING_RDLOCK;
for (pending = TAILQ_FIRST(&pending_head); pending; pending = next) {
next = TAILQ_NEXT(pending, p_list);
if (strncmp(addr, pending->p_addr, sizeof(addr)) == 0) {
if (!sync_flush_done) {
peer_flush(pending);
sync_flush_done = 1;
}
pending_put(pending);
count_pending++;
}
}
PENDING_UNLOCK;
/* And flush autowhite list as well... */
count_white = autowhite_del_addr(sa, salen);
if (queueid != NULL) {
*aclstr = '\0';
if (acl_line != 0)
snprintf(aclstr, sizeof(aclstr), " (ACL %d)", acl_line);
mg_log(LOG_INFO, "%s: addr %s flushed, removed %d grey and %d autowhite%s",
queueid, addr, count_pending, count_white, aclstr);
}
return;
}
syntax highlighted by Code2HTML, v. 0.9.1