/*
 * client SMTP program, use sm io.
 *
 * based on proxy.c 
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: smtpc2.c,v 1.85 2007/05/18 03:04:43 ca Exp $")
#include "sm/assert.h"
#include "sm/error.h"
#include "sm/str.h"
#include "sm/test.h"
#include "sm/io.h"
#include "sm/ctype.h"
#if 0
#include "smi-net.h"
#endif
#include "sm/sysexits.h"

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#if HAVE_MATH_H
#include <math.h>
#define SM_MATH_STATS 1
#else
#define SM_MATH_STATS 0
#endif

#include "st.h"
#include "common.h"	/* HACK for debugging, otherwise structs are hidden */

extern sm_stream_T SmStThrIO;

#define SM_IOBUFSIZE (1024*1024)
#define SM_LINE_LEN (4*1024)
static	uchar databuf[SM_IOBUFSIZE];

#ifndef INADDR_NONE
# define INADDR_NONE 0xffffffff
#endif

#define MAXTC	65536L		/* max. number of total connections */
#define SC_MAXADDRS	256		/* max. number of addresses */
#define REQUEST_TIMEOUT 5
#define SEC2USEC(s) ((s)*1000000LL)

static char *prog;			/* Program name   */
static struct sockaddr_in rmt_addr;	/* Remote address */
static unsigned int ta_per_thrd;
static unsigned int sessions;
static unsigned int sess_cnt = 0;
static int debug = 0;
static int stoponerror = 0;
static unsigned long count = 0;
static int busy = 0;
static int concurrent = 0;
static int waitdata = 0;
static int sequence = -1;
static int seqfirst = 0;
static int postfix = 0;
static int rset = 1;
static int idisplay = 0;	/* interactive display */
static int show_rate = 0;
static unsigned int datalen = 0;
static char *cmdfile = NULL;
static char *datafile = NULL;
static int flushdata = 0;
static unsigned int rcpts = 0;
static unsigned long ta_total = 0;
static unsigned long total_rate = 0;
static bool total_rate_set = false;
static unsigned long ta_cnt = 0;
static unsigned long startup = 0;
static unsigned long startup_act = 0;
static unsigned long teardown = 0;
/* static unsigned long teardown_act = 0; not yet used */
static const char *from[SC_MAXADDRS], *rcpt[SC_MAXADDRS];
static const char *fromdom[SC_MAXADDRS], *rcptdom[SC_MAXADDRS];
static unsigned int fromaddrs = 0;
static unsigned int rcptaddrs = 0;
static unsigned int fromdoms = 0;
static unsigned int rcptdoms = 0;
static unsigned int request_timeout = REQUEST_TIMEOUT;
static int showerror = 0;
static int usesize = 0;
static char *mailarg = NULL;
static char *ehloarg = "me.local";
static time_t firstNmail = 0;
static time_t firstmail = 0;

static bool try_drr = false;
static bool use_drr = false;
static bool try_rsad = false;
static bool use_rsad = false;

/* # of recipient addresses sent, # of replies received, # of "OK"s */
static unsigned int rcpts_s = 0;
static unsigned int rcpts_r = 0;
static unsigned int rcpts_ok = 0;
static int rcpt_r = 0;	/* RCPT status */
static unsigned int rcpt_i = 0;	/* index of failed RCPT (iff rcpt_r is set) */
static int mail_s = 0;	/* MAIL has been sent? */
static int mail_r = 0;	/* MAIL status */
static int use_pipelining = 0;
static sm_str_P reply_str = NULL;

static unsigned int srv_caps = 0;
static unsigned long srv_max_sz_b = 0;

/* 3 digits reply code plus ' ' or '-' */
#define SMTP_REPLY_OFFSET	4

/* server capabilities */
#define SCSE_CAP_NONE		0x0000	/* guess	*/
#define SCSE_CAP_ESMTP		0x0001	/* ESMTP	*/
#define SCSE_CAP_PIPELINING	0x0010	/* PIPELINING	*/
#define SCSE_CAP_8BITMIME	0x0020	/* 8BITMIME	*/
#define SCSE_CAP_SIZE		0x0040	/* SIZE		*/
#define SCSE_CAP_ENHSTAT	0x0080	/* ENHANCEDSTATUSCODES	*/
#define SCSE_CAP_AUTH		0x0100	/* AUTH		*/
#define SCSE_CAP_STARTTLS	0x0200	/* STARTTLS	*/
#define SCSE_CAP_DRR		0x0400	/* DRR	*/
#define SCSE_CAP_RSAD		0x0800	/* RSAD	*/

#define SCSE_SET_CAP(caps, cap)	(caps) |= (cap)
#define SCSE_CLR_CAP(caps, cap)	(caps) &= ~(cap)
#define SCSE_IS_CAP(caps, cap)	(((caps) & (cap)) != 0)


#define SCE_ST_ABORT_SE	0x0100
#define SCE_ST_ABORT_TA	0x0200

#define SCE_CODE(code)	((code) & 0x7f00)
#define SCE_VAL(code)	((code) & 0x00ff)
#define SCE_IS_ABORT_SE(code, counter)	\
	((SCE_CODE(code) == SCE_ST_ABORT_SE) && \
	 (SCE_VAL(code) == 0 || SCE_VAL(code) == (counter)))
#define SCE_IS_ABORT_TA(code, counter)	\
	((SCE_CODE(code) == SCE_ST_ABORT_TA) && \
	 (SCE_VAL(code) == 0 || SCE_VAL(code) == (counter)))

#define STAGE_CONNECT	0
#define STAGE_HELO	1
#define STAGE_MAIL	2
#define STAGE_RCPT	3
#define STAGE_DATA	4
#define STAGE_DOT	5
/* #define STAGE_LDAT	6 */
#define STAGES		6

#define SC_PIPELINE_OK(stage) (((STAGE_MAIL == (stage)) || (STAGE_RCPT == (stage))) && use_pipelining)

static int codes[STAGES];
static int waits[STAGES];

static void read_address(const char *str, struct sockaddr_in *sin);
static void *handle_request(void *arg);
static void print_sys_error(const char *msg);

/*
**  timing statistics
*/

#define ST_SE_CONNECT	0
#define ST_SE_EHLO	1
#define ST_SE_QUIT	3
#define ST_SE_N		4

#define ST_TA_RSET	0
#define ST_TA_MAIL	1
#define ST_TA_RCPT	2	/* what about multiple recipients??? */
#define ST_TA_DATA	3
#define ST_TA_DOT	4
#define ST_TA_N		5

typedef struct sce_stt_S		sce_stt_T, *sce_stt_P;
struct sce_stt_S
{
	st_utime_t	 scet_wr[ST_TA_N];
	st_utime_t	 scet_rd[ST_TA_N];
	st_utime_t	 scet_ta;	/* for one transaction */
};

typedef struct sce_sts_S		sce_sts_T, *sce_sts_P;
struct sce_sts_S
{
	st_utime_t	 sces_wr[ST_SE_N];
	st_utime_t	 sces_rd[ST_SE_N];
	st_utime_t	 sces_se;	/* for one session */
	sce_stt_P	 sces_tas;	/* transaction data */
};

static sce_sts_P sc_sts = NULL;
static int stats = 0;

static void
sce_usage(const char *prg)
{
	sm_io_fprintf(smioerr,
		"Usage: %s [options] -r host:port\n"
		"-2          flush data before final dot\n"
		"-8          try Deferred RCPT Reply\n"
		"-9          try RCPT status after dot.\n"
		"-A          measure SMTP dialogue times\n"
		"-a stage=code   set action reply code for stage to code\n"
		"   stage:\n"
		"     c: new session (connect)\n"
		"     h: HELO\n"
		"     m: MAIL\n"
		"     r: RCPT\n"
		"     d: DATA\n"
		"     b: Body\n"
		"   code:    action\n"
		"   %06#x   abort session [drop connection]\n"
		"   %06#x   abort transaction [move to next if there is one]\n"
		"   lower 2 bytes can be used as counter\n"
		"-B filename use content of filename for commands to send\n"
		"-C n        show counter (print if counter %% n == 0)\n"
		"-d n        set debug level\n"
		"-D filename use content of filename for DATA\n"
		"-E          show SMTP dialogue errors\n"
		"-e domain   use domain for EHLO [%s]\n"
		"-F domain   use domain for sender addresses [%s]\n"
		"-f address  from address\n"
		"-G          show total rate\n"
		"-g n        number of `teardown' messages (before total)\n"
		"-I n        number of `startup' messages\n"
		"-i          interactive display of performance data\n"
		"-l n        data length\n"
		"-M arg      set arg for MAIL\n"
		"-m n        number of messages to send\n"
		"-N          do not use RSET between transactions\n"
		"-n n        number of recipients per transaction\n"
		"-P          use PIPELINING if offered\n"
		"-p n        use n in addresses as identifier\n"
		"-O timeout  I/O timeout [%d].\n"
		"-R address  recipient address (can be specified multiple times)\n"
		"-s n        total sessions\n"
		"-S          stop on errors\n"
		"-t n        concurrent threads\n"
		"-T n        transactions per thread\n"
		"-w n        wait for n seconds after DATA\n"
		"-W stage=delay   wait for delay seconds before issuing stage command\n"
		"-q n        send n messages with 1..n as body.\n"
		"-Q m        use m as first element in sequence: m..n.\n"
		"-Y domain   use domain for recipient addresses [%s]\n"
		"-Z          use SIZE= for MAIL\n"
		, prg
		, SCE_ST_ABORT_SE
		, SCE_ST_ABORT_TA
		, ehloarg
		, fromdom[0]
		, request_timeout
		, rcptdom[0]
		);
}

static unsigned int
getstage(char arg)
{
	unsigned int stage;

	stage = -1;
	switch (optarg[0])
	{
	  case 'c':
		stage = STAGE_CONNECT;
		break;
	  case 'h':
	  case 'e':
		stage = STAGE_HELO;
		break;
	  case 'm':
		stage = STAGE_MAIL;
		break;
	  case 'r':
		stage = STAGE_RCPT;
		break;
	  case 'd':
		stage = STAGE_DATA;
		break;
	  case 'b':
		stage = STAGE_DOT;
		break;
	  default:
		sce_usage(prog);
		exit(EX_USAGE);
	}
	return stage;
}

#define ST_UTIME_MAX ((st_utime_t)-1)

static st_utime_t
next_val(int se, int ta, int stage, int wr)
{
	sce_stt_P sc_stt;

	if (!stats)
		return ST_UTIME_MAX;
	if (se >= sessions)
		return ST_UTIME_MAX;
	if (ta >= ta_per_thrd)
		return ST_UTIME_MAX;

	if (ta < 0)
	{
		if (stage < 0)
			return ST_UTIME_MAX;
		if (stage >= ST_SE_N)
			return sc_sts[se].sces_se;
		if (wr)
			return sc_sts[se].sces_wr[stage];
		else
			return sc_sts[se].sces_rd[stage];
	}

	sc_stt = sc_sts[se].sces_tas;
	if (stage < 0)
		return ST_UTIME_MAX;
	if (stage >= ST_TA_N)
		return sc_sts[se].sces_se;
	if (wr)
		return sc_stt[ta].scet_wr[stage];
	else
		return sc_stt[ta].scet_rd[stage];

	return ST_UTIME_MAX;
}

static void
some_stats(int se, int ta, int stage, int wr)
{
	uint se_u, ta_u, stage_u, count;
	st_utime_t t_val, t_sum, t_min, t_max;
#if SM_MATH_STATS
	ulong geometric, harmonic;
	double product, inversesum;
#endif

	if (se >= 0 && ta >= 0 && stage >= 0)
		return;

#if SM_MATH_STATS
	product = 1.0;
	inversesum = 0.0;
#endif
	t_val = t_sum = 0;
	t_min = UINT_MAX;
	t_max = 0;
	se_u = ta_u = stage_u = 0;
	if (se >= 0)
		se_u = se;
	if (ta >= 0)
		ta_u = ta;
	if (stage >= 0)
		stage_u = stage;

	count = 0;
	while (t_val >= 0)
	{
		t_val = next_val(se_u, ta_u, stage_u, wr);
		if (ST_UTIME_MAX == t_val)
			break;
		t_sum += t_val;
		if (t_val < t_min)
			t_min = t_val;
		if (t_val > t_max)
			t_max = t_val;
#if SM_MATH_STATS
		product *= (double) t_val;
		inversesum += (double) 1.0 / (double) t_val;
#endif

		if (se < 0)
			++se_u;
		else if (ta < 0)
			++ta_u;
		else if (stage < 0)
			++stage_u;
		++count;
	}
#if SM_MATH_STATS
	geometric = pow(product, 1.0/(double)count);
	harmonic = (double)count / inversesum;
#endif
	sm_io_fprintf(smioerr,
		"%4d:%4d:%d:%u  min=%4lu, max=%4lu, medium=%ld, geometric=%ld, harmonic=%ld\n"
		, se, ta, stage, count
		, (ulong) t_min, (ulong) t_max
		, (ulong) (t_sum / ((count > 0) ? count : 1))
#if SM_MATH_STATS
		, geometric, harmonic
#else
		, -1, -1
#endif
		);
}

static void
show_stats(void)
{
	uint se, ta, stage;
	sce_stt_P sc_stt;

	if (!stats || NULL == sc_sts)
		return;

	for (se = 0; se < sessions; se++)
	{
		sc_stt = sc_sts[se].sces_tas;
		for (ta = 0; ta < ta_per_thrd; ta++)
		{
			for (stage = 0; stage < ST_TA_N; stage++)
			{
				sm_io_fprintf(smioerr,
					"%4u:%4u:%u  wr=%4lu, rd=%4lu\n"
					, se, ta, stage
					, (ulong) sc_stt[ta].scet_wr[stage]
					, (ulong) sc_stt[ta].scet_rd[stage]
					);
			}
			sm_io_fprintf(smioerr,
				"%4u:%4u    ta=%4lu\n"
				, se, ta
				, (ulong) sc_stt[ta].scet_ta
				);
		}
		for (stage = 0; stage < ST_SE_N; stage++)
		{
			sm_io_fprintf(smioerr,
				"%4u:    :%u  wr=%4lu, rd=%4lu\n"
				, se, stage
				, (ulong) sc_sts[se].sces_wr[stage]
				, (ulong) sc_sts[se].sces_rd[stage]
				);
		}
		sm_io_fprintf(smioerr,
			"%4u         se=%4lu\n"
			, se
			, (ulong) sc_sts[se].sces_se
			);
	}
	some_stats(-1, 0, 1, 1);
}

int
main(int argc, char *argv[])
{
	extern char    *optarg;
	int             opt, n, threads;
	int             raddr;
	unsigned int    u, off, stage;
	long int        tc;
	size_t		j;
	ssize_t		bytesread;
	sce_stt_P	sc_stt;

	prog = argv[0];
	raddr = 0;
	sessions = ta_per_thrd = threads = 1;
	rcpt[0] = from[0] = NULL;
	bytesread = 0;
	fromdom[0] = "local.host";
	rcptdom[0] = "local.host";

	/* Parse arguments */
	while ((opt = getopt(argc, argv,
			"289Aa:B:C:c:D:d:Ee:F:f:Gg:hI:il:M:m:Nn:O:o:Pp:R:r:Ss:T:t:W:w:Q:q:Y:Z"))
				!= -1)
	{
		switch (opt)
		{
		  case '2':
			flushdata = 1;
			break;
		  case '8':
			try_drr = true;
			break;
		  case '9':
			try_rsad = true;
			break;
		  case 'A':
			stats = 1;
			break;
		  case 'a':
			if (optarg != NULL && ISALPHA(optarg[0])
			    && optarg[1] == '=')
				off = 2;
			else
			{
				sce_usage(prog);
				break;
			}

			u = (unsigned int) strtoul(optarg + off, NULL, 0);
			stage = getstage(optarg[0]);
			SM_ASSERT(stage < STAGES);
			codes[stage] = u;
			break;
		  case 'B':
			cmdfile = strdup(optarg);
			if (NULL == cmdfile)
			{
				sm_io_fprintf(smioerr, "strdup(%s)=NULL\n"
					, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'C':
			count = (unsigned long) strtoul(optarg, NULL, 0);
			break;
		  case 'T':
		  case 'c':
			ta_per_thrd = atoi(optarg);
			if (ta_per_thrd < 1)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number of connections: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'd':
			debug = atoi(optarg);
			if (debug < 0)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number for debug: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'D':
			datafile = strdup(optarg);
			if (NULL == datafile)
			{
				sm_io_fprintf(smioerr, "strdup(%s)=NULL\n"
					, optarg);
				exit(EX_OSERR);
			}
			break;
		  case 'E':
			++showerror;
			break;
		  case 'e':
			ehloarg = optarg;
			break;
		  case 'f':
			if (fromaddrs >= SC_MAXADDRS - 1)
			{
				sm_io_fprintf(smioerr,
					"%s: too many addresses=%d, max=%d\n",
					prog, fromaddrs, SC_MAXADDRS);
				exit(EX_USAGE);
			}
			from[fromaddrs++] = optarg;
			break;
		  case 'F':
			if (fromdoms >= SC_MAXADDRS - 1)
			{
				sm_io_fprintf(smioerr,
					"%s: too many domains=%d, max=%d\n",
					prog, fromdoms, SC_MAXADDRS);
				exit(EX_USAGE);
			}
			fromdom[fromdoms++] = optarg;
			break;
		  case 'G':
			show_rate = 1;
			break;
		  case 'g':
			teardown = (unsigned long) atol(optarg);
			break;
		  case 'I':
			startup = (unsigned long) atol(optarg);
			break;
		  case 'i':
			idisplay = 1;
			break;
		  case 'M':
			if (NULL == optarg)
			{
				sm_io_fprintf(smioerr,
					"%s: missing arg for -%c\n",
					prog, opt);
				exit(EX_USAGE);
			}
			mailarg = strdup(optarg);
			if (NULL == mailarg)
			{
				sm_io_fprintf(smioerr,
					"%s: strdup(%s) failed\n",
					prog, optarg);
				exit(EX_OSERR);
			}
			break;
		  case 'm':
			ta_total = (unsigned long) atol(optarg);
			break;
		  case 'l':
			datalen = (unsigned int) atoi(optarg);
			break;
		  case 'N':
			rset = 0;
			break;
		  case 'n':
			rcpts = (unsigned int) atoi(optarg);
			if (rcpts < 1)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number of rcpts: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'O':
			request_timeout  = atoi(optarg);
			if (request_timeout <= 0)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid timeout: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'P':
			use_pipelining = 1;
			break;
		  case 'p':
			postfix = atoi(optarg);
			break;
		  case 'r':
			read_address(optarg, &rmt_addr);
			if (rmt_addr.sin_addr.s_addr == INADDR_ANY)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid remote address: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			raddr = 1;
			break;
		  case 'R':
			if (rcptaddrs >= SC_MAXADDRS - 1)
			{
				sm_io_fprintf(smioerr,
					"%s: too many addresses=%d, max=%d\n",
					prog, rcptaddrs, SC_MAXADDRS);
				exit(EX_USAGE);
			}
			rcpt[rcptaddrs++] = optarg;
			break;
		  case 's':
			sessions = (unsigned int) atoi(optarg);
			if (sessions < 1)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number of sessions: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'S':
			stoponerror++;
			break;
		  case 't':
			threads = atoi(optarg);
			if (threads < 1)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number of threads: %s\n",
					prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'q':
			sequence = atoi(optarg);
			if (sequence < 1)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number for sequence: %s\n"
					, prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'Q':
			seqfirst = atoi(optarg);
			if (seqfirst < 0)
			{
				sm_io_fprintf(smioerr,
					"%s: invalid number for sequence begin: %s\n"
					, prog, optarg);
				exit(EX_USAGE);
			}
			break;
		  case 'W':
			if (optarg != NULL && ISALPHA(optarg[0])
			    && optarg[1] == '=')
				off = 2;
			else
			{
				sce_usage(prog);
				break;
			}

			u = (unsigned int) strtoul(optarg + off, NULL, 0);
			stage = getstage(optarg[0]);
			SM_ASSERT(stage < STAGES);
			waits[stage] = u;
			break;
		  case 'w':
			waitdata = atoi(optarg);
			break;
		  case 'Y':
			if (rcptdoms >= SC_MAXADDRS - 1)
			{
				sm_io_fprintf(smioerr,
					"%s: too many domains=%d, max=%d\n",
					prog, rcptdoms, SC_MAXADDRS);
				exit(EX_USAGE);
			}
			rcptdom[rcptdoms++] = optarg;
			break;
		  case 'Z':
			usesize = 1;
			break;
		  case 'h':
		  case '?':
			sce_usage(prog);
			exit(EX_USAGE);
		}
	}
	if (!raddr)
	{
		sm_io_fprintf(smioerr, "%s: remote address required\n", prog);
		exit(EX_USAGE);
	}

	/* number of recipients not set? */
	if (0 == rcpts)
	{
		if (rcptaddrs > 1)
			rcpts = rcptaddrs;
		else
			rcpts = 1;
	}
	if (0 == fromdoms)
		fromdoms = 1;
	if (0 == rcptdoms)
		rcptdoms = 1;

	if (0 == bytesread)
	{
#if SC_USE_DATE
#define MAIL_HEADER "From: me\r\nTo: you\r\nSubject: test\r\nDate: today\r\n\r\n"
#else
#define MAIL_HEADER "From: me\r\nTo: you\r\nSubject: test\r\n\r\n"
#endif
		j = strlcpy((char *)databuf, MAIL_HEADER, sizeof(databuf));
		for (j = strlen(MAIL_HEADER); j < sizeof(databuf); j++)
			databuf[j] = ' ' + (j % 64);
		for (j = 80; j < sizeof(databuf) - 1; j += 80)
		{
			databuf[j] = '\r';
			databuf[j + 1] = '\n';
		}
	}

	tc = (long) sessions * (long) ta_per_thrd;
	if (debug)
		sm_io_fprintf(smioerr, "%s: starting client [%d]\n", prog,
			threads);

	if (stats)
	{
		j = sessions * sizeof(*sc_sts);
		if (j < sessions || j < sizeof(*sc_sts))
		{
			sm_io_fprintf(smioerr,
				"%s: statistics too large %lu\n",
				prog, j);
			exit(EX_OSERR);
		}
		sc_sts = malloc(j);
		if (NULL == sc_sts)
		{
			sm_io_fprintf(smioerr,
				"%s: cannot allocate statistics %lu\n",
				prog, j);
			exit(EX_OSERR);
		}
		sm_memzero(sc_sts, j);

		j = tc * sizeof(*sc_stt);
		if (j < tc || j < sizeof(*sc_stt))
		{
			sm_io_fprintf(smioerr,
				"%s: statistics too large (TA) %lu\n",
				prog, j);
			exit(EX_OSERR);
		}
		sc_stt = malloc(j);
		if (NULL == sc_stt)
		{
			sm_io_fprintf(smioerr,
				"%s: cannot allocate statistics (TA) %lu\n",
				prog, j);
			exit(EX_OSERR);
		}
		sm_memzero(sc_stt, j);
		off = 0;
		for (u = 0; u < sessions; u++)
		{
			sc_sts[u].sces_tas = &(sc_stt[off]);
			off += ta_per_thrd;
		}
	}

	/* Initialize the ST library */
	if (st_init() < 0)
	{
		print_sys_error("st_init");
		exit(EX_OSERR);
	}
	if (idisplay)
		sm_io_fprintf(smioerr,
				"   msg    time_i   time_t  msgs/s t-msgs/s  busy  conc\n");

	for (n = 0; n < threads; n++)
	{
		if (debug)
			sm_io_fprintf(smioerr, "%s: starting client %d/%d\n",
				prog, n, threads);
		if (st_thread_create(handle_request, (void *) n, 0, 0) == NULL)
		{
			print_sys_error("st_thread_create");
			exit(EX_OSERR);
		}
	}

	/* wait for them... */
	st_sleep(1);
	while (busy > 0)
		st_sleep(1);

	if (idisplay)
		sm_io_fprintf(smioerr, "\n");
	if (show_rate) {
		sm_io_fprintf(smioerr,
			"%s: total=%lu, total_rate=%lu, (should be %lu/%lu)\n",
			prog, ta_cnt, total_rate, tc, ta_total);
	}
	else {
		sm_io_fprintf(smioerr,
			"%s: total=%lu (should be %lu/%lu)\n",
			prog, ta_cnt, tc, ta_total);
	}
	show_stats();
	return 0;
}

static void
read_address(const char *str, struct sockaddr_in * sin)
{
	char            host[128], *p;
	struct hostent *hp;
	short           port;

	strlcpy(host, str, sizeof(host));
	if ((p = strchr(host, ':')) == NULL)
	{
		sm_io_fprintf(smioerr, "%s: invalid address: %s\n", prog, host);
		exit(EX_USAGE);
	}
	*p++ = '\0';
	port = (short) atoi(p);
	if (port < 1)
	{
		sm_io_fprintf(smioerr, "%s: invalid port: %s\n", prog, p);
		exit(EX_USAGE);
	}
	memset(sin, 0, sizeof(struct sockaddr_in));
	sin->sin_family = AF_INET;
	sin->sin_port = htons(port);
	if (host[0] == '\0')
	{
		sin->sin_addr.s_addr = INADDR_ANY;
		return;
	}
	sin->sin_addr.s_addr = inet_addr(host);
	if (INADDR_NONE == sin->sin_addr.s_addr)
	{
		/* not dotted-decimal */
		if ((hp = gethostbyname(host)) == NULL)
		{
			sm_io_fprintf(smioerr, "%s: can't resolve address: %s\n", prog, host);
			exit(EX_OSERR);
		}
		memcpy(&sin->sin_addr, hp->h_addr, hp->h_length);
	}
}

/* before changing these, check the macros! */
#define SMTP_OK		0
#define SMTP_NONE	1	/* no SMTP reply (PIPELINING) */
#define SMTP_CONT	2	/* 3xy */
#define SMTP_AN		3	/* SMTP reply type isn't 2 or 3 */
#define SMTP_SSD	4	/* 421 */
#define SMTP_RD		10	/* read error */
#define SMTP_WR		20	/* write error */
#define SMTP_IS_OK(r)	((r) <= SMTP_CONT)
#define SMTP_IO_ERR(r)	((r) >= SMTP_RD)
#define SMTP_FATAL(r)	((r) >= SMTP_SSD)

static int
smtpwrite(int l, sm_file_T *fp, int tid, int i)
{
	int wr;
	sm_ret_T ret;
	ssize_t b;

	while (l > 0)
	{
		wr = SM_MIN(l, (int) sizeof(databuf));
		ret = sm_io_write(fp, databuf, wr, &b);
		if ((int) b != wr || ret != SM_SUCCESS)
		{
			sm_io_fprintf(smioerr,
				"[%d] data write error i=%d, n=%d, r=%d, ret=%#x\n"
			      , tid, i, l, (int) b, ret);
			return SMTP_WR;
		}
		l -= wr;
	}
	return SMTP_OK;
}

static int
smtpdata(int l, sm_file_T *fp, int tid, int i)
{
	int r;

	if (datafile != NULL)
	{
		sm_file_T *dfp;
		ssize_t bytesread;

		r = sm_io_open(SmStStdio, datafile, SM_IO_RDONLY, &dfp,
			SM_IO_WHAT_END);
		if (r != SM_SUCCESS)
		{
			sm_io_fprintf(smioerr, "sm_io_open(%s)=%r\n"
				, datafile, r);
			return r;
		}
		do
		{
			r = sm_io_read(dfp, databuf, sizeof(databuf),
				&bytesread);
			if (SM_IO_EOF == r)
			{
				r = SMTP_OK;
				break;
			}
			if (sm_is_err(r))
				break;
			r = smtpwrite(bytesread, fp, tid, i);
			if (flushdata)
				(void) sm_io_flush(fp);
		} while (SMTP_OK == r);
		sm_io_close(dfp, SM_IO_CF_NONE);
	}
	else
		r = smtpwrite(l, fp, tid, i);
	return r;
}

/*
**  RD_SRV_CAP -- read server capabilities
**
**	Parameters:
**		sc_sess -- SMTPC session context
**
**	Returns:
**		usual sm_error code
*/

static sm_ret_T
rd_srv_cap(sm_str_P ehlo_reply)
{
	char *opt;

	/* "250 " + something useful: at least 4 characters */
	if (sm_str_getlen(ehlo_reply) < 8)
		return SM_SUCCESS;
	if (sm_str_rd_elem(ehlo_reply, 0) != '2' ||
	    sm_str_rd_elem(ehlo_reply, 1) != '5' ||
	    sm_str_rd_elem(ehlo_reply, 2) != '0' ||
	    (sm_str_rd_elem(ehlo_reply, 3) != ' ' &&
	     sm_str_rd_elem(ehlo_reply, 3) != '-'))
		return SM_SUCCESS;	/* error?? */

#define SMTPC_IS_CAPN(str)	(sm_strcaseeqn(opt, (str), strlen(str)))
#define SMTPC_IS_CAP(str)	(sm_strcaseeq(opt, (str)))

	opt = (char *) sm_str_getdata(ehlo_reply);
	SM_REQUIRE(opt != NULL);
	opt += SMTP_REPLY_OFFSET;

	if (SMTPC_IS_CAP("pipelining"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_PIPELINING);
	else if (SMTPC_IS_CAP("8bitmime"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_8BITMIME);
	else if (SMTPC_IS_CAP("size"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_SIZE);
	else if (SMTPC_IS_CAPN("size "))
	{
		ulonglong_T max_sz_b;	/* off_t? */
		char *endptr;

		errno = 0;

		/* 5 == strlen("size ") */
		max_sz_b = sm_strtoull(opt + 5, &endptr, 10);
		if (!((ULLONG_MAX == max_sz_b && ERANGE == errno) ||
		      endptr == opt + 5))
		{
			SCSE_SET_CAP(srv_caps, SCSE_CAP_SIZE);
			srv_max_sz_b = max_sz_b;
		}
		/* else: silently ignore ... */
	}
	else if (SMTPC_IS_CAP("enhancedstatuscodes"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_ENHSTAT);
	else if (SMTPC_IS_CAPN("auth "))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_AUTH);
	else if (SMTPC_IS_CAP("starttls"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_STARTTLS);
	else if (SMTPC_IS_CAP("drr"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_DRR);
	else if (SMTPC_IS_CAP("prdr"))
		SCSE_SET_CAP(srv_caps, SCSE_CAP_RSAD);
	return SM_SUCCESS;
}

/*
**  SMTPCOMMAND -- send one SMTP command and read reply
**
**	Parameters:
**		str -- SMTP command
**		l -- length of str
**		fp -- filepointer for output
**		out -- reply from server (to be filled in)
**		tid -- thread id
**		i -- transaction counter
**		stage -- SMTP stage
**		t_write -- time for write (output)
**		t_read -- time for read (output)
**
**	Returns:
**		SMTP_* (see above)
*/

static int
smtpcommand(char *str, int l, sm_file_T *fp, sm_str_P out, int tid, unsigned int i, int stage, st_utime_t *t_write, st_utime_t *t_read)
{
	sm_ret_T        ret;
	ssize_t         b;
	st_utime_t	t0, t1;
	int		used;
	unsigned int rcpts_ad;

	/* avoid bogus "might be used uninitialized" warnings */
	t0 = t1 = 0;
	if (stage >= 0 && stage <= STAGES && waits[stage] > 0)
		sleep(waits[stage]);
	if (debug > 3)
	{
		sm_io_fprintf(smioerr, "[%d] send: ", tid);
		sm_io_write(smioerr, (uchar *) str, l, &b);
		sm_io_flush(smioerr);
	}
	if (stats)
		t0 = st_utime();
	ret = sm_io_write(fp, (uchar *) str, l, &b);
	if (b != l)
	{
		sm_io_fprintf(smioerr, "[%d] write error i=%u, n=%d, r=%d, ret=%#x\n",
			      tid, i, l, (int) b, ret);
		if (stoponerror)
			sessions = 0;
		return SMTP_WR;
	}
	if (!SC_PIPELINE_OK(stage)
	    || !SCSE_IS_CAP(srv_caps, SCSE_CAP_PIPELINING))
	{
		ret = sm_io_flush(fp);
		if (sm_is_error(ret))
		{
			sm_io_fprintf(smioerr,
				"[%d] flush error i=%u, n=%d, ret=%#x\n",
				tid, i, (int) b, ret);
			if (stoponerror)
				sessions = 0;
			return SMTP_WR;
		}
	}
	if (stats && t_write != NULL)
	{
		t1 = st_utime();
		*t_write = t1 - t0;
		t0 = t1;
	}
	if (STAGE_MAIL == stage)
		mail_s = 1;
	if (STAGE_RCPT == stage)
		++rcpts_s;

	rcpts_ad = 0;

	/* also check how much data has been sent? */
	if (SC_PIPELINE_OK(stage)
	    && SCSE_IS_CAP(srv_caps, SCSE_CAP_PIPELINING)
	    && !sm_io_getinfo(fp, SM_IO_IS_READABLE, NULL))
		return SMTP_NONE;

  again:
	used = 0;
	do
	{
		time_t before, after;

		sm_str_clr(out);
#if DEBUG
		before = st_time();
#else
		before = 0;
#endif
		ret = sm_fgetline0(fp, out);
#if DEBUG
		after = st_time();
#else
		after = 0;
#endif

		if (debug > 3)
		{
			sm_io_fprintf(smioerr, "[%d] rcvd [len=%d, res=%#x]: ", tid,
				      sm_str_getlen(out), ret);
			sm_io_write(smioerr, sm_str_getdata(out),
				sm_str_getlen(out), &b);
			sm_io_putc(smioerr, '\n');
			sm_io_flush(smioerr);
		}
		if (sm_is_error(ret))
		{
			sm_io_fprintf(smioerr,
				"[%d] cmd=%s, error=read, i=%u, n=%d, ret=%#x, after-before=%ld\n",
				tid, str, i, (int) b, ret,
				(long) (after - before));
			if (stoponerror)
				sessions = 0;
			return SMTP_RD;
		}
		if (sm_str_getlen(out) == 0 ||
		    (sm_str_rd_elem(out, 0) != '2' &&
		     sm_str_rd_elem(out, 0) != '3'))
		{
			ret = SMTP_AN;
			if (use_pipelining &&
			    SCSE_IS_CAP(srv_caps, SCSE_CAP_PIPELINING) &&
	    	    (STAGE_RCPT == stage || STAGE_DATA == stage))
				break;
			else if (STAGE_DOT == stage && (use_rsad || use_drr))
				break;
			else
				return ret;
		}
		if (STAGE_HELO == stage)
			(void) rd_srv_cap(out);
	} while (!sm_is_error(ret) && sm_str_getlen(out) > 4 &&
		 sm_str_rd_elem(out, 3) == '-');

	if (stats && t_read != NULL)
	{
		t1 = st_utime();
		*t_read = t1 - t0;
	}

	if (sm_str_getlen(out) > 3
	    && sm_str_rd_elem(out, 0) == '4'
	    && sm_str_rd_elem(out, 1) == '2'
	    && sm_str_rd_elem(out, 2) == '1')
		return SMTP_SSD;

	if (sm_str_getlen(out) > 3
	    && sm_str_rd_elem(out, 0) != '2'
	    && sm_str_rd_elem(out, 0) != '3')
		ret = SMTP_AN;
	else if (sm_str_getlen(out) > 3
	    && sm_str_rd_elem(out, 0) == '3')
		ret = SMTP_CONT;
	else
		ret = SMTP_OK;	/* not really correct but good enough */

	if (use_pipelining && SCSE_IS_CAP(srv_caps, SCSE_CAP_PIPELINING) &&
	    (STAGE_RCPT == stage || STAGE_DATA == stage
	     /* || STAGE_LDAT == stage */ ))
	{
		if (SMTP_NONE == mail_r)
		{
			mail_r = ret;
			used = 1;
		}
		else if (rcpts_r < rcpts_s)
		{
			if (SMTP_OK == ret)
				++rcpts_ok;
			else
			{
				rcpt_r = ret;
				rcpt_i = rcpts_r;
			}

			++rcpts_r;
			if (rcpts_r == rcpts_s)
				/* done; what to do? */;
			used = 1;
		}

		if (!SMTP_IS_OK(ret))
			sm_str_cpy(reply_str, out) ;

		/* need to collect all replies? */
		if (STAGE_DATA == stage && used)
			goto again;
	}

	if (STAGE_DOT == stage && use_rsad)
	{
			if (0 == rcpts_ad)
			{
				if (ret != SMTP_CONT)
					rcpts_ad = rcpts_ok + 1;	/* avoid "looping" */
			}
			if (!SMTP_IS_OK(ret) && showerror && rcpts_ad <= rcpts_ok)
					sm_io_fprintf(smioerr,
						"RCPT=error, text=%@T, se=%d, ta=%u, rcpt=%u\n",
						out, sessions, rcpts_ad);
			++rcpts_ad;
			if (rcpts_ad <= rcpts_ok + 1)
				goto again;
	}

	if (STAGE_DOT == stage && use_drr)
	{
			++rcpts_ad;
			if (rcpts_ad <= rcpts_ok)
				goto again;
	}

	if (STAGE_MAIL == stage && SMTP_OK == ret)
		mail_r = ret;

	return ret;
}

static int
smtpcmds(sm_file_T *fp, int tid, int i, sm_str_P out)
{
	int r;
	sm_file_T *cfp;

	r = sm_io_open(SmStStdio, cmdfile, SM_IO_RDONLY, &cfp, SM_IO_WHAT_END);
	if (r != SM_SUCCESS)
	{
		sm_io_fprintf(smioerr, "sm_io_open(%s)=%r\n", cmdfile, r);
		return r;
	}
	do
	{
		sm_str_clr(out);
		r = sm_fgetline(cfp, out);
		if (sm_is_err(r))
			break;
		r = smtpcommand((char *)sm_str_getdata(out), sm_str_getlen(out),
				fp, out, tid, 0, -1, NULL, NULL);
		if (!SMTP_IS_OK(r) && showerror)
			sm_io_fprintf(smioerr, "text=%@T\n", out);
	} while (!SMTP_IO_ERR(r) && !SMTP_FATAL(r));
	sm_io_close(cfp, SM_IO_CF_NONE);
	return r;
}

static void *
handle_request(void *arg)
{
	st_netfd_t      rmt_nfd;
	int             sock, n, i, tid, r, rok, myseq;
	unsigned int    u, j;
	ssize_t         b;
	sm_file_T      *fp;
	sm_str_P        out;
	sm_ret_T        ret;
	char		*s;
	char            buf[SM_LINE_LEN];
	char            sbuf[64];
	st_utime_t	t0, t1, ts0;
	st_utime_t	*pt0, *pt1;
	sce_sts_P	sc_stse;
	sce_stt_P	sc_stt;
	sce_stt_P	sc_stta;

	++busy;
	i = 0;
	tid = (int) arg;

	/* avoid bogus "might be used uninitialized" warnings */
	t0 = t1 = ts0 = 0;
	pt0 = NULL;
	pt1 = NULL;
	sc_stt = NULL;
	sc_stta = NULL;
	sc_stse = NULL;

	if (debug)
		sm_io_fprintf(smioerr, "client[%d]: transactions=%d\n",
			tid, ta_per_thrd);

	out = sm_str_new(NULL, SM_LINE_LEN, SM_IOBUFSIZE);
	if (NULL == out)
	{
		sm_snprintf(buf, sizeof(buf), "[%d] str new failed i=%d",
			tid, i);
		print_sys_error(buf);
		if (stoponerror)
			sessions = 0;
		goto done;
	}

	if (0 == firstNmail)
		firstNmail = st_time();
	if (0 == firstmail && (startup == 0 || ta_cnt >= startup)) {
		firstmail = st_time();
		startup_act = ta_cnt;
	}

	if (use_pipelining)
	{
		reply_str = sm_str_new(NULL, SM_LINE_LEN, SM_IOBUFSIZE);
		if (NULL == reply_str)
		{
			sm_snprintf(buf, sizeof(buf), "[%d] str new failed i=%d",
				tid, i);
			print_sys_error(buf);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
	}
	else
		reply_str = NULL;


	/*
	**  Only run a certain number of sessions;
	**  this is global variable but we can manipulate it without locking.
	*/

	while (sess_cnt < sessions)
	{
		if (stats && sc_sts != NULL)
		{
			sc_stse = &(sc_sts[sess_cnt]);
			sc_stt = sc_sts[sess_cnt].sces_tas;
		}
		++sess_cnt;

		if (0 == firstmail && (startup == 0 || ta_cnt >= startup)) {
			firstmail = st_time();
			startup_act = ta_cnt;
		}
		myseq = sequence;
		if (debug)
			sm_io_fprintf(smioerr,
				"client[%d]: myseq=%d, sequence=%d, seqfirst=%d\n"
				, tid, myseq, sequence, seqfirst);
		if (myseq == seqfirst)
			goto done;
		if (sequence > seqfirst)
			--sequence;

		if (stats)
			ts0 = t0 = st_utime();

		/* Connect to remote host */
		if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0)
		{
			sm_snprintf(buf, sizeof(buf), "[%d] socket i=%d",
				tid, i);
			print_sys_error(buf);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
		ret = sm_io_open(&SmStThrIO, (void *) &sock, SM_IO_RDWR, &fp,
				NULL);
		if (ret != SM_SUCCESS)
		{
			sm_snprintf(buf, sizeof(buf),
				"[%d] sm_io_open()=%d, i=%d", tid, ret, i);
			print_sys_error(buf);
			close(sock);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
		sm_io_clrblocking(fp);
		rmt_nfd = (st_netfd_t) f_cookie(*fp);

		r = request_timeout;
		ret = sm_io_setinfo(fp, SM_IO_WHAT_TIMEOUT, &r);
		if (ret != SM_SUCCESS)
		{
			sm_snprintf(buf, sizeof(buf),
				"[%d] set timeout()=%d, i=%d", tid, ret, i);
			print_sys_error(buf);
			sm_io_close(fp, SM_IO_CF_NONE);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
		ret = sm_io_setinfo(fp, SM_IO_DOUBLE, NULL);
		if (ret != SM_SUCCESS)
		{
			sm_snprintf(buf, sizeof(buf),
				"[%d] set double()=%d, i=%d", tid, ret, i);
			print_sys_error(buf);
			sm_io_close(fp, SM_IO_CF_NONE);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
		rmt_nfd = (st_netfd_t) f_cookie(*fp);
		if (st_connect(rmt_nfd, (struct sockaddr *) & rmt_addr,
			       sizeof(rmt_addr), SEC2USEC(request_timeout)) < 0)
		{
			sm_snprintf(buf, sizeof(buf), "[%d] connect i=%d", tid, i);
			print_sys_error(buf);
			sm_io_close(fp, SM_IO_CF_NONE);
			if (stoponerror)
				sessions = 0;
			goto done;
		}
		++concurrent;
		if (debug > 2)
			sm_io_fprintf(smioerr,
				"client[%d]: connected, i=%d, ta=%d, conc=%d\n",
				tid, i, ta_per_thrd, concurrent);

		do
		{
			sm_str_clr(out);
			ret = sm_fgetline0(fp, out);

			if (debug > 3)
			{
				sm_io_fprintf(smioerr,
					"[%d] greet [len=%d, res=%#x]: ", tid,
					sm_str_getlen(out), ret);
				sm_io_write(smioerr, sm_str_getdata(out),
					sm_str_getlen(out), &b);
				sm_io_flush(smioerr);
			}
			if (sm_is_error(ret))
			{
				sm_io_fprintf(smioerr,
					"[%d] read greet i=%d, ret=%#x\n",
					tid, i, ret);
				if (stoponerror)
					sessions = 0;
				goto fail;
			}
		} while (!sm_is_error(ret) && sm_str_getlen(out) > 4 &&
			 sm_str_rd_elem(out, 3) == '-');
		if (stats && sc_sts != NULL)
		{
			t1 = st_utime();
			sc_stse->sces_rd[ST_SE_CONNECT] = t1 - t0;
			sc_stse->sces_wr[ST_SE_CONNECT] = 0;
		}
		if (!sm_is_error(ret)
		    && sm_str_getlen(out) > 2
		    && sm_str_rd_elem(out, 0) == '4'
		    && sm_str_rd_elem(out, 1) == '2'
		    && sm_str_rd_elem(out, 2) == '1')
		{
			if (showerror)
				sm_io_fprintf(smioerr,
					"greeting=error, text=%@T, sessions=%d\n",
					out, sessions);
			if (stoponerror)
				sessions = 0;
			goto fail;
		}
		if (sm_is_error(ret) || sm_str_getlen(out) <= 0
		    || sm_str_rd_elem(out, 0) != '2')
		{
			if (showerror)
				sm_io_fprintf(smioerr,
					"greeting=error, text=%@T, sessions=%d\n",
					out, sessions);
			if (stoponerror)
				sessions = 0;
			goto fail;
		}

		if (SCE_IS_ABORT_SE(codes[STAGE_CONNECT], sess_cnt))
			goto fail;

		srv_caps = 0;
		srv_max_sz_b = 0;

		if (cmdfile != NULL)
		{
			r = smtpcmds(fp, tid, i, out);
			goto fail;
		}

		n = strlcpy(buf, "EHLO ", sizeof(buf));
		n = strlcat(buf, ehloarg, sizeof(buf));
		n = strlcat(buf, "\r\n", sizeof(buf));
		if (stats && sc_sts != NULL)
		{
			pt0 = &sc_stse->sces_wr[ST_SE_EHLO];
			pt1 = &sc_stse->sces_rd[ST_SE_EHLO];
		}
		r = smtpcommand(buf, n, fp, out, tid, 0, STAGE_HELO, pt0, pt1);
		if (SMTP_SSD == r)
		{
			if (stoponerror)
				sessions = 0;
			goto fail;
		}
		if (!SMTP_IS_OK(r))
		{
			if (showerror)
				sm_io_fprintf(smioerr,
					"EHLO=error, text=%@T, sessions=%d\n",
					out, sessions);
			if (stoponerror)
				sessions = 0;
			goto fail;
		}

		if (SCE_IS_ABORT_SE(codes[STAGE_HELO], sess_cnt))
			goto fail;

		rcpts_s = rcpts_r = rcpts_ok = 0;
		rcpt_r = SMTP_NONE;
		rcpt_i = 0;
		mail_s = 0;
		mail_r = SMTP_NONE;

		for (u = 0;
		     u < ta_per_thrd && myseq != seqfirst && sessions != 0;
		     u++)
		{
			if (stats && sc_stt != NULL)
			{
				t0 = st_utime();
				sc_stta = &(sc_stt[u]);
			}

			sm_memzero(sbuf, sizeof(sbuf));
			s = sbuf;
			if (usesize && datalen > 0)
			{
				n = sm_snprintf(sbuf, sizeof(sbuf),
					" SIZE=%d", datalen);
			}
			if (mailarg != NULL)
			{
				/* ignore errors for now ... */
				strlcat(sbuf, " ", sizeof(sbuf));
				strlcat(sbuf, mailarg, sizeof(sbuf));
			}

			use_drr = try_drr && SCSE_IS_CAP(srv_caps, SCSE_CAP_DRR);
	 		use_rsad = try_rsad && SCSE_IS_CAP(srv_caps, SCSE_CAP_RSAD);

#define MAIL_ARG	(use_drr ? " DRR" : use_rsad ? " PRDR" : "")

			if (fromaddrs > 0)
				n = sm_snprintf(buf, sizeof(buf),
					"MAIL FROM:<%s>%s%s\r\n",
					from[u % fromaddrs], s, MAIL_ARG);
			else if (myseq > 0)
				n = sm_snprintf(buf, sizeof(buf),
					"MAIL FROM:<nobody-%d-%d-%d_%d@%s>%s\r\n",
					myseq, i, tid, postfix,
					fromdom[u % fromdoms], MAIL_ARG);
			else
				n = sm_snprintf(buf, sizeof(buf),
					"MAIL FROM:<nobody-%d-%d-%d@%s>%s%s\r\n",
					u, tid, sessions,
					fromdom[u % fromdoms], s, MAIL_ARG);
			if (stats && sc_sts != NULL)
			{
				pt0 = &sc_stta->scet_wr[ST_TA_MAIL];
				pt1 = &sc_stta->scet_rd[ST_TA_MAIL];
			}
			r = smtpcommand(buf, n, fp, out, tid, u, STAGE_MAIL,
					pt0, pt1);
			if (SMTP_SSD == r)
			{
				if (stoponerror)
					sessions = 0;
				goto fail;
			}
			if (!SMTP_IS_OK(r))
			{
				if (showerror)
					sm_io_fprintf(smioerr,
						"MAIL=error, text=%@T, sessions=%d, ta=%d\n",
						out, sessions, u);
				break;
			}
			if (SCE_IS_ABORT_SE(codes[STAGE_MAIL], ta_cnt))
				goto fail;
			if (SCE_IS_ABORT_TA(codes[STAGE_MAIL], ta_cnt))
				break;

			rok = 0;
			for (j = 0; j < rcpts; j++)
			{
				if (rcptaddrs > 0)
					n = sm_snprintf(buf, sizeof(buf),
						"RCPT TO:<%s>\r\n",
						rcpt[j % rcptaddrs]);
				else if (myseq > 0)
					n = sm_snprintf(buf, sizeof(buf),
						"RCPT To:<nobody-%d-%d-%d-%d@%s>\r\n",
						myseq, i, j, tid,
						rcptdom[j % rcptdoms]);
				else
					n = sm_snprintf(buf, sizeof(buf),
						"RCPT TO:<nobody-%u-%u-%d-%d@%s>\r\n",
						u, j, tid, sessions,
						rcptdom[j % rcptdoms]);
				if (stats && sc_sts != NULL && 0 == j)
				{
					pt0 = &sc_stta->scet_wr[ST_TA_RCPT];
					pt1 = &sc_stta->scet_rd[ST_TA_RCPT];
				}
				r = smtpcommand(buf, n, fp, out, tid, u,
						STAGE_RCPT, pt0, pt1);
				if (SMTP_SSD == r)
				{
					if (stoponerror)
						sessions = 0;
					goto fail;
				}
				if (SMTP_OK == r && !use_pipelining)
					++rcpts_ok;

				/* pipelining... */
				if (use_pipelining && !SMTP_IS_OK(mail_r))
				{
					if (showerror)
						sm_io_fprintf(smioerr,
							"MAIL=error, text=%@T, sessions=%d, ta=%d\n",
							reply_str, sessions, u);
					if (stoponerror)
						sessions = 0;
					break;
				}
				if (use_pipelining && !SMTP_IS_OK(rcpt_r))
				{
					if (showerror)
						sm_io_fprintf(smioerr,
							"RCPT=error, text=%@T, se=%d, ta=%u, rcpt=%u\n",
							reply_str, sessions, u, rcpt_i);
					if (stoponerror)
						sessions = 0;
					break;
				}

				if (!SMTP_IS_OK(r))
				{
					if (showerror)
						sm_io_fprintf(smioerr,
							"RCPT=error, text=%@T, se=%d, ta=%u, rcpt=%u\n",
							out, sessions, u, j);
					if (SMTP_IO_ERR(r))
						break;
					if (stoponerror)
						sessions = 0;
				}
				else
					++rok;
				if (SCE_IS_ABORT_SE(codes[STAGE_RCPT], ta_cnt))
					goto fail;
				if (SCE_IS_ABORT_TA(codes[STAGE_RCPT], ta_cnt))
					break;
			}
			if (SMTP_FATAL(r) || 0 == rok)
				break;
			if (SCE_IS_ABORT_TA(codes[STAGE_RCPT], ta_cnt))
				break;

			n = strlcpy(buf, "DATA\r\n", sizeof(buf));
			if (stats && sc_sts != NULL)
			{
				pt0 = &sc_stta->scet_wr[ST_TA_DATA];
				pt1 = &sc_stta->scet_rd[ST_TA_DATA];
			}
			r = smtpcommand(buf, n, fp, out, tid, u, STAGE_DATA,
					pt0, pt1);
			if (SMTP_SSD == r)
			{
				if (stoponerror)
					sessions = 0;
				goto fail;
			}

			/* pipelining... */
			if (use_pipelining && !SMTP_IS_OK(mail_r))
			{
				if (showerror)
					sm_io_fprintf(smioerr,
						"MAIL=error, text=%@T, sessions=%d, ta=%d\n",
						reply_str, sessions, u);
				if (stoponerror)
					sessions = 0;
				break;
			}
			if (use_pipelining && !SMTP_IS_OK(rcpt_r))
			{
				if (showerror)
					sm_io_fprintf(smioerr,
						"RCPT=error, text=%@T, se=%d, ta=%u, rcpt=%u\n",
						reply_str, sessions, u, rcpt_i);
				if (stoponerror)
				{
					sessions = 0;
					break;
				}
				if (rcpts_ok <= 0)
					break;
			}

			if (!SMTP_IS_OK(r))
			{
				if (showerror)
					sm_io_fprintf(smioerr,
						"DATA=error, text=%@T, se=%d, ta=%d, r=%d\n",
						out, sessions, u, r);
				if (stoponerror)
					sessions = 0;
				break;
			}
			if (waitdata > 0)
				st_sleep(waitdata);
			if (SCE_IS_ABORT_SE(codes[STAGE_DATA], ta_cnt))
				goto fail;
			if (SCE_IS_ABORT_TA(codes[STAGE_DATA], ta_cnt))
				break;

			if (myseq > 0)
			{
				n = sm_snprintf(buf, sizeof(buf),
					"From: me+%d\r\nTo: you+%d\r\nMessage-Id: <%d@smtpc2.domain>\r\nSubject: test+%d\r\n\r\n%d\r\n",
					myseq, myseq, myseq, myseq, myseq);
				ret = sm_io_write(fp, (unsigned char *) buf,
						strlen(buf), &b);
				if ((int) b != strlen(buf) || ret != SM_SUCCESS)
				{
					sm_io_fprintf(smioerr,
						"[%d] data write error i=%d, n=%d, b=%d, ret=%#x\n"
			      		, tid, i, strlen(buf), (int) b, ret);
					break;
				}
				if (datalen > 0 || datafile != NULL)
				{
					r = smtpdata(datalen, fp, tid, u);
					if (!SMTP_IS_OK(r))
						break;
				}
				n = strlcpy(buf, "\r\n.\r\n", sizeof(buf));
			}
			else if (0 == datalen && NULL == datafile)
			{
				n = strlcpy(buf,
					"From: me\r\nTo: you\r\nSubject: test\r\n\r\nbody\r\n.\r\n",
					sizeof(buf));
			}
			else
			{
				r = smtpdata(datalen, fp, tid, u);
				if (!SMTP_IS_OK(r))
					break;
				n = strlcpy(buf, "\r\n.\r\n", sizeof(buf));
			}
			if (SCE_IS_ABORT_SE(codes[STAGE_DOT], ta_cnt))
				goto fail;
			if (SCE_IS_ABORT_TA(codes[STAGE_DOT], ta_cnt))
				break;

			if (stats && sc_sts != NULL)
			{
				pt0 = &(sc_stta->scet_wr[ST_TA_DOT]);
				pt1 = &(sc_stta->scet_rd[ST_TA_DOT]);
			}
			r = smtpcommand(buf, n, fp, out, tid, u, STAGE_DOT,
					pt0, pt1);
			if (SMTP_SSD == r)
			{
				if (stoponerror)
					sessions = 0;
				goto fail;
			}
			if (!SMTP_IS_OK(r))
			{
				if (showerror)
					sm_io_fprintf(smioerr,
						"DOT=error, text=%@T, se=%d, ta=%d\n",
						out, sessions, u);
				if (stoponerror)
					sessions = 0;
				break;
			}

			/* another transaction? */
			if (u < ta_per_thrd - 1)
			{
				myseq = sequence;
				if (sequence > seqfirst)
					--sequence;
				if (rset)
				{
					n = strlcpy(buf, "RSET\r\n",
						sizeof(buf));
					if (stats && sc_sts != NULL)
					{
						pt0 = &sc_stta->scet_wr[ST_TA_RSET];
						pt1 = &sc_stta->scet_rd[ST_TA_RSET];
					}
					r = smtpcommand(buf, n, fp, out, tid,
							u, -1, pt0, pt1);
				}
			}
			if (!SMTP_IS_OK(r))
				break;

			if (stats && sc_stt != NULL)
			{
				t1 = st_utime();
				sc_stta->scet_ta = t1 - t0;
			}

			++ta_cnt;
			if (count > 0 && (ta_cnt % count) == 0)
			{
				time_t lastNmail, elapsedN;
				time_t elapsed;
				long tr;

				/* sm_io_fprintf(smioerr, "%ld\r", ta_cnt); */
				lastNmail = st_time();
				elapsedN = lastNmail - firstNmail;
				firstNmail = lastNmail;
				tr = 0;
				if (firstmail > 0) {
					elapsed = lastNmail - firstmail;
					if (elapsed > 0)
						tr = (ta_cnt - startup_act) / elapsed;
				}
				else
					elapsed = 0;
				if (idisplay)
					sm_io_fprintf(smioerr,
						"%6u  %8ld %8ld  %6ld   %6ld  %4d  %4d\r"
						, ta_cnt
						, (long) elapsedN
						, (long) elapsed
						, (elapsedN > 0) ? (long) (count / elapsedN) : -1L
						, tr
						, busy, concurrent
						);
				else
					sm_io_fprintf(smioerr,
						"msg=%6u, t_i=%2ld, t_t=%4ld, msg/s=%4ld, t-msg/s=%4ld, busy=%3d, conc=%3d\n"
						, ta_cnt
						, (long) elapsedN
						, (long) elapsed
						, (elapsedN > 0) ? (long) (count / elapsedN) : -1L
						, tr
						, busy, concurrent
						);

				if (ta_cnt + teardown >= ta_total && !total_rate_set)
				{
					total_rate = tr;
					total_rate_set = true;
					if (teardown > 0 && count >= 10)
						count /= 10;
				}

			}
			if (ta_total > 0 && ta_cnt >= ta_total)
			{
				sessions = 0;
				break;
			}
		}
		if (stats && sc_sts != NULL)
		{
			pt0 = &sc_stse->sces_wr[ST_SE_QUIT];
			pt1 = &sc_stse->sces_rd[ST_SE_QUIT];
		}
		n = strlcpy(buf, "QUIT\r\n", sizeof(buf));
		r = smtpcommand(buf, n, fp, out, tid, u, -1, pt0, pt1);
		/*
		if (!SMTP_IS_OK(r))
			;
		*/

		sm_io_close(fp, SM_IO_CF_NONE);
		fp = NULL;
		--concurrent;
		if (stats && sc_sts != NULL)
			sc_stse->sces_se = st_utime() - ts0;
	}

  fail:
	if (fp != NULL)
	{
		sm_io_close(fp, SM_IO_CF_NONE);
		fp = NULL;
		--concurrent;
	}

  done:
	SM_STR_FREE(out);
	--busy;
	return NULL;
}

static void 
print_sys_error(const char *msg)
{
	sm_io_fprintf(smioerr, "%s: %s: %s\n", prog, msg, strerror(errno));
}


syntax highlighted by Code2HTML, v. 0.9.1