/*
* tlibbe.cxx
*
* Thread library implementation for BeOS
*
* Portable Windows Library
*
* The contents of this file are subject to the Mozilla Public License
* Version 1.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is Portable Windows Library.
*
* The Initial Developer of the Original Code is Equivalence Pty. Ltd.
*
* Portions are Copyright (c) 1993-1998 Equivalence Pty. Ltd.
*
* Portions are Copyright (C) 1993 Free Software Foundation, Inc.
* All Rights Reserved.
*
* Contributor(s): Yuri Kiryanov, ykiryanov at users.sourceforge.net
*
* $Log: tlibbe.cxx,v $
* Revision 1.33 2004/10/26 18:29:41 ykiryanov
* Added ostream::write and istream::read with 2nd param as streamsize to please
* New toolchain
*
* Revision 1.32 2004/07/11 07:56:36 csoutheren
* Applied jumbo VxWorks patch, thanks to Eize Slange
*
* Revision 1.31 2004/05/30 04:48:15 ykiryanov
* Sync point is better when semaphore based
*
* Revision 1.30 2004/05/24 04:17:16 ykiryanov
* Made PSyncPoint::Wait to return FALSE if called with timeout 0
*
* Revision 1.29 2004/05/23 22:20:37 ykiryanov
* Got rid of 2 housekeeper thread problem
*
* Revision 1.28 2004/05/21 00:49:16 csoutheren
* Added PreShutdown to ~PProcess
*
* Revision 1.27 2004/05/02 18:00:54 ykiryanov
* Renamed unblock pipe to make code compatible with socket code
*
* Revision 1.26 2004/05/02 16:59:09 ykiryanov
* Fixed assert in setting priority to threads
*
* Revision 1.25 2004/04/30 16:10:20 ykiryanov
* Added PMutex code based on BLocker to support recursive locks
*
* Revision 1.24 2004/04/25 21:51:37 ykiryanov
* Cleaned up thread termination act. Very cool
*
* Revision 1.23 2004/04/25 04:32:37 ykiryanov
* Fixed very old bug - no get thread id code in InitialiseProcessThread
*
* Revision 1.22 2004/04/18 00:23:40 ykiryanov
* Rearranged code to be more reliable. We nearly there
*
* Revision 1.21 2004/04/02 03:17:19 ykiryanov
* New version, improved
*
* Revision 1.20 2004/02/23 23:40:42 ykiryanov
* Added missing constructor for PMutex
*
* Revision 1.19 2004/02/23 21:23:09 ykiryanov
* Removed assert line to enable semaphore constructor
*
* Revision 1.18 2004/02/23 20:37:17 ykiryanov
* Changed function definition PXBlockIO to prototype one
*
* Revision 1.17 2004/02/23 18:10:39 ykiryanov
* Added a parameter to semaphore constructor to avoid ambiguity
*
* Revision 1.16 2004/02/23 00:02:20 ykiryanov
* Changed my e-mail to ykiryanov at users.sourceforge.net. Just in case someone wants to collaborate
*
* Revision 1.15 2004/02/22 23:59:28 ykiryanov
* Added missing functions: PProcess::SetMaxHandles(), PThread::GetCurrentThreadId(),
* PThread::PXAbortBlock(), PSyncPoint::Signal(), ::Wait(), ::Wait(timeout), ::WillBlock()
*
* Revision 1.14 2004/02/22 04:35:04 ykiryanov
* Removed PMutex desctructor
*
* Revision 1.13 2003/02/26 01:13:18 robertj
* Fixed race condition where thread can terminatebefore an IsSuspeded() call
* occurs and cause an assert, thanks Sebastian Meyer
*
* Revision 1.12 2001/06/30 06:59:07 yurik
* Jac Goudsmit from Be submit these changes 6/28. Implemented by Yuri Kiryanov
*
* Revision 1.11 2001/03/07 06:57:32 yurik
* Changed email to current one
*
* Revision 1.10 2001/01/16 12:32:06 rogerh
* Remove duplicate SetAutoDelete() function. Submitted by
* Jac Goudsmit <jac_goudsmit@yahoo.com>
*
*
*/
class PThread;
class PProcess;
class PSemaphore;
class PSyncPoint;
class PMutex;
#include <ptlib.h>
#include <ptlib/socket.h>
#ifdef B_ZETA_VERSION
#include <posix/rlimit.h>
#endif // Zeta
// For class BLocker
#include <be/support/Locker.h>
int PX_NewHandle(const char *, int);
#define DEBUG_SEMAPHORES1 1
//////////////////////////////////////////////////////////////////////////////
// Threads
static int const priorities[] = {
1, // Lowest priority is 1. 0 is not
B_LOW_PRIORITY,
B_NORMAL_PRIORITY,
B_DISPLAY_PRIORITY,
B_URGENT_DISPLAY_PRIORITY,
};
int32 PThread::ThreadFunction(void * threadPtr)
{
PThread * thread = (PThread *)PAssertNULL(threadPtr);
PProcess & process = PProcess::Current();
process.threadMutex.Wait();
process.activeThreads.SetAt((unsigned) thread->mId, thread);
process.threadMutex.Signal();
thread->Main();
return 0;
}
PThread::PThread()
: autoDelete(TRUE),
mId(B_BAD_THREAD_ID),
mPriority(B_NORMAL_PRIORITY),
mStackSize(0),
mSuspendCount(0)
{
}
void PThread::InitialiseProcessThread()
{
autoDelete = FALSE;
mId = find_thread(NULL);
mPriority = B_NORMAL_PRIORITY;
mStackSize = 0;
mSuspendCount = 1;
PAssert(::pipe(unblockPipe) == 0, "Pipe creation failed in InitialiseProcessThread!");
PAssertOS(unblockPipe[0]);
PAssertOS(unblockPipe[1]);
((PProcess *)this)->activeThreads.DisallowDeleteObjects();
((PProcess *)this)->activeThreads.SetAt(mId, this);
}
PThread::PThread(PINDEX stackSize,
AutoDeleteFlag deletion,
Priority priorityLevel,
const PString & name)
: mId(B_BAD_THREAD_ID),
mPriority(B_NORMAL_PRIORITY),
mStackSize(0),
mSuspendCount(0)
{
PAssert(stackSize > 0, PInvalidParameter);
autoDelete = deletion == AutoDeleteThread;
mId = ::spawn_thread(ThreadFunction, // Function
(const char*) name, // Name
priorities[priorityLevel], // Priority
(void *) this); // Pass this as cookie
PAssertOS(mId >= B_NO_ERROR);
mSuspendCount = 1;
mStackSize = stackSize;
mPriority = priorities[priorityLevel];
threadName.sprintf(name, mId);
::rename_thread(mId, (const char*) threadName); // real, unique name - with id
PAssert(::pipe(unblockPipe) == 0, "Pipe creation failed in PThread constructor");
PX_NewHandle("Thread unblock pipe", PMAX(unblockPipe[0], unblockPipe[1]));
}
PThread * PThread::Current()
{
PProcess & process = PProcess::Current();
process.threadMutex.Wait();
PThread * thread = process.activeThreads.GetAt((unsigned)find_thread(NULL));
process.threadMutex.Signal();
return thread;
}
PThread::~PThread()
{
// if we are not process, remove this thread from the active thread list
PProcess & process = PProcess::Current();
if(process.GetThreadId() != GetThreadId())
{
process.threadMutex.Wait();
process.activeThreads.RemoveAt((unsigned) mId);
process.threadMutex.Signal();
}
if (!IsTerminated())
Terminate();
::close(unblockPipe[0]);
::close(unblockPipe[1]);
}
void PThread::Restart()
{
if(!IsTerminated())
return;
mId = ::spawn_thread(ThreadFunction, // Function
"PWLT", // Name
mPriority,
(void *) this); // Pass this as cookie
PAssertOS(mId >= B_NO_ERROR);
threadName.sprintf("PWLib Thread %d", mId);
::rename_thread(mId, (const char*) threadName); // real, unique name - with id
}
void PThread::Terminate()
{
if(mStackSize <=0)
return;
if(mId == find_thread(NULL))
{
::exit_thread(0);
return;
}
if(IsTerminated())
return;
PXAbortBlock();
WaitForTermination(20);
if(mId > B_BAD_THREAD_ID)
::kill_thread(0);
}
BOOL PThread::IsTerminated() const
{
return mId == B_BAD_THREAD_ID;
}
void PThread::WaitForTermination() const
{
WaitForTermination(PMaxTimeInterval);
}
BOOL PThread::WaitForTermination(const PTimeInterval & /*maxWait*/) const // Fix timeout
{
status_t result = B_NO_ERROR;
status_t exit_value = B_NO_ERROR;
result = ::wait_for_thread(mId, &exit_value);
if ( result == B_INTERRUPTED ) { // thread was killed.
return TRUE;
}
if ( result == B_OK ) { // thread is dead
#ifdef DEBUG_THREADS
PError << "B_OK" << endl;
#endif
return TRUE;
}
if ( result == B_BAD_THREAD_ID ) { // thread has invalid id
return TRUE;
}
return FALSE;
}
void PThread::Suspend(BOOL susp)
{
PAssert(!IsTerminated(), "Operation on terminated thread");
if (susp)
{
status_t result = ::suspend_thread(mId);
if(B_OK == result)
::atomic_add(&mSuspendCount, 1);
PAssert(result == B_OK, "Thread don't want to be suspended");
}
else
Resume();
}
void PThread::Resume()
{
PAssert(!IsTerminated(), "Operation on terminated thread");
status_t result = ::resume_thread(mId);
if(B_OK == result)
::atomic_add(&mSuspendCount, -1);
PAssert(result == B_NO_ERROR, "Thread doesn't want to resume");
}
BOOL PThread::IsSuspended() const
{
return (mSuspendCount > 0);
}
void PThread::SetAutoDelete(AutoDeleteFlag deletion)
{
PAssert(deletion != AutoDeleteThread || this != &PProcess::Current(), PLogicError);
autoDelete = deletion == AutoDeleteThread;
}
void PThread::SetPriority(Priority priorityLevel)
{
PAssert(!IsTerminated(), "Operation on terminated thread");
mPriority = priorities[priorityLevel];
status_t result = ::set_thread_priority(mId, mPriority );
if(result != B_OK)
PTRACE(0, "Changing thread priority failed, error " << strerror(result) << endl);
}
PThread::Priority PThread::GetPriority() const
{
if(!IsTerminated())
{
switch (mPriority) {
case 0 :
return LowestPriority;
case B_LOW_PRIORITY :
return LowPriority;
case B_NORMAL_PRIORITY :
return NormalPriority;
case B_DISPLAY_PRIORITY :
return HighPriority;
case B_URGENT_DISPLAY_PRIORITY :
return HighestPriority;
}
PAssertAlways(POperatingSystemError);
}
return LowestPriority;
}
void PThread::Yield()
{
// we just sleep for long enough to cause a reschedule (100 microsec)
::snooze(100);
}
void PThread::Sleep( const PTimeInterval & delay ) // Time interval to sleep for.
{
bigtime_t microseconds =
delay == PMaxTimeInterval ? B_INFINITE_TIMEOUT : (delay.GetMilliSeconds() * 1000 );
status_t result = ::snooze( microseconds ) ; // delay in ms, snooze in microsec
PAssert(result == B_OK, "Thread has insomnia");
}
int PThread::PXBlockOnChildTerminate(int pid, const PTimeInterval & /*timeout*/) // Fix timeout
{
status_t result = B_NO_ERROR;
status_t exit_value = B_NO_ERROR;
result = ::wait_for_thread(pid, &exit_value);
if ( result == B_INTERRUPTED )
{
// thread was killed.
#ifdef DEBUG_THREADS
PError << "B_INTERRUPTED" << endl;
#endif
return 1;
}
if ( result == B_OK )
{
// thread is dead
return 1;
}
if ( result == B_BAD_THREAD_ID )
{
// thread has invalid id
return 1;
}
return 0; // ???
}
PThreadIdentifier PThread::GetCurrentThreadId(void)
{
return ::find_thread(NULL);
}
int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
PTRACE(7, "PWLib\tPThread::PXBlockOnIO(" << handle << ',' << type << ')');
if ((handle < 0) || (handle >= PProcess::Current().GetMaxHandles())) {
PTRACE(2, "PWLib\tAttempt to use illegal handle in PThread::PXBlockOnIO, handle=" << handle);
errno = EBADF;
return -1;
}
// make sure we flush the buffer before doing a write
P_fd_set read_fds;
P_fd_set write_fds;
P_fd_set exception_fds;
int retval;
do {
switch (type) {
case PChannel::PXReadBlock:
case PChannel::PXAcceptBlock:
read_fds = handle;
write_fds.Zero();
exception_fds.Zero();
break;
case PChannel::PXWriteBlock:
read_fds.Zero();
write_fds = handle;
exception_fds.Zero();
break;
case PChannel::PXConnectBlock:
read_fds.Zero();
write_fds = handle;
exception_fds = handle;
break;
default:
PAssertAlways(PLogicError);
return 0;
}
// include the termination pipe into all blocking I/O functions
read_fds += unblockPipe[0];
P_timeval tval = timeout;
retval = ::select(PMAX(handle, unblockPipe[0])+1,
read_fds, write_fds, exception_fds, tval);
} while (retval < 0 && errno == EINTR);
if ((retval == 1) && read_fds.IsPresent(unblockPipe[0])) {
BYTE ch;
::read(unblockPipe[0], &ch, 1);
errno = EINTR;
retval = -1;
PTRACE(6, "PWLib\tUnblocked I/O");
}
return retval;
}
void PThread::PXAbortBlock(void) const
{
BYTE ch;
::write(unblockPipe[1], &ch, 1);
}
///////////////////////////////////////////////////////////////////////////////
// PProcess
PDECLARE_CLASS(PHouseKeepingThread, PThread)
public:
PHouseKeepingThread()
: PThread(1000, NoAutoDeleteThread, NormalPriority, "Housekeeper")
{ closing = FALSE; Resume(); }
void Main();
void SetClosing() { closing = TRUE; }
protected:
BOOL closing;
};
void PProcess::Construct()
{
maxHandles = FOPEN_MAX;
PTRACE(4, "PWLib\tMaximum per-process file handles is " << maxHandles);
::pipe(timerChangePipe);
// initialise the housekeeping thread
housekeepingThread = NULL;
CommonConstruct();
}
void PHouseKeepingThread::Main()
{
PProcess & process = PProcess::Current();
while (!closing) {
PTimeInterval delay = process.timers.Process();
int fd = process.timerChangePipe[0];
P_fd_set read_fds = fd;
P_timeval tval = delay;
if (::select(fd+1, read_fds, NULL, NULL, tval) == 1) {
BYTE ch;
::read(fd, &ch, 1);
}
process.PXCheckSignals();
}
}
void PProcess::SignalTimerChange()
{
if (housekeepingThread == NULL)
{
housekeepingThread = new PHouseKeepingThread;
}
BYTE ch;
write(timerChangePipe[1], &ch, 1);
}
BOOL PProcess::SetMaxHandles(int newMax)
{
return FALSE;
}
PProcess::~PProcess()
{
PreShutdown();
// Don't wait for housekeeper to stop if Terminate() is called from it.
if (housekeepingThread != NULL && PThread::Current() != housekeepingThread) {
housekeepingThread->SetClosing();
SignalTimerChange();
housekeepingThread->WaitForTermination();
delete housekeepingThread;
}
CommonDestruct();
}
///////////////////////////////////////////////////////////////////////////////
// PSemaphore
PSemaphore::PSemaphore(BOOL fNested) : mfNested(fNested)
{
}
PSemaphore::PSemaphore(unsigned initial, unsigned)
{
Create(initial);
}
void PSemaphore::Create(unsigned initial)
{
mOwner = ::find_thread(NULL);
PAssertOS(mOwner != B_BAD_THREAD_ID);
if(!mfNested)
{
mCount = initial;
semId = ::create_sem(initial, "PWLS");
PAssertOS(semId >= B_NO_ERROR);
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::create_sem (PSemaphore()), id: " << semId << ", this: " << this << ", count:" << info.count << endl;
#endif
}
else // Use BLocker
{
semId = (sem_id) new BLocker("PWLN", true); // PWLib use recursive locks. true for benaphore style, false for not
}
}
PSemaphore::~PSemaphore()
{
if(!mfNested)
{
status_t result = B_NO_ERROR;
PAssertOS(semId >= B_NO_ERROR);
// Transmit ownership of the semaphore to our thread
thread_id curThread = ::find_thread(NULL);
if(mOwner != curThread)
{
thread_info tinfo;
::get_thread_info(curThread, &tinfo);
::set_sem_owner(semId, tinfo.team);
mOwner = curThread;
}
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::delete_sem, id: " << semId << ", this: " << this << ", name: " << info.name << ", count:" << info.count;
#endif
// Deleting the semaphore id
result = ::delete_sem(semId);
#ifdef DEBUG_SEMAPHORES
if( result != B_NO_ERROR )
PError << "...delete_sem failed, error: " << strerror(result) << endl;
#endif
}
else // Use BLocker
{
delete (BLocker*) semId; // Thanks!
}
}
void PSemaphore::Wait()
{
if(!mfNested)
{
PAssertOS(semId >= B_NO_ERROR);
status_t result = B_NO_ERROR;
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::acquire_sem, id: " << semId << ", name: " << info.name << ", count:" << info.count << endl;
#endif
while((B_BAD_THREAD_ID != mOwner)
&& ((result = ::acquire_sem(semId)) == B_INTERRUPTED))
{
}
}
else
{
((BLocker*)semId)->Lock(); // Using class to support recursive locks
}
}
BOOL PSemaphore::Wait(const PTimeInterval & timeout)
{
PInt64 ms = timeout.GetMilliSeconds();
bigtime_t microseconds = ms * 1000;
status_t result = B_NO_ERROR;
if(!mfNested)
{
PAssertOS(semId >= B_NO_ERROR);
PAssertOS(timeout < PMaxTimeInterval);
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::acquire_sem_etc " << semId << ",this: " << this << ", name: " << info.name << ", count:" << info.count
<< ", ms: " << microseconds << endl;
#endif
while((B_BAD_THREAD_ID != mOwner)
&& ((result = ::acquire_sem_etc(semId, 1,
B_RELATIVE_TIMEOUT, microseconds)) == B_INTERRUPTED))
{
}
}
else
{
result = ((BLocker*)semId)->LockWithTimeout(microseconds); // Using BLocker class to support recursive locks
}
return ms == 0 ? FALSE : result == B_OK;
}
void PSemaphore::Signal()
{
if(!mfNested)
{
PAssertOS(semId >= B_NO_ERROR);
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::release_sem " << semId << ", this: " << this << ", name: " << info.name << ", count:" << info.count << endl;
#endif
::release_sem(semId);
}
else
{
((BLocker*)semId)->Unlock(); // Using BLocker class to support recursive locks
}
}
BOOL PSemaphore::WillBlock() const
{
if(!mfNested)
{
PAssertOS(semId >= B_NO_ERROR);
#ifdef DEBUG_SEMAPHORES
sem_info info;
get_sem_info(semId, &info);
PError << "::acquire_sem_etc (WillBlock) " << semId << ", this: " << this << ", name: " << info.name << ", count:" << info.count << endl;
#endif
status_t result = ::acquire_sem_etc(semId, 0, B_RELATIVE_TIMEOUT, 0);
return result == B_WOULD_BLOCK;
}
else
{
return mOwner == find_thread(NULL); // If we are in our own thread, we won't lock
}
}
///////////////////////////////////////////////////////////////////////////////
// PSyncPoint
PSyncPoint::PSyncPoint()
: PSemaphore(FALSE) // FALSE is semaphore based, TRUE means implemented through BLocker
{
PSemaphore::Create(0);
}
void PSyncPoint::Signal()
{
PSemaphore::Signal();
}
void PSyncPoint::Wait()
{
PSemaphore::Wait();
}
BOOL PSyncPoint::Wait(const PTimeInterval & timeout)
{
return PSemaphore::Wait(timeout);
}
BOOL PSyncPoint::WillBlock() const
{
return PSemaphore::WillBlock();
}
//////////////////////////////////////////////////////////////////////////////
// PMutex, derived from BLightNestedLocker
PMutex::PMutex()
: PSemaphore(TRUE) // TRUE means implemented through BLocker
{
PSemaphore::Create(0);
}
PMutex::PMutex(const PMutex&)
: PSemaphore(TRUE)
{
PAssertAlways("PMutex copy constructor not supported");
}
void PMutex::Signal()
{
PSemaphore::Signal();
}
void PMutex::Wait()
{
PSemaphore::Wait();
}
BOOL PMutex::Wait(const PTimeInterval & timeout)
{
return PSemaphore::Wait(timeout);
}
BOOL PMutex::WillBlock() const
{
return PSemaphore::WillBlock();
}
//////////////////////////////////////////////////////////////////////////////
// Extra functionality not found in BeOS
int seteuid(uid_t uid) { return 0; }
int setegid(gid_t gid) { return 0; }
///////////////////////////////////////////////////////////////////////////////
// Toolchain dependent stuff
#if (__GNUC_MINOR__ > 9)
#warning "Using gcc 2.95.x"
ostream& ostream::write(const char *s, streamsize n) { return write(s, (long) n); };
istream& istream::read(char *s, streamsize n) { return read(s, (long) n); };
#endif // gcc minor > 9
// End Of File ///////////////////////////////////////////////////////////////
syntax highlighted by Code2HTML, v. 0.9.1