368 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			368 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
| """This class extends pexpect.spawn to specialize setting up SSH connections.
 | |
| This adds methods for login, logout, and expecting the shell prompt.
 | |
| 
 | |
| $Id: pxssh.py 513 2008-02-09 18:26:13Z noah $
 | |
| """
 | |
| 
 | |
| from pexpect import *
 | |
| import pexpect
 | |
| import time
 | |
| 
 | |
| __all__ = ['ExceptionPxssh', 'pxssh']
 | |
| 
 | |
| # Exception classes used by this module.
 | |
| 
 | |
| 
 | |
| class ExceptionPxssh(ExceptionPexpect):
 | |
|     """Raised for pxssh exceptions.
 | |
|     """
 | |
| 
 | |
| 
 | |
| class pxssh (spawn):
 | |
| 
 | |
|     """This class extends pexpect.spawn to specialize setting up SSH
 | |
|     connections. This adds methods for login, logout, and expecting the shell
 | |
|     prompt. It does various tricky things to handle many situations in the SSH
 | |
|     login process. For example, if the session is your first login, then pxssh
 | |
|     automatically accepts the remote certificate; or if you have public key
 | |
|     authentication setup then pxssh won't wait for the password prompt.
 | |
| 
 | |
|     pxssh uses the shell prompt to synchronize output from the remote host. In
 | |
|     order to make this more robust it sets the shell prompt to something more
 | |
|     unique than just $ or #. This should work on most Borne/Bash or Csh style
 | |
|     shells.
 | |
| 
 | |
|     Example that runs a few commands on a remote server and prints the result::
 | |
| 
 | |
|         import pxssh
 | |
|         import getpass
 | |
|         try:
 | |
|             s = pxssh.pxssh()
 | |
|             hostname = raw_input('hostname: ')
 | |
|             username = raw_input('username: ')
 | |
|             password = getpass.getpass('password: ')
 | |
|             s.login (hostname, username, password)
 | |
|             s.sendline ('uptime')  # run a command
 | |
|             s.prompt()             # match the prompt
 | |
|             print s.before         # print everything before the prompt.
 | |
|             s.sendline ('ls -l')
 | |
|             s.prompt()
 | |
|             print s.before
 | |
|             s.sendline ('df')
 | |
|             s.prompt()
 | |
|             print s.before
 | |
|             s.logout()
 | |
|         except pxssh.ExceptionPxssh, e:
 | |
|             print "pxssh failed on login."
 | |
|             print str(e)
 | |
| 
 | |
|     Note that if you have ssh-agent running while doing development with pxssh
 | |
|     then this can lead to a lot of confusion. Many X display managers (xdm,
 | |
|     gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
 | |
|     dialog box popup asking for a password during development. You should turn
 | |
|     off any key agents during testing. The 'force_password' attribute will turn
 | |
|     off public key authentication. This will only work if the remote SSH server
 | |
|     is configured to allow password logins. Example of using 'force_password'
 | |
|     attribute::
 | |
| 
 | |
|             s = pxssh.pxssh()
 | |
|             s.force_password = True
 | |
|             hostname = raw_input('hostname: ')
 | |
|             username = raw_input('username: ')
 | |
|             password = getpass.getpass('password: ')
 | |
|             s.login (hostname, username, password)
 | |
|     """
 | |
| 
 | |
|     def __init__(
 | |
|             self,
 | |
|             timeout=30,
 | |
|             maxread=2000,
 | |
|             searchwindowsize=None,
 | |
|             logfile=None,
 | |
|             cwd=None,
 | |
|             env=None):
 | |
|         spawn.__init__(
 | |
|             self,
 | |
|             None,
 | |
|             timeout=timeout,
 | |
|             maxread=maxread,
 | |
|             searchwindowsize=searchwindowsize,
 | |
|             logfile=logfile,
 | |
|             cwd=cwd,
 | |
|             env=env)
 | |
| 
 | |
|         self.name = '<pxssh>'
 | |
| 
 | |
|         # SUBTLE HACK ALERT! Note that the command to set the prompt uses a
 | |
|         # slightly different string than the regular expression to match it. This
 | |
|         # is because when you set the prompt the command will echo back, but we
 | |
|         # don't want to match the echoed command. So if we make the set command
 | |
|         # slightly different than the regex we eliminate the problem. To make the
 | |
|         # set command different we add a backslash in front of $. The $ doesn't
 | |
|         # need to be escaped, but it doesn't hurt and serves to make the set
 | |
|         # prompt command different than the regex.
 | |
| 
 | |
|         # used to match the command-line prompt
 | |
|         self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
 | |
|         self.PROMPT = self.UNIQUE_PROMPT
 | |
| 
 | |
|         # used to set shell command-line prompt to UNIQUE_PROMPT.
 | |
|         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
 | |
|         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
 | |
|         self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
 | |
|         # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
 | |
|         # displaying a GUI password dialog. I have not figured out how to
 | |
|         # disable only SSH_ASKPASS without also disabling X11 forwarding.
 | |
|         # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
 | |
|         #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
 | |
|         self.force_password = False
 | |
|         self.auto_prompt_reset = True
 | |
| 
 | |
|     def levenshtein_distance(self, a, b):
 | |
|         """This calculates the Levenshtein distance between a and b.
 | |
|         """
 | |
| 
 | |
|         n, m = len(a), len(b)
 | |
|         if n > m:
 | |
|             a, b = b, a
 | |
|             n, m = m, n
 | |
|         current = range(n + 1)
 | |
|         for i in range(1, m + 1):
 | |
|             previous, current = current, [i] + [0] * n
 | |
|             for j in range(1, n + 1):
 | |
|                 add, delete = previous[j] + 1, current[j - 1] + 1
 | |
|                 change = previous[j - 1]
 | |
|                 if a[j - 1] != b[i - 1]:
 | |
|                     change = change + 1
 | |
|                 current[j] = min(add, delete, change)
 | |
|         return current[n]
 | |
| 
 | |
|     def sync_original_prompt(self):
 | |
|         """This attempts to find the prompt. Basically, press enter and record
 | |
|         the response; press enter again and record the response; if the two
 | |
|         responses are similar then assume we are at the original prompt. This
 | |
|         is a slow function. It can take over 10 seconds. """
 | |
| 
 | |
|         # All of these timing pace values are magic.
 | |
|         # I came up with these based on what seemed reliable for
 | |
|         # connecting to a heavily loaded machine I have.
 | |
|         # If latency is worse than these values then this will fail.
 | |
| 
 | |
|         try:
 | |
|             # GAS: Clear out the cache before getting the prompt
 | |
|             self.read_nonblocking(size=10000, timeout=1)
 | |
|         except TIMEOUT:
 | |
|             pass
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         x = self.read_nonblocking(size=1000, timeout=1)
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         a = self.read_nonblocking(size=1000, timeout=1)
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         b = self.read_nonblocking(size=1000, timeout=1)
 | |
|         ld = self.levenshtein_distance(a, b)
 | |
|         len_a = len(a)
 | |
|         if len_a == 0:
 | |
|             return False
 | |
|         if float(ld) / len_a < 0.4:
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     # TODO: This is getting messy and I'm pretty sure this isn't perfect.
 | |
|     # TODO: I need to draw a flow chart for this.
 | |
|     def login(
 | |
|             self,
 | |
|             server,
 | |
|             username,
 | |
|             password='',
 | |
|             terminal_type='ansi',
 | |
|             original_prompt=r"[#$]",
 | |
|             login_timeout=10,
 | |
|             port=None,
 | |
|             auto_prompt_reset=True):
 | |
|         """This logs the user into the given server. It uses the
 | |
|         'original_prompt' to try to find the prompt right after login. When it
 | |
|         finds the prompt it immediately tries to reset the prompt to something
 | |
|         more easily matched. The default 'original_prompt' is very optimistic
 | |
|         and is easily fooled. It's more reliable to try to match the original
 | |
|         prompt as exactly as possible to prevent false matches by server
 | |
|         strings such as the "Message Of The Day". On many systems you can
 | |
|         disable the MOTD on the remote server by creating a zero-length file
 | |
|         called "~/.hushlogin" on the remote server. If a prompt cannot be found
 | |
|         then this will not necessarily cause the login to fail. In the case of
 | |
|         a timeout when looking for the prompt we assume that the original
 | |
|         prompt was so weird that we could not match it, so we use a few tricks
 | |
|         to guess when we have reached the prompt. Then we hope for the best and
 | |
|         blindly try to reset the prompt to something more unique. If that fails
 | |
|         then login() raises an ExceptionPxssh exception.
 | |
| 
 | |
|         In some situations it is not possible or desirable to reset the
 | |
|         original prompt. In this case, set 'auto_prompt_reset' to False to
 | |
|         inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
 | |
|         uses a unique prompt in the prompt() method. If the original prompt is
 | |
|         not reset then this will disable the prompt() method unless you
 | |
|         manually set the PROMPT attribute. """
 | |
| 
 | |
|         ssh_options = '-q'
 | |
|         if self.force_password:
 | |
|             ssh_options = ssh_options + ' ' + self.SSH_OPTS
 | |
|         if port is not None:
 | |
|             ssh_options = ssh_options + ' -p %s' % (str(port))
 | |
|         cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
 | |
| 
 | |
|         # This does not distinguish between a remote server 'password' prompt
 | |
|         # and a local ssh 'passphrase' prompt (for unlocking a private key).
 | |
|         spawn._spawn(self, cmd)
 | |
|         i = self.expect(
 | |
|             [
 | |
|                 "(?i)are you sure you want to continue connecting",
 | |
|                 original_prompt,
 | |
|                 "(?i)(?:password)|(?:passphrase for key)",
 | |
|                 "(?i)permission denied",
 | |
|                 "(?i)terminal type",
 | |
|                 TIMEOUT,
 | |
|                 "(?i)connection closed by remote host"],
 | |
|             timeout=login_timeout)
 | |
| 
 | |
|         # First phase
 | |
|         if i == 0:
 | |
|             # New certificate -- always accept it.
 | |
|             # This is what you get if SSH does not have the remote host's
 | |
|             # public key stored in the 'known_hosts' cache.
 | |
|             self.sendline("yes")
 | |
|             i = self.expect(
 | |
|                 [
 | |
|                     "(?i)are you sure you want to continue connecting",
 | |
|                     original_prompt,
 | |
|                     "(?i)(?:password)|(?:passphrase for key)",
 | |
|                     "(?i)permission denied",
 | |
|                     "(?i)terminal type",
 | |
|                     TIMEOUT])
 | |
|         if i == 2:  # password or passphrase
 | |
|             self.sendline(password)
 | |
|             i = self.expect(
 | |
|                 [
 | |
|                     "(?i)are you sure you want to continue connecting",
 | |
|                     original_prompt,
 | |
|                     "(?i)(?:password)|(?:passphrase for key)",
 | |
|                     "(?i)permission denied",
 | |
|                     "(?i)terminal type",
 | |
|                     TIMEOUT])
 | |
|         if i == 4:
 | |
|             self.sendline(terminal_type)
 | |
|             i = self.expect(
 | |
|                 [
 | |
|                     "(?i)are you sure you want to continue connecting",
 | |
|                     original_prompt,
 | |
|                     "(?i)(?:password)|(?:passphrase for key)",
 | |
|                     "(?i)permission denied",
 | |
|                     "(?i)terminal type",
 | |
|                     TIMEOUT])
 | |
| 
 | |
|         # Second phase
 | |
|         if i == 0:
 | |
|             # This is weird. This should not happen twice in a row.
 | |
|             self.close()
 | |
|             raise ExceptionPxssh(
 | |
|                 'Weird error. Got "are you sure" prompt twice.')
 | |
|         elif i == 1:  # can occur if you have a public key pair set to authenticate.
 | |
|             # TODO: May NOT be OK if expect() got tricked and matched a false
 | |
|             # prompt.
 | |
|             pass
 | |
|         elif i == 2:  # password prompt again
 | |
|             # For incorrect passwords, some ssh servers will
 | |
|             # ask for the password again, others return 'denied' right away.
 | |
|             # If we get the password prompt again then this means
 | |
|             # we didn't get the password right the first time.
 | |
|             self.close()
 | |
|             raise ExceptionPxssh('password refused')
 | |
|         elif i == 3:  # permission denied -- password was bad.
 | |
|             self.close()
 | |
|             raise ExceptionPxssh('permission denied')
 | |
|         elif i == 4:  # terminal type again? WTF?
 | |
|             self.close()
 | |
|             raise ExceptionPxssh(
 | |
|                 'Weird error. Got "terminal type" prompt twice.')
 | |
|         elif i == 5:  # Timeout
 | |
|             # This is tricky... I presume that we are at the command-line prompt.
 | |
|             # It may be that the shell prompt was so weird that we couldn't match
 | |
|             # it. Or it may be that we couldn't log in for some other reason. I
 | |
|             # can't be sure, but it's safe to guess that we did login because if
 | |
|             # I presume wrong and we are not logged in then this should be caught
 | |
|             # later when I try to set the shell prompt.
 | |
|             pass
 | |
|         elif i == 6:  # Connection closed by remote host
 | |
|             self.close()
 | |
|             raise ExceptionPxssh('connection closed')
 | |
|         else:  # Unexpected
 | |
|             self.close()
 | |
|             raise ExceptionPxssh('unexpected login response')
 | |
|         if not self.sync_original_prompt():
 | |
|             self.close()
 | |
|             raise ExceptionPxssh('could not synchronize with original prompt')
 | |
|         # We appear to be in.
 | |
|         # set shell prompt to something unique.
 | |
|         if auto_prompt_reset:
 | |
|             if not self.set_unique_prompt():
 | |
|                 self.close()
 | |
|                 raise ExceptionPxssh(
 | |
|                     'could not set shell prompt\n' + self.before)
 | |
|         return True
 | |
| 
 | |
|     def logout(self):
 | |
|         """This sends exit to the remote shell. If there are stopped jobs then
 | |
|         this automatically sends exit twice. """
 | |
| 
 | |
|         self.sendline("exit")
 | |
|         index = self.expect([EOF, "(?i)there are stopped jobs"])
 | |
|         if index == 1:
 | |
|             self.sendline("exit")
 | |
|             self.expect(EOF)
 | |
|         self.close()
 | |
| 
 | |
|     def prompt(self, timeout=20):
 | |
|         """This matches the shell prompt. This is little more than a short-cut
 | |
|         to the expect() method. This returns True if the shell prompt was
 | |
|         matched. This returns False if there was a timeout. Note that if you
 | |
|         called login() with auto_prompt_reset set to False then you should have
 | |
|         manually set the PROMPT attribute to a regex pattern for matching the
 | |
|         prompt. """
 | |
| 
 | |
|         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
 | |
|         if i == 1:
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def set_unique_prompt(self):
 | |
|         """This sets the remote prompt to something more unique than # or $.
 | |
|         This makes it easier for the prompt() method to match the shell prompt
 | |
|         unambiguously. This method is called automatically by the login()
 | |
|         method, but you may want to call it manually if you somehow reset the
 | |
|         shell prompt. For example, if you 'su' to a different user then you
 | |
|         will need to manually reset the prompt. This sends shell commands to
 | |
|         the remote host to set the prompt, so this assumes the remote host is
 | |
|         ready to receive commands.
 | |
| 
 | |
|         Alternatively, you may use your own prompt pattern. Just set the PROMPT
 | |
|         attribute to a regular expression that matches it. In this case you
 | |
|         should call login() with auto_prompt_reset=False; then set the PROMPT
 | |
|         attribute. After that the prompt() method will try to match your prompt
 | |
|         pattern."""
 | |
| 
 | |
|         self.sendline("unset PROMPT_COMMAND")
 | |
|         self.sendline(self.PROMPT_SET_SH)  # sh-style
 | |
|         i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
 | |
|         if i == 0:  # csh-style
 | |
|             self.sendline(self.PROMPT_SET_CSH)
 | |
|             i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
 | |
|             if i == 0:
 | |
|                 return False
 | |
|         return True
 | |
| 
 | |
| # vi:ts=4:sw=4:expandtab:ft=python:
 |