Viewing file: manhole_tap.py (5.24 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" TAP plugin for creating telnet- and ssh-accessible manhole servers.
@author: Jp Calderone """
from zope.interface import implementer
from twisted.internet import protocol from twisted.application import service, strports from twisted.cred import portal, checkers from twisted.python import usage, filepath
from twisted.conch import manhole, manhole_ssh, telnet from twisted.conch.insults import insults from twisted.conch.ssh import keys
class makeTelnetProtocol: def __init__(self, portal): self.portal = portal
def __call__(self): auth = telnet.AuthenticatingTelnetProtocol args = (self.portal,) return telnet.TelnetTransport(auth, *args)
class chainedProtocolFactory: def __init__(self, namespace): self.namespace = namespace
def __call__(self): return insults.ServerProtocol(manhole.ColoredManhole, self.namespace)
@implementer(portal.IRealm) class _StupidRealm: def __init__(self, proto, *a, **kw): self.protocolFactory = proto self.protocolArgs = a self.protocolKwArgs = kw
def requestAvatar(self, avatarId, *interfaces): if telnet.ITelnetProtocol in interfaces: return (telnet.ITelnetProtocol, self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs), lambda: None) raise NotImplementedError()
class Options(usage.Options): optParameters = [ ["telnetPort", "t", None, ("strports description of the address on which to listen for telnet " "connections")], ["sshPort", "s", None, ("strports description of the address on which to listen for ssh " "connections")], ["passwd", "p", "/etc/passwd", "name of a passwd(5)-format username/password file"], ["sshKeyDir", None, "<USER DATA DIR>", "Directory where the autogenerated SSH key is kept."], ["sshKeyName", None, "server.key", "Filename of the autogenerated SSH key."], ["sshKeySize", None, 4096, "Size of the automatically generated SSH key."], ]
def __init__(self): usage.Options.__init__(self) self['namespace'] = None
def postOptions(self): if self['telnetPort'] is None and self['sshPort'] is None: raise usage.UsageError( "At least one of --telnetPort and --sshPort must be specified")
def makeService(options): """ Create a manhole server service.
@type options: L{dict} @param options: A mapping describing the configuration of the desired service. Recognized key/value pairs are::
"telnetPort": strports description of the address on which to listen for telnet connections. If None, no telnet service will be started.
"sshPort": strports description of the address on which to listen for ssh connections. If None, no ssh service will be started.
"namespace": dictionary containing desired initial locals for manhole connections. If None, an empty dictionary will be used.
"passwd": Name of a passwd(5)-format username/password file.
"sshKeyDir": The folder that the SSH server key will be kept in.
"sshKeyName": The filename of the key.
"sshKeySize": The size of the key, in bits. Default is 4096.
@rtype: L{twisted.application.service.IService} @return: A manhole service. """ svc = service.MultiService()
namespace = options['namespace'] if namespace is None: namespace = {}
checker = checkers.FilePasswordDB(options['passwd'])
if options['telnetPort']: telnetRealm = _StupidRealm(telnet.TelnetBootstrapProtocol, insults.ServerProtocol, manhole.ColoredManhole, namespace)
telnetPortal = portal.Portal(telnetRealm, [checker])
telnetFactory = protocol.ServerFactory() telnetFactory.protocol = makeTelnetProtocol(telnetPortal) telnetService = strports.service(options['telnetPort'], telnetFactory) telnetService.setServiceParent(svc)
if options['sshPort']: sshRealm = manhole_ssh.TerminalRealm() sshRealm.chainedProtocolFactory = chainedProtocolFactory(namespace)
sshPortal = portal.Portal(sshRealm, [checker]) sshFactory = manhole_ssh.ConchFactory(sshPortal)
if options['sshKeyDir'] != "<USER DATA DIR>": keyDir = options['sshKeyDir'] else: from twisted.python._appdirs import getDataDirectory keyDir = getDataDirectory()
keyLocation = filepath.FilePath(keyDir).child(options['sshKeyName'])
sshKey = keys._getPersistentRSAKey(keyLocation, int(options['sshKeySize'])) sshFactory.publicKeys[b"ssh-rsa"] = sshKey sshFactory.privateKeys[b"ssh-rsa"] = sshKey
sshService = strports.service(options['sshPort'], sshFactory) sshService.setServiceParent(svc)
return svc
|