/* $Id: sync.c,v 1.66.2.5 2006/11/07 05:12:12 manu Exp $ */

/*
 * Copyright (c) 2004 Emmanuel Dreyfus
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. All advertising materials mentioning features or use of this software
 *    must display the following acknowledgement:
 *        This product includes software developed by Emmanuel Dreyfus
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,  
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "config.h"

#ifdef HAVE_SYS_CDEFS_H
#include <sys/cdefs.h>
#ifdef __RCSID
__RCSID("$Id: sync.c,v 1.66.2.5 2006/11/07 05:12:12 manu Exp $");
#endif
#endif

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <strings.h>
#include <pthread.h>
#include <syslog.h>
#include <sysexits.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "pending.h"
#include "sync.h"
#include "conf.h"
#include "autowhite.h"
#include "milter-greylist.h"

#define SYNC_PROTO_CURRENT 3

struct sync_master_sock {
	int runs;
	int sock;
};

struct sync_master_sock sync_master4 = { 0, -1 };
struct sync_master_sock sync_master6 = { 0, -1 };
struct peerlist peer_head;
pthread_rwlock_t peer_lock; /* For the peer list */
pthread_rwlock_t sync_lock; /* For all peer's sync queue */
pthread_cond_t sync_sleepflag;

static void sync_listen(char *, char *, struct sync_master_sock *);
static int local_addr(const struct sockaddr *sa, const socklen_t salen);
static int sync_queue_poke(struct peer *, struct sync *);
static struct sync * sync_queue_peek(struct peer *);
static int select_protocol(struct peer *, int, FILE *);
static void sync_vers(FILE *, int);

void
peer_init(void) {
	int error;

	LIST_INIT(&peer_head);
	if ((error = pthread_rwlock_init(&peer_lock, NULL)) != 0) {
		mg_log(LOG_ERR, 
		    "pthread_rwlock_init failed: %s", strerror(error));
		exit(EX_OSERR);
	}

	if ((error = pthread_rwlock_init(&sync_lock, NULL)) != 0) {
		mg_log(LOG_ERR, 
		    "pthread_rwlock_init failed: %s", strerror(error));
		exit(EX_OSERR);
	}

	if ((error = pthread_cond_init(&sync_sleepflag, NULL)) != 0) {
		mg_log(LOG_ERR, 
		    "pthread_cond_init failed: %s", strerror(error));
		exit(EX_OSERR);
	}

	return;
}

void 
peer_clear(void) {
	struct peer *peer;
	struct sync *sync;

	PEER_WRLOCK;

	while(!LIST_EMPTY(&peer_head)) {
		peer = LIST_FIRST(&peer_head);

		while((sync = sync_queue_peek(peer)) != NULL)
			sync_free(sync);
			
		if (peer->p_stream != NULL)
			fclose(peer->p_stream);

		LIST_REMOVE(peer, p_list);
		free(peer->p_name);
		free(peer);
	}

	PEER_UNLOCK;

	return;	
}

void 
peer_add(peername)
	char *peername;
{
	struct peer *peer;

	if ((peer = malloc(sizeof(*peer))) == NULL ||
	    (peer->p_name = strdup(peername)) == NULL) {
		mg_log(LOG_ERR, "cannot add peer: %s", strerror(errno));
		exit(EX_OSERR);
	}

	peer->p_qlen = 0;
	peer->p_stream = NULL;
	peer->p_flags = 0;
	TAILQ_INIT(&peer->p_deferred);

	PEER_WRLOCK;
	LIST_INSERT_HEAD(&peer_head, peer, p_list);
	PEER_UNLOCK;

	if (conf.c_debug)
		mg_log(LOG_DEBUG, "load peer %s", peer->p_name);

	return;
}

void
peer_create(pending)
	struct pending *pending;
{
	struct peer *peer;

	PEER_RDLOCK;
	if (LIST_EMPTY(&peer_head))
		goto out;

	LIST_FOREACH(peer, &peer_head, p_list) 
		sync_queue(peer, PS_CREATE, pending, -1); /* -1: unused */

out:
	PEER_UNLOCK;
	
	return;
}

void
peer_delete(pending, autowhite)
	struct pending *pending;
	time_t autowhite;
{
	struct peer *peer;

	PEER_RDLOCK;
	if (LIST_EMPTY(&peer_head))
		goto out;

	LIST_FOREACH(peer, &peer_head, p_list) 
		sync_queue(peer, PS_DELETE, pending, autowhite);

out:
	PEER_UNLOCK;
	
	return;
}

static int
sync_queue_poke(peer, sync)
	struct peer *peer;
	struct sync *sync;
{
	SYNC_WRLOCK;
	if (peer->p_qlen < SYNC_MAXQLEN) {
		TAILQ_INSERT_HEAD(&peer->p_deferred, sync, s_list);
		peer->p_qlen++;
		SYNC_UNLOCK;
		return 1;
	} else {
		SYNC_UNLOCK;
		return 0;
	}
}

static struct sync *
sync_queue_peek(peer)
	struct peer *peer;
{
	struct sync *sync;

	SYNC_WRLOCK;
	sync = TAILQ_FIRST(&peer->p_deferred);
	if (!TAILQ_EMPTY(&peer->p_deferred)) {
		TAILQ_REMOVE(&peer->p_deferred, sync, s_list);
		peer->p_qlen--;
	}
	SYNC_UNLOCK;
	return sync;
}

int
sync_send(peer, type, pending, autowhite) /* peer list is read-locked */
	struct peer *peer;
	peer_sync_t type;
	struct pending *pending;
	time_t autowhite;
{
	char sep[] = " \n\t\r";
	char *replystr;
	int replycode;
	char line[LINELEN + 1];
	char *cookie = NULL;
	char *keyw;
	char awstr[LINELEN + 1];
	int bw;

	if ((peer->p_stream == NULL) && (peer_connect(peer) != 0))
		return -1;

	*line = '\0';
	switch(type) {
	case PS_FLUSH:
		bw = snprintf(line, LINELEN, "flush addr %s\r\n",
		    pending->p_addr);
		break;
	case PS_CREATE:
		bw = snprintf(line, LINELEN, "add addr %s from %s "
		    "rcpt %s date %ld\r\n", pending->p_addr, 
			pending->p_from, pending->p_rcpt, 
			(long)pending->p_tv.tv_sec);
		break;
	default:
		if (peer->p_vers >= 2) {
			keyw = "del2";
			snprintf(awstr, LINELEN, " aw %ld", (long)autowhite);
		} else {
			keyw = "del";
			awstr[0] = '\0';
		}
		bw = snprintf(line, LINELEN, "%s addr %s from %s "
		    "rcpt %s date %ld%s\r\n", keyw, pending->p_addr, 
			pending->p_from, pending->p_rcpt, 
			(long)pending->p_tv.tv_sec, awstr);
		break;
	}

	if (bw > LINELEN) {
		mg_log(LOG_ERR, "closing connexion with peer %s: "
		    "send buffer would overflow (%d entries queued)", 
		    peer->p_name, peer->p_qlen);
		fclose(peer->p_stream);
		peer->p_stream = NULL;
		return -1;
	}

	bw = fprintf(peer->p_stream, "%s", line);
	if (bw != strlen(line)) {
		mg_log(LOG_ERR, "closing connexion with peer %s: "
		    "%s (%d entries queued) - I was unable to send "
		    "complete line \"%s\" - bytes written: %i", 
		    peer->p_name, strerror(errno), peer->p_qlen, 
		    line, bw);
		fclose(peer->p_stream);
		peer->p_stream = NULL;
		return -1;
	}

	fflush(peer->p_stream);

	/* 
	 * Check the return code 
	 */
	get_more:
	sync_waitdata(peer->p_socket);
	if (fgets(line, LINELEN, peer->p_stream) == NULL) {
		if (errno == EAGAIN) {
			if ( feof(peer->p_stream) ) {
				mg_log(LOG_ERR, "lost connexion with peer %s: "
		  		  "%s (%d entries queued)", 
				    peer->p_name, strerror(errno), peer->p_qlen);
				fclose(peer->p_stream);
				peer->p_stream = NULL;
				return -1;
			}
			goto get_more;
		}
		mg_log(LOG_ERR, "lost connexion with peer %s: "
		    "%s (%d entries queued)", 
		    peer->p_name, strerror(errno), peer->p_qlen);
		fclose(peer->p_stream);
		peer->p_stream = NULL;
		return -1;
	}

	/*
	 * On some systems, opening a stream on a socket introduce
	 * weird behavior: the in and out buffers get mixed up. 
	 * By calling fflush() after each read operation, we fix that
	 */
	fflush(peer->p_stream);

	if ((replystr = strtok_r(line, sep, &cookie)) == NULL) {
		mg_log(LOG_ERR, "Unexpected reply \"%s\" from %s, "
		    "closing connexion (%d entries queued)", 
		    line, peer->p_name, peer->p_qlen);
		fclose(peer->p_stream);
		peer->p_stream = NULL;
		return -1;
	}

	replycode = atoi(replystr);
	if (replycode != 201) {
		mg_log(LOG_ERR, "Unexpected reply \"%s\" from %s, "
		    "closing connexion (%d entries queued)", 
		    line, peer->p_name, peer->p_qlen);
		fclose(peer->p_stream);
		peer->p_stream = NULL;
		return -1;
	}

	if (conf.c_debug)
		mg_log(LOG_DEBUG, "sync one entry with %s", peer->p_name);

	return 0;
}

int
peer_connect(peer)	/* peer list is read-locked */
	struct peer *peer;
{
	struct servent *se;
#ifdef HAVE_GETADDRINFO
	struct addrinfo hints, *res0, *res;
	int err;
#else
	struct protoent *pe;
	int proto;
	sockaddr_t raddr;
	socklen_t raddrlen;
#endif
	sockaddr_t laddr;
	socklen_t laddrlen;
	char *laddrstr;
	int service;
	int s = -1;
	char *replystr;
	int replycode;
	FILE *stream;
	char sep[] = " \n\t\r";
	char line[LINELEN + 1];
	int param;
	char *cookie = NULL;

	if (peer->p_stream != NULL)
		mg_log(LOG_ERR, "peer_connect called and peer->p_stream != 0");

	if (conf.c_syncport != NULL) {
		service = htons(atoi(conf.c_syncport));
	} else {
		if ((se = getservbyname(MXGLSYNC_NAME, "tcp")) == NULL)
		    service = htons(atoi(MXGLSYNC_PORT));
		else
		    service = se->s_port;
	}

#ifdef HAVE_GETADDRINFO
	bzero(&hints, sizeof(hints));
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	if ((err = getaddrinfo(peer->p_name, "0", &hints, &res0)) != 0) {
		mg_log(LOG_ERR, "cannot sync with peer %s, "
		    "getaddrinfo failed: %s (%d entries queued)",
		    peer->p_name, gai_strerror(err), peer->p_qlen);
		return -1;
	}

	for (res = res0; res; res = res->ai_next) {
		/*We only test an address family which kernel supports. */
		s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
		if (s == -1)
			continue;
		close(s);

		if (local_addr(res->ai_addr, res->ai_addrlen)) {
			peer->p_flags |= P_LOCAL;
			freeaddrinfo(res0);
			return -1;
		}
	}

	for (res = res0; res; res = res->ai_next) {
		s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
		if (s == -1)
			continue;

		switch (res->ai_family) {
		case AF_INET:
			SA4(res->ai_addr)->sin_port = service;
			if (conf.c_syncsrcaddr != NULL) {
				laddrstr = conf.c_syncsrcaddr;
			} else {
				laddrstr = "0.0.0.0";
				}
			break;
#ifdef AF_INET6
		case AF_INET6:
			SA6(res->ai_addr)->sin6_port = service;
			laddrstr = "::";
			break;
#endif
		default:
			mg_log(LOG_ERR, "cannot sync, unknown address family");
			close(s);
			s = -1;
			continue;
		}
		laddrlen = sizeof(laddr);
		if (ipfromstring(laddrstr, SA(&laddr), &laddrlen,
		    res->ai_family) == 1 &&
		    bind(s, SA(&laddr), laddrlen) == 0 &&
		    connect(s, res->ai_addr, res->ai_addrlen) == 0)
			break;
		close(s);
		s = -1;
	}
	freeaddrinfo(res0);
	if (s < 0) {
		mg_log(LOG_ERR,
		    "cannot sync with peer %s: %s (%d entries queued)",
		    peer->p_name, strerror(errno), peer->p_qlen);
		return -1;
	}
#else
	raddrlen = sizeof(raddr);
	if (ipfromstring(peer->p_name, SA(&raddr), &raddrlen,
	    AF_UNSPEC) != 1) {
		mg_log(LOG_ERR, "cannot sync, invalid address");
		return -1;
	}

	if (local_addr(SA(&raddr), raddrlen)) {
		peer->p_flags |= P_LOCAL;
		return -1;
	}

	switch (SA(&raddr)->sa_family) {
	case AF_INET:
		SA4(&raddr)->sin_port = service;
		if (conf.c_syncsrcaddr != NULL) {
			laddrstr = conf.c_syncsrcaddr;
		} else {
			laddrstr = "0.0.0.0";
			}
		break;
#ifdef AF_INET6
	case AF_INET6:
		SA6(&raddr)->sin6_port = service;
		laddrstr = "::";
		break;
#endif
	default:
		mg_log(LOG_ERR, "cannot sync, unknown address family");
		return -1;
	}

	if ((pe = getprotobyname("tcp")) == NULL)
		proto = 6;
	else
		proto = pe->p_proto;

	if ((s = socket(SA(&raddr)->sa_family, SOCK_STREAM, proto)) == -1) {
		mg_log(LOG_ERR, "cannot sync with peer %s, "
		    "socket failed: %s (%d entries queued)", 
		    peer->p_name, strerror(errno), peer->p_qlen);
		return -1;
	}

	laddrlen = sizeof(laddr);
	if (ipfromstring(laddrstr, SA(&laddr), &laddrlen,
	    SA(&raddr)->sa_family) != 1) {
		mg_log(LOG_ERR, "cannot sync, invalid address");
		close(s);
		return -1;
	}

	if (bind(s, SA(&laddr), laddrlen) != 0) {
		mg_log(LOG_ERR, "cannot sync with peer %s, "
		    "bind failed: %s (%d entries queued)",
		    peer->p_name, strerror(errno), peer->p_qlen);
		close(s);
		return -1;
	}

	if (connect(s, SA(&raddr), raddrlen) != 0) {
		mg_log(LOG_ERR, "cannot sync with peer %s, "
		    "connect failed: %s (%d entries queued)", 
		    peer->p_name, strerror(errno), peer->p_qlen);
		close(s);
		return -1;
	}
#endif

	param = O_NONBLOCK;
	if (fcntl(s, F_SETFL, param) != 0) {
		mg_log(LOG_ERR, "cannot set non blocking I/O with %s: %s",
		    peer->p_name, strerror(errno));
	}

	if ((stream = fdopen(s, "w+")) == NULL) {
		mg_log(LOG_ERR, "cannot sync with peer %s, "
		    "fdopen failed: %s (%d entries queued)", 
		    peer->p_name, strerror(errno), peer->p_qlen);
		close(s);
		return -1;
	}

	if (setvbuf(stream, NULL, _IOLBF, 0) != 0)
		mg_log(LOG_ERR, "cannot set line buffering with peer %s: %s", 
		    peer->p_name, strerror(errno));

	sync_waitdata(s);	
	if (fgets(line, LINELEN, stream) == NULL) {
		mg_log(LOG_ERR, "Lost connexion with peer %s: "
		    "%s (%d entries queued)", 
		    peer->p_name, strerror(errno), peer->p_qlen);
		goto bad;
	}

	/*
	 * On some systems, opening a stream on a socket introduce
	 * weird behavior: the in and out buffers get mixed up. 
	 * By calling fflush() after each read operation, we fix that
	 */
	fflush(stream);

	if ((replystr = strtok_r(line, sep, &cookie)) == NULL) {
		mg_log(LOG_ERR, "Unexpected reply \"%s\" from peer %s "
		    "closing connexion (%d entries queued)", 
		    line, peer->p_name, peer->p_qlen);
		goto bad;
	}

	replycode = atoi(replystr);
	if (replycode != 200) {
		mg_log(LOG_ERR, "Unexpected reply \"%s\" from peer %s "
		    "closing connexion (%d entries queued)", 
		    line, peer->p_name, peer->p_qlen);
		goto bad;
	}

	if ((peer->p_vers = select_protocol(peer, s, stream)) == 0)
		goto bad;	

	mg_log(LOG_INFO, "Connection to %s established, protocol version %d", 
	    peer->p_name, peer->p_vers);
	peer->p_stream = stream;
	peer->p_socket = s;

	return 0;

bad:
	fclose(stream);
	peer->p_stream = NULL;

	return -1;
}

void
sync_master_restart(void) {
	pthread_t tid;
	int empty;
	int error;

	PEER_RDLOCK;
	empty = LIST_EMPTY(&peer_head);
	PEER_UNLOCK;

	if (empty || sync_master4.runs || sync_master6.runs)
		return;

	if (conf.c_syncaddr != NULL) {
		if (strchr(conf.c_syncaddr, ':'))
		    sync_listen(conf.c_syncaddr, conf.c_syncport,
				&sync_master6);
		else
		    sync_listen(conf.c_syncaddr, conf.c_syncport,
				&sync_master4);
	} else {

#ifdef AF_INET6
		sync_listen("::", conf.c_syncport, &sync_master6);
#endif
		sync_listen("0.0.0.0", conf.c_syncport, &sync_master4);
	}


	if (!sync_master4.runs && !sync_master6.runs) {
		mg_log(LOG_ERR, "cannot start MX sync, socket failed: %s",
		    strerror(errno));
		exit(EX_OSERR);
	}
	if (sync_master6.runs) {
		if ((error = pthread_create(&tid, NULL, sync_master,
		    (void *)&sync_master6)) != 0) {
			mg_log(LOG_ERR, 
			    "Cannot run MX sync thread for IPv6: %s",
			    strerror(error));
			exit(EX_OSERR);
		}
		if ((error = pthread_detach(tid)) != 0) {
			mg_log(LOG_ERR, 
			    "pthread_detach failed for IPv6 MX sync: %s",
			    strerror(error));
			exit(EX_OSERR);
		}
	}
	if (sync_master4.runs) {
		if ((error = pthread_create(&tid, NULL, sync_master,
		    (void *)&sync_master4)) != 0) {
			mg_log(LOG_ERR, 
			    "Cannot run MX sync thread for IPv4: %s",
			    strerror(error));
			exit(EX_OSERR);
		}
		if ((error = pthread_detach(tid)) != 0) {
			mg_log(LOG_ERR, 
			    "pthread_detach failed for IPv4 MX sync: %s",
			    strerror(error));
			exit(EX_OSERR);
		}
	}
}

void *
sync_master(arg)
	void *arg;
{
	struct sync_master_sock *sms = arg;

	for (;;) {
		sockaddr_t raddr;
		socklen_t raddrlen;
		int fd;
		FILE *stream;
		pthread_t tid;
		struct peer *peer;
		char peerstr[IPADDRSTRLEN];
		int error;

		bzero((void *)&raddr, sizeof(raddr));
		raddrlen = sizeof(raddr);
		if ((fd = accept(sms->sock, SA(&raddr), &raddrlen)) == -1) {
			mg_log(LOG_ERR, "incoming connexion "
			    "failed: %s", strerror(errno));

			if (errno != ECONNABORTED)
				exit(EX_OSERR);
			continue;
		}
		unmappedaddr(SA(&raddr), &raddrlen);

		iptostring(SA(&raddr), raddrlen, peerstr, sizeof(peerstr));
		mg_log(LOG_INFO, "Incoming MX sync connexion from %s", 
		    peerstr);

		if ((stream = fdopen(fd, "w+")) == NULL) {
			mg_log(LOG_ERR, 
			    "incoming connexion from %s failed, "
			    "fdopen fail: %s", peerstr, strerror(errno));
			close(fd);
			exit(EX_OSERR);
		}

		if (setvbuf(stream, NULL, _IOLBF, 0) != 0)
			mg_log(LOG_ERR, "cannot set line buffering: %s", 
			    strerror(errno));	
		
		/*
		 * Check that the orginator IP is one of our peers
		 */
		PEER_RDLOCK;

		if (LIST_EMPTY(&peer_head)) {
			fprintf(stream, "105 No more peers, shutting down!\n");

			PEER_UNLOCK;
			fclose(stream);
			close(sms->sock);
			sms->sock = -1;
			sms->runs = 0;
			return NULL;
		}
			
		LIST_FOREACH(peer, &peer_head, p_list) {
#ifdef HAVE_GETADDRINFO
			struct addrinfo hints, *res0, *res;
			int err;
			int match = 0;

			bzero(&hints, sizeof(hints));
			hints.ai_flags = AI_PASSIVE;
			hints.ai_family = AF_UNSPEC;
			hints.ai_socktype = SOCK_STREAM;
			err = getaddrinfo(peer->p_name, "0", &hints, &res0);
			if (err != 0) {
				mg_log(LOG_ERR, "cannot resolve %s: %s",
				    peer->p_name, gai_strerror(err));
				continue;
			}
			for (res = res0; res; res = res->ai_next) {
				if (ip_equal(SA(&raddr), res->ai_addr)) {
					match = 1;
					break;
				}
			}
			freeaddrinfo(res0);
			if (match)
				break;
#else
			sockaddr_t addr;
			socklen_t addrlen;

			addrlen = sizeof(addr);
			if (ipfromstring(peer->p_name, SA(&addr), &addrlen,
			     AF_UNSPEC) != 1) {
				mg_log(LOG_ERR, "cannot resolve %s",
				    peer->p_name);
				continue;
			}
			if (ip_equal(SA(&raddr), SA(&addr)))
				break;
#endif
		}

		PEER_UNLOCK;

		if (peer == NULL) {
			mg_log(LOG_INFO, "Remote host %s is not a peer MX", 
			    peerstr);
			fprintf(stream, 
			    "106 You have no permission to talk, go away!\n");
			fclose(stream);
			continue;
		}

		if ((error = pthread_create(&tid, NULL, 
		    (void *(*)(void *))sync_server, (void *)stream)) != 0) {
			mg_log(LOG_ERR, "incoming connexion from %s failed, "
			    "pthread_create failed: %s", 
			    peerstr, strerror(error));
			fclose(stream);
			continue;
		}
		if ((error = pthread_detach(tid)) != 0) {
			mg_log(LOG_ERR, "incoming connexion from %s failed, "
			    "pthread_detach failed: %s",
			    peerstr, strerror(error));
			exit(EX_OSERR);
		}
	}

	/* NOTREACHED */
	mg_log(LOG_ERR, "sync_master quitted unexpectedly");
	return NULL;
}

static void
sync_listen(addr, port, sms)
        char *addr, *port;
	struct sync_master_sock *sms;
{
	struct protoent *pe;
	struct servent *se;
	int proto;
	sockaddr_t laddr;
	socklen_t laddrlen;
	int service;
	int optval;
	int s;

	sms->runs = 1;
	laddrlen = sizeof(laddr);
	if (ipfromstring(addr, SA(&laddr), &laddrlen, AF_UNSPEC) != 1) {
		sms->runs = 0;
		return;
	}

	if ((pe = getprotobyname("tcp")) == NULL)
		proto = 6;
	else
		proto = pe->p_proto;

	if (port != NULL)
		service = htons(atoi(port));
	else {
		if ((se = getservbyname(MXGLSYNC_NAME, "tcp")) == NULL)
		    service = htons(atoi(MXGLSYNC_PORT));
		else
		    service = se->s_port;
	}

	switch (SA(&laddr)->sa_family) {
	case AF_INET:
		SA4(&laddr)->sin_port = service;
		break;
#ifdef AF_INET6
	case AF_INET6:
		SA6(&laddr)->sin6_port = service;
		break;
#endif
	default:
		sms->runs = 0;
		return;
	}

	if ((s = socket(SA(&laddr)->sa_family, SOCK_STREAM, proto)) == -1) {
		sms->runs = 0;
		return;
	}

	optval = 1;
	if ((setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
	    &optval, sizeof(optval))) != 0) {
		mg_log(LOG_ERR, "cannot set SO_REUSEADDR: %s",
		    strerror(errno));
	}

	optval = 1;
	if ((setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
	    &optval, sizeof(optval))) != 0) {
		mg_log(LOG_ERR, "cannot set SO_KEEPALIVE: %s",
		    strerror(errno));
	}

#ifdef IPV6_V6ONLY
	if (SA(&laddr)->sa_family == AF_INET6) {
		optval = 1;
		if ((setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
		    &optval, sizeof(optval))) != 0) {
			mg_log(LOG_ERR, "cannot set IPV6_V6ONLY: %s",
			    strerror(errno));
		}
	}
#endif

	if (bind(s, SA(&laddr), laddrlen) != 0) {
		mg_log(LOG_ERR, "cannot start MX sync, bind failed: %s",
		    strerror(errno));
		sms->runs = 0;
		close(s);
		return;
	}

	if (listen(s, MXGLSYNC_BACKLOG) != 0) {
		mg_log(LOG_ERR, "cannot start MX sync, listen failed: %s",
		    strerror(errno));
		sms->runs = 0;
		close(s);
		return;
	}

	sms->sock = s;
	return;
}

void
sync_server(arg) 
	void *arg;
{
	FILE *stream = arg;
	char sep[] = " \n\t\r";
	char *cmd;
	char *keyword;
	char *addrstr;
	char *from;
	char *rcpt;
	char from_clean[ADDRLEN + 1];
	char rcpt_clean[ADDRLEN + 1];
	char *datestr;
	char *awstr;
	char *cookie;
	char line[LINELEN + 1];
	peer_sync_t action;
	sockaddr_t addr;
	socklen_t addrlen;
	time_t date;
	time_t aw = time(NULL) + conf.c_autowhite_validity;

	fprintf(stream, "200 Yeah, what do you want?\n");
	fflush(stream);

	for (;;) {
		if ((fgets(line, LINELEN, stream)) == NULL)
			break;

		/*
		 * On some systems, opening a stream on a socket introduce
		 * weird behavior: the in and out buffers get mixed up. 
		 * By calling fflush() after each read operation, we fix that
		 */
		fflush(stream);

		/*
		 * Get the command { quit | help | add | del | del2 | flush }
		 */
		cookie = NULL;
		if ((cmd = strtok_r(line, sep, &cookie)) == NULL) {
			fprintf(stream, "101 No command\n");
			fflush(stream);
			continue;
		}

		if (strncmp(cmd, "quit", CMDLEN) == 0) {
			break;
		} else if ((strncmp(cmd, "help", CMDLEN)) == 0) {
			sync_help(stream);
			continue;
		} else if ((strncmp(cmd, "vers3", CMDLEN)) == 0) {
			sync_vers(stream, 3);
			continue;
		} else if ((strncmp(cmd, "vers2", CMDLEN)) == 0) {
			sync_vers(stream, 2);
			continue;
		} else if ((strncmp(cmd, "add", CMDLEN)) == 0) {
			action = PS_CREATE;
		} else if ((strncmp(cmd, "del2", CMDLEN)) == 0) {
			action = PS_DELETE;
			aw = -1;
		} else if ((strncmp(cmd, "del", CMDLEN)) == 0) {
			action = PS_DELETE;
		} else if ((strncmp(cmd, "flush", CMDLEN)) == 0) {
			action = PS_FLUSH;
		} else {
			fprintf(stream, "102 Invalid command \"%s\"\n", cmd);
			fflush(stream);
			continue;
		}

		/*
		 * get { "addr" ip_address }
		 */
		if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}

		if (strncmp(keyword, "addr", CMDLEN) != 0) {
			fprintf(stream, 
			    "104 Unexpected keyword \"%s\"\n", keyword);
			fflush(stream);
			continue;
		}

		if ((addrstr = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}
			
		addrlen = sizeof(addr);
		if (ipfromstring(addrstr, SA(&addr), &addrlen,
		    AF_UNSPEC) != 1) {
			fprintf(stream, "107 Invalid IP address\n");
			fflush(stream);
			continue;
		}	

		if (action == PS_FLUSH) {
			from = NULL;
			rcpt = NULL;
			date = 0;
			goto eol;
		}

		/*
		 * get { "from" email_address }
		 */
		if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}

		if (strncmp(keyword, "from", CMDLEN) != 0) {
			fprintf(stream, 
			    "104 Unexpected keyword \"%s\"\n", keyword);
			fflush(stream);
			continue;
		}

		if ((from = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}
		(void)strncpy_rmsp(from_clean, from, ADDRLEN);
		from = from_clean;

		/*
		 * get { "rcpt" email_address }
		 */
		if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}

		if (strncmp(keyword, "rcpt", CMDLEN) != 0) {
			fprintf(stream, 
			    "104 Unexpected keyword \"%s\"\n", keyword);
			fflush(stream);
			continue;
		}

		if ((rcpt = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}
		(void)strncpy_rmsp(rcpt_clean, rcpt, ADDRLEN);
		rcpt = rcpt_clean;

		/*
		 * get { "date" valid_date }
		 */
		if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}

		if (strncmp(keyword, "date", CMDLEN) != 0) {
			fprintf(stream, 
			    "104 Unexpected keyword \"%s\"\n", keyword);
			fflush(stream);
			continue;
		}

		if ((datestr = strtok_r(NULL, sep, &cookie)) == NULL) {
			fprintf(stream, "103 Incomplete command\n");
			fflush(stream);
			continue;
		}

		date = atoi(datestr);

		if (aw == -1) {
			/*
			 * get { "aw" valid_date }
			 */
			if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) {
				fprintf(stream, "103 Incomplete command\n");
				fflush(stream);
				continue;
			}

			if (strncmp(keyword, "aw", CMDLEN) != 0) {
				fprintf(stream, 
				    "104 Unexpected keyword \"%s\"\n", keyword);
				fflush(stream);
				continue;
			}

			if ((awstr = strtok_r(NULL, sep, &cookie)) == NULL) {
				fprintf(stream, "103 Incomplete command\n");
				fflush(stream);
				continue;
			}

			aw = atoi(awstr);
		}

		/* 
		 * Check nothing remains
		 */
eol:
		if ((keyword = strtok_r(NULL, sep, &cookie)) != NULL) {	
			fprintf(stream, 
			    "104 Unexpected keyword \"%s\"\n", keyword);
			fflush(stream);
			continue;
		}

		fprintf(stream, "201 All right, I'll do that\n");
		fflush(stream);

		if (action == PS_CREATE) {
			PENDING_WRLOCK;
			/* delay = -1 means unused: we supply the date */
			pending_get(SA(&addr), addrlen, from, rcpt, date);
			PENDING_UNLOCK;
		}
		if (action == PS_DELETE) {
			pending_del(SA(&addr), addrlen, from, rcpt, date);
			autowhite_add(SA(&addr), addrlen, from, 
			    rcpt, &aw, "(mxsync)");
		}
		if (action == PS_FLUSH) {
			pending_del_addr(SA(&addr), addrlen, NULL, 0);
			autowhite_del_addr(SA(&addr), addrlen);
		}

		/* Flush modifications to disk */
		dump_flush();
	}

	fprintf(stream, "202 Good bye\n");
	fclose(stream);

	return;

}

static void
sync_vers(stream, vers)
	FILE *stream;
	int vers;
{
	if (vers <= SYNC_PROTO_CURRENT) {
		fprintf(stream, 
		    "%d Yes, I speak version %d, what do you think?\n",
		    800 + vers, vers);
	} else {
		fprintf(stream, "108 Invalid vers%d command\n", vers);
	}
	fflush(stream);
	return;
}

void
sync_help(stream)
	FILE *stream;
{
	fprintf(stream, "203 Help? Sure, we have help here:\n");
	fprintf(stream, "203 \n");
	fprintf(stream, "203 Available commands are:\n");
	fprintf(stream, "203 help  -- displays this message\n");
	fprintf(stream, "203 quit  -- terminate connexion\n");
	fprintf(stream, "203 vers2 -- speak version 2 protocol\n");
	fprintf(stream, "203 vers3 -- speak version 3 protocol\n");
	fprintf(stream, 
	    "203 add addr <ip> from <email> rcpt <email> date <time>  "
	    "-- add en entry\n");
	fprintf(stream, 
	    "203 del addr <ip> from <email> rcpt <email> date <time>  "
	    "-- remove en entry\n");
	fprintf(stream, 
	    "203 del2 addr <ip> from <email> rcpt <email> date <time> "
	    "aw <time> -- remove en entry, adding it to autowhite with "
	    "given delay (version 2 only)\n");
	fprintf(stream, 
	    "203 flush addr <ip> -- remove anything about an ip "
	    " (version 3 only)\n");
	fflush(stream);

	return;
}

#define COM_TIMEOUT 3
int
sync_waitdata(fd)
	int fd;
{
	fd_set fdr, fde;
	struct timeval timeout;
	int retval;

	FD_ZERO(&fdr);
	FD_SET(fd, &fdr);

	FD_ZERO(&fde);
	FD_SET(fd, &fde);

	timeout.tv_sec = COM_TIMEOUT;
	timeout.tv_usec = 0;

	retval = select(fd + 1, &fdr, NULL, &fde, &timeout); 

	return retval;
}


void
sync_queue(peer, type, pending, autowhite)/* peer list must be read-locked */
	struct peer *peer;
	peer_sync_t type;
	struct pending *pending;
	time_t autowhite;
{
	int error;
	struct sync *sync;

	if (peer->p_flags & P_LOCAL)
		return;

	if ((sync = malloc(sizeof(*sync))) == NULL) {
		mg_log(LOG_ERR, "cannot allocate memory: %s", 
		    strerror(errno)); 
		exit(EX_OSERR);
	}

	sync->s_peer = peer;
	sync->s_type = type;
	sync->s_autowhite = autowhite;
	sync->s_pending = pending_ref(pending);

	/*
	 * If the queue has overflown, try to wakeup sync_sender to
	 * void it, but do not accept new entries anymore.
	 */
	if (!sync_queue_poke(peer, sync)) {
		mg_log(LOG_ERR, "peer %s queue overflow (%d entries), "
		    "discarding new entry", peer->p_name, peer->p_qlen);
		sync_free(sync);
	}

	if ((error = pthread_cond_signal(&sync_sleepflag)) != 0) {
		mg_log(LOG_ERR, 
		    "cannot wakeup sync_sender: %s", strerror(error));
		exit(EX_SOFTWARE);
	}
	return;
}

void
sync_free(sync)
	struct sync *sync;
{
	pending_free(sync->s_pending);
	free(sync);
}

void
sync_sender_start(void) {
	pthread_t tid;
	int error;

	if ((error = pthread_create(&tid, NULL, 
	    (void *(*)(void *))sync_sender, NULL)) != 0) {
		mg_log(LOG_ERR, "pthread_create failed: %s", strerror(error));
		exit(EX_OSERR);
	}
	if ((error = pthread_detach(tid)) != 0) {
		mg_log(LOG_ERR, "pthread_detach failed: %s", strerror(error));
		exit(EX_OSERR);
	}
	return;
}

/* ARGSUSED0 */
void
sync_sender(dontcare)
	void *dontcare;
{
	int done = 0;
	struct peer *peer;
	struct sync *sync;
	pthread_mutex_t mutex;
	struct timeval tv1, tv2, tv3;
	int error;

	if ((error = pthread_mutex_init(&mutex, NULL)) != 0) {
		mg_log(LOG_ERR, "pthread_mutex_init failed: %s", 
		    strerror(error));
		exit(EX_OSERR);
	}

	if ((error = pthread_mutex_lock(&mutex)) != 0) {
		mg_log(LOG_ERR, "pthread_mutex_lock failed: %s", 
		    strerror(error));
		exit(EX_OSERR);
	}

	for (;;) {
		if ((error = pthread_cond_wait(&sync_sleepflag, &mutex)) != 0)
			mg_log(LOG_ERR, "pthread_cond_wait failed: %s", 
			    strerror(error));
		if (conf.c_debug) {
			mg_log(LOG_DEBUG, "sync_sender running");
			gettimeofday(&tv1, NULL);
		}
		done = 0;

		PEER_RDLOCK;
		if (LIST_EMPTY(&peer_head))
			goto out;
			
		LIST_FOREACH(peer, &peer_head, p_list) {
			/* Don't try to sync with ourselves */
			if (peer->p_flags & P_LOCAL)
				continue;

			while ((sync = sync_queue_peek(peer)) != NULL ) {

				if (sync_send(sync->s_peer, sync->s_type, 
				    sync->s_pending, sync->s_autowhite) != 0) {
					if (!sync_queue_poke(peer, sync))
						sync_free(sync);

					break;
				}

				sync_free(sync);

				done++;
			}
		}
out:
		PEER_UNLOCK;

		if (conf.c_debug) {
			gettimeofday(&tv2, NULL);
			timersub(&tv2, &tv1, &tv3);
			mg_log(LOG_DEBUG, "sync_sender sleeping, "
			    "done %d entries in %ld.%06lds", done,
			    tv3.tv_sec, tv3.tv_usec);
		}
	}
}


static int
local_addr(sa, salen)
	const struct sockaddr *sa;
	const socklen_t salen;
{
	sockaddr_t addr;
	int	sfd, islocal;

	memcpy(&addr, sa, salen);
	switch(sa->sa_family) {
	case AF_INET:
		SA4(&addr)->sin_port = 0;
		break;

#ifdef AF_INET6
	case AF_INET6:
		SA6(&addr)->sin6_port = 0;
		break;
#endif

	default:
		mg_log(LOG_ERR, "local_addr: unsupported AF %d",
		    sa->sa_family);
		return -1;
		break;
	}

	if ((sfd = socket(sa->sa_family, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
		mg_log(LOG_ERR, "local_addr: socket failed: %s",
		    strerror(errno));
		return -1;
	}

	if (bind(sfd, sa, salen) == -1) {
		if (errno != EADDRNOTAVAIL && 
#ifdef __FreeBSD__
		    errno != EINVAL &&
#endif
		    1) {
			mg_log(LOG_ERR, "local_addr: bind failed: %s",
			    strerror(errno));
			islocal = -1;
		} else {
			islocal = 0;
		}
	} else {
		islocal = 1;
	}

	close(sfd);

	return islocal;
}

static int 
select_protocol(peer, s, stream)
	struct peer *peer;
	int s;
	FILE *stream;
{
	char line[LINELEN + 1];
	int vers;
	char *replystr;
	int replycode;
	char sep[] = " \n\t\r";
	char *cookie = NULL;

	for (vers = SYNC_PROTO_CURRENT; vers > 1; vers--) {
		fprintf(stream, "vers%d\n", vers);

		fflush(stream);

		sync_waitdata(s);	
		if (fgets(line, LINELEN, stream) == NULL) {
			mg_log(LOG_ERR, "Lost connexion with peer %s: "
			    "%s (%d entries queued)", 
			    peer->p_name, strerror(errno), peer->p_qlen);
			return 0;
		}

		if ((replystr = strtok_r(line, sep, &cookie)) == NULL) {
			mg_log(LOG_ERR, "Unexpected reply \"%s\" from peer %s "
			    "closing connexion (%d entries queued)", 
			    line, peer->p_name, peer->p_qlen);
			return 0;
		}

		fflush(stream);

		replycode = atoi(replystr);
		if (replycode != 800 + vers) {
			mg_log(LOG_DEBUG, 
			    "peer %s answered code %d to command vers%d",
			    peer->p_name, replycode, vers);
		} else {
			return vers;
		}
	}

	return 1;
}

void
peer_flush(pending)
	struct pending *pending;
{
	struct peer *peer;

	PEER_RDLOCK;
	if (LIST_EMPTY(&peer_head))
		goto out;

	LIST_FOREACH(peer, &peer_head, p_list) {
		/* Unsupported before verseion 3 */
		if (peer->p_vers < 3)
			continue;
		sync_queue(peer, PS_FLUSH, pending, -1); /* -1: unused */
	}

out:
	PEER_UNLOCK;
	
	return;
}


syntax highlighted by Code2HTML, v. 0.9.1