接上一篇用Python写一个UDP端口测试工具(一)

需求

最近有个运维需求,需要测试客户端的UDP端口与服务器的连通性。

需求也很简单:客户端测往服务端发UDP包,服务端收到包后响应客户端,当客户端能收到服务端的响应则可断定端口是可达的。但是服务端需要测试的端口有很多,需要一款工具来实现。

思路

虽然nc等工具可以测试端口,但是面对多端口测试场景,就显得捉襟见肘了,因此就想到使用Python的socket编程来自己写一个工具来实现这个功能。

具体的思路如下:

  1. 起一个TCP线程,用于客户端、服务端之间协商需要测试的端口
  2. 起一个UDP线程,用于测试端口是否可达
  3. 由客户端指定需要测试哪些端口,用逗号分开端口号,端口范围使用“-”或“:”连接符指定

show you the code

服务端

#!/usr/bin/env python                                                                                                                                                                                                                                                         
#!/usr/libexec/platform-python
# encoding: utf-8
import os
import signal
import socket
import time
from threading import Thread, Event
import json
try:
    import queue
except ImportError:
    import Queue as queue


class TCPThread(Thread):
    def __init__(self, queue, event):
        Thread.__init__(self)
        self.queue = queue
        self.event = event

    def run(self):
        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.bind(('0.0.0.0', 4000))
        sock.listen(1)
        while True:
            conn, addr = sock.accept()
            while True:
                data_recv = conn.recv(65535)
                if data_recv == b'':
                    conn.close()
                    break
                try:
                    data_json = json.loads(data_recv.decode())
                except json.decoder.JSONDecodeError:
                    print("json encode error")
                    continue
                """ 合法数据
                {
                    "proto": "udpping",
                    "port": number,
                    "available": False,
                    "ready": False
                }
                """
                if "proto" in data_json and "port" in data_json:
                    # 判断UDP本地端口是否可用
                    try:
                        us = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
                        us.bind(('0.0.0.0', data_json['port']))
                        us.close()
                        data_json["available"] = True
                        conn.sendall(json.dumps(data_json).encode())
                        self.queue.put(data_json)
                        self.event.wait()  # --> 等待事件
                        self.event.clear()  # -->清除事件,以方便下次读取
                    except socket.error:
                        print("port %s unavailable" % data_json['port'])
                        conn.sendall(json.dumps(data_json).encode())
                        pass
                else:
                    print("proto error")
                    continue


class UDPThread(Thread):
    def __init__(self, queue, event):
        Thread.__init__(self)
        self.queue = queue
        self.event = event

    def run(self):

        while True:
            try:
                data = self.queue.get(timeout=3)
            except queue.Empty:
                continue
            sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
            sock.bind(('0.0.0.0', data['port']))
            sock.setblocking(False)
            timewait = 0
            while True:
                if timewait > 3:
                    break
                try:
                    data_recv, addr = sock.recvfrom(65535)
                    sock.sendto(data_recv, addr)
                    break
                except Exception:
                    time.sleep(0.01)
                    timewait += 0.01
            self.event.set()  # --> 发送通知事件,通知TCP线程开始干活


class Utils(object):
    @staticmethod
    def signal_handler(signal, frame):
        os._exit(0)


if __name__ == '__main__':
    signal.signal(signal.SIGINT, Utils.signal_handler)
    q = queue.Queue()
    event = Event()
    tcpthread = TCPThread(q, event)
    udpthread = UDPThread(q, event)

    tcpthread.start()
    udpthread.start()

客户端

#!/usr/libexec/platform-python
# encoding: utf-8
import re
import signal
import socket
from threading import Thread, Event
import json
import string
import random
import os
import sys
try:
    import queue
except ImportError:
    import Queue as queue


class TCPThread(Thread):
    def __init__(self, queue, tevent):
        Thread.__init__(self)
        self.queue = queue
        self.event = tevent
        self.server = '127.0.0.1'
        self.port = 4000
        self.proto_json = {
                "proto": "udpping",
                "server": "127.0.0.1",
                "port": 4000,
                "available": False,
            }
        self.testportlist = []

    def setserver(self, server, port):
        self.server = server
        self.port = port
        self.proto_json['server'] = server

    def _setport(self, port):
        try:
            port = int(port)
            if 0 < port < 65535:
                self.testportlist.append(int(port))
        except ValueError:
            print("%s not a number, ignore" % port)

    def _setportrange(self, portrange):
        _start, _end = re.split(':|-', portrange)
        try:
            _start = int(_start)
            _end = int(_end)
            if _start < _end and 0 < _start < 65535 and 0 < _end < 65535:
                self.testportlist.extend(list(range(int(_start), int(_end) + 1)))
            else:
                print("%s is a illegal port range, ignore" % portrange)
        except ValueError:
            print("%s or %s not a number, ignore %s" % (_start, _end, portrange))

    def settestports(self, port):
        if len(port.split(',')) > 1:
            for _slice in port.split(','):
                if len(re.split('-|:', _slice)) == 2:
                    self._setportrange(_slice)
                else:
                    self._setport(_slice)
        elif len(re.split('-|:', port)) == 2:
            self._setportrange(port)
        else:
            self._setport(port)

    def run(self):
        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.connect((self.server, self.port))

        while True:
            try:
                testport = self.testportlist.pop(0)
            except IndexError:
                break
            self.proto_json["port"] = testport
            data_send = json.dumps(self.proto_json).encode()
            sock.send(data_send)
            data_recv = sock.recv(65535)
            data_json = json.loads(data_recv.decode())
            if data_json["available"]:
                self.queue.put(data_json)
                self.event.wait()  # --> 等待UDP线程给事件通知
                self.proto_json["port"] += 1
                self.event.clear()  # -->清除事件,以方便下次读取
            else:
                self.proto_json["port"] += 1


class UDPThread(Thread):
    def __init__(self, queue, tevent):
        Thread.__init__(self)
        self.queue = queue
        self.event = tevent

    def run(self):
        while True:
            try:
                data = self.queue.get(timeout=3)
            except queue.Empty:
                # 队列为空,说明TCP线程已经执行完毕
                break
            sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
            payload = Utils.random_string(64)
            sock.sendto(payload.encode(), (data["server"], data["port"]))
            retrans = 0
            timeout = 0.5
            while True:
                if retrans >= 3:
                    print("port %s unreachable" % data["port"])
                    break
                sock.settimeout(timeout)
                try:
                    data_recv, addr = sock.recvfrom(65535)
                    if data_recv == payload.encode() and addr[1] == data["port"]:
                        break
                except socket.timeout:
                    sock.sendto(payload.encode(), (data["server"], data["port"]))
                    retrans += 1
                except Exception as e:
                    continue
            self.event.set()  # --> 发送通知事件,通知TCPThread干活


class Utils(object):
    @staticmethod
    def random_string(length):
        return ''.join(random.choice(string.ascii_letters + string.digits) for m in range(length))

    @staticmethod
    def signal_handler(signal, frame):
        os._exit(0)


def h():
    print(""" usage:""")
    print("""   this_program <dest_ip> <dest_port> <test_ports>""")

    print('')
    print(" examples:")
    print("   ./udpclient.py 192.168.1.1 4000 '5000,6000,7000-8000'")
    print('')


if __name__ == '__main__':
    if len(sys.argv) != 3 and len(sys.argv) != 4:
        h()
        exit()
    signal.signal(signal.SIGINT, Utils.signal_handler)

    server = socket.gethostbyname(sys.argv[1])
    port = int(sys.argv[2])
    testports = sys.argv[3]

    q = queue.Queue()
    event = Event()
    tcpthread = TCPThread(q, event)
    tcpthread.setserver(server, port)
    tcpthread.settestports(testports)
    udpthread = UDPThread(q, event)

    tcpthread.start()
    udpthread.start()
    tcpthread.join()
    udpthread.join()

    print('')

已知问题

  1. 程序一旦执行,不能使用Ctrl+C来停止,因为使用了多线程,不接收Ctrl+C信号,有解决方法。又不是不能用,就这样吧
  2. 服务端缺少重发机制,当服务端发给客户端的回应包丢包,则显示这个端口不可达(客户端有重发机制,当客户端到服务端的包丢包,会重发3次)

标签: none

添加新评论