使用多线程操作 paramiko 执行命令

背景

最近公司内网设备版本有些问题,会出现队列丢包现象,由于设备较多,写个脚本定期收集会方便一点,不过内网只有 paramiko,所以简单造了一个轮子,实现多线程并发登录设备执行命令并对结果进行解析。

用这个脚本可以在网络设备上执行任何命令,实现各种功能,而且多线程并发很 Nice!

本次用到的命令及输出示例如下:
# [H3C]dis qos queue-statistics interface outbound | in "^ Drop"
#  Dropped: 0 packets, 0 bytes

这个命令用来收集当前设备上所有接口的所有队列的转发信息累计值(bytes 和 packages)。

主要思路

代码比较简单,主要还是 paramiko 的 SSH 基本用法,这里说一下大概思路:

  1. 把所有设备信息写到一个文本文档里面,简单起见,用的是 txt,把登录信息构建成字典
  2. 写一个类,具有两个功能:初始化 SSH 连接、执行命令
  3. 对本次需求指定的命令及输出结果进行解析,并将结果存入文件
  4. 加入多线程执行,提高效率
  5. 添加 Linux 的 crontab,每小时收集一次信息 (在服务器上做的配置)

代码

代码地址:在线查看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from concurrent.futures import ThreadPoolExecutor
import paramiko


def get_device_list(filename):
"""从文本文件读取设备列表,返回由字典组成的列表。
文本内容格式为:ip,用户名,密码,别名,例如:
1.1.1.1 admin admin sw1
1.1.1.2 admin admin sw2
......

Args:
filename ([str]): 文件名称
"""
with open(filename, 'r') as f:
device_list = []
for line in f.readlines():
ip, username, password, name = line.strip().split()
device_list.append(
{
"ip": ip,
"username": username,
"password": password,
"name": name,
}
)
return device_list

class NetworkDevice(object):
def __init__(self, ip="", username="", password="'", name="", port=22,):
self.conn = None
if ip:
self.ip = ip.strip()
elif name:
self.name = name.strip()
else:
raise ValueError("需要设备连接地址(ip 或 别名)")
self.port = int(port)
self.username = username
self.password = password
self._open_ssh()

def _open_ssh(self):
"""初始化 SSH 连接,调起一个模拟终端,会话结束前可以一直执行命令。

Raises:
e: 抛出 paramiko 连接失败的任何异常

"""
ssh_connect_params = {
"hostname": self.ip,
"port": self.port,
"username": self.username,
"password": self.password,
"look_for_keys": False,
"allow_agent": False,
"timeout": 5, # TCP 连接超时时间
}
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
conn.connect(**ssh_connect_params)
except Exception as e:
raise e
self.conn = conn.invoke_shell(term="vt100", width=500, height=1000)
return ""

def exec_cmd(self, cmd, recv_time=3):
"""登录设备,执行命令

Args:
cmd ([type]): 命令字符串
recv_time (int, optional): 读取回显信息的超时时间. Defaults to 3.

Raises:
EOFError: 没有任何信息输出,说明连接失败。

Returns:
output:
"""
cmd = cmd.strip() + "\n"
self.conn.sendall("screen disable\n")
self.conn.sendall(cmd)
time.sleep(int(recv_time))
output = self.conn.recv(1024*1024)
if len(output) == 0:
raise EOFError("连接可能被关闭,没有任何信息输出")
return output.decode('utf-8', 'ignore')


dev = {
"ip":"192.168.56.21",
"username":"netdevops",
"password":"[email protected]",
"name": "sw1"
}
# sw1 = NetworkDevice(**dev)
# ret = sw1.exec_cmd("dis version")
# print(ret)

def parse_interface_drop(output):
"""把设备的输出队列丢包信息解析成累加值
命令及输出示例如下:
# [H3C]dis qos queue-statistics interface outbound | in "^ Drop"
# Dropped: 0 packets, 0 bytes
"""
ptn = re.compile(r"\s(\S+):\s+(\d+)\s+(\S+),\s+(\d+)\s+(\S+)")
count = 0
for i in ptn.findall(output):
count += int(i[1])
return count

def run(cmd, **conn_parms):
"""登录单台设备,执行指定命令,解析丢包统计
"""
sw = NetworkDevice(**conn_parms)
output = sw.exec_cmd(cmd)
drop_count = parse_interface_drop(output)
return "%s %s %s"%(
conn_parms.get("name"),
conn_parms.get("ip"),
drop_count)

# cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
# ret = run(cmd,**dev)
# print(ret)

if __name__== "__main__":
"""获取设备列表,使用多线程登录设备获取信息并返回
"""
with ThreadPoolExecutor(10) as pool:
futures = []
cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
dev_info = get_device_list("./iplist.txt")
for d in dev_info:
future = pool.submit(run, cmd, **d)
futures.append(future)
# for f in futures:
# print(f.result())
# 根据执行时间把结果写入文件,精确到小时
with open("./drops/%s.log"%time.strftime("%Y%m%d_%H"),'w') as f:
for line in futures:
f.write(line.result() + "\n")