From 8d7c648cab59b8339afc49ba6b2812d6e9f5f123 Mon Sep 17 00:00:00 2001 From: dnomd343 Date: Tue, 6 Sep 2022 18:00:55 +0800 Subject: [PATCH] feat: network module --- Basis/Functions.py | 46 ---------------------------------- Utils/Common/Network.py | 53 ++++++++++++++++++++++++++++++++++++++++ Utils/Common/__init__.py | 1 + 3 files changed, 54 insertions(+), 46 deletions(-) create mode 100644 Utils/Common/Network.py diff --git a/Basis/Functions.py b/Basis/Functions.py index 7297183..54b68e9 100644 --- a/Basis/Functions.py +++ b/Basis/Functions.py @@ -4,7 +4,6 @@ import re import time import uuid -import psutil import random import hashlib from IPy import IP @@ -110,48 +109,3 @@ def toBool(raw) -> bool: except: raise RuntimeError('Unable convert to bool') - -def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, waitTime: int = 10) -> int: # get available port - if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535: - raise RuntimeError('Invalid port range') - while True: - port = random.randint(rangeStart, rangeEnd) # choose randomly - if checkPortStatus(port): - logging.debug('Get new port -> %i' % port) - return port - time.sleep(waitTime / 1000) # ms -> s (default 10ms) - - -def checkPortStatus(port: int) -> bool: # check if the port is occupied - for connection in networkStatus(): # scan every connections - if connection['local']['port'] == port: # port occupied (whatever ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp) - logging.debug('Check port %i -> occupied' % port) - return False - logging.debug('Check port %i -> available' % port) - return True - - -def networkStatus() -> list: # get all network connections - result = [] - for connection in psutil.net_connections(): - if not connection.family.name.startswith('AF_INET'): # AF_INET / AF_INET6 - continue - if connection.type.name not in ['SOCK_STREAM', 'SOCK_DGRAM']: # TCP / UDP - continue - result.append({ - 'fd': connection.fd, - 'family': 'ipv6' if connection.family.name[-1] == '6' else 'ipv4', # ip version - 'type': 'tcp' if connection.type.name == 'SOCK_STREAM' else 'udp', # tcp or udp - 'local': { # local bind address - 'addr': connection.laddr.ip, - 'port': connection.laddr.port, - }, - 'remote': { # remote address - 'addr': connection.raddr.ip, - 'port': connection.raddr.port, - } if len(connection.raddr) != 0 else None, - 'status': connection.status, - 'pid': connection.pid, # process id - }) - logging.debug('Network status -> found %i connections' % len(result)) - return result diff --git a/Utils/Common/Network.py b/Utils/Common/Network.py new file mode 100644 index 0000000..f786705 --- /dev/null +++ b/Utils/Common/Network.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import psutil +import random +from Basis.Logger import logging + + +def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, msWait: int = 10) -> int: # found a available port + if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535: # port range check + raise RuntimeError('Invalid port range') + while True: + port = random.randint(rangeStart, rangeEnd) # choose randomly + if isVacantPort(port): + logging.debug('Found new available port -> %i' % port) + return port + time.sleep(msWait / 1000) # ms -> s (default 10ms) + + +def isVacantPort(port: int) -> bool: # whether the port is occupied + for connection in networkStatus(): # scan every connections + if connection['local']['port'] == port: # port occupied (ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp) + logging.debug('Check port %i -> occupied' % port) + return False + logging.debug('Check port %i -> available' % port) # vacant port + return True + + +def networkStatus() -> list: # get all network connections + result = [] + for connection in psutil.net_connections(): + if not connection.family.name.startswith('AF_INET'): # AF_INET / AF_INET6 + continue + if connection.type.name not in ['SOCK_STREAM', 'SOCK_DGRAM']: # TCP / UDP + continue + result.append({ + 'fd': connection.fd, + 'family': 'ipv6' if connection.family.name[-1] == '6' else 'ipv4', # ip version + 'type': 'tcp' if connection.type.name == 'SOCK_STREAM' else 'udp', # tcp or udp + 'local': { # local bind address + 'addr': connection.laddr.ip, + 'port': connection.laddr.port, + }, + 'remote': { # remote address + 'addr': connection.raddr.ip, + 'port': connection.raddr.port, + } if len(connection.raddr) != 0 else None, + 'status': connection.status, + 'pid': connection.pid, # process id + }) + logging.debug('Network status -> found %i connections' % len(result)) + return result diff --git a/Utils/Common/__init__.py b/Utils/Common/__init__.py index 3730331..232dfbd 100644 --- a/Utils/Common/__init__.py +++ b/Utils/Common/__init__.py @@ -3,3 +3,4 @@ from Utils.Common.Coding import urlEncode, urlDecode from Utils.Common.Coding import base64Encode, base64Decode +from Utils.Common.Network import isVacantPort, getAvailablePort