import os, base64, binascii
try:
import pwd
except ImportError:
pwd = None
else:
import crypt
try:
# get these from http://www.twistedmatrix.com/users/z3p/files/pyshadow-0.1.tar.gz
import md5_crypt
import shadow
except:
md5_crypt = None
shadow = None
try:
import pamauth
except ImportError:
pamauth = None
from twisted.conch import error
from twisted.conch.ssh import keys
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
from twisted.internet import defer
from twisted.python import components, failure, reflect
from credentials import ISSHPrivateKey, IPluggableAuthenticationModules
def verifyCryptedPassword(crypted, pw):
if crypted[0] == '$': # md5_crypt encrypted
if not md5_crypt: return 0
salt = crypted.split('$')[2]
return md5_crypt.md5_crypt(pw, salt) == crypted
if not pwd:
return 0
return crypt.crypt(pw, crypted[:2]) == crypted
class UNIXPasswordDatabase:
credentialInterfaces = IUsernamePassword,
__implements__ = ICredentialsChecker
def requestAvatarId(self, credentials):
if pwd:
try:
cryptedPass = pwd.getpwnam(credentials.username)[1]
except KeyError:
return defer.fail(UnauthorizedLogin())
else:
if cryptedPass not in ['*', 'x'] and \
verifyCryptedPassword(cryptedPass, credentials.password):
return defer.succeed(credentials.username)
if shadow:
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
shadowPass = shadow.getspnam(credentials.username)[1]
except KeyError:
os.setegid(gid)
os.seteuid(uid)
return defer.fail(UnauthorizedLogin())
os.setegid(gid)
os.seteuid(uid)
if verifyCryptedPassword(shadowPass, credentials.password):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
class SSHPublicKeyDatabase:
credentialInterfaces = ISSHPrivateKey,
__implements__ = ICredentialsChecker
def requestAvatarId(self, credentials):
if not self.checkKey(credentials):
return defer.fail(UnauthorizedLogin())
if not credentials.signature:
return defer.fail(error.ValidPublicKey())
else:
try:
pubKey = keys.getPublicKeyObject(data = credentials.blob)
if keys.verifySignature(pubKey, credentials.signature,
credentials.sigData):
return defer.succeed(credentials.username)
except:
pass
return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials):
sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
if sshDir.startswith('~'): # didn't expand
return 0
uid, gid = os.geteuid(), os.getegid()
ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
os.setegid(0)
os.seteuid(0)
os.setegid(ogid)
os.seteuid(ouid)
for name in ['authorized_keys2', 'authorized_keys']:
if not os.path.exists(sshDir+name):
continue
lines = open(sshDir+name).xreadlines()
os.setegid(0)
os.seteuid(0)
os.setegid(gid)
os.seteuid(uid)
for l in lines:
l2 = l.split()
if len(l2) < 2:
continue
try:
if base64.decodestring(l2[1]) == credentials.blob:
return 1
except binascii.Error:
continue
return 0
class PluggableAuthenticationModulesChecker:
__implements__ = ICredentialsChecker
credentialInterfaces = IPluggableAuthenticationModules,
def requestAvatarId(self, credentials):
if not pamauth:
return defer.fail(UnauthorizedLogin())
d = pamauth.pamAuthenticate('ssh', credentials.username,
credentials.pamConversion)
d.addCallback(lambda x: credentials.username)
return d
class SSHProtocolChecker:
__implements__ = ICredentialsChecker
checkers = {}
successfulCredentials = {}
def get_credentialInterfaces(self):
return self.checkers.keys()
credentialInterfaces = property(get_credentialInterfaces)
def registerChecker(self, checker, *credentialInterfaces):
if not credentialInterfaces:
credentialInterfaces = checker.credentialInterfaces
for credentialInterface in credentialInterfaces:
self.checkers[credentialInterface] = checker
def requestAvatarId(self, credentials):
ifac = components.getInterfaces(credentials)
for i in ifac:
c = self.checkers.get(i)
if c is not None:
return c.requestAvatarId(credentials).addCallback(
self._cbGoodAuthentication, credentials)
return defer.fail(UnhandledCredentials("No checker for %s" % \
', '.join(map(reflect.qal, ifac))))
def _cbGoodAuthentication(self, avatarId, credentials):
if avatarId not in self.successfulCredentials:
self.successfulCredentials[avatarId] = []
self.successfulCredentials[avatarId].append(credentials)
if self.areDone(avatarId):
del self.successfulCredentials[avatarId]
return avatarId
else:
raise error.NotEnoughAuthentication()
def areDone(self, avatarId):
"""Override to determine if the authentication is finished for a given
avatarId.
"""
return 1
syntax highlighted by Code2HTML, v. 0.9.1