/*
* tlibvx.cxx
*
* Thread library implementation for VxWorks
*
* Portable Windows Library
*
* The contents of this file are subject to the Mozilla Public License
* Version 1.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is Portable Windows Library.
*
* The Initial Developer of the Original Code is Equivalence Pty. Ltd.
*
* Portions are Copyright (c) 1993-1998 Equivalence Pty. Ltd.
*
* Portions are Copyright (C) 1993 Free Software Foundation, Inc.
* All Rights Reserved.
*
* Contributor(s): ______________________________________.
*
* $Log: tlibvx.cxx,v $
* Revision 1.4 2004/07/11 07:56:36 csoutheren
* Applied jumbo VxWorks patch, thanks to Eize Slange
*
*
* Revision 1.3 2003/05/21 00:49:16 csoutheren
* Added PreShutdown to ~PProcess
*
* Revision 1.2 2003/02/26 01:14:27 robertj
* Fixed race condition where thread can terminatebefore an IsSuspeded() call
* occurs and cause an assert, thanks Sebastian Meyer
*
* Revision 1.1 2002/11/05 01:43:39 robertj
* Added missing VxWorks files. Thanks Andreas Sikkema
*
* Revision 1.0 ?????????????
*/
class PProcess;
class PSemaphore;
#include <ptlib.h>
#include <ptlib/socket.h>
#include <trclib.h>
#include <usrlib.h>
// Forward to undocumented system call
extern "C" void dbgPrintCall(INSTR * callAdrs, int funcAdrs, int nargs,
UINT32 * pArgs);
#define VX_LOWEST_PRIORITY 250
#define VX_LOW_PRIORITY 200
#define VX_NORMAL_PRIORITY 150
#define VX_DISPLAY_PRIORITY 100
#define VX_URGENT_DISPLAY_PRIORITY 50
///////////////////////////////////////////////////////////////////////////////
// Critical Section Implementation
// -------------------------------
class CCriticalSection {
public:
CCriticalSection();
~CCriticalSection();
void Lock();
void Unlock();
private:
int intLevel;
STATUS taskLocked;
bool locked;
};
CCriticalSection::CCriticalSection()
{
intLevel = 0;
taskLocked = ERROR;
locked = false;
}
CCriticalSection::~CCriticalSection()
{
// Unlock anyway when someone forgot to call Unlock after Lock
if (locked == true)
Unlock();
}
void CCriticalSection::Lock()
{
if (locked == false) {
if (::intContext() == FALSE)
taskLocked = ::taskLock();
else
taskLocked = ERROR;
intLevel = ::intLock();
locked = true;
}
}
void CCriticalSection::Unlock()
{
if (locked == true) {
::intUnlock(intLevel);
if (taskLocked == OK)
::taskUnlock();
locked = false;
}
}
///////////////////////////////////////////////////////////////////////////////
// Threads
static int const priorities[] = {
VX_LOWEST_PRIORITY,
VX_LOW_PRIORITY,
VX_NORMAL_PRIORITY,
VX_DISPLAY_PRIORITY,
VX_URGENT_DISPLAY_PRIORITY
};
int PThread::ThreadFunction(void *threadPtr)
{
PAssertNULL(threadPtr);
PThread * thread = (PThread *)threadPtr;
PProcess & process = PProcess::Current();
process.activeThreadMutex.Wait();
process.activeThreads.SetAt(thread->PX_threadId, thread);
process.activeThreadMutex.Signal();
process.SignalTimerChange();
if (::semTake(thread->syncPoint, WAIT_FOREVER) == OK) {
if (::semDelete(thread->syncPoint) == OK)
thread->syncPoint = NULL;
thread->Main();
}
else
printf("::ThreadFunction> ::semTake failed, errno=0x%X\n",errno);
// And delete from administration
process.activeThreadMutex.Wait();
process.activeThreads.SetAt(thread->PX_threadId, NULL);
process.activeThreadMutex.Signal();
thread->PX_threadId = 0;
return 0;
}
void PThread::Trace(PThreadIdentifer threadId)
{
if (threadId == 0)
threadId = PThread::GetCurrentThreadId();
printf("Task name=%s\n", ::taskName(threadId));
::taskRegsShow(threadId);
REG_SET regSet;
if (::taskRegsGet(threadId, ®Set) != ERROR)
::trcStack(®Set, (FUNCPTR)dbgPrintCall, threadId);
else
printf("::Terminate> ::taskRegsGet failed, errno=0x%X\n", errno);
::taskShow(0, 2);
printf("\n");
::checkStack(0);
}
PThread::PThread()
: PX_threadId(ERROR),
priority(VX_NORMAL_PRIORITY),
originalStackSize(0)
{
}
PThread::PThread(PINDEX stackSize,
AutoDeleteFlag deletion,
Priority priorityLevel,
const PString & name
)
{
PAssert(stackSize > 0, PInvalidParameter);
autoDelete = (deletion == AutoDeleteThread);
originalStackSize = stackSize;
priority = priorities[priorityLevel];
syncPoint = ::semMCreate(SEM_Q_FIFO);
if (syncPoint != NULL) {
if (::semTake(syncPoint, NO_WAIT) == OK) {
STATUS taskLocked;
taskLocked = ::taskLock();
PX_threadId = ::taskSpawn(name, // Name
priority, // Priority
0, // options
stackSize, // stacksize
(FUNCPTR)ThreadFunction, // entrypoint
(int)this,0,0,0,0,0,0,0,0,0); // arg 1 --- arg 10
if (PX_threadId != ERROR) {
// threads are created suspended
Suspend();
::semGive(syncPoint);
if (taskLocked == OK)
::taskUnlock();
if (autoDelete) {
PProcess & process = PProcess::Current();
process.deleteThreadMutex.Wait();
process.autoDeleteThreads.Append(this);
process.deleteThreadMutex.Signal();
}
}
else {
if (taskLocked == OK)
::taskUnlock();
printf("::PThread> ::taskSpawn failed, errno=0x%X\n", errno);
PX_threadId = 0;
::semDelete(syncPoint);
syncPoint = NULL;
}
}
else {
printf("::PThread> ::semTake failed, errno=0x%X\n", errno);
::semDelete(syncPoint);
syncPoint = NULL;
}
}
}
PThread::~PThread()
{
if (originalStackSize <= 0)
return;
if (!IsTerminated())
Terminate();
}
void PThread::Restart()
{
if (IsTerminated()) {
PX_threadId = ::taskSpawn(NULL, // Auto name tn
priority, // Priority
0, // options
originalStackSize, // stacksize
(FUNCPTR)ThreadFunction, // entrypoint
(int)this,0,0,0,0,0,0,0,0,0); // arg 1 --- arg 10
if (PX_threadId == ERROR) {
printf("::Restart> ::taskSpawn failed, errno=0x%X\n", errno);
PX_threadId = 0;
}
}
else
printf("::Restart> Cannot restart running thread\n");
}
void PThread::Terminate()
{
if (originalStackSize <= 0)
return;
if (!IsTerminated()) {
if (::taskDelete(PX_threadId) == ERROR)
printf("::Terminate> ::taskDelete failed, errno=0x%X\n", errno);
// And delete from administration
PProcess & process = PProcess::Current();
process.activeThreadMutex.Wait();
process.activeThreads.SetAt(PX_threadId, NULL);
process.activeThreadMutex.Signal();
PX_threadId = 0;
}
}
BOOL PThread::IsTerminated() const
{
STATUS stat = ERROR;
if (PX_threadId != 0)
stat = ::taskIdVerify(PX_threadId);
return stat == ERROR;
}
void PThread::WaitForTermination() const
{
while (!IsTerminated()) {
Current()->Sleep(100);
}
}
BOOL PThread::WaitForTermination(const PTimeInterval & maxWait) const
{
if (PX_threadId == 0)
return TRUE;
PTimer timeout = maxWait;
while (!IsTerminated()) {
if (timeout == 0)
return FALSE;
Current()->Sleep(100);
}
return TRUE;
}
void PThread::Suspend(BOOL susp)
{
if (!IsTerminated()) {
if (susp) {
if (::taskSuspend(PX_threadId) == ERROR)
printf("::Suspend> Thread doesn't want to suspend, errno=0x%X\n", errno);
}
else {
if (::taskResume(PX_threadId) == ERROR)
printf("::Suspend> Thread doesn't want to resume, errno=0x%X\n", errno);
}
}
else
printf("::Suspend> Operation on terminated thread\n");
}
void PThread::Resume()
{
if (!IsTerminated()) {
if (::taskResume(PX_threadId) == ERROR)
printf("::Resume> Thread doesn't want to resume, errno=0x%X\n", errno);
}
else
printf("::Resume> Operation on terminated thread\n");
}
BOOL PThread::IsSuspended() const
{
BOOL isSuspended = FALSE;
if (!IsTerminated())
isSuspended = ::taskIsSuspended(PX_threadId);
else
printf("::IsSuspended> Operation on terminated thread\n");
return isSuspended;
}
void PThread::SetAutoDelete(AutoDeleteFlag deletion)
{
PAssert(deletion != AutoDeleteThread || this != &PProcess::Current(), PLogicError);
autoDelete = deletion == AutoDeleteThread;
}
void PThread::SetPriority(Priority priorityLevel)
{
if (!IsTerminated()) {
priority = priorities[priorityLevel];
if (::taskPrioritySet(PX_threadId, priority ) == ERROR)
printf("::SetPriority> ::taskPrioritySet failed, errno: 0x%X\n", errno);
}
else
printf("::SetPriority> Operation on terminated thread\n");
}
PThread::Priority PThread::GetPriority() const
{
if (!IsTerminated()) {
int prio;
if (::taskPriorityGet(PX_threadId, &prio) == OK) {
switch (prio) {
case VX_LOWEST_PRIORITY :
return LowestPriority;
case VX_LOW_PRIORITY :
return LowPriority;
case VX_NORMAL_PRIORITY :
return NormalPriority;
case VX_DISPLAY_PRIORITY :
return HighPriority;
case VX_URGENT_DISPLAY_PRIORITY :
return HighestPriority;
default:
printf("::GetPriority> Priority %d not mapped\n", prio);
break;
}
}
else
printf("::GetPriority> ::taskPriorityGet failed, errno=0x%X\n", errno);
}
else
printf("::GetPriority> Operation on terminated thread\n");
return LowestPriority;
}
void PThread::Yield()
{
::taskDelay(NO_WAIT);
}
void PThread::Sleep( const PTimeInterval & delay ) // Time interval to sleep for in microsec.
{
::taskDelay(delay.GetInterval()*sysClkRateGet()/1000);
}
void PThread::InitialiseProcessThread()
{
originalStackSize = 0;
autoDelete = FALSE;
PX_threadId = ::taskIdSelf();
PAssertOS((PX_threadId != ERROR) && (PX_threadId != 0));
((PProcess *)this)->activeThreads.DisallowDeleteObjects();
((PProcess *)this)->activeThreads.SetAt(PX_threadId, this);
}
PThread * PThread::Current()
{
PProcess & process = PProcess::Current();
process.activeThreadMutex.Wait();
PThread * thread = process.activeThreads.GetAt(::taskIdSelf());
process.activeThreadMutex.Signal();
return thread;
}
int PThread::PXBlockOnChildTerminate(int pid, const PTimeInterval & /*timeout*/) // Fix timeout
{
while (!IsTerminated()) {
Current()->Sleep(100);
}
return TRUE;
}
int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
// make sure we flush the buffer before doing a write
fd_set tmp_rfd, tmp_wfd, tmp_efd;
fd_set * read_fds = &tmp_rfd;
fd_set * write_fds = &tmp_wfd;
fd_set * exception_fds = &tmp_efd;
FD_ZERO (read_fds);
FD_ZERO (write_fds);
FD_ZERO (exception_fds);
switch (type) {
case PChannel::PXReadBlock:
case PChannel::PXAcceptBlock:
FD_SET (handle, read_fds);
break;
case PChannel::PXWriteBlock:
FD_SET (handle, write_fds);
break;
case PChannel::PXConnectBlock:
FD_SET (handle, write_fds);
FD_SET (handle, exception_fds);
break;
default:
PAssertAlways(PLogicError);
return 0;
}
P_timeval tval = timeout;
return ::select(handle+1, read_fds, write_fds, exception_fds, tval);
}
void PThread::PXAbortBlock() const
{
}
///////////////////////////////////////////////////////////////////////////////
// PProcess::HouseKeepingThread
void PProcess::Construct()
{
// hard coded value, change this to handle more sockets at once with the select call
maxHandles = 1024;
houseKeeper=NULL;
CommonConstruct();
}
PProcess::HouseKeepingThread::HouseKeepingThread()
: PThread(10000, NoAutoDeleteThread, HighPriority, PString("HKeeping"))
{
Resume();
}
void PProcess::HouseKeepingThread::Main()
{
PProcess & process = PProcess::Current();
while(1) {
process.deleteThreadMutex.Wait();
for (PINDEX i = 0; i < process.autoDeleteThreads.GetSize(); i++) {
PThread * pThread = (PThread *) process.autoDeleteThreads.GetAt(i);
if( pThread->IsTerminated() )
process.autoDeleteThreads.RemoveAt(i--);
}
process.deleteThreadMutex.Signal();
PTimeInterval nextTimer = process.timers.Process();
if (nextTimer != PMaxTimeInterval) {
if ( nextTimer.GetInterval() > 10000 )
nextTimer = 10000;
}
breakBlock.Wait( nextTimer );
}
}
void PProcess::SignalTimerChange()
{
if (houseKeeper == NULL) {
// Prevent reentrance before the following assignment is done
// Placed after above if-statement due to efficiency, and so
// requires an another NULL-test.
CCriticalSection section;
section.Lock();
if (houseKeeper == NULL)
houseKeeper = new HouseKeepingThread;
section.Unlock();
}
else
houseKeeper->breakBlock.Signal();
}
///////////////////////////////////////////////////////////////////////////////
// PProcess
PProcess::~PProcess()
{
PreShutdown();
Sleep(100); // Give threads time to die a natural death
delete houseKeeper;
// OK, if there are any left we get really insistent...
activeThreadMutex.Wait();
for (PINDEX i = 0; i < activeThreads.GetSize(); i++) {
PThread & thread = activeThreads.GetDataAt(i);
if (this != &thread && !thread.IsTerminated())
thread.Terminate(); // With extreme prejudice
}
activeThreadMutex.Signal();
deleteThreadMutex.Wait();
autoDeleteThreads.RemoveAll();
deleteThreadMutex.Signal();
delete configFiles;
}
///////////////////////////////////////////////////////////////////////////////
// PSemaphore
PSemaphore::PSemaphore(SEM_ID anId)
{
initialVar = 1;
maxCountVar = UINT_MAX;
semId = anId;
PAssertOS(semId != NULL);
}
PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
{
if (initial > maxCount)
initial = maxCount;
initialVar = initial;
maxCountVar = maxCount;
semId = ::semCCreate(SEM_Q_FIFO, initial);
PAssertOS(semId != NULL);
}
PSemaphore::PSemaphore(const PSemaphore & sem)
{
initialVar = sem.GetInitial();
maxCountVar = sem.GetMaxCount();
if (initialVar > maxCountVar)
initialVar = maxCountVar;
semId = ::semCCreate(SEM_Q_FIFO, initialVar);
PAssertOS(semId != NULL);
}
PSemaphore::~PSemaphore()
{
if (semId != NULL) {
if (::semDelete(semId) == OK)
semId = NULL;
else
printf("~PSemaphore> Error delete with ID=0x%X with errno: 0x%X\n",
(unsigned int)semId, errno);
}
}
void PSemaphore::Wait()
{
STATUS result = ::semTake(semId, WAIT_FOREVER);
if (result == OK) {
CCriticalSection section;
section.Lock();
initialVar--;
section.Unlock();
}
PAssertOS(result != ERROR);
}
BOOL PSemaphore::Wait(const PTimeInterval & timeout)
{
long wait;
if (timeout == PMaxTimeInterval) {
wait = WAIT_FOREVER;
}
else {
int ticks = sysClkRateGet();
wait = (timeout.GetInterval() * ticks);
wait = wait / 1000;
wait++; // wait at least one tick
}
STATUS result = ::semTake(semId, wait);
if (result == OK) {
CCriticalSection section;
section.Lock();
initialVar--;
section.Unlock();
}
PAssertOS((result != ERROR) || (errno == S_objLib_OBJ_TIMEOUT));
return result != ERROR;
}
void PSemaphore::Signal()
{
CCriticalSection section;
section.Lock();
if (initialVar < maxCountVar) {
section.Unlock();
STATUS result = ::semGive(semId);
section.Lock();
if (result == OK)
initialVar++;
PAssertOS(result != ERROR);
}
section.Unlock();
}
BOOL PSemaphore::WillBlock() const
{
return initialVar == 0;
}
///////////////////////////////////////////////////////////////////////////////
// PMutex
PMutex::PMutex()
: PSemaphore( ::semMCreate(SEM_Q_FIFO) )
{
}
PMutex::PMutex(const PMutex & mutex)
: PSemaphore(::semMCreate(SEM_Q_FIFO))
{
}
PMutex::~PMutex()
{
if (semId != NULL) {
if (::semDelete(semId) == OK)
semId = NULL;
else
printf("~PMutex> Error delete with ID=0x%X with errno: 0x%X\n",
(unsigned int)semId, errno);
}
}
void PMutex::Wait()
{
STATUS result = ::semTake(semId, WAIT_FOREVER);
PAssertOS(result == OK);
}
BOOL PMutex::Wait(const PTimeInterval & timeout)
{
long wait;
if (timeout == PMaxTimeInterval) {
wait = WAIT_FOREVER;
}
else {
int ticks = sysClkRateGet();
wait = (timeout.GetMilliSeconds() * ticks);
wait = wait / 1000;
wait++; // wait at least one tick
}
STATUS result = ::semTake(semId, wait);
PAssertOS((result != ERROR) || (errno == S_objLib_OBJ_TIMEOUT));
return result != ERROR;
}
void PMutex::Signal()
{
::semGive(semId);
}
BOOL PMutex::WillBlock() const
{
STATUS result = ::semTake(semId, NO_WAIT);
PAssertOS((result != ERROR) || (errno == S_objLib_OBJ_UNAVAILABLE));
if (result != ERROR)
::semGive(semId);
return result == ERROR;
}
///////////////////////////////////////////////////////////////////////////////
// PSyncPoint
PSyncPoint::PSyncPoint()
: PSemaphore(0, 1)
{
}
PSyncPoint::PSyncPoint(const PSyncPoint & syncPoint)
: PSemaphore(syncPoint)
{
}
// End Of File ///////////////////////////////////////////////////////////////
syntax highlighted by Code2HTML, v. 0.9.1