static pe_ring NQueue; static int StarvePrio = PE_QUEUES - 2; static void boot_queue() { HV *stash = gv_stashpv("Event", 1); PE_RING_INIT(&NQueue, 0); newCONSTSUB(stash, "QUEUES", newSViv(PE_QUEUES)); newCONSTSUB(stash, "PRIO_NORMAL", newSViv(PE_PRIO_NORMAL)); newCONSTSUB(stash, "PRIO_HIGH", newSViv(PE_PRIO_HIGH)); } /*inline*/ static void dequeEvent(pe_event *ev) { assert(ev); PE_RING_DETACH(&ev->que); --ActiveWatchers; } static void db_show_queue() { pe_event *ev; ev = (pe_event*) NQueue.next->self; while (ev) { warn("0x%x : %d\n", ev, ev->prio); ev = (pe_event*) ev->que.next->self; } } static int prepare_event(pe_event *ev, char *forwhat) { /* AVOID DIEING IN HERE!! */ STRLEN n_a; pe_watcher *wa = ev->up; if (!ev->callback) { if (WaPERLCB(wa)) { ev->callback = SvREFCNT_inc(wa->callback); EvPERLCB_on(ev); } else { ev->callback = wa->callback; ev->ext_data = wa->ext_data; EvPERLCB_off(ev); } assert(ev->callback); } assert(!WaSUSPEND(wa)); assert(WaREENTRANT(wa) || !wa->running); if (!WaACTIVE(wa)) { if (!WaRUNNOW(wa)) warn("Event: event for !ACTIVE watcher '%s'", SvPV(wa->desc,n_a)); } else { if (!WaREPEAT(wa)) pe_watcher_stop(wa, 0); else if (WaINVOKE1(wa)) pe_watcher_off(wa); } WaRUNNOW_off(wa); /* race condition? XXX */ if (WaDEBUGx(wa) >= 3) warn("Event: %s '%s' prio=%d\n", forwhat, SvPV(wa->desc,n_a), ev->prio); return 1; } static void queueEvent(pe_event *ev) { /**INVOKE**/ assert(ev->hits); if (!PE_RING_EMPTY(&ev->que)) return; /* clump'd event already queued */ if (!prepare_event(ev, "queue")) return; if (ev->prio < 0) { /* invoke the event immediately! */ ev->prio = 0; pe_event_invoke(ev); return; } if (ev->prio >= PE_QUEUES) ev->prio = PE_QUEUES-1; { /* queue in reverse direction? XXX */ /* warn("-- adding 0x%x/%d\n", ev, prio); db_show_queue();/**/ pe_ring *rg; rg = NQueue.next; while (rg->self && ((pe_event*)rg->self)->prio <= ev->prio) rg = rg->next; PE_RING_ADD_BEFORE(&ev->que, rg); /* warn("=\n"); db_show_queue();/**/ ++ActiveWatchers; } } static int pe_empty_queue(int maxprio) { /**INVOKE**/ pe_event *ev; ev = (pe_event*) NQueue.next->self; if (ev && ev->prio < maxprio) { dequeEvent(ev); pe_event_invoke(ev); return 1; } return 0; } /*inline*/ static void pe_multiplex(double tm) { if (SvIVX(DebugLevel) >= 2) { warn("Event: multiplex %.4fs %s%s\n", tm, PE_RING_EMPTY(&NQueue)?"":"QUEUE", PE_RING_EMPTY(&Idle)?"":"IDLE"); } if (!Estat.on) pe_sys_multiplex(tm); else { void *st = Estat.enter(-1, 0); pe_sys_multiplex(tm); Estat.commit(st, 0); } } static double pe_map_prepare(double tm) { pe_qcallback *qcb = (pe_qcallback*) Prepare.prev->self; while (qcb) { if (qcb->is_perl) { SV *got; double when; dSP; PUSHMARK(SP); PUTBACK; perl_call_sv((SV*)qcb->callback, G_SCALAR); SPAGAIN; got = POPs; PUTBACK; when = SvNOK(got) ? SvNVX(got) : SvNV(got); if (when < tm) tm = when; } else { /* !is_perl */ double got = (* (double(*)(void*)) qcb->callback)(qcb->ext_data); if (got < tm) tm = got; } qcb = (pe_qcallback*) qcb->ring.prev->self; } return tm; } static void pe_queue_pending() { double tm = 0; if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm); pe_multiplex(0); pe_timeables_check(); if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check); pe_signal_asynccheck(); if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck); } static int one_event(double tm) { /**INVOKE**/ /*if (SvIVX(DebugLevel) >= 4) warn("Event: ActiveWatchers=%d\n", ActiveWatchers); /**/ pe_signal_asynccheck(); if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck); if (pe_empty_queue(StarvePrio)) return 1; if (!PE_RING_EMPTY(&NQueue) || !PE_RING_EMPTY(&Idle)) { tm = 0; } else { double t1 = timeTillTimer(); if (t1 < tm) tm = t1; } if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm); pe_multiplex(tm); pe_timeables_check(); if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check); if (tm) { pe_signal_asynccheck(); if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck); } if (pe_empty_queue(PE_QUEUES)) return 1; while (1) { pe_watcher *wa; pe_event *ev; pe_ring *lk; if (PE_RING_EMPTY(&Idle)) return 0; lk = Idle.prev; PE_RING_DETACH(lk); wa = (pe_watcher*) lk->self; /* idle is not an event so CLUMP is never an option but we still need to create an event to pass info to the callback */ ev = pe_event_allocate(wa); if (!prepare_event(ev, "idle")) continue; /* can't queueEvent because we are already missed that */ pe_event_invoke(ev); return 1; } } static void pe_reentry() { pe_watcher *wa; struct pe_cbframe *frp; ENTER; /* for SAVE*() macro (see below) */ if (CurCBFrame < 0) return; frp = CBFrame + CurCBFrame; wa = frp->ev->up; assert(wa->running == frp->run_id); if (Estat.on) Estat.suspend(frp->stats); /* reversed by pe_event_postCB? */ if (WaREPEAT(wa)) { if (WaREENTRANT(wa)) { if (WaACTIVE(wa) && WaINVOKE1(wa)) pe_watcher_on(wa, 1); } else { if (!WaSUSPEND(wa)) { /* temporarily suspend non-reentrant watcher until callback is finished! */ pe_watcher_suspend(wa); SAVEDESTRUCTOR(_resume_watcher, wa); } } } } static int safe_one_event(double maxtm) { int got; pe_check_recovery(); pe_reentry(); got = one_event(maxtm); LEAVE; /* reentry */ return got; } static void pe_unloop(SV *why) { SV *rsv = perl_get_sv("Event::Result", 0); assert(rsv); sv_setsv(rsv, why); if (--ExitLevel < 0) { warn("Event::unloop() to %d", ExitLevel); } } static void pe_unloop_all(SV *why) { SV *rsv = perl_get_sv("Event::TopResult", 0); assert(rsv); sv_setsv(rsv, why); ExitLevel = 0; }