# -*- test-case-name: twisted.test.test_process -*- # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """UNIX Process management. Do NOT use this module directly - use reactor.spawnProcess() instead. Maintainer: U{Itamar Shtull-Trauring} """ # System Imports import os, sys, traceback, select, errno, struct, cStringIO, types, signal try: import pty except ImportError: pty = None try: import fcntl, termios except ImportError: fcntl = None from twisted.persisted import styles from twisted.python import log, failure from twisted.python.util import switchUID from twisted.internet import protocol # Sibling Imports import abstract, main, fdesc, error from main import CONNECTION_LOST, CONNECTION_DONE reapProcessHandlers = {} def reapAllProcesses(): """Reap all registered processes. """ for process in reapProcessHandlers.values(): process.reapProcess() def registerReapProcessHandler(pid, process): if reapProcessHandlers.has_key(pid): raise RuntimeError try: aux_pid, status = os.waitpid(pid, os.WNOHANG) except: log.msg('Failed to reap %d:' % pid) log.err() aux_pid = None if aux_pid: process.processEnded(status) else: reapProcessHandlers[pid] = process def unregisterReapProcessHandler(pid, process): if not (reapProcessHandlers.has_key(pid) and reapProcessHandlers[pid] == process): raise RuntimeError del reapProcessHandlers[pid] class ProcessWriter(abstract.FileDescriptor): """(Internal) Helper class to write into a Process's input pipe. I am a helper which describes a selectable asynchronous writer to a process's input pipe, including stdin. """ connected = 1 ic = 0 def __init__(self, reactor, proc, name, fileno): """Initialize, specifying a Process instance to connect to. """ abstract.FileDescriptor.__init__(self, reactor) self.proc = proc self.name = name self.fd = fileno def fileno(self): """Return the fileno() of my process's stdin. """ return self.fd # Copy relevant parts of the protocol def writeSomeData(self, data): """Write some data to the open process. """ try: rv = os.write(self.fd, data) if rv == len(data): self.startReading() return rv except IOError, io: if io.args[0] == errno.EAGAIN: return 0 return CONNECTION_LOST except OSError, ose: if ose.errno == errno.EPIPE: return CONNECTION_LOST if ose.errno == errno.EAGAIN: # MacOS-X does this return 0 raise def write(self, data): self.stopReading() abstract.FileDescriptor.write(self, data) def doRead(self): """The only way this pipe can become readable is at EOF, because the child has closed it. """ fd = self.fd r, w, x = select.select([fd], [fd], [], 0) if r and w: return CONNECTION_LOST def connectionLost(self, reason): """See abstract.FileDescriptor.connectionLost. """ abstract.FileDescriptor.connectionLost(self, reason) os.close(self.fd) self.proc.childConnectionLost(self.name) class ProcessReader(abstract.FileDescriptor): """ProcessReader I am a selectable representation of a process's output pipe, such as stdout and stderr. """ def __init__(self, reactor, proc, name, fileno): """Initialize, specifying a process to connect to. """ abstract.FileDescriptor.__init__(self, reactor) self.proc = proc self.name = name self.fd = fileno def fileno(self): """Return the fileno() of my process's stderr. """ return self.fd def writeSomeData(self, data): # the only time this is actually called is after .loseConnection Any # actual write attempt would fail, so we must avoid that. This hack # allows us to use .loseConnection on both readers and writers. assert data == "" return CONNECTION_LOST def doRead(self): """This is called when the pipe becomes readable. """ return fdesc.readFromFD(self.fd, self.dataReceived) def dataReceived(self, data): self.proto.childDataReceived(self.name, data) def connectionLost(self, reason): """Close my end of the pipe, signal the Process (which signals the ProcessProtocol). """ abstract.FileDescriptor.connectionLost(self, reason) os.close(self.fd) self.proc.childConnectionLost(self.name) class ProcessExitedAlready(Exception): """The process has already excited, and the operation requested can no longer be performed.""" pass class Process(styles.Ephemeral): """An operating-system Process. This represents an operating-system process with arbitrary input/output pipes connected to it. Those pipes may represent standard input, standard output, and standard error, or any other file descriptor. On UNIX, this is implemented using fork(), exec(), pipe() and fcntl(). These calls may not exist elsewhere so this code is not cross-platform. (also, windows can only select on sockets...) """ debug = False debug_child = False def __init__(self, reactor, command, args, environment, path, proto, uid=None, gid=None, childFDs=None): """Spawn an operating-system process. This is where the hard work of disconnecting all currently open files / forking / executing the new process happens. (This is executed automatically when a Process is instantiated.) This will also run the subprocess as a given user ID and group ID, if specified. (Implementation Note: this doesn't support all the arcane nuances of setXXuid on UNIX: it will assume that either your effective or real UID is 0.) @param childFDs: a dictionary mapping fd_in_child -> current_fd_in_parent/'r'/'w' If the value is a number, it specifies one of the parent's fds that will be remapped to the child's fd. This is useful for things like inetd and shell-like file redirection. If it is the string 'r', a pipe will be created and attached to the child at that fd number, and the parent will be able to read from the pipe. This is useful for the child's stdout and stderr. If it is the string 'w', a pipe will be created and attached, and the parent will be able to write into that pipe. This is useful for the child's stdin. If childFDs is not passed, the default behaviour is to use a mapping that opens the usual stdin/stdout/stderr pipes. """ if not proto: assert 'r' not in childFDs.values() and 'w' not in childFDs.values() self.lostProcess = False settingUID = (uid is not None) or (gid is not None) if settingUID: curegid = os.getegid() currgid = os.getgid() cureuid = os.geteuid() curruid = os.getuid() if uid is None: uid = cureuid if gid is None: gid = curegid # prepare to change UID in subprocess os.setuid(0) os.setgid(0) self.pipes = {} # keys are childFDs, we can sense them closing # values are ProcessReader/ProcessWriters helpers = {} # keys are childFDs # values are parentFDs if childFDs is None: childFDs = {0: "w", # we write to the child's stdin 1: "r", # we read from their stdout 2: "r", # and we read from their stderr } debug = self.debug if debug: print "childFDs", childFDs # fdmap.keys() are filenos of pipes that are used by the child. fdmap = {} # maps childFD to parentFD for childFD, target in childFDs.items(): if debug: print "[%d]" % childFD, target if target == "r": # we need a pipe that the parent can read from readFD, writeFD = os.pipe() if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD) fdmap[childFD] = writeFD # child writes to this helpers[childFD] = readFD # parent reads from this fdesc.setNonBlocking(readFD) elif target == "w": # we need a pipe that the parent can write to readFD, writeFD = os.pipe() if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) fdmap[childFD] = readFD # child reads from this helpers[childFD] = writeFD # parent writes to this fdesc.setNonBlocking(writeFD) else: assert type(target) == int, '%r should be an int' % (target,) fdmap[childFD] = target # parent ignores this if debug: print "fdmap", fdmap if debug: print "helpers", helpers # the child only cares about fdmap.values() self.pid = os.fork() if self.pid == 0: # pid is 0 in the child process # do not put *ANY* code outside the try block. The child process # must either exec or _exit. If it gets outside this block (due # to an exception that is not handled here, but which might be # handled higher up), there will be two copies of the parent # running in parallel, doing all kinds of damage. # After each change to this code, review it to make sure there # are no exit paths. try: # stop debugging, if I am! I don't care anymore! sys.settrace(None) # close all parent-side pipes self._setupChild(fdmap) self._execChild(path, settingUID, uid, gid, command, args, environment) except: # If there are errors, bail and try to write something # descriptive to stderr. # XXX: The parent's stderr isn't necessarily fd 2 anymore, or # even still available # XXXX: however even libc assumes write(2,err) is a useful # thing to attempt try: stderr = os.fdopen(2,'w') stderr.write("Upon execvpe %s %s in environment %s\n:" % (command, str(args), "id %s" % id(environment))) traceback.print_exc(file=stderr) stderr.flush() for fd in range(3): os.close(fd) except: pass # make *sure* the child terminates # Did you read the comment about not adding code here? os._exit(1) # we are the parent if settingUID: os.setregid(currgid, curegid) os.setreuid(curruid, cureuid) self.status = -1 # this records the exit status of the child # arrange for the parent-side pipes to be read and written for childFD, parentFD in helpers.items(): os.close(fdmap[childFD]) if childFDs[childFD] == "r": reader = ProcessReader(reactor, self, childFD, parentFD) reader.proto = proto self.pipes[childFD] = reader reader.startReading() if childFDs[childFD] == "w": writer = ProcessWriter(reactor, self, childFD, parentFD) writer.proto = proto self.pipes[childFD] = writer # we do startReading here to watch for EOF. We won't do an # actual .startWriting until some data has been written to # the transmit buffer. writer.startReading() self.proto = proto try: # the 'transport' is used for some compatibility methods if self.proto is not None: self.proto.makeConnection(self) except: log.err() registerReapProcessHandler(self.pid, self) def _setupChild(self, fdmap): """ fdmap[childFD] = parentFD The child wants to end up with 'childFD' attached to what used to be the parent's parentFD. As an example, a bash command run like 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}. 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}. Step 1: close all file descriptors that aren't values of fdmap. This means 0 .. maxfds. Step 2: for each childFD: if fdmap[childFD] == childFD, the descriptor is already in place. Make sure the CLOEXEC flag is not set, then delete the entry from fdmap. if childFD is in fdmap.values(), then the target descriptor is busy. Use os.dup() to move it elsewhere, update all fdmap[childFD] items that point to it, then close the original. Then fall through to the next case. now fdmap[childFD] is not in fdmap.values(), and is free. Use os.dup2() to move it to the right place, then close the original. """ debug = self.debug_child if debug: #errfd = open("/tmp/p.err", "a", 0) errfd = sys.stderr print >>errfd, "starting _setupChild" destList = fdmap.values() try: import resource maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1 # OS-X reports 9223372036854775808. That's a lot of fds to close if maxfds > 1024: maxfds = 1024 except: maxfds = 256 for fd in range(maxfds): if fd in destList: continue if debug and fd == errfd.fileno(): continue try: os.close(fd) except: pass # at this point, the only fds still open are the ones that need to # be moved to their appropriate positions in the child (the targets # of fdmap, i.e. fdmap.values() ) if debug: print >>errfd, "fdmap", fdmap childlist = fdmap.keys() childlist.sort() for child in childlist: target = fdmap[child] if target == child: # fd is already in place if debug: print >>errfd, "%d already in place" % target if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): old = fcntl.fcntl(child, fcntl.F_GETFD) fcntl.fcntl(child, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) else: if child in fdmap.values(): # we can't replace child-fd yet, as some other mapping # still needs the fd it wants to target. We must preserve # that old fd by duping it to a new home. newtarget = os.dup(child) # give it a safe home if debug: print >>errfd, "os.dup(%d) -> %d" % (child, newtarget) os.close(child) # close the original for c,p in fdmap.items(): if p == child: fdmap[c] = newtarget # update all pointers # now it should be available if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child) os.dup2(target, child) # At this point, the child has everything it needs. We want to close # everything that isn't going to be used by the child, i.e. # everything not in fdmap.keys(). The only remaining fds open are # those in fdmap.values(). # Any given fd may appear in fdmap.values() multiple times, so we # need to remove duplicates first. old = [] for fd in fdmap.values(): if not fd in old: if not fd in fdmap.keys(): old.append(fd) if debug: print >>errfd, "old", old for fd in old: os.close(fd) def _execChild(self, path, settingUID, uid, gid, command, args, environment): if path: os.chdir(path) # set the UID before I actually exec the process if settingUID: switchUID(uid, gid) os.execvpe(command, args, environment) def reapProcess(self): """Try to reap a process (without blocking) via waitpid. This is called when sigchild is caught or a Process object loses its "connection" (stdout is closed) This ought to result in reaping all zombie processes, since it will be called twice as often as it needs to be. (Unfortunately, this is a slightly experimental approach, since UNIX has no way to be really sure that your process is going to go away w/o blocking. I don't want to block.) """ try: pid, status = os.waitpid(self.pid, os.WNOHANG) except: log.msg('Failed to reap %d:' % self.pid) log.err() pid = None if pid: self.processEnded(status) unregisterReapProcessHandler(pid, self) def writeToChild(self, childFD, data): self.pipes[childFD].write(data) def closeChildFD(self, childFD): # for writer pipes, loseConnection tries to write the remaining data # out to the pipe before closing it # if childFD is not in the list of pipes, assume that it is already # closed if self.pipes.has_key(childFD): self.pipes[childFD].loseConnection() def pauseProducing(self): for p in self.pipes.itervalues(): if isinstance(p, ProcessReader): p.stopReading() def resumeProducing(self): for p in self.pipes.itervalues(): if isinstance(p, ProcessReader): p.startReading() # compatibility def closeStdin(self): """Call this to close standard input on this process. """ self.closeChildFD(0) def closeStdout(self): self.closeChildFD(1) def closeStderr(self): self.closeChildFD(2) def loseConnection(self): self.closeStdin() self.closeStderr() self.closeStdout() def write(self,data): """Call this to write to standard input on this process. """ self.pipes[0].write(data) def signalProcess(self, signalID): if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'): signalID = getattr(signal, 'SIG'+signalID) if self.pid is None: raise ProcessExitedAlready os.kill(self.pid, signalID) def processEnded(self, status): # this is called when the child terminates (SIGCHLD) self.status = status self.lostProcess = True self.pid = None #for fd, helper in self.pipes.items(): # helper.connectionLost(None) ##self.closeStdin() self.maybeCallProcessEnded() def childConnectionLost(self, childFD): # this is called when one of the helpers (ProcessReader or # ProcessWriter) notices their pipe has been closed del self.pipes[childFD] try: self.proto.childConnectionLost(childFD) except: log.err() self.maybeCallProcessEnded() def maybeCallProcessEnded(self): # we don't call ProcessProtocol.processEnded until: # the child has terminated, AND # all writers have indicated an error status, AND # all readers have indicated EOF # This insures that we've gathered all output from the process. if self.pipes: #print "maybe, but pipes still", self.pipes.keys() return if not self.lostProcess: #print "maybe, but haven't .lostProcess yet" self.reapProcess() return try: exitCode = sig = None if self.status != -1: if os.WIFEXITED(self.status): exitCode = os.WEXITSTATUS(self.status) else: sig = os.WTERMSIG(self.status) else: pass # don't think this can happen if exitCode or sig: e = error.ProcessTerminated(exitCode, sig, self.status) else: e = error.ProcessDone(self.status) if self.proto is not None: self.proto.processEnded(failure.Failure(e)) self.proto = None except: log.err() class PTYProcess(abstract.FileDescriptor, styles.Ephemeral): """An operating-system Process that uses PTY support.""" def __init__(self, reactor, command, args, environment, path, proto, uid=None, gid=None, usePTY=None): """Spawn an operating-system process. This is where the hard work of disconnecting all currently open files / forking / executing the new process happens. (This is executed automatically when a Process is instantiated.) This will also run the subprocess as a given user ID and group ID, if specified. (Implementation Note: this doesn't support all the arcane nuances of setXXuid on UNIX: it will assume that either your effective or real UID is 0.) """ if not pty and type(usePTY) not in (types.ListType, types.TupleType): # no pty module and we didn't get a pty to use raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module." abstract.FileDescriptor.__init__(self, reactor) settingUID = (uid is not None) or (gid is not None) if settingUID: curegid = os.getegid() currgid = os.getgid() cureuid = os.geteuid() curruid = os.getuid() if uid is None: uid = cureuid if gid is None: gid = curegid # prepare to change UID in subprocess os.setuid(0) os.setgid(0) if type(usePTY) in (types.TupleType, types.ListType): masterfd, slavefd, ttyname = usePTY else: masterfd, slavefd = pty.openpty() ttyname = os.ttyname(slavefd) pid = os.fork() self.pid = pid if pid == 0: # pid is 0 in the child process try: sys.settrace(None) os.close(masterfd) os.setsid() if hasattr(termios, 'TIOCSCTTY'): fcntl.ioctl(slavefd, termios.TIOCSCTTY, '') else: for fd in range(3): if fd != slavefd: os.close(fd) fd = os.open(ttyname, os.O_RDWR) os.close(fd) os.dup2(slavefd, 0) # stdin os.dup2(slavefd, 1) # stdout os.dup2(slavefd, 2) # stderr if path: os.chdir(path) for fd in range(3, 256): try: os.close(fd) except: pass # set the UID before I actually exec the process if settingUID: switchUID(uid, gid) os.execvpe(command, args, environment) except: stderr = os.fdopen(1, 'w') stderr.write("Upon execvpe %s %s in environment %s:\n" % (command, str(args), "id %s" % id(environment))) traceback.print_exc(file=stderr) stderr.flush() os._exit(1) assert pid!=0 os.close(slavefd) fdesc.setNonBlocking(masterfd) self.fd=masterfd self.startReading() self.connected = 1 self.proto = proto self.lostProcess = 0 self.status = -1 try: self.proto.makeConnection(self) except: log.err() registerReapProcessHandler(self.pid, self) def reapProcess(self): """Try to reap a process (without blocking) via waitpid. This is called when sigchild is caught or a Process object loses its "connection" (stdout is closed) This ought to result in reaping all zombie processes, since it will be called twice as often as it needs to be. (Unfortunately, this is a slightly experimental approach, since UNIX has no way to be really sure that your process is going to go away w/o blocking. I don't want to block.) """ try: pid, status = os.waitpid(self.pid, os.WNOHANG) except OSError, e: if e.errno == 10: # no child process pid = None else: raise except: log.err() pid = None if pid: self.processEnded(status) unregisterReapProcessHandler(self.pid, self) # PTYs do not have stdin/stdout/stderr. They only have in and out, just # like sockets. You cannot close one without closing off the entire PTY. def closeStdin(self): pass def closeStdout(self): pass def closeStderr(self): pass def signalProcess(self, signalID): if signalID in ('HUP', 'STOP', 'INT', 'KILL'): signalID = getattr(signal, 'SIG'+signalID) os.kill(self.pid, signalID) def processEnded(self, status): self.status = status self.lostProcess += 1 self.maybeCallProcessEnded() def doRead(self): """Called when my standard output stream is ready for reading. """ try: return fdesc.readFromFD(self.fd, self.proto.outReceived) except OSError: return CONNECTION_LOST def fileno(self): """This returns the file number of standard output on this process. """ return self.fd def maybeCallProcessEnded(self): # two things must happen before we call the ProcessProtocol's # processEnded method. 1: the child process must die and be reaped # (which calls our own processEnded method). 2: the child must close # their stdin/stdout/stderr fds, causing the pty to close, causing # our connectionLost method to be called. #2 can also be triggered # by calling .loseConnection(). if self.lostProcess == 2: try: exitCode = sig = None if self.status != -1: if os.WIFEXITED(self.status): exitCode = os.WEXITSTATUS(self.status) else: sig = os.WTERMSIG(self.status) else: pass # wonder when this happens if exitCode or sig: e = error.ProcessTerminated(exitCode, sig, self.status) else: e = error.ProcessDone(self.status) self.proto.processEnded(failure.Failure(e)) self.proto = None except: log.err() def connectionLost(self, reason): """I call this to clean up when one or all of my connections has died. """ abstract.FileDescriptor.connectionLost(self, reason) os.close(self.fd) self.lostProcess +=1 self.maybeCallProcessEnded() def writeSomeData(self, data): """Write some data to the open process. """ try: return os.write(self.fd, data) except IOError,io: if io.args[0] == errno.EAGAIN: return 0 return CONNECTION_LOST