1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import re import time from concurrent.futures import ThreadPoolExecutor import paramiko
def get_device_list(filename): """从文本文件读取设备列表,返回由字典组成的列表。 文本内容格式为:ip,用户名,密码,别名,例如: 1.1.1.1 admin admin sw1 1.1.1.2 admin admin sw2 ...... Args: filename ([str]): 文件名称 """ with open(filename, 'r') as f: device_list = [] for line in f.readlines(): ip, username, password, name = line.strip().split() device_list.append( { "ip": ip, "username": username, "password": password, "name": name, } ) return device_list
class NetworkDevice(object): def __init__(self, ip="", username="", password="'", name="", port=22,): self.conn = None if ip: self.ip = ip.strip() elif name: self.name = name.strip() else: raise ValueError("需要设备连接地址(ip 或 别名)") self.port = int(port) self.username = username self.password = password self._open_ssh() def _open_ssh(self): """初始化 SSH 连接,调起一个模拟终端,会话结束前可以一直执行命令。
Raises: e: 抛出 paramiko 连接失败的任何异常
""" ssh_connect_params = { "hostname": self.ip, "port": self.port, "username": self.username, "password": self.password, "look_for_keys": False, "allow_agent": False, "timeout": 5, } conn = paramiko.SSHClient() conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: conn.connect(**ssh_connect_params) except Exception as e: raise e self.conn = conn.invoke_shell(term="vt100", width=500, height=1000) return ""
def exec_cmd(self, cmd, recv_time=3): """登录设备,执行命令
Args: cmd ([type]): 命令字符串 recv_time (int, optional): 读取回显信息的超时时间. Defaults to 3.
Raises: EOFError: 没有任何信息输出,说明连接失败。
Returns: output: """ cmd = cmd.strip() + "\n" self.conn.sendall("screen disable\n") self.conn.sendall(cmd) time.sleep(int(recv_time)) output = self.conn.recv(1024*1024) if len(output) == 0: raise EOFError("连接可能被关闭,没有任何信息输出") return output.decode('utf-8', 'ignore')
dev = { "ip":"192.168.56.21", "username":"netdevops", "password":"[email protected]", "name": "sw1" }
def parse_interface_drop(output): """把设备的输出队列丢包信息解析成累加值 命令及输出示例如下: # [H3C]dis qos queue-statistics interface outbound | in "^ Drop" # Dropped: 0 packets, 0 bytes """ ptn = re.compile(r"\s(\S+):\s+(\d+)\s+(\S+),\s+(\d+)\s+(\S+)") count = 0 for i in ptn.findall(output): count += int(i[1]) return count
def run(cmd, **conn_parms): """登录单台设备,执行指定命令,解析丢包统计 """ sw = NetworkDevice(**conn_parms) output = sw.exec_cmd(cmd) drop_count = parse_interface_drop(output) return "%s %s %s"%( conn_parms.get("name"), conn_parms.get("ip"), drop_count)
if __name__== "__main__": """获取设备列表,使用多线程登录设备获取信息并返回 """ with ThreadPoolExecutor(10) as pool: futures = [] cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"' dev_info = get_device_list("./iplist.txt") for d in dev_info: future = pool.submit(run, cmd, **d) futures.append(future) with open("./drops/%s.log"%time.strftime("%Y%m%d_%H"),'w') as f: for line in futures: f.write(line.result() + "\n")
|