mirror of https://github.com/dnomd343/ProxyC
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
157 lines
4.7 KiB
157 lines
4.7 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import re
|
|
import time
|
|
import uuid
|
|
import psutil
|
|
import random
|
|
import hashlib
|
|
from IPy import IP
|
|
from Basis.Logger import logging
|
|
|
|
|
|
def isIpAddr(ipAddr: str) -> bool:
|
|
try:
|
|
if '/' in ipAddr: # filter CIDR
|
|
return False
|
|
if '.' not in ipAddr and ':' not in ipAddr: # not IPv4 or IPv6
|
|
return False
|
|
IP(ipAddr) # try to convert to IP address
|
|
return True # valid IP address
|
|
except:
|
|
return False
|
|
|
|
|
|
def isDomain(domain: str) -> bool:
|
|
try:
|
|
domainRegex = r'^(?=^.{3,255}$)[a-zA-Z0-9_][a-zA-Z0-9_-]{0,62}(\.[a-zA-Z0-9_][a-zA-Z0-9_-]{0,62})+$'
|
|
return re.search(domainRegex, domain) is not None # regex matching
|
|
except: # unexpected error
|
|
return False
|
|
|
|
|
|
def isHost(host: str) -> bool:
|
|
return isIpAddr(host) or isDomain(host) # IPv4 / IPv6 / Domain
|
|
|
|
|
|
def isPort(port: int) -> bool:
|
|
if type(port) != int:
|
|
return False
|
|
return port in range(1, 65536) # 1 ~ 65535
|
|
|
|
|
|
def md5Sum(data: str, encode: str = 'utf-8') -> str:
|
|
return hashlib.md5(data.encode(encoding = encode)).hexdigest() # MD5 hash
|
|
|
|
|
|
def hostFormat(host: str, v6Bracket: bool = False) -> str:
|
|
try:
|
|
ip = IP(host)
|
|
if v6Bracket and ip.version() == 6:
|
|
return '[%s]' % str(ip) # [IPv6]
|
|
return str(ip) # IPv4 / IPV6
|
|
except: # not ip address
|
|
return host
|
|
|
|
|
|
def v6AddBracket(host: str) -> str: # add bracket for ipv6
|
|
return hostFormat(host, v6Bracket = True)
|
|
|
|
|
|
def genFlag(length: int = 12) -> str: # generate random task flag
|
|
flag = ''
|
|
for i in range(0, length):
|
|
tmp = random.randint(0, 15)
|
|
if tmp >= 10:
|
|
flag += chr(tmp + 87) # a ~ f
|
|
else:
|
|
flag += str(tmp) # 0 ~ 9
|
|
logging.debug('Generate new flag -> ' + flag)
|
|
return flag
|
|
|
|
|
|
def genUUID() -> str: # generate uuid v5
|
|
return str(uuid.uuid5(
|
|
uuid.NAMESPACE_DNS, genFlag(length = 16)
|
|
))
|
|
|
|
|
|
def toInt(raw) -> int:
|
|
try:
|
|
return int(raw)
|
|
except:
|
|
raise RuntimeError('Unable convert to int')
|
|
|
|
|
|
def toStr(raw) -> str:
|
|
if raw is None:
|
|
raise RuntimeError('None could not convert to str')
|
|
if isinstance(raw, bytes): # bytes -> str
|
|
return str(raw, encoding = 'utf-8')
|
|
try:
|
|
return str(raw)
|
|
except:
|
|
raise RuntimeError('Unable convert to str')
|
|
|
|
|
|
def toStrTidy(raw) -> str:
|
|
return toStr(raw).strip().lower() # with trim and lower
|
|
|
|
|
|
def toBool(raw) -> bool:
|
|
if isinstance(raw, (bool, int, float)):
|
|
return bool(raw)
|
|
try:
|
|
raw = toStr(raw).strip().lower()
|
|
if raw in ['true', 'false']:
|
|
return True if raw == 'true' else False
|
|
return int(raw) != 0
|
|
except:
|
|
raise RuntimeError('Unable convert to bool')
|
|
|
|
|
|
def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, waitTime: int = 10) -> int: # get available port
|
|
if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535:
|
|
raise RuntimeError('Invalid port range')
|
|
while True:
|
|
port = random.randint(rangeStart, rangeEnd) # choose randomly
|
|
if checkPortStatus(port):
|
|
logging.debug('Get new port -> %i' % port)
|
|
return port
|
|
time.sleep(waitTime / 1000) # ms -> s (default 10ms)
|
|
|
|
|
|
def checkPortStatus(port: int) -> bool: # check if the port is occupied
|
|
for connection in networkStatus(): # scan every connections
|
|
if connection['local']['port'] == port: # port occupied (whatever ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp)
|
|
logging.debug('Check port %i -> occupied' % port)
|
|
return False
|
|
logging.debug('Check port %i -> available' % port)
|
|
return True
|
|
|
|
|
|
def networkStatus() -> list: # get all network connections
|
|
result = []
|
|
for connection in psutil.net_connections():
|
|
if not connection.family.name.startswith('AF_INET'): # AF_INET / AF_INET6
|
|
continue
|
|
if connection.type.name not in ['SOCK_STREAM', 'SOCK_DGRAM']: # TCP / UDP
|
|
continue
|
|
result.append({
|
|
'fd': connection.fd,
|
|
'family': 'ipv6' if connection.family.name[-1] == '6' else 'ipv4', # ip version
|
|
'type': 'tcp' if connection.type.name == 'SOCK_STREAM' else 'udp', # tcp or udp
|
|
'local': { # local bind address
|
|
'addr': connection.laddr.ip,
|
|
'port': connection.laddr.port,
|
|
},
|
|
'remote': { # remote address
|
|
'addr': connection.raddr.ip,
|
|
'port': connection.raddr.port,
|
|
} if len(connection.raddr) != 0 else None,
|
|
'status': connection.status,
|
|
'pid': connection.pid, # process id
|
|
})
|
|
logging.debug('Network status -> found %i connections' % len(result))
|
|
return result
|
|
|