/* * Copyright (c) 1996, 1998, 1999 University of Utah and the Flux Group. * All rights reserved. * * This file is part of the Flux OSKit. The OSKit is free software, also known * as "open source;" you can redistribute it and/or modify it under the terms * of the GNU General Public License (GPL), version 2, as published by the Free * Software Foundation (FSF). To explore alternate licensing terms, contact * the University of Utah at csl-dist@cs.utah.edu or +1-801-585-3271. * * The OSKit is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GPL for more details. You should have * received a copy of the GPL along with the OSKit; see the file COPYING. If * not, write to the FSF, 59 Temple Place #330, Boston, MA 02111-1307, USA. */ #ifdef CPU_INHERIT /* * Simplified Stride scheduler. Can be preemptive or not. */ #include #include #include #include #include #include #include #include #include #include "hash.h" /* * The scheduler structures are per-scheduler instantiation. * * The runq is a single, doubly-linked queue. */ typedef struct stridesched { queue_head_t runq; int runq_count; int tickets; /* Global tickets */ int pass; /* Global pass */ int stride; /* Global stride */ int last_update; short ready; short preemptible; pthread_t tid; /* Back pointer to thread */ hash_table_t *hashtable; /* Hash TID to pstate */ } stridesched_t; #define BASE_TICKETS (1024 * 4) #define STRIDE1 (1 << 15) #define QUANTUM ((PIT_HZ/100) / 1000) /* * Per-thread scheduler state structure. */ typedef struct stride_state { queue_chain_t runq; /* Queueing element */ pthread_t tid; /* TID of thread */ int tickets; int stride; int pass; int remain; int start; } stride_state_t; #define NULL_STRIDESTATE ((stride_state_t *) 0) #define TICKETS(p) (p->tickets) #define STRIDE(p) (p->stride) #define PASS(p) (p->pass) #define REMAIN(p) (p->remain) #define START(p) (p->start) extern int ffs(); void *stride_schedloop(void *arg); static void stride_canceled(void *arg); static int stride_pedantic = 1; static void client_init(stride_state_t *pstate, int tickets); /* * Map TID to pstate structure with a hash table. */ static void stride_setstate(stridesched_t *psched, pthread_t tid, stride_state_t *pstate) { if (tidhash_add(psched->hashtable, (void *) pstate, tid)) panic("stride_setstate: " "hashtable add failed: tid:%d pstate:0x%x", tid, (int) pstate); } static stride_state_t * stride_getstate(stridesched_t *psched, pthread_t tid) { stride_state_t *pstate; if ((pstate = (stride_state_t *) tidhash_lookup(psched->hashtable, tid)) == NULL) panic("stride_getstate: " "hashtable lookup failed: tid:%d", tid); return pstate; } static void stride_remstate(stridesched_t *psched, pthread_t tid) { tidhash_rem(psched->hashtable, tid); } /* * Create a Stride scheduler. This creates the thread and makes sure it * gets run so that it exists and is ready to handle scheduling messages. */ int create_stride_scheduler(pthread_t *tid, const pthread_attr_t *attr, int preemptible) { stridesched_t *psched; if ((psched = (stridesched_t *) calloc(1, sizeof(stridesched_t))) == NULL) panic("create_stride_scheduler: No more memory"); if (tidhash_create(&psched->hashtable, 0)) panic("allocate_stride_scheduler: Hash Table allocation"); psched->preemptible = preemptible; queue_init(&psched->runq); pthread_create(tid, attr, stride_schedloop, (void *) psched); /* * The scheduler has to run. */ while (! psched->ready) oskit_pthread_sleep(1); /* * Back in this thread. Just return. */ return 0; } /* * Initialize the per-thread scheduler state structure. This is called * when a new thread is created, and is stashed in the parent scheduler * thread structure. */ stride_state_t * stride_thread_init(stridesched_t *psched, pthread_t tid, int tickets) { stride_state_t *pstate; if ((pstate = (stride_state_t *) calloc(1, sizeof(stride_state_t))) == NULL) panic("stride_thread_init: No more memory"); pstate->tid = tid; stride_setstate(psched, tid, pstate); client_init(pstate, tickets); return pstate; } /* * Runq stuff */ /* * Are there any threads on the runq? */ static inline int runq_empty(stridesched_t *psched) { return (psched->runq_count == 0); } /* * Determine if a pthread is on the runq. Use a separate field * since using the flags would require locking the thread. Use the * queue chain pointer instead, setting it to zero when a thread is * removed from the queue. */ static inline int runq_onrunq(stridesched_t *psched, stride_state_t *pstate) { return (int) pstate->runq.next; } /* * Add and remove threads from the runq. */ /* * Insert into the runq. */ static void runq_insert(stridesched_t *psched, stride_state_t *pstate) { queue_head_t *phdr = &(psched->runq); stride_state_t *ptmp; if (queue_empty(phdr)) { queue_enter(phdr, pstate, stride_state_t *, runq); goto done; } queue_iterate(&(psched->runq), ptmp, stride_state_t *, runq) { if (PASS(pstate) <= PASS(ptmp)) { queue_enter_before(phdr, ptmp, pstate, stride_state_t *, runq); goto done; } } queue_enter(phdr, pstate, stride_state_t *, runq); done: psched->runq_count++; } /* * Dequeue the highest priority thread, which is the first thread since * the list ordered by PASS. */ static stride_state_t * runq_dequeue(stridesched_t *psched) { queue_head_t *phdr = &(psched->runq); stride_state_t *pnext; queue_remove_first(phdr, pnext, stride_state_t *, runq); pnext->runq.next = (queue_entry_t) 0; psched->runq_count--; START(pnext) = oskit_pthread_childtime(pnext->tid); return pnext; } /* * Remove an arbitrary thread from the runq. */ static inline void runq_remove(stridesched_t *psched, stride_state_t *pstate) { queue_head_t *phdr = &(psched->runq); queue_remove(phdr, pstate, stride_state_t *, runq); pstate->runq.next = (queue_entry_t) 0; psched->runq_count--; } /* * Debug the runq. */ static void runq_check(stridesched_t *psched) { int count, tickets; stride_state_t *pstate; tickets = count = 0; queue_iterate(&(psched->runq), pstate, stride_state_t *, runq) { count++; tickets += TICKETS(pstate); if (count > psched->runq_count) panic("stride scheduler: Bad runq(%d): 0x%x\n", pthread_self(), psched); } if (tickets != psched->tickets) panic("stride scheduler: Bad ticket count(%d): 0x%x\n", pthread_self(), psched); } /* * Stride scheduler specific routines. */ /* * Update global pass based on elapsed real time. */ static void global_pass_update(stridesched_t *psched) { int elapsed = oskit_pthread_childtime(psched->tid) - psched->last_update; psched->pass += (psched->stride * elapsed) / QUANTUM; psched->last_update += elapsed; } static void thread_pass_update(stridesched_t *psched, stride_state_t *pstate) { int elapsed; elapsed = oskit_pthread_childtime(pstate->tid); elapsed -= START(pstate); PASS(pstate) += (STRIDE(pstate) * elapsed) / QUANTUM; } /* * Update global tickets and stride to reflect change in runq. */ static void global_tickets_update(stridesched_t *psched, int delta) { psched->tickets += delta; /* * XXX: Is this the correct thing to do when the last thread * leaves? The paper says nothing about it. */ if (psched->tickets) psched->stride = STRIDE1 / psched->tickets; else psched->stride = STRIDE1; } /* * Initialize the tickets for a new thread to some reasonable value. */ static void client_init(stride_state_t *pstate, int tickets) { TICKETS(pstate) = tickets; STRIDE(pstate) = STRIDE1 / tickets; REMAIN(pstate) = STRIDE(pstate); } /* * Join the party. */ static void client_join(stridesched_t *psched, stride_state_t *pstate) { global_pass_update(psched); PASS(pstate) = psched->pass + REMAIN(pstate); global_tickets_update(psched, TICKETS(pstate)); runq_insert(psched, pstate); } /* * Leave the party */ static void client_leave(stridesched_t *psched, stride_state_t *pstate) { global_pass_update(psched); REMAIN(pstate) = PASS(pstate) - psched->pass; global_tickets_update(psched, -TICKETS(pstate)); } /* * Change priority */ static void client_modify(stridesched_t *psched, stride_state_t *pstate, int current, int tickets) { int remain, stride, queued; if ((queued = runq_onrunq(psched, pstate))) { runq_remove(psched, pstate); client_leave(psched, pstate); } else if (current) global_tickets_update(psched, -TICKETS(pstate)); stride = STRIDE1 / tickets; remain = (REMAIN(pstate) * stride) / STRIDE(pstate); TICKETS(pstate) = tickets; STRIDE(pstate) = stride; REMAIN(pstate) = remain; if (queued) client_join(psched, pstate); else if (current) global_tickets_update(psched, TICKETS(pstate)); } /* * Debug */ static void stride_debug(stridesched_t *psched) { stride_state_t *pstate; printf("stride(%d): GT %d GS %d GP %d LU %d\n", (int) pthread_self(), psched->tickets, psched->stride, psched->pass, psched->last_update); queue_iterate(&(psched->runq), pstate, stride_state_t *, runq) { printf("0x%x(%d) T %d S %d P %d R %d S %d\n", (int) pstate, (int) pstate->tid, TICKETS(pstate), STRIDE(pstate), PASS(pstate), REMAIN(pstate), START(pstate)); } } /* * This is the scheduler loop. */ void * stride_schedloop(void *arg) { stridesched_t *psched = (stridesched_t *) arg; schedmsg_t msg; stride_state_t *current_thread = 0, *pstate = 0; sched_wakecond_t wakeup_cond = 0; int pedantic, rc; oskit_s32_t timeout; psched->tid = pthread_self(); /* * Must tell the main scheduling code ... */ pthread_sched_become_scheduler(); /* * Preemption means donate with non-zero timeout. */ if (psched->preemptible) timeout = PTHREAD_TICK; else timeout = 0; /* * Cancelation cleanup handler to cleanup resources at exit. */ pthread_cleanup_push(stride_canceled, (void *) psched); pedantic = stride_pedantic; psched->ready = 1; CPUDEBUG(STRIDE, "stride_schedloop(%d): ready - psched:0x%x\n", psched->tid, (int) psched); while (1) { pthread_testcancel(); CPUDEBUG(STRIDE, "stride_schedloop(%d): qcount(%d)\n", psched->tid, psched->runq_count); /* * Consume any pending messages until there are no more. */ if (! pthread_sched_message_recv(&msg, 0)) goto consume; runq_check(psched); if (CPUDEBUG_ISSET(STRIDE)) stride_debug(psched); /* * Find a thread to run. */ if (runq_empty(psched)) current_thread = NULL_STRIDESTATE; else current_thread = runq_dequeue(psched); /* * If we found a thread, switch into it and wait for * a message. Otherwise, wait for messages to arrive * that indicate something has changed. */ if (current_thread) { CPUDEBUG(STRIDE, "stride_schedloop(%d): pstate 0x%x(%d)\n", psched->tid, (int) current_thread, current_thread->tid); if (pedantic || !runq_empty(psched)) wakeup_cond = WAKEUP_ON_BLOCK; else wakeup_cond = WAKEUP_ON_SWITCH; /* * Donate and check the return condition. */ rc = pthread_sched_donate_wait_recv(current_thread->tid, wakeup_cond, &msg, timeout); CPUDEBUG(STRIDE, "stride_schedloop(%d): Donated: %d %s\n", psched->tid, current_thread->tid, msg_sched_rcodes[rc & ~SCHEDULE_MSGRECV]); switch (rc & ~SCHEDULE_MSGRECV) { case SCHEDULE_NOTREADY: /* * Thread was not ready to recv donation. * Forget about this thread. */ break; case SCHEDULE_BLOCKED: /* Thread blocked, so forget about it. */ client_leave(psched, current_thread); break; case SCHEDULE_PREEMPTED: case SCHEDULE_YIELDED: case SCHEDULE_TIMEDOUT: assert(! runq_onrunq(psched, current_thread)); thread_pass_update(psched, current_thread); runq_insert(psched, current_thread); break; default: if (rc == SCHEDULE_MSGRECV) { thread_pass_update(psched, current_thread); runq_insert(psched, current_thread); } else panic("stride_schedloop: " "Bad return code:%d", rc); break; } /* Back to the top to look for more messages */ if (! (rc & SCHEDULE_MSGRECV)) continue; } else { /* * No threads to run so block waiting for a message. */ CPUDEBUG(STRIDE, "stride_schedloop(%d): Recv\n", psched->tid); rc = pthread_sched_message_recv(&msg, -1); if (rc == OSKIT_ECANCELED) pthread_exit((void *) PTHREAD_CANCELED); } /* * Process messages. */ consume: /* * Map tid in message to thread state structure. Avoid * lookup if possible. */ assert(msg.tid); if (msg.type != MSG_SCHED_NEWTHREAD) { if (current_thread && msg.tid == current_thread->tid) pstate = current_thread; else pstate = stride_getstate(psched, msg.tid); } CPUDEBUG(STRIDE, "stride_schedloop(%d): Message: %s 0x%x(%d)\n", psched->tid, msg_sched_typenames[msg.type], (int) pstate, (pstate ? pstate->tid : 0)); switch (msg.type) { case MSG_SCHED_NEWTHREAD: /* * New thread has joined us. Create a state structure * and add it to the runq. */ pstate = stride_thread_init(psched, msg.tid, msg.opaque); /* and add it to the runq */ client_join(psched, pstate); break; case MSG_SCHED_UNBLOCK: /* * A thread wants to be rescheduled. Might be the * first time this thread was ever seen. */ if (! runq_onrunq(psched, pstate)) client_join(psched, pstate); break; case MSG_SCHED_SETSTATE: /* * Thread parameters have been changed. The opaque * value is is the new tickets. */ client_modify(psched, pstate, pstate == current_thread, (int) msg.opaque); break; case MSG_SCHED_EXITED: /* * The thread has exited. */ CPUDEBUG(STRIDE, "stride_schedloop(%d): " "Exit: Thread 0x%x(%d)\n", pthread_self(), (int) pstate, pstate->tid); stride_remstate(psched, pstate->tid); free(pstate); break; default: panic("stride_schedloop: Bad message: %d 0x%x\n", msg.type, pstate); break; } } /* * Never reached. */ pthread_cleanup_pop(1); } /* * Handle async cancel of the scheduler. Cleanup resources before the thread * disappears completely. * * XXX NOT CLEANING UP THREADS! */ void stride_canceled(void *arg) { stridesched_t *psched = (stridesched_t *) arg; CPUDEBUG(STRIDE, "stride_terminate: Scheduler exiting:%d\n", pthread_self()); free(psched); } #endif CPU_INHERIT