python实现ping功能

实验: 实现ping命令

1、实验目的:

  • 要求学生掌握Socket编程技术,以及ICMP协议

2、实验内容:

  • 要求学生掌握利用Socket进行编程的技术
  • 不能采用现有的工具,必须自己一步一步,根据协议进行操作
  • 了解ping报文的格式和步骤,要求符合ICMP协议并组建报文
  • 在一秒钟内,如果收到,则为成功,如果收不到,则失败
  • 必须采用图形界面,查看收到回应的结果
  • 可以通过程序,查看子网中有哪些主机可以ping通

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# encoding:utf-8
import time
import struct
import socket
import select
import sys
import netifaces

from IPy import IP
from tkinter import *
from tkinter import scrolledtext, messagebox
from concurrent.futures import ThreadPoolExecutor

s_list = []
num = 0

def chesksum(data):
'''
校验和
:return: answer
'''
n = len(data)
m = n % 2
sum = 0
for i in range(0, n - m, 2):
# 传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
sum += (data[i]) + ((data[i+1]) << 8)
if m == 1:
sum += (data[-1])
# 将高于16位与低16位相加
sum = (sum >> 16) + (sum & 0xffff)
sum += (sum >> 16) # 如果还有高于16位,将继续与低16位相加
# 取反
answer = ~sum & 0xffff
# 主机字节序转网络字节序列(参考小端序转大端序)
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer

def request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body):
'''
请求报文
:return: icmp_packet
'''
# 把字节打包成二进制数据
# >:big-endian
# B:unsigned char
# H:unsigned short
# 32s:char[32]
fmt = '>BBHHH32s'
icmp_packet = struct.pack(fmt, data_type, data_code,
data_checksum, data_ID, data_Sequence, payload_body)
icmp_chesksum = chesksum(icmp_packet) # 获取校验和
# 把校验和传入,再次打包
icmp_packet = struct.pack(fmt, data_type, data_code,
icmp_chesksum, data_ID, data_Sequence, payload_body)
return icmp_packet

def raw_socket(dst_addr, icmp_packet):
'''
创建套接字,并将数据发送到套接字
:return: send_time, rawsocket
'''
# 实例化一个socket对象,ipv4,原套接字,分配协议端口
rawsocket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
# 记录当前请求时间
send_time = time.time()
# 发送数据到网络
try:
rawsocket.sendto(icmp_packet, (dst_addr, 0))
except:
return -1,rawsocket
# 返回数据
return send_time, rawsocket

def reply_ping(send_time, rawsocket, data_Sequence, dst_addr,timeout=1):
'''
应答报文分析,计算时间
:return: received_time - send_time or -1
'''
while True:
# 实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
what_ready = select.select([rawsocket], [], [], timeout)
# 等待时间
wait_for_time = (time.time() - send_time)
# 没有返回可读的内容,判断超时
if what_ready[0] == []: # Timeout
return -1
# 记录接收时间
received_time = time.time()
# 设置接收的包的字节为1024
received_packet, addr = rawsocket.recvfrom(1024)
# 获取接收包的icmp头
icmpHeader = received_packet[20:28]
# 解包
icmp_type, code, checksum, packet_id, sequence = struct.unpack(">BBHHH", icmpHeader)

if icmp_type == 0 and sequence == data_Sequence and addr[0] == dst_addr:
return received_time - send_time

# 数据包的超时时间判断
if timeout - wait_for_time <= 0:
return -1

def get_host_ip():
'''
查询本机ip地址
:return: ip
'''
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8',0))
ip = s.getsockname()[0]
finally:
s.close()

return ip

def get_netmask():
'''
获取子网掩码
:return: netmask
'''

ip = get_host_ip()
#获取所有本机网络接口
interfaces = netifaces.interfaces()
#查看网络接口参数
for ifc in interfaces:
n = netifaces.ifaddresses(ifc)
if n.get(2):
if n[2][0]['addr'] == ip:
netmask = n[2][0]['netmask']
return netmask

def ping(host):
'''
ping
'''
if len(host) == 0: #如果为空则什么都不做
return
accept, lost = 0, 0
sumtime, shorttime, longtime = 0, 1000, 0
#icmp数据包的构建
data_type = 8 # 类型:ICMP Echo Request——回显请求(Ping请求)
data_code = 0 # 代码:0
data_checksum = 0 # 校验和:先以0代替
data_ID = 0 # Identifier:设置为ping 进程的进程ID
data_Sequence = 1 # Sequence number:每个发送出去的分组递增序列号。
payload_body = b'12345678912345678912345678912' # data:数据部分

# 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
try:
dst_addr = socket.gethostbyname(host)
except:
messagebox.showwarning(title='ERROR', message='错误的域名或IP地址')
return
txt1.insert(END,dst_addr)
root.update()
s="正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host, dst_addr)
txt.insert(END,s)
for i in range(0, 4):
# 请求ping数据包的二进制转换
icmp_packet = request_ping(
data_type, data_code, data_checksum, data_ID, data_Sequence + i, payload_body)
# 连接套接字,并将数据发送到套接字
send_time, rawsocket = raw_socket(dst_addr, icmp_packet)
if send_time == -1: #即raw_socke(dst_addr, icmp_packet)里有报错
messagebox.showwarning(title='ERROR', message='请求的地址无效')
s="\n请求的地址无效\n"
txt.insert(END,s)
root.update()
return
# 数据包传输时间
times = reply_ping(send_time,rawsocket, data_Sequence + i,dst_addr)
if times >= 0:
s="\n来自 {0} 的回复: 字节=32 时间={1}ms".format(dst_addr, int(times*1000))
txt.insert(END,s)
root.update()
accept += 1
return_time = int(times * 1000)
sumtime += return_time
if return_time > longtime:
longtime = return_time
if return_time < shorttime:
shorttime = return_time
time.sleep(0.7)
else:
lost += 1
txt.insert(END,"\n请求超时。")
root.update()
if i+1 == 4:
s="\n{0}的Ping统计信息:\n".format(dst_addr)
txt.insert(END,s)
root.update()
s="\t数据包:已发送={0},接收={1},丢失={2}({3}%丢失),\
\n往返行程的估计时间(以毫秒为单位):\
\n\t最短={4}ms,最长={5}ms,平均={6}ms\n".format(
i + 1, accept, lost, lost / (i + 1) * 100,
shorttime, longtime, sumtime/accept if accept != 0 else 'NaN ')
txt.insert(END,s)
txt1.delete(0.0, END)
root.update()

def test(x):
global s_list
global num
#icmp数据包的构建
data_type = 8 # 类型:ICMP Echo Request——回显请求(Ping请求)
data_code = 0 # 代码:0
data_checksum = 0 # 校验和:先以0代替
data_ID = 0 # Identifier:设置为ping 进程的进程ID
data_Sequence = 1 # Sequence number:每个发送出去的分组递增序列号。
payload_body = b'01234567890123456789012345678901' # data:数据部分
# 请求ping数据包的二进制转换
icmp_packet = request_ping(
data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body)
# 连接套接字,并将数据发送到套接字
send_time, rawsocket = raw_socket(x.strNormal(0), icmp_packet)
# 数据包传输时间
times = reply_ping(send_time,rawsocket, data_Sequence, x.strNormal(0))

print(x.strNormal(0),end ="")
if times >= 0:
num += 1
s="\n{0} :字节=32 时间={1}ms".format(x.strNormal(0), int(times*1000))
s_list.append(s)
print(' success',end ="")
print('')


def find():
global s_list
global num
host_ip = get_host_ip()
netmask = get_netmask()
ip = IP(host_ip).make_net(netmask)
s = ip.strNormal(1)
txt1.insert(END,s)
s="正在进行子网设备发现..."
txt.insert(END,s)
root.update()
s_list = []
num = 0
pool = ThreadPoolExecutor(255)#限制最大线程量为255

for x in ip:
if x != ip[0] and x.strNormal(0) != host_ip and x != ip [ip.len()-1]:
pool.submit(test,x)

pool.shutdown(wait=True)

for s in s_list:
txt.insert(END,s)
s="\n完成.共{0}个.\n".format(num)
txt.insert(END,s)
txt1.delete(0.0, END)
root.update()

def clear():
txt.delete(0.0, END)

def exit():
sys.exit(0)

if __name__ == '__main__':
'''
图形化界面及调用
'''
root = Tk()
root.title('PING TOOL')
root.geometry('720x560')

host_lb = Label(root, text='请输入要ping的主机或域名')
host_lb.place(relx=0.1, rely=0.1, relwidth=0.8, relheight=0.1)
inp1 = Entry(root)
inp1.place(relx=0.1, rely=0.2, relwidth=0.6, relheight=0.1)

ping_btn = Button(root, text='PING', command=lambda: ping(str(inp1.get())))
ping_btn.place(relx=0.7, rely=0.2, relwidth=0.2, relheight=0.1)

s = "本机IP地址:"+ get_host_ip()
ip_lb = Label(root, text=s)
ip_lb.place(relx=0.1, rely=0.3, relwidth=0.3, relheight=0.1)
s = "子网掩码:" + get_netmask()
netmask_lb = Label(root, text=s)
netmask_lb.place(relx=0.4, rely=0.3, relwidth=0.3, relheight=0.1)

find_lb = Label(root, text="正在Ping:")
find_lb.place(relx=0, rely=0.5, relwidth=0.2, relheight=0.1)

txt1 = Text(root)
txt1.place(relx=0.2, rely=0.53, relwidth=0.2, relheight=0.05)

find_btn = Button(root, text='FIND DEVICE IN LAN', command=lambda: find())
find_btn.place(relx=0.2, rely=0.4, relwidth=0.3, relheight=0.1)
clear_btn = Button(root, text='CLEAR', command=clear)
clear_btn.place(relx=0.5, rely=0.4, relwidth=0.3, relheight=0.1)
exit_btn = Button(root, text='EXIT', command=exit)
exit_btn.place(relx=0.5, rely=0.5, relwidth=0.3, relheight=0.1)

txt = scrolledtext.ScrolledText(root)
txt.place(rely = 0.6,relwidth=1.0,relheight =0.4)

root.mainloop()