/* * Copyright (c) 2004, 2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: t-wait-0.c,v 1.9 2006/04/02 06:58:36 ca Exp $") #include "sm/assert.h" #include "sm/error.h" #include "sm/memops.h" #include "sm/heap.h" #include "sm/test.h" #include "sm/evthr.h" #include "sm/queue.h" #include /* 2004-10-23 priority: 8 libevthr: test: minthr=1, maxthr=2 start one task twice which - starts another task that will eventually signal a cv - returns "async" and waits for a cv (as "sleep" task, 1s each?) the "other" task should act like DNS lookups: - create a request - after a while answer the request i.e., there is one task which reads requests from a queue and invoke some callback function with some context; creating a request means appending a structure to the list, the struct contains the callback function and the context (some id? in this case the cv should be sufficient) 2004-10-24 system can hang if "can block" is set for a task because libevthr skips those when creating more threads and it doesn't "remember" that another task (non-blocking) wasn't created before? */ static int Verbose = 0; static uint Dbglvl = 0; /* "server" context */ struct s_ctx_S { pthread_mutex_t s_mutex; timeval_T s_nextt; TAILQ_HEAD(, r_req_S) s_head; /* list of requests */ }; typedef struct s_ctx_S s_ctx_T, *s_ctx_P; /* "wait" context */ struct w_ctx_S { s_ctx_P w_s_ctx; /* pointer to "server" context */ int w_count; int w_nresults; timeval_T w_nextt; }; typedef struct w_ctx_S w_ctx_T, *w_ctx_P; /* result context */ struct a_res_S { pthread_mutex_t a_mutex; pthread_cond_t a_cond; int a_result; }; typedef struct a_res_S a_res_T, *a_res_P; /* requst list */ struct r_req_S { a_res_P r_a_res; /* pointer to result context */ TAILQ_ENTRY(r_req_S) r_link; }; typedef struct r_req_S r_req_T, *r_req_P; static sm_ret_T w_new(w_ctx_P *pw_ctx) { w_ctx_P w_ctx; SM_REQUIRE(pw_ctx != NULL); w_ctx = (w_ctx_P) sm_zalloc(sizeof(*w_ctx)); if (w_ctx == NULL) return sm_error_temp(SM_EM_EVTHR, ENOMEM); *pw_ctx = w_ctx; return SM_SUCCESS; } static sm_ret_T a_new(a_res_P *pa_res) { int r; a_res_P a_res; SM_REQUIRE(pa_res != NULL); a_res = (a_res_P) sm_zalloc(sizeof(*a_res)); if (a_res == NULL) return sm_error_temp(SM_EM_EVTHR, ENOMEM); r = pthread_mutex_init(&a_res->a_mutex, NULL); SM_TEST(r == 0); if (r != 0) goto error; r = pthread_cond_init(&a_res->a_cond, NULL); SM_TEST(r == 0); if (r != 0) goto error; *pa_res = a_res; return SM_SUCCESS; error: return sm_error_perm(SM_EM_EVTHR, r); } static sm_ret_T s_new(s_ctx_P *ps_ctx) { int r; s_ctx_P s_ctx; SM_REQUIRE(ps_ctx != NULL); s_ctx = (s_ctx_P) sm_zalloc(sizeof(*s_ctx)); if (s_ctx == NULL) return sm_error_temp(SM_EM_EVTHR, ENOMEM); r = pthread_mutex_init(&s_ctx->s_mutex, NULL); SM_TEST(r == 0); if (r != 0) goto error; TAILQ_INIT(&s_ctx->s_head); *ps_ctx = s_ctx; return SM_SUCCESS; error: return sm_error_perm(SM_EM_EVTHR, r); } static sm_ret_T r_new(r_req_P *pr_req) { r_req_P r_req; SM_REQUIRE(pr_req != NULL); r_req = (r_req_P) sm_zalloc(sizeof(*r_req)); if (r_req == NULL) return sm_error_temp(SM_EM_EVTHR, ENOMEM); *pr_req = r_req; return SM_SUCCESS; } static sm_ret_T notify(a_res_P a_res) { sm_ret_T ret; int r; SM_REQUIRE(a_res != NULL); ret = SM_SUCCESS; if (Verbose > 2) fprintf(stderr, "func=notify, a_res=%p\n", a_res); r = pthread_mutex_lock(&a_res->a_mutex); if (Verbose > 2) fprintf(stderr, "func=notify, a_res=%p, lock=%d\n", a_res, r); SM_TEST(r == 0); if (r != 0) return sm_error_perm(SM_EM_EVTHR, r); r = pthread_cond_signal(&a_res->a_cond); if (Verbose > 0) fprintf(stderr, "func=notify, a_res=%p, cond_signal=%d\n", a_res, r); SM_TEST(r == 0); r = pthread_mutex_unlock(&a_res->a_mutex); SM_TEST(r == 0); return ret; } static sm_ret_T get_result(a_res_P a_res) { int r; if (Verbose > 2) fprintf(stderr, "func=get_result, a_res=%p\n", a_res); r = pthread_mutex_lock(&a_res->a_mutex); if (r != 0) { /* LOG something! */ return sm_error_perm(SM_EM_EVTHR, r); } if (a_res->a_result == 0) { while (a_res->a_result == 0) { if (Verbose > 1) fprintf(stderr, "func=get_result, a_res=%p, where=before_cond_wait\n" , a_res); r = pthread_cond_wait(&a_res->a_cond, &a_res->a_mutex); if (Verbose > 1) fprintf(stderr, "func=get_result, a_res=%p, wait=%d\n" , a_res, r); if (r == EINVAL) break; if (r == 0) { r = pthread_mutex_unlock(&a_res->a_mutex); SM_ASSERT(r == 0); } } } else { r = pthread_mutex_unlock(&a_res->a_mutex); SM_ASSERT(r == 0); } if (r == 0) return SM_SUCCESS; return sm_error_perm(SM_EM_EVTHR, r); } static sm_ret_T add_request(s_ctx_P s_ctx, a_res_P a_res) { sm_ret_T ret; int r; r_req_P r_req; SM_REQUIRE(s_ctx != NULL); SM_REQUIRE(a_res != NULL); ret = r_new(&r_req); SM_TEST(ret == SM_SUCCESS); if (sm_is_err(ret)) return ret; r_req->r_a_res = a_res; if (Verbose > 2) fprintf(stderr, "func=add_request, s_ctx=%p, a_res=%p\n" , s_ctx, a_res); r = pthread_mutex_lock(&s_ctx->s_mutex); SM_TEST(r == 0); if (r != 0) return sm_error_perm(SM_EM_EVTHR, r); TAILQ_INSERT_TAIL(&s_ctx->s_head, r_req, r_link); r = pthread_mutex_unlock(&s_ctx->s_mutex); SM_TEST(r == 0); return ret; } static sm_ret_T fctw(sm_evthr_task_P tsk) { int r; sm_ret_T ret; w_ctx_P w_ctx; timeval_T sleept, delay; SM_ASSERT(tsk != NULL); #if 0 /* may not be true because task is appended to waitq and started again */ SM_TEST(evthr_got_slp(tsk)); #endif w_ctx = (w_ctx_P) tsk->evthr_t_actx; SM_ASSERT(w_ctx != NULL); ++w_ctx->w_count; r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); if (Verbose > 2) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p, count=%d\n" , sleept.tv_sec, w_ctx, w_ctx->w_count); delay.tv_sec = 0; delay.tv_usec = 0; #if 0 timeradd(&sleept, &delay, &tsk->evthr_t_sleep); #endif if (w_ctx->w_count < 3) { a_res_P a_res; ret = a_new(&a_res); SM_TEST(sm_is_success(ret)); if (sm_is_err(ret)) return ret; ret = add_request(w_ctx->w_s_ctx, a_res); SM_TEST(sm_is_success(ret)); if (Verbose > 4) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p" ", a_res=%p" ", evthr_t_rqevf=%X" ", evthr_t_evocc=%X" ", evthr_t_state=%X" ", evthr_t_flags=%X" ", evthr_t_sleep=%ld" "\n" , sleept.tv_sec , w_ctx , a_res , tsk->evthr_t_rqevf , tsk->evthr_t_evocc , tsk->evthr_t_state , tsk->evthr_t_flags , tsk->evthr_t_sleep.tv_sec ); ret = evthr_waitq_app(tsk); SM_TEST(sm_is_success(ret)); if (Verbose > 2) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p, evthr_waitq_app=%X\n" , sleept.tv_sec, w_ctx, ret); ret = evthr_new_sl(tsk, sleept, true); if (Verbose > 2) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p, evthr_new_sl=%X\n" , sleept.tv_sec, w_ctx, ret); sleep(10); ret = get_result(a_res); SM_TEST(sm_is_success(ret)); ++w_ctx->w_nresults; r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); if (Verbose > 0) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p, get_result=%X, a_result=%d\n" , sleept.tv_sec, w_ctx, ret, a_res->a_result); if (w_ctx->w_nresults >= 2) exit(0); /* HACK... */ return EVTHR_OK; } delay.tv_sec = 20; delay.tv_usec = 0; timeradd(&sleept, &delay, &tsk->evthr_t_sleep); w_ctx->w_nextt = tsk->evthr_t_sleep; if (Verbose > 2) fprintf(stderr, "func=fctw, now=%ld, w_ctx=%p, next=%lds\n" , sleept.tv_sec, w_ctx, delay.tv_sec); return EVTHR_SLPQ; } static sm_ret_T fctsrv(sm_evthr_task_P tsk) { int r; s_ctx_P s_ctx; timeval_T sleept, delay; r_req_P r_req; static int result = 0; SM_ASSERT(tsk != NULL); SM_TEST(evthr_got_slp(tsk)); s_ctx = (s_ctx_P) tsk->evthr_t_actx; SM_ASSERT(s_ctx != NULL); r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); if (Verbose > 2) fprintf(stderr, "func=fctsrv, now=%ld, s_ctx=%p\n" , sleept.tv_sec, s_ctx); r = pthread_mutex_lock(&s_ctx->s_mutex); SM_TEST(r == 0); if (r == 0 && !TAILQ_EMPTY(&s_ctx->s_head)) { r_req = TAILQ_FIRST(&s_ctx->s_head); TAILQ_REMOVE(&s_ctx->s_head, r_req, r_link); r = pthread_mutex_unlock(&s_ctx->s_mutex); SM_TEST(r == 0); if (Verbose > 1) fprintf(stderr, "func=fctsrv, now=%ld, s_ctx=%p, r_req=%p\n" , sleept.tv_sec, s_ctx, r_req); r_req->r_a_res->a_result = ++result; notify(r_req->r_a_res); } r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); delay.tv_sec = 2; delay.tv_usec = 0; timeradd(&sleept, &delay, &tsk->evthr_t_sleep); s_ctx->s_nextt = tsk->evthr_t_sleep; return EVTHR_SLPQ; } static sm_ret_T fctstop(sm_evthr_task_P tsk) { int r; timeval_T sleept; SM_ASSERT(tsk != NULL); SM_ASSERT(evthr_got_slp(tsk)); r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); fprintf(stderr, "func=fctstop, now=%ld, state=ERROR\n", sleept.tv_sec); exit(1); } static void testwait(bool hard, bool canblock) { sm_ret_T ret; int r; sm_evthr_ctx_P evthr_ctx; sm_evthr_task_P task1, task2, task3; w_ctx_P w_ctx1; s_ctx_P s_ctx1; timeval_T sleept, nextt, delay; evthr_ctx = NULL; ret = thr_init(); SM_TEST(sm_is_success(ret)); sm_memzero(&sleept, sizeof(sleept)); ret = evthr_init(&evthr_ctx, 1, 2, 2); SM_TEST(sm_is_success(ret)); SM_TEST(evthr_ctx != NULL); if (sm_is_err(ret)) goto err2; evthr_set_dbglvl(evthr_ctx, Dbglvl); if (hard) { ret = evthr_set_max_h(evthr_ctx, 3); SM_TEST(sm_is_success(ret)); } /* "request server" function: sleep 20s before doing something */ ret = s_new(&s_ctx1); SM_TEST(sm_is_success(ret)); SM_TEST(s_ctx1 != NULL); if (sm_is_err(ret)) goto error; r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); sleept.tv_sec += 20; s_ctx1->s_nextt = sleept; ret = evthr_task_new(evthr_ctx, &task2, EVTHR_EV_SL, -1, &sleept, fctsrv, (void *) s_ctx1); SM_TEST(sm_is_success(ret)); SM_TEST(task2 != NULL); if (sm_is_err(ret)) goto error; /* "lock" function: sleep 1s before doing something */ ret = w_new(&w_ctx1); SM_TEST(sm_is_success(ret)); SM_TEST(w_ctx1 != NULL); if (sm_is_err(ret)) goto error; w_ctx1->w_s_ctx = s_ctx1; r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); delay.tv_sec = 0; delay.tv_usec = 1000; timeradd(&sleept, &delay, &nextt); w_ctx1->w_nextt = nextt; ret = evthr_task_new(evthr_ctx, &task1, EVTHR_EV_SL, -1, &nextt, fctw, (void *) w_ctx1); SM_TEST(sm_is_success(ret)); SM_TEST(task1 != NULL); if (sm_is_err(ret)) goto error; if (canblock) { EVTHRT_SET_FLAG(task1, EVTHRT_FL_BLOCK); if (Verbose > 2) fprintf(stderr, "func=testwait, task=%p, flags=%x\n", task1, task1->evthr_t_flags); } /* "stop" function: sleep 60s before stopping system (emergency stop) */ r = gettimeofday(&sleept, NULL); SM_TEST(r == 0); delay.tv_sec = 60; delay.tv_usec = 0; timeradd(&sleept, &delay, &nextt); ret = evthr_task_new(evthr_ctx, &task3, EVTHR_EV_SL, -1, &nextt, fctstop, (void *) NULL); SM_TEST(sm_is_success(ret)); SM_TEST(task3 != NULL); if (sm_is_err(ret)) goto error; ret = evthr_loop(evthr_ctx); SM_TEST(sm_is_success(ret)); if (!sm_is_success(ret)) fprintf(stderr, "evthr_loop=%x\n", ret); error: if (evthr_ctx != NULL) { ret = evthr_stop(evthr_ctx); SM_TEST(sm_is_success(ret)); if (!sm_is_success(ret)) fprintf(stderr, "evthr_stop=%x\n", ret); evthr_ctx = NULL; } err2: ret = thr_stop(); SM_TEST(sm_is_success(ret)); } int main(int argc, char *argv[]) { int c; bool hard, canblock; hard = false; canblock = false; while ((c = getopt(argc, argv, "bd:hV")) != -1) { switch (c) { case 'b': canblock = true; break; case 'd': Dbglvl = atoi(optarg); break; case 'h': hard = true; break; case 'V': Verbose++; break; #if 0 default: usage(argv[0]); return(1); #endif /* 0 */ } } sm_test_begin(argc, argv, "test wait with deadlock avoidance"); testwait(hard, canblock); return sm_test_end(); }