/*
 * Copyright (c) 2006 Claus Assmann
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the license/LICENSE.3C file which can be found at the
 * top level of this source code distribution.
 */

#include "sm/generic.h"
SM_IDSTR(id, "@(#)$Id: t-aq-perf-0.c,v 1.10 2006/12/30 23:04:56 ca Exp $")

#include "sm/debug.h"
#include "sm/heap.h"
#include "sm/sysexits.h"
#include "sm/test.h"
#include "sm/io.h"
#include "sm/rfc2821.h"
#include "sm/actdb-int.h"
#include "sm/aqrdq.h"
#define QMGR_DEBUG_DEFINE 1
#include "sm/qmgr-int.h"
#include "sm/qmgrdbg.h"
#define QMGR_LOG_DEFINES 1
#include "log.h"

static int Verbose = 0;
static bool Concurrent = false;
static bool Done = false;
static pthread_cond_t cond_added, cond_deleted;
static aq_ctx_P Aq_ctx;
static uint Once_TAs;
static uint Total_TAs;
static uint Fill = 90;
static uint Rate = 0;
static uint Aq_size = 90;

#if SM_HEAP_CHECK
# include "sm/io.h"
extern SM_DEBUG_T SmHeapCheck;
# define HEAP_CHECK (SmHeapCheck > 0)
#else
# define HEAP_CHECK 0
#endif

#define SM_AQ_RCPTS	256
#define RCPT_MAX_LEN	256

sm_cstr_P Tst_CDB_id;
sessta_id_T *ta_ids = NULL;
sm_log_ctx_P Lctx = NULL;
sm_str_P Defaultdomain = NULL;

/*
**  AQ_T_ADD -- add entries to AQ and rdq
*/

static sm_ret_T
aq_t_add(void)
{
	sm_ret_T ret;
	uint32_t added, total;
	int status;
	time_T time_now;

	total = 0;
	ret = SM_FAILURE;
	do {
		time_now = time(NULL);
		status = pthread_mutex_lock(&Aq_ctx->aq_mutex);
		SM_TEST_E(0 == status);
		if (Verbose > 3)
			sm_io_fprintf(smioerr, "func=aq_t_add, locked=1\n");
		ret = qm_test_fill_aq(Lctx, Aq_ctx, time_now, Total_TAs, Once_TAs, Fill, Rate, Defaultdomain, &added, THR_NO_LOCK);
		if (Verbose > 2)
			sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, todal=%u, ret=%d\n", added, total, ret);
		if (0 == ret) SM_ASSERT(AQ_RDQS_FIRST(Aq_ctx->aq_rdqs) != NULL);
		status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
		SM_ASSERT(status == 0);
		if (Verbose > 3)
			sm_io_fprintf(smioerr, "func=aq_t_add, unlocked=1\n");
		SM_TEST_ERR(ret);
		total += added;
		if (added > 0 && Concurrent) {
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, todal=%u\n", added, total);
			status = pthread_cond_signal(&cond_added);
			SM_TEST_E(0 == status);
		}
		if (0 == added && Concurrent) {
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, ret=%d\n", added, ret);
			if (SM_NOTDONE == ret) {
				Done = true;
				break;
			}
			status = pthread_cond_wait(&cond_deleted, &Aq_ctx->aq_mutex);
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_add, deleted\n");
			SM_TEST_E(0 == status);
		}
		if (SM_NOTDONE == ret) {
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_add, done=full\n");
			return ret;
		}
	} while (Concurrent);
	if (Verbose > 1)
		sm_io_fprintf(smioerr, "func=aq_t_add, return=%d\n", ret);
	return ret;

  error:
	sm_io_fprintf(smioerr, "func=aq_t_add, error=%d\n", ret);
	return ret;
}

#if QM_TEST_SCHED
/*
**  AQ_T_SCHED -- take entries from rdq and move them to waitq
*/

static sm_ret_T
aq_t_sched(void)
{
	sm_ret_T ret;
	aq_rcpt_P aq_rcpt;
	aqrdq_ctx_P aqrdq;
	uint todo_entries;

	ret = SM_SUCCESS;
	aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs);
	todo_entries = aqrdq->aqrdq_entries;
	if (0 == todo_entries)
		return SM_NOTDONE;

	/* ??? move to waitq ??? */
	ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, NULL);

	return ret;
}
#endif /* QM_TEST_SCHED */


/*
**  AQ_T_DEL -- delete entries from AQ
*/

static sm_ret_T
aq_t_del(void)
{
	sm_ret_T ret;
	int i, status;
	uint todo_entries, entries_removed;
	bool done, aq_locked;
	aq_rcpt_P aq_rcpt;
	aq_ta_P aq_ta;
	aqrdq_ctx_P aqrdq;

	entries_removed = 0;
	done = false;
	aq_locked = false;
	do {
		ret = SM_FAILURE;
		SM_ASSERT(!aq_locked);
		status = pthread_mutex_lock(&Aq_ctx->aq_mutex);
		SM_TEST_E(0 == status);
		aq_locked = true;
		aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs);
		while (Concurrent && NULL == aqrdq) {
			status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
			SM_TEST_E(0 == status);
			aq_locked = false;
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_del, wait\n");
			status = pthread_cond_wait(&cond_added, &Aq_ctx->aq_mutex);
			SM_TEST_E(0 == status);
			done = Done;
			aq_locked = true;
			aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs);
			if (Concurrent && NULL == aqrdq) {
				if (Verbose)
					sm_io_fprintf(smioerr, "ar_t_del=signal_but_no_rdq\n");
			}
			else
				break;
		}
		SM_TEST_E(aqrdq != NULL);
		todo_entries = aqrdq->aqrdq_entries;
		if (0 == todo_entries) {
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_del, todo=0\n");
			if (aq_locked) {
				status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
				SM_TEST_E(0 == status);
				aq_locked = false;
			}
			return SM_NOTDONE;
		}

#if HAVE_MONCONTROL
		moncontrol(1);
#endif
		SM_ASSERT(aq_locked);
		for (i = 0; i < todo_entries && !AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts); i++, entries_removed++)
		{
			aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts);
			SM_TEST_E(aq_rcpt != NULL);
			SM_IS_AQ_RCPT(aq_rcpt);

			/*
			**  Outdated: new code in qm_fr_sc_react() accesses
			**  aq_ta directly via dadb (which isn't used in this
			**  test program at all).
			*/

			ret = aq_ta_find(Aq_ctx, aq_rcpt->aqr_ss_ta_id, false, &aq_ta);
			SM_TEST_ERR(ret);
			ret = aq_rdq_rm(Aq_ctx, aq_rcpt, THR_NO_LOCK, Lctx);
			SM_TEST_ERR(ret);
			ret = aq_rcpt_rm(Aq_ctx, aq_rcpt, AQR_RM_N_RDQ|AQR_RM_I_RDQ);
			SM_TEST_ERR(ret);
			ret = aq_ta_rm(Aq_ctx, aq_ta, false);
			SM_TEST_ERR(ret);
		}
#if HAVE_MONCONTROL
		moncontrol(1);
#endif
		if (Concurrent) {
			status = pthread_cond_signal(&cond_deleted);
			SM_TEST_E(0 == status);
			if (Verbose > 1)
				sm_io_fprintf(smioerr, "func=aq_t_del, signal=sent, entries_removed=%u, done=%d\n", entries_removed, done);
			if (done)
				break;
			if (entries_removed >= Total_TAs)
				break;
		}
		if (aq_locked) {
			status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
			SM_TEST_E(0 == status);
			aq_locked = false;
		}
	} while (Concurrent && ret != SM_NOTDONE);
	if (aq_locked) {
		status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
		SM_TEST_E(0 == status);
		aq_locked = false;
	}

	if (Verbose > 1)
		sm_io_fprintf(smioerr, "func=aq_t_del, return=%d\n", ret);
	return ret;

  error:
	if (aq_locked) {
		status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
		SM_TEST_E(0 == status);
		aq_locked = false;
	}
	sm_io_fprintf(smioerr, "func=aq_t_del, error=%d\n", ret);
	return ret;
}

static void *
aq_thr_add(void *arg)
{
	sm_ret_T ret;

	ret = aq_t_add();
	return NULL;
}
static void *
aq_thr_del(void *arg)
{
	sm_ret_T ret;

	ret = aq_t_del();
	return NULL;
}

/*
**  AQ_T_PERF -- add/remove entries to/from AQ
*/

static sm_ret_T
aq_t_perf(void)
{
	sm_ret_T ret, reta;
	uint i;
	int status;
	sessta_id_T ta_id;
	sm_logconfig_P lcfg;
	pthread_t thr1, thr2;

	Aq_ctx = NULL;
	ret = aq_open(NULL /*qmgr_ctx*/, &Aq_ctx, Aq_size, 0);
	SM_TEST_ERR(ret);

	sm_snprintf(ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)0, 0);
	Tst_CDB_id = sm_cstr_scpyn((uchar *)ta_id, sizeof(ta_id));
	if (NULL == Tst_CDB_id)
		return sm_error_temp(SM_EM_Q, ENOMEM);
	Defaultdomain = sm_str_scpy(NULL, "example.com", 64);
	SM_TEST_E(Defaultdomain != NULL);

	ret = sm_log_create(NULL, &Lctx, &lcfg);
	SM_TEST_ERR(ret);
	ret = sm_log_setfp_fd(Lctx, smioerr, SMIOERR_FILENO);
	SM_TEST_ERR(ret);
	ret = sm_log_setdebuglevel(Lctx, 10);
	SM_TEST_ERR(ret);

	srand(time(0));

	if (Concurrent) {
		void *res;

		status = pthread_cond_init(&cond_added, NULL);
		SM_TEST_E(0 == status);
		status = pthread_cond_init(&cond_deleted, NULL);
		SM_TEST_E(0 == status);

		status = pthread_create(&thr1, NULL, aq_thr_add, NULL);
		SM_TEST_E(0 == status);
		status = pthread_create(&thr2, NULL, aq_thr_del, NULL);
		SM_TEST_E(0 == status);

		status = pthread_join(thr1, &res);
		SM_TEST_E(0 == status);
		if (Verbose)
			sm_io_fprintf(smioout, "aq_thr_add=done\n");
		status = pthread_join(thr2, &res);
		SM_TEST_E(0 == status);
		if (Verbose)
			sm_io_fprintf(smioout, "aq_thr_del=done\n");
		goto done;
	}

	i = 0;
	do {
		reta = aq_t_add();
		SM_TEST_ERR(reta);
		if (SM_NOTDONE == reta) {
			if (Verbose)
				sm_io_fprintf(smioout, "ar_t_add=not_done\n", i);
			break;
		}
#if QM_TEST_SCHED
		ret = aq_t_sched(Aq_ctx, Once_TAs);
		SM_TEST_ERR(ret);
		if (SM_NOTDONE == ret) {
			if (Verbose)
				sm_io_fprintf(smioout, "ar_t_sched=not_done\n", i);
			break;
		}
#endif /* QM_TEST_SCHED */
		ret = aq_t_del();
		SM_TEST_ERR(ret);
		if (SM_NOTDONE == ret) {
			if (Verbose)
				sm_io_fprintf(smioout, "ar_t_del=not_done\n", i);
			break;
		}
		++i;
	} while (reta != SM_NOTDONE && ret != SM_NOTDONE);

	if (Verbose)
		sm_io_fprintf(smioout, "i=%u\n", i);

	if (Concurrent) {
		status = pthread_cond_destroy(&cond_added);
		SM_TEST_E(0 == status);
		status = pthread_cond_destroy(&cond_deleted);
		SM_TEST_E(0 == status);
	}

  done:
	ret = aq_close(Aq_ctx);
	return ret;

  error:
	return ret;
}

static void
usage(const char *prg)
{
	sm_io_fprintf(smioerr, "usage: %s [options]\n", prg);
	sm_io_fprintf(smioerr, "Test AQ insertion, lookups, and removal (domains are random numbers)\n");
	sm_io_fprintf(smioerr, "options:\n");
	sm_io_fprintf(smioerr, "-C    try to perform adding/deleting entries concurrently\n");
	sm_io_fprintf(smioerr, "-e n  specify number of recipients to add each iteration\n");
	sm_io_fprintf(smioerr, "-f n  specify allowed fill of AQ\n");
	sm_io_fprintf(smioerr, "-n n  specify total number of recipients to use\n");
	sm_io_fprintf(smioerr, "-s n  specify size of AQ\n");
	sm_io_fprintf(smioerr, "-r n  fill rate\n");
	sm_io_fprintf(smioerr, "-V    increase verbosity\n");
	exit(EX_USAGE);
}


int
main(int argc, char **argv)
{
	int c;
	sm_ret_T r;
	char *prg;

#if HAVE_MONCONTROL
	moncontrol(1);
#endif
	prg = argv[0];
	Total_TAs = SM_AQ_RCPTS;
	Aq_size = SM_AQ_RCPTS;
	Once_TAs = Total_TAs / 10;
#if SM_HEAP_CHECK
	SmHeapCheck = 0;
#endif
	while ((c = getopt(argc, argv, "Ce:f:H:l:n:r:s:V")) != -1) {
		switch (c) {
		  case 'C':
			Concurrent = true;
			break;
		  case 'e':
			Once_TAs = (uint) strtoul(optarg, NULL, 0);
			break;
		  case 'f':
			Fill = (uint) strtoul(optarg, NULL, 0);
			break;
#if SM_HEAP_CHECK
		  case 'H':
			SmHeapCheck = atoi(optarg);
			break;
#endif
		  case 'n':
			Total_TAs = (uint) strtoul(optarg, NULL, 0);
			break;
		  case 'r':
			Rate = (uint) strtoul(optarg, NULL, 0);
			break;
		  case 's':
			Aq_size = (uint) strtoul(optarg, NULL, 0);
			break;
		  case 'V':
			++Verbose;
			break;
		  default:
			usage(prg);
			return EX_USAGE;
		}
	}

	sm_test_begin(argc, argv, "test AQ perf 0");

	r = aq_t_perf();

#if SM_HEAP_CHECK
	if (HEAP_CHECK) {
		sm_io_fprintf(smioout, "heap should be empty except for makebuf:\n");
		sm_heap_report(smioout, 3);
	}
#endif
	sm_io_flush(smioout);
	return sm_test_end();
}


syntax highlighted by Code2HTML, v. 0.9.1