跳转至

paramiko

SSH工具

import paramiko
import time


class UseSSH:
    def __init__(self):
        # 创建 SSH 客户端对象
        ssh = paramiko.SSHClient()

        # 自动添加主机密钥
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        # 使用密钥连接 SSH 服务器
        private_key_path = "C:\\Users\\MIBO\\.ssh\\id_rsa"
        ssh.connect(hostname="10.44.3.66", username="root", key_filename=private_key_path)
        self.objssh = ssh

    def exec_cmd(self, cmd):
        # 普通执行命令
        stdin, stdout, stderr = self.objssh.exec_command(cmd)
        output = stdout.read().decode()
        print(output)

    def exec_invoke_cmd(self, k_andv: dict):
        # 创建交互式 shell
        channel = self.objssh.invoke_shell()

        for cmd, out in k_andv.items():
            channel.send(cmd + '\n')

            output = ""
            # 用于检查 SSH 会话中是否有可读取的数据。该方法会在没有可读取数据时阻塞,直到 SSH 会话中有数据可供读取或发生超时为止。
            while not channel.recv_ready():
                time.sleep(0.5)
                continue
            while channel.recv_ready():
                # 不要慌,多等下输出,有些机器反应慢
                time.sleep(0.5)
                output += channel.recv(4096).decode()

            if out in output:
                print(f"** {cmd} - 执行成功")
            else:
                print(f"!! {cmd} - 执行失败")
            print(output)
            print("=" * 30)
        channel.close()

    def __del__(self):
        print("销毁ssh")
        self.objssh.close()


cc = UseSSH()
cc.exec_cmd('ls')

k_andv = {"ls -al": "mibo", "ifconfig": "RUNNING"}
cc.exec_invoke_cmd(k_andv)

代理转发ssh

请求全部通过 主机 10.11.22.31 端口65157 SOCKS5服务转发到 192.168.124.191

import socks
import socket
import paramiko
socks.set_default_proxy(socks.SOCKS5, "10.11.22.31", 65157)
socket.socket = socks.socksocket
hostname = '192.168.124.191'
username = 'root'
password = 'anaxh2wn'
port=22


ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=hostname,port=port,username=username,password=password)
stdin,stdout,stderr=ssh.exec_command('ifconfig')
print(stdout.read().decode('utf-8'))
ssh.close()