# -*- test-case-name: twisted.test.test_process -*-
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""UNIX Process management.
Do NOT use this module directly - use reactor.spawnProcess() instead.
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
"""
# System Imports
import os, sys, traceback, select, errno, struct, cStringIO, types, signal
try:
import pty
except ImportError:
pty = None
try:
import fcntl, termios
except ImportError:
fcntl = None
from twisted.persisted import styles
from twisted.python import log, failure
from twisted.python.util import switchUID
from twisted.internet import protocol
# Sibling Imports
import abstract, main, fdesc, error
from main import CONNECTION_LOST, CONNECTION_DONE
reapProcessHandlers = {}
def reapAllProcesses():
"""Reap all registered processes.
"""
for process in reapProcessHandlers.values():
process.reapProcess()
def registerReapProcessHandler(pid, process):
if reapProcessHandlers.has_key(pid):
raise RuntimeError
try:
aux_pid, status = os.waitpid(pid, os.WNOHANG)
except:
log.msg('Failed to reap %d:' % pid)
log.err()
aux_pid = None
if aux_pid:
process.processEnded(status)
else:
reapProcessHandlers[pid] = process
def unregisterReapProcessHandler(pid, process):
if not (reapProcessHandlers.has_key(pid)
and reapProcessHandlers[pid] == process):
raise RuntimeError
del reapProcessHandlers[pid]
class ProcessWriter(abstract.FileDescriptor):
"""(Internal) Helper class to write into a Process's input pipe.
I am a helper which describes a selectable asynchronous writer to a
process's input pipe, including stdin.
"""
connected = 1
ic = 0
def __init__(self, reactor, proc, name, fileno):
"""Initialize, specifying a Process instance to connect to.
"""
abstract.FileDescriptor.__init__(self, reactor)
self.proc = proc
self.name = name
self.fd = fileno
def fileno(self):
"""Return the fileno() of my process's stdin.
"""
return self.fd
# Copy relevant parts of the protocol
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
rv = os.write(self.fd, data)
if rv == len(data):
self.startReading()
return rv
except IOError, io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST
except OSError, ose:
if ose.errno == errno.EPIPE:
return CONNECTION_LOST
if ose.errno == errno.EAGAIN: # MacOS-X does this
return 0
raise
def write(self, data):
self.stopReading()
abstract.FileDescriptor.write(self, data)
def doRead(self):
"""The only way this pipe can become readable is at EOF, because the
child has closed it.
"""
fd = self.fd
r, w, x = select.select([fd], [fd], [], 0)
if r and w:
return CONNECTION_LOST
def connectionLost(self, reason):
"""See abstract.FileDescriptor.connectionLost.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.proc.childConnectionLost(self.name)
class ProcessReader(abstract.FileDescriptor):
"""ProcessReader
I am a selectable representation of a process's output pipe, such as
stdout and stderr.
"""
def __init__(self, reactor, proc, name, fileno):
"""Initialize, specifying a process to connect to.
"""
abstract.FileDescriptor.__init__(self, reactor)
self.proc = proc
self.name = name
self.fd = fileno
def fileno(self):
"""Return the fileno() of my process's stderr.
"""
return self.fd
def writeSomeData(self, data):
# the only time this is actually called is after .loseConnection Any
# actual write attempt would fail, so we must avoid that. This hack
# allows us to use .loseConnection on both readers and writers.
assert data == ""
return CONNECTION_LOST
def doRead(self):
"""This is called when the pipe becomes readable.
"""
return fdesc.readFromFD(self.fd, self.dataReceived)
def dataReceived(self, data):
self.proto.childDataReceived(self.name, data)
def connectionLost(self, reason):
"""Close my end of the pipe, signal the Process (which signals the
ProcessProtocol).
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.proc.childConnectionLost(self.name)
class ProcessExitedAlready(Exception):
"""The process has already excited, and the operation requested can no longer be performed."""
pass
class Process(styles.Ephemeral):
"""An operating-system Process.
This represents an operating-system process with arbitrary input/output
pipes connected to it. Those pipes may represent standard input,
standard output, and standard error, or any other file descriptor.
On UNIX, this is implemented using fork(), exec(), pipe()
and fcntl(). These calls may not exist elsewhere so this
code is not cross-platform. (also, windows can only select
on sockets...)
"""
debug = False
debug_child = False
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None, childFDs=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
@param childFDs: a dictionary mapping
fd_in_child -> current_fd_in_parent/'r'/'w'
If the value is a number, it specifies one of the parent's fds
that will be remapped to the child's fd. This is useful for
things like inetd and shell-like file redirection.
If it is the string 'r', a pipe will be created and attached to
the child at that fd number, and the parent will be able to
read from the pipe. This is useful for the child's stdout and
stderr.
If it is the string 'w', a pipe will be created and attached,
and the parent will be able to write into that pipe. This is
useful for the child's stdin.
If childFDs is not passed, the default behaviour is to use a
mapping that opens the usual stdin/stdout/stderr pipes.
"""
if not proto:
assert 'r' not in childFDs.values() and 'w' not in childFDs.values()
self.lostProcess = False
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
# prepare to change UID in subprocess
os.setuid(0)
os.setgid(0)
self.pipes = {}
# keys are childFDs, we can sense them closing
# values are ProcessReader/ProcessWriters
helpers = {}
# keys are childFDs
# values are parentFDs
if childFDs is None:
childFDs = {0: "w", # we write to the child's stdin
1: "r", # we read from their stdout
2: "r", # and we read from their stderr
}
debug = self.debug
if debug: print "childFDs", childFDs
# fdmap.keys() are filenos of pipes that are used by the child.
fdmap = {} # maps childFD to parentFD
for childFD, target in childFDs.items():
if debug: print "[%d]" % childFD, target
if target == "r":
# we need a pipe that the parent can read from
readFD, writeFD = os.pipe()
if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD)
fdmap[childFD] = writeFD # child writes to this
helpers[childFD] = readFD # parent reads from this
fdesc.setNonBlocking(readFD)
elif target == "w":
# we need a pipe that the parent can write to
readFD, writeFD = os.pipe()
if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
fdmap[childFD] = readFD # child reads from this
helpers[childFD] = writeFD # parent writes to this
fdesc.setNonBlocking(writeFD)
else:
assert type(target) == int, '%r should be an int' % (target,)
fdmap[childFD] = target # parent ignores this
if debug: print "fdmap", fdmap
if debug: print "helpers", helpers
# the child only cares about fdmap.values()
self.pid = os.fork()
if self.pid == 0: # pid is 0 in the child process
# do not put *ANY* code outside the try block. The child process
# must either exec or _exit. If it gets outside this block (due
# to an exception that is not handled here, but which might be
# handled higher up), there will be two copies of the parent
# running in parallel, doing all kinds of damage.
# After each change to this code, review it to make sure there
# are no exit paths.
try:
# stop debugging, if I am! I don't care anymore!
sys.settrace(None)
# close all parent-side pipes
self._setupChild(fdmap)
self._execChild(path, settingUID, uid, gid,
command, args, environment)
except:
# If there are errors, bail and try to write something
# descriptive to stderr.
# XXX: The parent's stderr isn't necessarily fd 2 anymore, or
# even still available
# XXXX: however even libc assumes write(2,err) is a useful
# thing to attempt
try:
stderr = os.fdopen(2,'w')
stderr.write("Upon execvpe %s %s in environment %s\n:" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
for fd in range(3):
os.close(fd)
except:
pass # make *sure* the child terminates
# Did you read the comment about not adding code here?
os._exit(1)
# we are the parent
if settingUID:
os.setregid(currgid, curegid)
os.setreuid(curruid, cureuid)
self.status = -1 # this records the exit status of the child
# arrange for the parent-side pipes to be read and written
for childFD, parentFD in helpers.items():
os.close(fdmap[childFD])
if childFDs[childFD] == "r":
reader = ProcessReader(reactor, self, childFD, parentFD)
reader.proto = proto
self.pipes[childFD] = reader
reader.startReading()
if childFDs[childFD] == "w":
writer = ProcessWriter(reactor, self, childFD, parentFD)
writer.proto = proto
self.pipes[childFD] = writer
# we do startReading here to watch for EOF. We won't do an
# actual .startWriting until some data has been written to
# the transmit buffer.
writer.startReading()
self.proto = proto
try:
# the 'transport' is used for some compatibility methods
if self.proto is not None:
self.proto.makeConnection(self)
except:
log.err()
registerReapProcessHandler(self.pid, self)
def _setupChild(self, fdmap):
"""
fdmap[childFD] = parentFD
The child wants to end up with 'childFD' attached to what used to be
the parent's parentFD. As an example, a bash command run like
'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
Step 1: close all file descriptors that aren't values of fdmap.
This means 0 .. maxfds.
Step 2: for each childFD:
if fdmap[childFD] == childFD, the descriptor is already in place.
Make sure the CLOEXEC flag is not set, then delete the entry from
fdmap.
if childFD is in fdmap.values(), then the target descriptor is
busy. Use os.dup() to move it elsewhere, update all fdmap[childFD]
items that point to it, then close the original. Then fall through
to the next case.
now fdmap[childFD] is not in fdmap.values(), and is free. Use
os.dup2() to move it to the right place, then close the original.
"""
debug = self.debug_child
if debug:
#errfd = open("/tmp/p.err", "a", 0)
errfd = sys.stderr
print >>errfd, "starting _setupChild"
destList = fdmap.values()
try:
import resource
maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1
# OS-X reports 9223372036854775808. That's a lot of fds to close
if maxfds > 1024:
maxfds = 1024
except:
maxfds = 256
for fd in range(maxfds):
if fd in destList:
continue
if debug and fd == errfd.fileno():
continue
try: os.close(fd)
except: pass
# at this point, the only fds still open are the ones that need to
# be moved to their appropriate positions in the child (the targets
# of fdmap, i.e. fdmap.values() )
if debug: print >>errfd, "fdmap", fdmap
childlist = fdmap.keys()
childlist.sort()
for child in childlist:
target = fdmap[child]
if target == child:
# fd is already in place
if debug: print >>errfd, "%d already in place" % target
if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
old = fcntl.fcntl(child, fcntl.F_GETFD)
fcntl.fcntl(child,
fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
else:
if child in fdmap.values():
# we can't replace child-fd yet, as some other mapping
# still needs the fd it wants to target. We must preserve
# that old fd by duping it to a new home.
newtarget = os.dup(child) # give it a safe home
if debug: print >>errfd, "os.dup(%d) -> %d" % (child,
newtarget)
os.close(child) # close the original
for c,p in fdmap.items():
if p == child:
fdmap[c] = newtarget # update all pointers
# now it should be available
if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child)
os.dup2(target, child)
# At this point, the child has everything it needs. We want to close
# everything that isn't going to be used by the child, i.e.
# everything not in fdmap.keys(). The only remaining fds open are
# those in fdmap.values().
# Any given fd may appear in fdmap.values() multiple times, so we
# need to remove duplicates first.
old = []
for fd in fdmap.values():
if not fd in old:
if not fd in fdmap.keys():
old.append(fd)
if debug: print >>errfd, "old", old
for fd in old:
os.close(fd)
def _execChild(self, path, settingUID, uid, gid,
command, args, environment):
if path:
os.chdir(path)
# set the UID before I actually exec the process
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except:
log.msg('Failed to reap %d:' % self.pid)
log.err()
pid = None
if pid:
self.processEnded(status)
unregisterReapProcessHandler(pid, self)
def writeToChild(self, childFD, data):
self.pipes[childFD].write(data)
def closeChildFD(self, childFD):
# for writer pipes, loseConnection tries to write the remaining data
# out to the pipe before closing it
# if childFD is not in the list of pipes, assume that it is already
# closed
if self.pipes.has_key(childFD):
self.pipes[childFD].loseConnection()
def pauseProducing(self):
for p in self.pipes.itervalues():
if isinstance(p, ProcessReader):
p.stopReading()
def resumeProducing(self):
for p in self.pipes.itervalues():
if isinstance(p, ProcessReader):
p.startReading()
# compatibility
def closeStdin(self):
"""Call this to close standard input on this process.
"""
self.closeChildFD(0)
def closeStdout(self):
self.closeChildFD(1)
def closeStderr(self):
self.closeChildFD(2)
def loseConnection(self):
self.closeStdin()
self.closeStderr()
self.closeStdout()
def write(self,data):
"""Call this to write to standard input on this process.
"""
self.pipes[0].write(data)
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
signalID = getattr(signal, 'SIG'+signalID)
if self.pid is None:
raise ProcessExitedAlready
os.kill(self.pid, signalID)
def processEnded(self, status):
# this is called when the child terminates (SIGCHLD)
self.status = status
self.lostProcess = True
self.pid = None
#for fd, helper in self.pipes.items():
# helper.connectionLost(None)
##self.closeStdin()
self.maybeCallProcessEnded()
def childConnectionLost(self, childFD):
# this is called when one of the helpers (ProcessReader or
# ProcessWriter) notices their pipe has been closed
del self.pipes[childFD]
try:
self.proto.childConnectionLost(childFD)
except:
log.err()
self.maybeCallProcessEnded()
def maybeCallProcessEnded(self):
# we don't call ProcessProtocol.processEnded until:
# the child has terminated, AND
# all writers have indicated an error status, AND
# all readers have indicated EOF
# This insures that we've gathered all output from the process.
if self.pipes:
#print "maybe, but pipes still", self.pipes.keys()
return
if not self.lostProcess:
#print "maybe, but haven't .lostProcess yet"
self.reapProcess()
return
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass # don't think this can happen
if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
if self.proto is not None:
self.proto.processEnded(failure.Failure(e))
self.proto = None
except:
log.err()
class PTYProcess(abstract.FileDescriptor, styles.Ephemeral):
"""An operating-system Process that uses PTY support."""
def __init__(self, reactor, command, args, environment, path, proto,
uid=None, gid=None, usePTY=None):
"""Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
"""
if not pty and type(usePTY) not in (types.ListType, types.TupleType):
# no pty module and we didn't get a pty to use
raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module."
abstract.FileDescriptor.__init__(self, reactor)
settingUID = (uid is not None) or (gid is not None)
if settingUID:
curegid = os.getegid()
currgid = os.getgid()
cureuid = os.geteuid()
curruid = os.getuid()
if uid is None:
uid = cureuid
if gid is None:
gid = curegid
# prepare to change UID in subprocess
os.setuid(0)
os.setgid(0)
if type(usePTY) in (types.TupleType, types.ListType):
masterfd, slavefd, ttyname = usePTY
else:
masterfd, slavefd = pty.openpty()
ttyname = os.ttyname(slavefd)
pid = os.fork()
self.pid = pid
if pid == 0: # pid is 0 in the child process
try:
sys.settrace(None)
os.close(masterfd)
os.setsid()
if hasattr(termios, 'TIOCSCTTY'):
fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
else:
for fd in range(3):
if fd != slavefd:
os.close(fd)
fd = os.open(ttyname, os.O_RDWR)
os.close(fd)
os.dup2(slavefd, 0) # stdin
os.dup2(slavefd, 1) # stdout
os.dup2(slavefd, 2) # stderr
if path:
os.chdir(path)
for fd in range(3, 256):
try: os.close(fd)
except: pass
# set the UID before I actually exec the process
if settingUID:
switchUID(uid, gid)
os.execvpe(command, args, environment)
except:
stderr = os.fdopen(1, 'w')
stderr.write("Upon execvpe %s %s in environment %s:\n" %
(command, str(args),
"id %s" % id(environment)))
traceback.print_exc(file=stderr)
stderr.flush()
os._exit(1)
assert pid!=0
os.close(slavefd)
fdesc.setNonBlocking(masterfd)
self.fd=masterfd
self.startReading()
self.connected = 1
self.proto = proto
self.lostProcess = 0
self.status = -1
try:
self.proto.makeConnection(self)
except:
log.err()
registerReapProcessHandler(self.pid, self)
def reapProcess(self):
"""Try to reap a process (without blocking) via waitpid.
This is called when sigchild is caught or a Process object loses its
"connection" (stdout is closed) This ought to result in reaping all
zombie processes, since it will be called twice as often as it needs
to be.
(Unfortunately, this is a slightly experimental approach, since
UNIX has no way to be really sure that your process is going to
go away w/o blocking. I don't want to block.)
"""
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError, e:
if e.errno == 10: # no child process
pid = None
else:
raise
except:
log.err()
pid = None
if pid:
self.processEnded(status)
unregisterReapProcessHandler(self.pid, self)
# PTYs do not have stdin/stdout/stderr. They only have in and out, just
# like sockets. You cannot close one without closing off the entire PTY.
def closeStdin(self):
pass
def closeStdout(self):
pass
def closeStderr(self):
pass
def signalProcess(self, signalID):
if signalID in ('HUP', 'STOP', 'INT', 'KILL'):
signalID = getattr(signal, 'SIG'+signalID)
os.kill(self.pid, signalID)
def processEnded(self, status):
self.status = status
self.lostProcess += 1
self.maybeCallProcessEnded()
def doRead(self):
"""Called when my standard output stream is ready for reading.
"""
try:
return fdesc.readFromFD(self.fd, self.proto.outReceived)
except OSError:
return CONNECTION_LOST
def fileno(self):
"""This returns the file number of standard output on this process.
"""
return self.fd
def maybeCallProcessEnded(self):
# two things must happen before we call the ProcessProtocol's
# processEnded method. 1: the child process must die and be reaped
# (which calls our own processEnded method). 2: the child must close
# their stdin/stdout/stderr fds, causing the pty to close, causing
# our connectionLost method to be called. #2 can also be triggered
# by calling .loseConnection().
if self.lostProcess == 2:
try:
exitCode = sig = None
if self.status != -1:
if os.WIFEXITED(self.status):
exitCode = os.WEXITSTATUS(self.status)
else:
sig = os.WTERMSIG(self.status)
else:
pass # wonder when this happens
if exitCode or sig:
e = error.ProcessTerminated(exitCode, sig, self.status)
else:
e = error.ProcessDone(self.status)
self.proto.processEnded(failure.Failure(e))
self.proto = None
except:
log.err()
def connectionLost(self, reason):
"""I call this to clean up when one or all of my connections has died.
"""
abstract.FileDescriptor.connectionLost(self, reason)
os.close(self.fd)
self.lostProcess +=1
self.maybeCallProcessEnded()
def writeSomeData(self, data):
"""Write some data to the open process.
"""
try:
return os.write(self.fd, data)
except IOError,io:
if io.args[0] == errno.EAGAIN:
return 0
return CONNECTION_LOST
syntax highlighted by Code2HTML, v. 0.9.1