/*
* Copyright (c) 2002, 2004, 2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: t-evthr-srv.c,v 1.12 2005/01/05 18:16:01 ca Exp $")
#include "sm/assert.h"
#include "sm/ctype.h"
#include "sm/error.h"
#include "sm/memops.h"
#include "sm/heap.h"
#include "sm/test.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/unixsock.h"
#include "sm/check.h"
#include <stdio.h>
#define NSOCKET "./sockevthr1"
#define WHAT_TERM 0
#define WHAT_CONT 1
#define IOBUFSIZE 64
static int Verbose = 0;
struct t_ctx_S
{
sm_evthr_ctx_P ctx;
char *str;
int fd;
int what;
int status;
int called;
int buflen;
char buf[IOBUFSIZE];
};
typedef struct t_ctx_S t_ctx_T, *t_ctx_P;
/*
** FCT1 -- read/write
*/
static sm_ret_T
fct1(sm_evthr_task_P tsk)
{
t_ctx_P fctx;
int fd, r, l;
sm_ret_T ret;
char *str;
SM_ASSERT(tsk != NULL);
fctx = (t_ctx_P) tsk->evthr_t_actx;
fd = fctx->fd;
l = fctx->status--;
str = (fctx->str == NULL) ? "<NiL>" : fctx->str;
fctx->called++;
if (Verbose > 1)
{
fprintf(stderr, "fct1: called %lx '",
(long) tsk);
prtbuf(stderr, str, strlen(str));
fprintf(stderr, "', fd=%d, status=%d, ev=%x\n",
fd, l, evthr_rqevents(tsk));
}
r = fctx->what;
if (r > 1)
sleep(r - 1);
if (fctx->called > 256)
return EVTHR_TERM|EVTHR_DEL;
if (fd >= 0)
{
if (evthr_got_rd(tsk))
{
sm_memzero(fctx->buf, sizeof(fctx->buf));
r = read(fd, fctx->buf, sizeof(fctx->buf));
fctx->buf[sizeof(fctx->buf) - 1] = '\0';
if (Verbose > 2)
{
fprintf(stderr, "fct1: got r=%d, buf='", r);
if (r > 0)
prtbuf(stderr, fctx->buf, r);
fprintf(stderr, "'\n");
}
if (r > 0)
{
fctx->buflen = r;
for (l = 0; l < r; l++)
{
if (fctx->buf[l] == 'Q')
return EVTHR_TERM|EVTHR_DEL;
if (fctx->buf[l] != '\0')
fctx->buf[l]++;
}
ret = evthr_en_wr(tsk);
SM_TEST(ret == SM_SUCCESS);
return EVTHR_WAITQ;
}
}
if (evthr_got_wr(tsk))
{
if (fctx->buflen == 0)
{
fctx->buf[0] = 'Q';
fctx->buflen = 1;
}
r = write(fd, fctx->buf, fctx->buflen);
if (Verbose > 2)
fprintf(stderr, "fct1: wrote r=%d\n", r);
if (r > 0)
{
return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_WR);
}
}
}
if (l <= 0)
return EVTHR_DEL;
switch (fctx->what)
{
case WHAT_TERM:
return EVTHR_TERM|EVTHR_DEL;
case WHAT_CONT:
default:
return EVTHR_WAITQ;
}
/* NOTREACHED */
return EVTHR_TERM|EVTHR_DEL;
}
/*
** FCTS -- listen on fd
*/
static sm_ret_T
fcts(sm_evthr_task_P tsk)
{
t_ctx_P fctx;
sm_evthr_task_P task;
t_ctx_T *tctx;
sm_ret_T ret;
struct timeval sleept;
SM_ASSERT(tsk != NULL);
SM_ASSERT(evthr_got_li(tsk));
fctx = (t_ctx_P) tsk->evthr_t_actx;
fctx->called++;
if (Verbose > 1)
fprintf(stderr, "fcts: called %lx '%s'\n",
(long) tsk, fctx->str);
/* add the new connection */
sm_memzero(&sleept, sizeof(sleept));
SM_ASSERT(tsk != NULL);
SM_ASSERT(tsk->evthr_t_nc != NULL);
SM_TEST(tsk->evthr_t_nc->evthr_a_fd >= 0);
if (tsk->evthr_t_nc != NULL && tsk->evthr_t_nc->evthr_a_fd >= 0)
{
tctx = (t_ctx_T *) sm_zalloc(sizeof(*tctx));
SM_TEST(tctx != NULL);
tctx->str = NULL;
tctx->fd = tsk->evthr_t_nc->evthr_a_fd;
tctx->what = WHAT_CONT;
tctx->status = 0;
tctx->called = 0;
tctx->buflen = 0;
ret = evthr_task_new(fctx->ctx, &task, EVTHR_EV_RD,
tctx->fd, &sleept, fct1, (void *) tctx);
SM_TEST(sm_is_success(ret));
SM_TEST(task != NULL);
}
return EVTHR_WAITQ;
}
static void
testev(char *sockname, int what, int loops, int reps)
{
int lfd;
sm_ret_T ret;
sm_evthr_ctx_P evthr_ctx;
sm_evthr_task_P task3;
t_ctx_T tctx3;
char dat3[16];
struct timeval sleept;
ret = thr_init();
SM_TEST(sm_is_success(ret));
sm_memzero(&sleept, sizeof(sleept));
ret = evthr_init(&evthr_ctx, 1, 6, 10);
SM_TEST(sm_is_success(ret));
SM_TEST(evthr_ctx != NULL);
lfd = -1;
strlcpy(dat3, "Accept\n", sizeof(dat3));
tctx3.called = 0;
if (sockname != NULL)
{
if (Verbose > 1)
fprintf(stderr, "srv: accept\n");
lfd = unix_server_listen(sockname, 10);
SM_TEST(lfd >= 0);
if (lfd >= 0)
{
tctx3.ctx = evthr_ctx;
tctx3.str = dat3;
tctx3.fd = lfd;
tctx3.what = what;
tctx3.status = loops;
ret = sm_fd_nonblock(lfd, true);
SM_TEST(sm_is_success(ret));
ret = evthr_task_new(evthr_ctx, &task3, EVTHR_EV_LI,
lfd, &sleept, fcts, (void *) &tctx3);
SM_TEST(sm_is_success(ret));
SM_TEST(task3 != NULL);
}
else
fprintf(stderr, "unix_server_listen()=%d, errno=%d\n",
lfd, errno);
}
ret = evthr_loop(evthr_ctx);
SM_TEST(sm_is_success(ret));
if (!sm_is_success(ret))
fprintf(stderr, "evthr_loop()=%x\n", ret);
/*
** we should "hold" the system before deleting tasks?
** deleting the tasks while they are still in use
** will break things.
*/
if (lfd >= 0)
close(lfd);
SM_TEST(tctx3.called > 0);
if (reps > 0)
SM_TEST(tctx3.called == reps);
if (Verbose > 0)
{
fprintf(stderr, "fcts=%d\n", tctx3.called);
}
ret = evthr_stop(evthr_ctx);
SM_TEST(sm_is_success(ret));
if (!sm_is_success(ret))
fprintf(stderr, "evthr_stop()=%x\n", ret);
ret = thr_stop();
SM_TEST(sm_is_success(ret));
}
static void
usage(const char *prg)
{
fprintf(stderr, "usage: %s [options] socket\n", prg);
exit(0);
}
int
main(int argc, char *argv[])
{
int c, what, loops, reps;
char *sockname, *prg;
sockname = NULL;
what = 1;
loops = 16;
reps = -1;
prg = argv[0];
while ((c = getopt(argc, argv, "l:r:w:V")) != -1)
{
switch (c)
{
case 'l':
loops = atoi(optarg);
break;
case 'r':
reps = atoi(optarg);
break;
case 'w':
what = atoi(optarg);
break;
case 'V':
Verbose++;
break;
#if 0
default:
usage(argv[0]);
return(1);
#endif /* 0 */
}
}
sm_test_begin(argc, argv, "test evthr");
argc -= optind;
argv += optind;
if (argc <= 0)
usage(prg);
sockname = argv[0];
if (sockname != NULL)
(void) unlink(sockname);
testev(sockname, what, loops, reps);
if (sockname != NULL)
{
c = unlink(sockname);
SM_TEST(c == 0);
}
return sm_test_end();
}
syntax highlighted by Code2HTML, v. 0.9.1