/*
* Copyright (c) 2002, 2004, 2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: t-ev-rcb-clt.c,v 1.14 2005/05/31 21:00:28 ca Exp $")
#include "sm/assert.h"
#include "sm/ctype.h"
#include "sm/error.h"
#include "sm/memops.h"
#include "sm/heap.h"
#include "sm/test.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/unixsock.h"
#include "sm/rcb.h"
#include "sm/check.h"
#include "t-rcb.h"
#include <stdio.h>
#define NSOCKET "./sockevthr1"
#define WHAT_TERM 0
#define WHAT_CONT 1
#define MAX_CALLS 256
#define IOBUFSIZE 64
#define RCBSIZE 256
int Verbose = 0;
struct t_ctx_S
{
sm_evthr_ctx_P ctx;
int fd;
int what;
int status;
int called;
sm_rcb_P rcbr;
sm_rcb_P rcbw;
};
typedef struct t_ctx_S t_ctx_T, *t_ctx_P;
#define TMO
#include "t-rcb-sr.c"
/*
** INPUTRCB -- read data for rcb
**
** Parameters:
** rcb -- RCB
** fd -- fd
**
** Returns:
** usual sm_error code
*/
#define WAITFORREPLY 1
sm_ret_T
inputrcb(sm_rcb_P rcb)
{
int c;
sm_ret_T ret;
ret = sm_rcb_open_enc(rcb, RCBSIZE - 4);
SM_TEST(sm_is_success(ret));
/* just a place holder for the size */
ret = sm_rcb_putuint32(rcb, 0);
SM_TEST(sm_is_success(ret));
while ((c = getchar()) != EOF)
{
if (c == 'I')
ret = addint(rcb);
else if (c == 'S')
ret = addstr(rcb);
else
{
ret = sm_rcb_close_enc(rcb);
SM_TEST(ret == SM_SUCCESS);
if (c == 'W' && sm_is_success(ret))
{
c = getchar();
ret = WAITFORREPLY;
}
break;
}
if (sm_is_err(ret))
{
(void) sm_rcb_close_enc(rcb);
break;
}
}
if (c == EOF && sm_is_success(ret))
{
(void) sm_rcb_close_enc(rcb);
ret = EOF;
}
return ret;
}
/*
** FCT1 -- read/write RCB
**
** Parameters:
** tsk -- event thread task
**
** Returns:
** usual sm_error code
*/
sm_ret_T
fct1(sm_evthr_task_P tsk)
{
t_ctx_P fctx;
int fd, r, l;
sm_ret_T ret, rv;
SM_ASSERT(tsk != NULL);
fctx = (t_ctx_P) tsk->evthr_t_actx;
fd = fctx->fd;
l = fctx->status--;
fctx->called++;
r = fctx->what;
if (Verbose > 1)
{
fprintf(stderr,
"fct1: called %lx, fd=%d, status=%d, what=%d, ev=%x\n",
(long) tsk, fd, l, r, evthr_rqevents(tsk));
}
if (r > 1)
sleep(r - 1);
if (fctx->called > MAX_CALLS)
return EVTHR_TERM|EVTHR_DEL;
rv = 0;
if (fd >= 0)
{
if (evthr_got_rd(tsk))
{
ret = rcvrcb(fctx->rcbr, fd, false);
if (Verbose > 1)
fprintf(stderr, "rcvrcb=%x\n", ret);
if (ret < 0)
return EVTHR_TERM|EVTHR_DEL;
else if (ret > 0)
return EVTHR_WAITQ;
else
{
ret = sm_rcb_open_rcv(fctx->rcbr);
SM_TEST(sm_is_success(ret));
rv |= evthr_r_yes(EVTHR_EV_WR);
}
}
if (evthr_got_wr(tsk))
{
ret = sm_rcb_snd(fd, fctx->rcbw);
if (Verbose > 1)
fprintf(stderr, "sm_rcb_snd=%x\n", ret);
SM_TEST(ret >= 0);
if (ret < 0)
return EVTHR_TERM|EVTHR_DEL;
if (ret > 0)
return EVTHR_WAITQ;
/* if (ret == 0) */
ret = sm_rcb_close_snd(fctx->rcbw);
ret = inputrcb(fctx->rcbw);
if (Verbose > 1)
fprintf(stderr, "inputrcb=%x\n", ret);
if (ret == WAITFORREPLY)
{
rv |= evthr_r_yes(EVTHR_EV_RD);
rv |= evthr_r_no(EVTHR_EV_WR);
}
else if (ret == EOF)
{
/* terminate soon */
fctx->status = 2;
rv |= evthr_r_no(EVTHR_EV_WR);
}
else
rv |= evthr_r_yes(EVTHR_EV_RD|EVTHR_EV_WR);
ret = sm_rcb_open_snd(fctx->rcbw);
SM_TEST(sm_is_success(ret));
return EVTHR_WAITQ|rv;
}
}
if (l <= 0)
return EVTHR_TERM|EVTHR_DEL;
switch (fctx->what)
{
case WHAT_TERM:
return EVTHR_TERM|EVTHR_DEL;
case WHAT_CONT:
default:
return EVTHR_WAITQ|rv;
}
/* NOTREACHED */
return EVTHR_TERM|EVTHR_DEL;
}
void
testev(char *sockname, int what, int loops, int reps)
{
int fd;
sm_ret_T ret;
sm_evthr_ctx_P ctx;
sm_evthr_task_P task;
t_ctx_T tctx;
struct timeval sleept;
ret = thr_init();
SM_TEST(sm_is_success(ret));
sm_memzero(&sleept, sizeof(sleept));
ret = evthr_init(&ctx, 1, 6, 10);
SM_TEST(sm_is_success(ret));
SM_TEST(ctx != NULL);
fd = -1;
tctx.called = 0;
if (sockname != NULL)
{
(void) unix_client_connect(sockname, &fd);
SM_TEST(fd >= 0);
if (fd >= 0)
{
tctx.ctx = ctx;
tctx.fd = fd;
tctx.what = what;
tctx.status = loops;
tctx.rcbw = sm_rcb_new(NULL, RCBSIZE, RCBSIZE);
SM_TEST(tctx.rcbw != NULL);
if (tctx.rcbw == NULL)
goto error;
tctx.rcbr = sm_rcb_new(NULL, RCBSIZE, RCBSIZE);
SM_TEST(tctx.rcbr != NULL);
if (tctx.rcbr == NULL)
goto error;
ret = sm_rcb_open_rcv(tctx.rcbr);
SM_TEST(sm_is_success(ret));
ret = sm_fd_nonblock(fd, true);
SM_TEST(sm_is_success(ret));
ret = inputrcb(tctx.rcbw);
SM_TEST(sm_is_success(ret));
ret = sm_rcb_open_snd(tctx.rcbw);
SM_TEST(sm_is_success(ret));
ret = evthr_task_new(ctx, &task, EVTHR_EV_WR, fd,
&sleept, fct1, (void *) &tctx);
SM_TEST(sm_is_success(ret));
SM_TEST(task != NULL);
}
else
fprintf(stderr, "unix_server_connect()=%d, errno=%d\n",
fd, errno);
}
ret = evthr_loop(ctx);
SM_TEST(sm_is_success(ret));
if (!sm_is_success(ret))
fprintf(stderr, "evthr_loop()=%x\n", ret);
/*
** we should "hold" the system before deleting tasks?
** deleting the tasks while they are still in use
** will break things.
*/
error:
if (fd >= 0)
close(fd);
SM_TEST(tctx.called > 0);
if (reps > 0)
SM_TEST(tctx.called == reps);
if (Verbose > 0)
{
fprintf(stderr, "fcts=%d\n", tctx.called);
}
ret = evthr_stop(ctx);
SM_TEST(sm_is_success(ret));
if (!sm_is_success(ret))
fprintf(stderr, "evthr_stop()=%x\n", ret);
ret = thr_stop();
SM_TEST(sm_is_success(ret));
}
void
usage(const char *prg)
{
fprintf(stderr, "usage: %s [options] socket\n", prg);
exit(0);
}
int
main(int argc, char *argv[])
{
int c, what, loops, reps;
char *sockname, *prg;
sockname = NULL;
what = WHAT_CONT;
loops = 16;
reps = -1;
prg = argv[0];
while ((c = getopt(argc, argv, "l:r:w:V")) != -1)
{
switch (c)
{
case 'l':
loops = atoi(optarg);
break;
case 'r':
reps = atoi(optarg);
break;
case 'w':
what = atoi(optarg);
break;
case 'V':
Verbose++;
break;
#if 0
default:
usage(argv[0]);
return(1);
#endif /* 0 */
}
}
sm_test_begin(argc, argv, "test evthr");
argc -= optind;
argv += optind;
if (argc <= 0)
usage(prg);
sockname = argv[0];
testev(sockname, what, loops, reps);
return sm_test_end();
}
syntax highlighted by Code2HTML, v. 0.9.1