# -*- test-case-name: twisted.test.test_process -*-
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""UNIX Process management.
Do NOT use this module directly - use reactor.spawnProcess() instead.
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
"""
# System Imports
import os, sys, traceback, select, errno, struct, cStringIO, types, signal
try:
import pty
import fcntl, termios
except:
pty = None
try:
import pwd, grp
from os import setgroups
def initgroups(uid, primaryGid):
username = pwd.getpwuid(uid)[0]
l=[primaryGid]
for groupname, password, gid, userlist in grp.getgrall():
if username in userlist:
l.append(gid)
setgroups(l)
except:
def initgroups(uid, primaryGid):
pass
def switch_uid(uid, gid):
os.setgid(gid)
initgroups(uid, gid)
os.setuid(uid)
from twisted.persisted import styles
from twisted.python import log, failure
from twisted.internet import protocol
# Sibling Imports
import abstract, main, fdesc, error
from main import CONNECTION_LOST, CONNECTION_DONE
reapProcessHandlers = {}
def reapAllProcesses():
"""Reap all registered processes.
"""
for process in reapProcessHandlers.values():
process.reapProcess()
def registerReapProcessHandler(pid, process):
if reapProcessHandlers.has_key(pid):
raise RuntimeError
try:
aux_pid, status = os.waitpid(pid, os.WNOHANG)
except:
log.deferr()
aux_pid = None
if aux_pid:
process.processEnded(status)
else:
reapProcessHandlers[pid] = process
def unregisterReapProcessHandler(pid, process):
if not (reapProcessHandlers.has_key(pid)
and reapProcessHandlers[pid] == process):
raise RuntimeError
del reapProcessHandlers[pid]
from twisted.internet.abstract import _BufferBufferMixin
class ProcessWriter(_BufferBufferMixin, abstract.FileDescriptor, styles.Ephemeral):
"""(Internal) Helper class to write to Process's stdin.
I am a helper which describes a selectable asynchronous writer to a
process's stdin.
"""
connected = 1
ic = 0
def __init__(self, proc):
"""Initialize, specifying a Process instance to connect to.
"""
abstract.FileDescriptor.__init__(self)
self.proc = proc
def write(self, data):
self.stopReading()
abstract.FileDescriptor.write(self, data)
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
rv = os.write(self.proc.stdin, data)
if rv == len(data):
self.startReading()
return rv
except IOError, io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST
except OSError, ose:
if ose.errno == errno.EPIPE:
return CONNECTION_LOST
if ose.errno == errno.EAGAIN: # MacOS-X does this
return 0
raise
def doRead(self):
"""This does nothing.
"""
fd = self.fileno()
r, w, x = select.select([fd], [fd], [], 0)
if r and w:
return CONNECTION_LOST
def connectionLost(self, reason):
"""See abstract.FileDescriptor.connectionLost.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.proc.stdin)
self.proc.inConnectionLost()
def fileno(self):
"""Return the fileno() of my process's stdin.
"""
return self.proc.stdin
class ProcessError(abstract.FileDescriptor):
"""ProcessError
I am a selectable representation of a process's stderr.
"""
def __init__(self, proc):
"""Initialize, specifying a process to connect to.
"""
abstract.FileDescriptor.__init__(self)
self.proc = proc
def fileno(self):
"""Return the fileno() of my process's stderr.
"""
return self.proc.stderr
def doRead(self):
"""Call back to my process's doError.
"""
return self.proc.doError()
def connectionLost(self, reason):
"""I close my process's stderr.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.proc.stderr)
self.proc.errConnectionLost()
class ProcessExitedAlready(Exception):
"""The process has already excited, and the operation requested can no longer be performed."""
pass
class Process(abstract.FileDescriptor, styles.Ephemeral):
"""An operating-system Process.
This represents an operating-system process with standard input,
standard output, and standard error streams connected to it.
On UNIX, this is implemented using fork(), exec(), pipe()
and fcntl(). These calls may not exist elsewhere so this
code is not cross-platform. (also, windows can only select
on sockets...)
"""
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
"""
abstract.FileDescriptor.__init__(self, reactor)
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
# prepare to change UID in subprocess
os.setuid(0)
os.setgid(0)
stdout_read, stdout_write = os.pipe()
stderr_read, stderr_write = os.pipe()
stdin_read, stdin_write = os.pipe()
self.pid = os.fork()
if self.pid == 0: # pid is 0 in the child process
# stop debugging, if I am! I don't care anymore!
sys.settrace(None)
# Destroy my stdin / stdout / stderr (in that order)
try:
os.dup2(stdin_read, 0)
os.dup2(stdout_write, 1)
os.dup2(stderr_write, 2)
# XXX TODO FIXME: 256 is a magic number here; really we need a
# way of saying "close all open FDs except 0, 1, 2". This will
# fail in a surprising and subtle way if the current process
# has more than 256 FDs open. On linux this would be
# "[os.close(int(fd)) for fd in os.listdir('/proc/self/fd')]"
# but I seriously doubt that's portable.
for fd in range(3, 256):
try: os.close(fd)
except: pass
if path:
os.chdir(path)
# set the UID before I actually exec the process
if settingUID:
switch_uid(uid, gid)
os.execvpe(command, args, environment)
except:
# If there are errors, bail and try to write something
# descriptive to stderr.
try:
stderr = os.fdopen(2,'w')
stderr.write("Upon execvpe %s %s in environment %s\n:" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
for fd in range(3):
os.close(fd)
except:
pass # make *sure* the child terminates
os._exit(1)
if settingUID:
os.setregid(currgid, curegid)
os.setreuid(curruid, cureuid)
self.status = -1
for fd in stdout_write, stderr_write, stdin_read:
os.close(fd)
for fd in (stdout_read, stderr_read, stdin_write):
fdesc.setNonBlocking(fd)
self.stdout = stdout_read # os.fdopen(stdout_read, 'r')
self.stderr = stderr_read # os.fdopen(stderr_read, 'r')
self.stdin = stdin_write
# ok now I really have a fileno()
self.writer = ProcessWriter(self)
self.writer.startReading()
self.err = ProcessError(self)
self.err.startReading()
self.startReading()
self.connected = 1
self.proto = proto
try:
self.proto.makeConnection(self)
except:
log.deferr()
registerReapProcessHandler(self.pid, self)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except:
log.deferr()
pid = None
if pid:
self.processEnded(status)
del reapProcessHandlers[pid]
def closeStdin(self):
"""Call this to close standard input on this process.
"""
if hasattr(self, "writer"):
self.writer.loseConnection()
def closeStderr(self):
"""Close stderr."""
if hasattr(self, "err"):
self.err.stopReading()
self.err.connectionLost(None)
def closeStdout(self):
"""Close stdout."""
if not self.lostOutConnection:
self.stopReading()
self.connectionLost(None)
def loseConnection(self):
self.closeStdin()
self.closeStderr()
self.closeStdout()
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL'):
signalID = getattr(signal, 'SIG'+signalID)
if self.pid is None:
raise ProcessExitedAlready
os.kill(self.pid, signalID)
def doError(self):
"""Called when my standard error stream is ready for reading.
"""
return fdesc.readFromFD(self.stderr, self.proto.errReceived)
def doRead(self):
"""Called when my standard output stream is ready for reading.
"""
return fdesc.readFromFD(self.stdout, self.proto.outReceived)
def doWrite(self):
"""Called when my standard output stream is ready for writing.
This will only happen in the case where the pipe to write to is
broken.
"""
return CONNECTION_DONE
def write(self,data):
"""Call this to write to standard input on this process.
"""
self.writer.write(data)
def fileno(self):
"""This returns the file number of standard output on this process.
"""
return self.stdout
lostErrorConnection = 0
lostOutConnection = 0
lostInConnection = 0
lostProcess = 0
def maybeCallProcessEnded(self):
if (self.lostErrorConnection and
self.lostOutConnection and
self.lostInConnection):
if self.lostProcess:
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass # wonder when this happens
if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
self.proto.processEnded(failure.Failure(e))
except:
log.deferr()
else:
self.reapProcess()
def processEnded(self, status):
self.status = status
self.lostProcess = 1
self.pid = None
self.closeStdin()
self.maybeCallProcessEnded()
def inConnectionLost(self):
try:
self.proto.inConnectionLost()
except:
log.deferr()
del self.writer
self.lostInConnection = 1
self.maybeCallProcessEnded()
def errConnectionLost(self):
self.lostErrorConnection = 1
del self.err
try:
self.proto.errConnectionLost()
except:
log.deferr()
self.maybeCallProcessEnded()
def connectionLost(self, reason):
"""stdout closed.
"""
self.lostOutConnection = 1
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.stdout)
try:
self.proto.outConnectionLost()
except:
log.deferr()
self.maybeCallProcessEnded()
class PTYProcess(abstract.FileDescriptor, styles.Ephemeral):
"""An operating-system Process that uses PTY support."""
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None, usePTY=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
"""
if not pty and type(usePTY) not in (types.ListType, types.TupleType):
# no pty module and we didn't get a pty to use
raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module."
abstract.FileDescriptor.__init__(self, reactor)
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
# prepare to change UID in subprocess
os.setuid(0)
os.setgid(0)
if type(usePTY) in (types.TupleType, types.ListType):
masterfd, slavefd, ttyname = usePTY
else:
masterfd, slavefd = pty.openpty()
ttyname = os.ttyname(slavefd)
pid = os.fork()
self.pid = pid
if pid == 0: # pid is 0 in the child process
try:
sys.settrace(None)
os.close(masterfd)
os.setsid()
if hasattr(termios, 'TIOCSCTTY'):
fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
else:
for fd in range(3):
if fd != slavefd:
os.close(fd)
fd = os.open(ttyname, os.O_RDWR)
os.close(fd)
os.dup2(slavefd, 0) # stdin
os.dup2(slavefd, 1) # stdout
os.dup2(slavefd, 2) # stderr
if path:
os.chdir(path)
for fd in range(3, 256):
try: os.close(fd)
except: pass
# set the UID before I actually exec the process
if settingUID:
switch_uid(uid, gid)
os.execvpe(command, args, environment)
except:
stderr = os.fdopen(1, 'w')
stderr.write("Upon execvpe %s %s in environment %s:\n" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
os._exit(1)
assert pid!=0
os.close(slavefd)
fdesc.setNonBlocking(masterfd)
self.fd=masterfd
self.startReading()
self.connected = 1
self.proto = proto
self.lostProcess = 0
self.status = -1
try:
self.proto.makeConnection(self)
except:
log.deferr()
registerReapProcessHandler(self.pid, self)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError, e:
if e.errno == 10: # no child process
pid = None
else:
raise
except:
log.deferr()
pid = None
if pid:
self.processEnded(status)
del reapProcessHandlers[pid]
# PTYs do not have stdin/stdout/stderr. They only have in and out, just
# like sockets. You cannot close one without closing off the entire PTY.
def closeStdin(self):
pass
def closeStdout(self):
pass
def closeStderr(self):
pass
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL'):
signalID = getattr(signal, 'SIG'+signalID)
os.kill(self.pid, signalID)
def processEnded(self, status):
self.status = status
self.lostProcess += 1
self.maybeCallProcessEnded()
def doRead(self):
"""Called when my standard output stream is ready for reading.
"""
try:
return fdesc.readFromFD(self.fd, self.proto.outReceived)
except OSError:
return CONNECTION_LOST
def fileno(self):
"""This returns the file number of standard output on this process.
"""
return self.fd
def maybeCallProcessEnded(self):
# two things must happen before we call the ProcessProtocol's
# processEnded method. 1: the child process must die and be reaped
# (which calls our own processEnded method). 2: the child must close
# their stdin/stdout/stderr fds, causing the pty to close, causing
# our connectionLost method to be called. #2 can also be triggered
# by calling .loseConnection().
if self.lostProcess == 2:
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass # wonder when this happens
if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
self.proto.processEnded(failure.Failure(e))
except:
log.deferr()
def connectionLost(self, reason):
"""I call this to clean up when one or all of my connections has died.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.lostProcess +=1
self.maybeCallProcessEnded()
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
return os.write(self.fd, data)
except IOError,io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST
syntax highlighted by Code2HTML, v. 0.9.1