2020年3月

接上一篇用Python写一个UDP端口测试工具(一)

需求

最近有个运维需求,需要测试客户端的UDP端口与服务器的连通性。

需求也很简单:客户端测往服务端发UDP包,服务端收到包后响应客户端,当客户端能收到服务端的响应则可断定端口是可达的。但是服务端需要测试的端口有很多,需要一款工具来实现。

思路

虽然nc等工具可以测试端口,但是面对多端口测试场景,就显得捉襟见肘了,因此就想到使用Python的socket编程来自己写一个工具来实现这个功能。

具体的思路如下:

  1. 起一个TCP线程,用于客户端、服务端之间协商需要测试的端口
  2. 起一个UDP线程,用于测试端口是否可达
  3. 由客户端指定需要测试哪些端口,用逗号分开端口号,端口范围使用“-”或“:”连接符指定

show you the code

服务端

#!/usr/bin/env python                                                                                                                                                                                                                                                         
#!/usr/libexec/platform-python
# encoding: utf-8
import os
import signal
import socket
import time
from threading import Thread, Event
import json
try:
    import queue
except ImportError:
    import Queue as queue


class TCPThread(Thread):
    def __init__(self, queue, event):
        Thread.__init__(self)
        self.queue = queue
        self.event = event

    def run(self):
        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.bind(('0.0.0.0', 4000))
        sock.listen(1)
        while True:
            conn, addr = sock.accept()
            while True:
                data_recv = conn.recv(65535)
                if data_recv == b'':
                    conn.close()
                    break
                try:
                    data_json = json.loads(data_recv.decode())
                except json.decoder.JSONDecodeError:
                    print("json encode error")
                    continue
                """ 合法数据
                {
                    "proto": "udpping",
                    "port": number,
                    "available": False,
                    "ready": False
                }
                """
                if "proto" in data_json and "port" in data_json:
                    # 判断UDP本地端口是否可用
                    try:
                        us = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
                        us.bind(('0.0.0.0', data_json['port']))
                        us.close()
                        data_json["available"] = True
                        conn.sendall(json.dumps(data_json).encode())
                        self.queue.put(data_json)
                        self.event.wait()  # --> 等待事件
                        self.event.clear()  # -->清除事件,以方便下次读取
                    except socket.error:
                        print("port %s unavailable" % data_json['port'])
                        conn.sendall(json.dumps(data_json).encode())
                        pass
                else:
                    print("proto error")
                    continue


class UDPThread(Thread):
    def __init__(self, queue, event):
        Thread.__init__(self)
        self.queue = queue
        self.event = event

    def run(self):

        while True:
            try:
                data = self.queue.get(timeout=3)
            except queue.Empty:
                continue
            sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
            sock.bind(('0.0.0.0', data['port']))
            sock.setblocking(False)
            timewait = 0
            while True:
                if timewait > 3:
                    break
                try:
                    data_recv, addr = sock.recvfrom(65535)
                    sock.sendto(data_recv, addr)
                    break
                except Exception:
                    time.sleep(0.01)
                    timewait += 0.01
            self.event.set()  # --> 发送通知事件,通知TCP线程开始干活


class Utils(object):
    @staticmethod
    def signal_handler(signal, frame):
        os._exit(0)


if __name__ == '__main__':
    signal.signal(signal.SIGINT, Utils.signal_handler)
    q = queue.Queue()
    event = Event()
    tcpthread = TCPThread(q, event)
    udpthread = UDPThread(q, event)

    tcpthread.start()
    udpthread.start()

客户端

#!/usr/libexec/platform-python
# encoding: utf-8
import re
import signal
import socket
from threading import Thread, Event
import json
import string
import random
import os
import sys
try:
    import queue
except ImportError:
    import Queue as queue


class TCPThread(Thread):
    def __init__(self, queue, tevent):
        Thread.__init__(self)
        self.queue = queue
        self.event = tevent
        self.server = '127.0.0.1'
        self.port = 4000
        self.proto_json = {
                "proto": "udpping",
                "server": "127.0.0.1",
                "port": 4000,
                "available": False,
            }
        self.testportlist = []

    def setserver(self, server, port):
        self.server = server
        self.port = port
        self.proto_json['server'] = server

    def _setport(self, port):
        try:
            port = int(port)
            if 0 < port < 65535:
                self.testportlist.append(int(port))
        except ValueError:
            print("%s not a number, ignore" % port)

    def _setportrange(self, portrange):
        _start, _end = re.split(':|-', portrange)
        try:
            _start = int(_start)
            _end = int(_end)
            if _start < _end and 0 < _start < 65535 and 0 < _end < 65535:
                self.testportlist.extend(list(range(int(_start), int(_end) + 1)))
            else:
                print("%s is a illegal port range, ignore" % portrange)
        except ValueError:
            print("%s or %s not a number, ignore %s" % (_start, _end, portrange))

    def settestports(self, port):
        if len(port.split(',')) > 1:
            for _slice in port.split(','):
                if len(re.split('-|:', _slice)) == 2:
                    self._setportrange(_slice)
                else:
                    self._setport(_slice)
        elif len(re.split('-|:', port)) == 2:
            self._setportrange(port)
        else:
            self._setport(port)

    def run(self):
        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.connect((self.server, self.port))

        while True:
            try:
                testport = self.testportlist.pop(0)
            except IndexError:
                break
            self.proto_json["port"] = testport
            data_send = json.dumps(self.proto_json).encode()
            sock.send(data_send)
            data_recv = sock.recv(65535)
            data_json = json.loads(data_recv.decode())
            if data_json["available"]:
                self.queue.put(data_json)
                self.event.wait()  # --> 等待UDP线程给事件通知
                self.proto_json["port"] += 1
                self.event.clear()  # -->清除事件,以方便下次读取
            else:
                self.proto_json["port"] += 1


class UDPThread(Thread):
    def __init__(self, queue, tevent):
        Thread.__init__(self)
        self.queue = queue
        self.event = tevent

    def run(self):
        while True:
            try:
                data = self.queue.get(timeout=3)
            except queue.Empty:
                # 队列为空,说明TCP线程已经执行完毕
                break
            sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
            payload = Utils.random_string(64)
            sock.sendto(payload.encode(), (data["server"], data["port"]))
            retrans = 0
            timeout = 0.5
            while True:
                if retrans >= 3:
                    print("port %s unreachable" % data["port"])
                    break
                sock.settimeout(timeout)
                try:
                    data_recv, addr = sock.recvfrom(65535)
                    if data_recv == payload.encode() and addr[1] == data["port"]:
                        break
                except socket.timeout:
                    sock.sendto(payload.encode(), (data["server"], data["port"]))
                    retrans += 1
                except Exception as e:
                    continue
            self.event.set()  # --> 发送通知事件,通知TCPThread干活


class Utils(object):
    @staticmethod
    def random_string(length):
        return ''.join(random.choice(string.ascii_letters + string.digits) for m in range(length))

    @staticmethod
    def signal_handler(signal, frame):
        os._exit(0)


def h():
    print(""" usage:""")
    print("""   this_program <dest_ip> <dest_port> <test_ports>""")

    print('')
    print(" examples:")
    print("   ./udpclient.py 192.168.1.1 4000 '5000,6000,7000-8000'")
    print('')


if __name__ == '__main__':
    if len(sys.argv) != 3 and len(sys.argv) != 4:
        h()
        exit()
    signal.signal(signal.SIGINT, Utils.signal_handler)

    server = socket.gethostbyname(sys.argv[1])
    port = int(sys.argv[2])
    testports = sys.argv[3]

    q = queue.Queue()
    event = Event()
    tcpthread = TCPThread(q, event)
    tcpthread.setserver(server, port)
    tcpthread.settestports(testports)
    udpthread = UDPThread(q, event)

    tcpthread.start()
    udpthread.start()
    tcpthread.join()
    udpthread.join()

    print('')

已知问题

  1. 程序一旦执行,不能使用Ctrl+C来停止,因为使用了多线程,不接收Ctrl+C信号,有解决方法。又不是不能用,就这样吧
  2. 服务端缺少重发机制,当服务端发给客户端的回应包丢包,则显示这个端口不可达(客户端有重发机制,当客户端到服务端的包丢包,会重发3次)

需求

最近有个运维需求,需要测试客户端的UDP端口与服务器的连通性。

需求也很简单:客户端测往服务端发UDP包,服务端收到包后响应客户端,当客户端能收到服务端的响应则可断定端口是可达的。但是客户端需要测试的端口范有很多,是个很大的端口范围。

思路

虽然nc等工具可以测试端口,但是面对多端口测试场景,就显得捉襟见肘了,因此就想到使用Python的socket编程来自己写一个工具来实现这个功能。

具体思路如下:

  1. 由客户端指定本地端口范围,也可以不指定,不指定则交由系统使用随机端口
  2. 当指定了本地端口,循环结束后停止运行,并输出统计结果
  3. 服务端为固定端口
  4. 如果需要服务端也需要变更为端口范围,需要有一个额外的线程来控制协商端口号,因为涉及到系统可能会存在端口被占用、端口不通等异常情况需要处理,因此简单的一个循环不能解决问题,可以参考下一篇用Python写一个UDP端口测试工具(二)

show you the code

服务端

#!/usr/bin/env python                                                                                                                                                                                                                                                         

from __future__ import print_function

import socket
import sys
import signal
import os

def h():
    print(""" usage:""")
    print("""   this_program <listen_port>""")
    print("""   listen_port is not specified, default is 4000""")

    print()
    print(" examples:")
    print("   ./udpserver.py 4000")
    print()

def signal_handler(signal, frame):
    os._exit(0)

signal.signal(signal.SIGINT, signal_handler)

if len(sys.argv)>1:
    try:
        port = int(sys.argv[1])
    except:
        h()
        exit()
else:
    port = 4000

sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))

while True:
    recv_data,addr = sock.recvfrom(65536)
    sock.sendto(recv_data, addr)

客户端

#!/usr/bin/env python

from __future__ import print_function

import socket
import sys
import time
import string
import random
import signal
import sys
import os

INTERVAL = 1000  #unit ms
LEN =64
IP=""
PORT=0
SRC_PORT_RANGE=False

count=0
count_of_received=0
rtt_sum=0.0
rtt_min=99999999.0
rtt_max=0.0

def signal_handler(signal, frame):
    if count!=0 and count_of_received!=0:
        print('')
        print('--- ping statistics ---')
    if count!=0:
        print('%d packets transmitted, %d received, %.2f%% packet loss'%(count,count_of_received, (count-count_of_received)*100.0/count))
    if count_of_received!=0:
        print('rtt min/avg/max = %.2f/%.2f/%.2f ms'%(rtt_min,rtt_sum/count_of_received,rtt_max))
    os._exit(0)

def random_string(length):
    return ''.join(random.choice(string.ascii_letters+ string.digits ) for m in range(length))

def h():
    print(""" usage:""")
    print("""   this_program <dest_ip> <dest_port>""")
    print("""   this_program <dest_ip> <dest_port> "<options>" """)

    print()
    print(""" options:""")
    print("""   LEN             the length of payload, unit:byte""")
    print("""   INTERVAL        the seconds waited between sending each packet, as well as the timeout for reply packet, unit: ms""")
    print("""   SRC_PORT_RANGE  the source port range, will be stoped when end of loop""")

    print()
    print(" examples:")
    print("   ./udpclient.py 44.55.66.77 4000")
    print('   ./udpclient.py 44.55.66.77 4000 "LEN=400;INTERVAL=2000;SRC_PORT_RANGE=\'20000:30000\'"')
    print("   ./udpclient.py fe80::5400:ff:aabb:ccdd 4000")
    print()

if len(sys.argv) != 3 and len(sys.argv)!=4 :
    h()
    exit()

IP=socket.gethostbyname(sys.argv[1])
PORT=int(sys.argv[2])

is_ipv6=0;

if IP.find(":")!=-1:
    is_ipv6=1;

if len(sys.argv)==4:
    try:
        exec(sys.argv[3])
    except:
        h()
        exit()

signal.signal(signal.SIGINT, signal_handler)

print("UDPping %s via port %d with %d bytes of payload"% (IP,PORT,LEN))
sys.stdout.flush()

if SRC_PORT_RANGE:
    start, end = SRC_PORT_RANGE.split(':')
    start = int(start)
    end = int(end)

while True:
    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    if SRC_PORT_RANGE:
        if start > end:
            signal_handler(None,None)
            break
        try:
            sock.bind(('0.0.0.0', start))
        except:
            print("%s already in use" % start)
            start += 1
            continue
        start += 1
    payload= random_string(LEN)
    sock.sendto(payload.encode(), (IP, PORT))
    time_of_send=time.time()
    deadline = time.time() + INTERVAL/1000.0
    received=0
    rtt=0.0
    retrans=0
    timeout=0.5

    while True:
        if retrans >= 3:
            print("%s packet loss, retrans %s times, " % (time.strftime("%Y-%m-%d %H:%M:%S"), retrans),"%s:%s" % sock.getsockname(), "-> %s:%s" % (IP, PORT))
            sys.stdout.flush()
            break

        sock.settimeout(timeout)
        
        try:
            recv_data,addr = sock.recvfrom(65536)
            if recv_data== payload.encode() and addr[1]==PORT:
                rtt=((time.time()-time_of_send)*1000)
                print("%s:%s"%sock.getsockname(),"-> %s:%s"%(IP, PORT),"Reply from","%s:%s"%(IP,PORT),"seq=%d"%count, "time=%.2f"%(rtt),"ms")
                sys.stdout.flush()
                received=1
                break
        except socket.timeout:
            sock.sendto(payload.encode(), (IP, PORT))
            retrans+=1
            timeout=timeout*2
        except Exception as e:
            pass
    count+=    1
    if received==1:
        count_of_received+=1
        rtt_sum+=rtt
        rtt_max=max(rtt_max,rtt)
        rtt_min=min(rtt_min,rtt)
    else:
        #print("Request timed out")
        sys.stdout.flush()

    time_remaining=deadline-time.time()
    if(time_remaining>0):
        time.sleep(time_remaining)

参考项目:https://github.com/wangyu-/UDPping/

简介

抓包是分析网络协议、问题排查利器,tcpdump是Linux下的一款抓包工具。

分片

首先了解一个概念:MTU(最大传输单元),MTU是定义网络中报文的最大尺寸。如果报文大小超过MTU,则网络栈/网卡会自动将包拆分成多个分片进行发送,保证每个分片都小于MTU。

TCP协议会在三次握手时协商确认MSS(最大报文段长度),MSS选项是TCP协议定义的一个选项,MSS选项用于在TCP连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度。因此TCP报文通常不会被分片。

UDP协议面向无连接,没有协商等机制,依赖MTU来进行分片传输。当发送的数据大于MTU时,由网络栈/网卡会自动将包拆分成多个分片进行发送。

- 阅读剩余部分 -