312 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			312 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
| """This class extends pexpect.spawn to specialize setting up SSH connections.
 | |
| This adds methods for login, logout, and expecting the shell prompt.
 | |
| 
 | |
| $Id: pxssh.py 513 2008-02-09 18:26:13Z noah $
 | |
| """
 | |
| 
 | |
| from pexpect import *
 | |
| import pexpect
 | |
| import time
 | |
| 
 | |
| __all__ = ['ExceptionPxssh', 'pxssh']
 | |
| 
 | |
| # Exception classes used by this module.
 | |
| class ExceptionPxssh(ExceptionPexpect):
 | |
|     """Raised for pxssh exceptions.
 | |
|     """
 | |
| 
 | |
| class pxssh (spawn):
 | |
| 
 | |
|     """This class extends pexpect.spawn to specialize setting up SSH
 | |
|     connections. This adds methods for login, logout, and expecting the shell
 | |
|     prompt. It does various tricky things to handle many situations in the SSH
 | |
|     login process. For example, if the session is your first login, then pxssh
 | |
|     automatically accepts the remote certificate; or if you have public key
 | |
|     authentication setup then pxssh won't wait for the password prompt.
 | |
| 
 | |
|     pxssh uses the shell prompt to synchronize output from the remote host. In
 | |
|     order to make this more robust it sets the shell prompt to something more
 | |
|     unique than just $ or #. This should work on most Borne/Bash or Csh style
 | |
|     shells.
 | |
| 
 | |
|     Example that runs a few commands on a remote server and prints the result::
 | |
|         
 | |
|         import pxssh
 | |
|         import getpass
 | |
|         try:                                                            
 | |
|             s = pxssh.pxssh()
 | |
|             hostname = raw_input('hostname: ')
 | |
|             username = raw_input('username: ')
 | |
|             password = getpass.getpass('password: ')
 | |
|             s.login (hostname, username, password)
 | |
|             s.sendline ('uptime')  # run a command
 | |
|             s.prompt()             # match the prompt
 | |
|             print s.before         # print everything before the prompt.
 | |
|             s.sendline ('ls -l')
 | |
|             s.prompt()
 | |
|             print s.before
 | |
|             s.sendline ('df')
 | |
|             s.prompt()
 | |
|             print s.before
 | |
|             s.logout()
 | |
|         except pxssh.ExceptionPxssh, e:
 | |
|             print "pxssh failed on login."
 | |
|             print str(e)
 | |
| 
 | |
|     Note that if you have ssh-agent running while doing development with pxssh
 | |
|     then this can lead to a lot of confusion. Many X display managers (xdm,
 | |
|     gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
 | |
|     dialog box popup asking for a password during development. You should turn
 | |
|     off any key agents during testing. The 'force_password' attribute will turn
 | |
|     off public key authentication. This will only work if the remote SSH server
 | |
|     is configured to allow password logins. Example of using 'force_password'
 | |
|     attribute::
 | |
| 
 | |
|             s = pxssh.pxssh()
 | |
|             s.force_password = True
 | |
|             hostname = raw_input('hostname: ')
 | |
|             username = raw_input('username: ')
 | |
|             password = getpass.getpass('password: ')
 | |
|             s.login (hostname, username, password)
 | |
|     """
 | |
| 
 | |
|     def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
 | |
|         spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
 | |
| 
 | |
|         self.name = '<pxssh>'
 | |
|         
 | |
|         #SUBTLE HACK ALERT! Note that the command to set the prompt uses a
 | |
|         #slightly different string than the regular expression to match it. This
 | |
|         #is because when you set the prompt the command will echo back, but we
 | |
|         #don't want to match the echoed command. So if we make the set command
 | |
|         #slightly different than the regex we eliminate the problem. To make the
 | |
|         #set command different we add a backslash in front of $. The $ doesn't
 | |
|         #need to be escaped, but it doesn't hurt and serves to make the set
 | |
|         #prompt command different than the regex.
 | |
| 
 | |
|         # used to match the command-line prompt
 | |
|         self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
 | |
|         self.PROMPT = self.UNIQUE_PROMPT
 | |
| 
 | |
|         # used to set shell command-line prompt to UNIQUE_PROMPT.
 | |
|         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
 | |
|         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
 | |
|         self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
 | |
|         # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
 | |
|         # displaying a GUI password dialog. I have not figured out how to
 | |
|         # disable only SSH_ASKPASS without also disabling X11 forwarding.
 | |
|         # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
 | |
|         #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
 | |
|         self.force_password = False
 | |
|         self.auto_prompt_reset = True 
 | |
| 
 | |
|     def levenshtein_distance(self, a,b):
 | |
| 
 | |
|         """This calculates the Levenshtein distance between a and b.
 | |
|         """
 | |
| 
 | |
|         n, m = len(a), len(b)
 | |
|         if n > m:
 | |
|             a,b = b,a
 | |
|             n,m = m,n
 | |
|         current = range(n+1)
 | |
|         for i in range(1,m+1):
 | |
|             previous, current = current, [i]+[0]*n
 | |
|             for j in range(1,n+1):
 | |
|                 add, delete = previous[j]+1, current[j-1]+1
 | |
|                 change = previous[j-1]
 | |
|                 if a[j-1] != b[i-1]:
 | |
|                     change = change + 1
 | |
|                 current[j] = min(add, delete, change)
 | |
|         return current[n]
 | |
| 
 | |
|     def sync_original_prompt (self):
 | |
| 
 | |
|         """This attempts to find the prompt. Basically, press enter and record
 | |
|         the response; press enter again and record the response; if the two
 | |
|         responses are similar then assume we are at the original prompt. This
 | |
|         is a slow function. It can take over 10 seconds. """
 | |
| 
 | |
|         # All of these timing pace values are magic.
 | |
|         # I came up with these based on what seemed reliable for
 | |
|         # connecting to a heavily loaded machine I have.
 | |
|         # If latency is worse than these values then this will fail.
 | |
| 
 | |
|         try:
 | |
|             self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt
 | |
|         except TIMEOUT:
 | |
|             pass
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         x = self.read_nonblocking(size=1000,timeout=1)
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         a = self.read_nonblocking(size=1000,timeout=1)
 | |
|         time.sleep(0.1)
 | |
|         self.sendline()
 | |
|         time.sleep(0.5)
 | |
|         b = self.read_nonblocking(size=1000,timeout=1)
 | |
|         ld = self.levenshtein_distance(a,b)
 | |
|         len_a = len(a)
 | |
|         if len_a == 0:
 | |
|             return False
 | |
|         if float(ld)/len_a < 0.4:
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
 | |
|     ### TODO: I need to draw a flow chart for this.
 | |
|     def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True):
 | |
| 
 | |
|         """This logs the user into the given server. It uses the
 | |
|         'original_prompt' to try to find the prompt right after login. When it
 | |
|         finds the prompt it immediately tries to reset the prompt to something
 | |
|         more easily matched. The default 'original_prompt' is very optimistic
 | |
|         and is easily fooled. It's more reliable to try to match the original
 | |
|         prompt as exactly as possible to prevent false matches by server
 | |
|         strings such as the "Message Of The Day". On many systems you can
 | |
|         disable the MOTD on the remote server by creating a zero-length file
 | |
|         called "~/.hushlogin" on the remote server. If a prompt cannot be found
 | |
|         then this will not necessarily cause the login to fail. In the case of
 | |
|         a timeout when looking for the prompt we assume that the original
 | |
|         prompt was so weird that we could not match it, so we use a few tricks
 | |
|         to guess when we have reached the prompt. Then we hope for the best and
 | |
|         blindly try to reset the prompt to something more unique. If that fails
 | |
|         then login() raises an ExceptionPxssh exception.
 | |
|         
 | |
|         In some situations it is not possible or desirable to reset the
 | |
|         original prompt. In this case, set 'auto_prompt_reset' to False to
 | |
|         inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
 | |
|         uses a unique prompt in the prompt() method. If the original prompt is
 | |
|         not reset then this will disable the prompt() method unless you
 | |
|         manually set the PROMPT attribute. """
 | |
| 
 | |
|         ssh_options = '-q'
 | |
|         if self.force_password:
 | |
|             ssh_options = ssh_options + ' ' + self.SSH_OPTS
 | |
|         if port is not None:
 | |
|             ssh_options = ssh_options + ' -p %s'%(str(port))
 | |
|         cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
 | |
| 
 | |
|         # This does not distinguish between a remote server 'password' prompt
 | |
|         # and a local ssh 'passphrase' prompt (for unlocking a private key).
 | |
|         spawn._spawn(self, cmd)
 | |
|         i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
 | |
| 
 | |
|         # First phase
 | |
|         if i==0: 
 | |
|             # New certificate -- always accept it.
 | |
|             # This is what you get if SSH does not have the remote host's
 | |
|             # public key stored in the 'known_hosts' cache.
 | |
|             self.sendline("yes")
 | |
|             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
 | |
|         if i==2: # password or passphrase
 | |
|             self.sendline(password)
 | |
|             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
 | |
|         if i==4:
 | |
|             self.sendline(terminal_type)
 | |
|             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
 | |
| 
 | |
|         # Second phase
 | |
|         if i==0:
 | |
|             # This is weird. This should not happen twice in a row.
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
 | |
|         elif i==1: # can occur if you have a public key pair set to authenticate. 
 | |
|             ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
 | |
|             pass
 | |
|         elif i==2: # password prompt again
 | |
|             # For incorrect passwords, some ssh servers will
 | |
|             # ask for the password again, others return 'denied' right away.
 | |
|             # If we get the password prompt again then this means
 | |
|             # we didn't get the password right the first time. 
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('password refused')
 | |
|         elif i==3: # permission denied -- password was bad.
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('permission denied')
 | |
|         elif i==4: # terminal type again? WTF?
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
 | |
|         elif i==5: # Timeout
 | |
|             #This is tricky... I presume that we are at the command-line prompt.
 | |
|             #It may be that the shell prompt was so weird that we couldn't match
 | |
|             #it. Or it may be that we couldn't log in for some other reason. I
 | |
|             #can't be sure, but it's safe to guess that we did login because if
 | |
|             #I presume wrong and we are not logged in then this should be caught
 | |
|             #later when I try to set the shell prompt.
 | |
|             pass
 | |
|         elif i==6: # Connection closed by remote host
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('connection closed')
 | |
|         else: # Unexpected 
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('unexpected login response')
 | |
|         if not self.sync_original_prompt():
 | |
|             self.close()
 | |
|             raise ExceptionPxssh ('could not synchronize with original prompt')
 | |
|         # We appear to be in.
 | |
|         # set shell prompt to something unique.
 | |
|         if auto_prompt_reset:
 | |
|             if not self.set_unique_prompt():
 | |
|                 self.close()
 | |
|                 raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
 | |
|         return True
 | |
| 
 | |
|     def logout (self):
 | |
| 
 | |
|         """This sends exit to the remote shell. If there are stopped jobs then
 | |
|         this automatically sends exit twice. """
 | |
| 
 | |
|         self.sendline("exit")
 | |
|         index = self.expect([EOF, "(?i)there are stopped jobs"])
 | |
|         if index==1:
 | |
|             self.sendline("exit")
 | |
|             self.expect(EOF)
 | |
|         self.close()
 | |
| 
 | |
|     def prompt (self, timeout=20):
 | |
| 
 | |
|         """This matches the shell prompt. This is little more than a short-cut
 | |
|         to the expect() method. This returns True if the shell prompt was
 | |
|         matched. This returns False if there was a timeout. Note that if you
 | |
|         called login() with auto_prompt_reset set to False then you should have
 | |
|         manually set the PROMPT attribute to a regex pattern for matching the
 | |
|         prompt. """
 | |
| 
 | |
|         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
 | |
|         if i==1:
 | |
|             return False
 | |
|         return True
 | |
|         
 | |
|     def set_unique_prompt (self):
 | |
| 
 | |
|         """This sets the remote prompt to something more unique than # or $.
 | |
|         This makes it easier for the prompt() method to match the shell prompt
 | |
|         unambiguously. This method is called automatically by the login()
 | |
|         method, but you may want to call it manually if you somehow reset the
 | |
|         shell prompt. For example, if you 'su' to a different user then you
 | |
|         will need to manually reset the prompt. This sends shell commands to
 | |
|         the remote host to set the prompt, so this assumes the remote host is
 | |
|         ready to receive commands.
 | |
| 
 | |
|         Alternatively, you may use your own prompt pattern. Just set the PROMPT
 | |
|         attribute to a regular expression that matches it. In this case you
 | |
|         should call login() with auto_prompt_reset=False; then set the PROMPT
 | |
|         attribute. After that the prompt() method will try to match your prompt
 | |
|         pattern."""
 | |
| 
 | |
|         self.sendline ("unset PROMPT_COMMAND")
 | |
|         self.sendline (self.PROMPT_SET_SH) # sh-style
 | |
|         i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
 | |
|         if i == 0: # csh-style
 | |
|             self.sendline (self.PROMPT_SET_CSH)
 | |
|             i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
 | |
|             if i == 0:
 | |
|                 return False
 | |
|         return True
 | |
| 
 | |
| # vi:ts=4:sw=4:expandtab:ft=python:
 |