#include <9pm/u.h> #include <9pm/libc.h> #include <9pm/ns.h> #include <9pm/thread.h> #include <9pm/threadimpl.h> static Lock chanlock; // Central channel access lock static int emptyentry(Channel *c) { int i, extra; assert((c->nentry == 0 && c->qentry == nil) || (c->nentry && c->qentry)); for (i = 0; i < c->nentry; i++) if (c->qentry[i] == nil) return i; if (i == 0) extra = 1; else{ extra = i; if (extra > 16) extra = 16; } c->nentry += extra; c->qentry = realloc(c->qentry, c->nentry*sizeof(c->qentry[0])); if (c->qentry == nil) sysfatal("realloc channel entries: %r"); memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); return i; } void chanfree(Channel *c) { int i, inuse; lock(&chanlock); inuse = 0; for (i = 0; i < c->nentry; i++) if (c->qentry[i]) inuse = 1; if (inuse) c->freed = 1; else { if (c->qentry) free(c->qentry); free(c); } unlock(&chanlock); } int chaninit(Channel *c, int elemsize, int elemcnt) { if(elemcnt < 0 || elemsize <= 0 || c == nil) return -1; c->f = 0; c->n = 0; c->freed = 0; c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chaninit %lux", c); return 1; } Channel * chancreate(int elemsize, int elemcnt) { Channel *c; if(elemcnt < 0 || elemsize <= 0) return nil; c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1); c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chancreate %lux", c); return c; } int alt(Alt *alts) { Alt *a, *xa; Channel *c; uchar *v; int i, n, entry; Thread *t; lock(&chanlock); t = _threadgetthr(); t->alt = alts; t->call = Callalt; repeat: // Test which channels can proceed n = 1; a = nil; entry = -1; for (xa = alts; xa->op; xa++) { xa->entryno = -1; if (xa->op == CHANNOP) continue; if (xa->op == CHANNOBLK) { if (a == nil) { t->call = Callnil; unlock(&chanlock); return xa - alts; } else break; } c = xa->c; if (c == nil) sysfatal("alt: nil channel in entry %ld\n", xa - alts); if ((xa->op == CHANSND && c->n < c->s) || (xa->op == CHANRCV && c->n)) { // There's room to send in the channel if (nrand(n) == 0) { a = xa; entry = -1; } n++; } else { // Test for blocked senders or receivers for (i = 0; i < c->nentry; i++) { // Is it claimed? if ( c->qentry[i] && xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op // complementary op && *c->qentry[i]->tag == nil ) { // No if (nrand(n) == 0) { a = xa; entry = i; } n++; break; } } } } if (a == nil) { // Nothing can proceed, enqueue on all channels c = nil; for (a = alts; a->op; a++) { Channel *cp; if (a->op == CHANNOP || a->op == CHANNOBLK) continue; cp = a->c; a->tag = &c; i = emptyentry(cp); cp->qentry[i] = a; a->entryno = i; } // And wait for the rendez vous unlock(&chanlock); if (_threadrendezvous((ulong)&c, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); /* We rendezed-vous on channel c, dequeue from all channels * and find the Alt struct to which c belongs */ a = nil; for (xa = alts; xa->op; xa++) { Channel *xc; if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue; xc = xa->c; threadassert(xa->entryno >= 0 && xa->entryno < xc->nentry && xc->qentry[xa->entryno]); xc->qentry[xa->entryno] = nil; xa->entryno = -1; if (xc == c) a = xa; } if (c->s) { // Buffered channel, try again sleep(0); goto repeat; } unlock(&chanlock); if (c->freed) chanfree(c); if (t->exiting) threadexits(nil); t->call = Callnil; return a - alts; } c = a->c; // Channel c can proceed if (c->s) { // Send or receive via the buffered channel if (a->op == CHANSND) { v = c->v + ((c->f + c->n) % c->s) * c->e; if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); c->n++; } else { if (a->v) { v = c->v + (c->f % c->s) * c->e; memmove(a->v, v, c->e); } c->n--; c->f++; } } if (entry < 0) for (i = 0; i < c->nentry; i++) { if ( (xa = c->qentry[i]) && xa ->op == (CHANSND+CHANRCV) - a->op && *xa ->tag == nil ) { // Unblock peer process *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { t->call = Callnil; return -1; } t->call = Callnil; return a - alts; } } if (entry >= 0) { xa = c->qentry[entry]; if (a->op == CHANSND) { if (xa->v) { if (a->v) memmove(xa->v, a->v, c->e); else memset(xa->v, 0, c->e); } } else { if (a->v) { if (xa->v) memmove(a->v, xa->v, c->e); else memset(a->v, 0, c->e); } } *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { t->call = Callnil; return -1; } t->call = Callnil; return a - alts; } unlock(&chanlock); yield(); t->call = Callnil; return a - alts; } static int recvcommon(Channel *c, void *v) { Alt *a; int i; lock(&chanlock); for (i = 0; i < c->nentry; i++) { if ( (a = c->qentry[i]) && a->op == CHANSND && *a->tag == nil ) { *a->tag = c; if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; } else { if (v) { if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; unlock(&chanlock); return 1; } return 0; } int nbrecv(Channel *c, void *v) { int r; r = recvcommon(c, v); if (r == 0) unlock(&chanlock); return r; } int recv(Channel *c, void *v) { Alt a; Channel *tag; int i; Thread *t; retry: if (i = recvcommon(c, v)) // chanlock has been released return i; // chanlock is still held tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANRCV; t = _threadgetthr(); t->alt = &a; t->call = Callrcv; // enqueue on the channel i = emptyentry(c); c->qentry[i] = &a; a.entryno = i; unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel threadassert(a.entryno >= 0 && a.entryno < c->nentry && c->qentry[a.entryno]); c->qentry[a.entryno] = nil; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } static int sendcommon(Channel *c, void *v) { Alt *a; int i; lock(&chanlock); for (i = 0; i < c->nentry; i++) { if ( (a = c->qentry[i]) && a->op == CHANRCV && *a->tag == nil ) { *a->tag = c; if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; } else { if (a->v) { if (v) memmove(a->v, v, c->e); else memset(a->v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; unlock(&chanlock); yield(); return 1; } return 0; } int nbsend(Channel *c, void *v) { int r; r = sendcommon(c, v); if (r == 0) unlock(&chanlock); return r; } int send(Channel *c, void *v) { Alt a; Channel *tag; int i; Proc *p; Thread *t; retry: if (i = sendcommon(c, v)){ // chanlock has been released return i; } // chanlock is still held tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANSND; t = _threadgetthr(); t->alt = &a; t->call = Callsnd; // enqueue on the channel i = emptyentry(c); c->qentry[i] = &a; a.entryno = i; unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel threadassert(a.entryno >= 0 && a.entryno < c->nentry && c->qentry[a.entryno]); c->qentry[a.entryno] = nil; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again // Unbuffered channels: data is already transferred if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } int sendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return send(c, &v); } ulong recvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (recv(c, &v) < 0) return ~0; return v; } int sendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return send(c, &v); } void * recvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (recv(c, &v) < 0) return nil; return v; } int nbsendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return nbsend(c, &v); } ulong nbrecvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (nbrecv(c, &v) == 0) return 0; return v; } int nbsendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return nbsend(c, &v); } void * nbrecvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (nbrecv(c, &v) == 0) return nil; return v; }