Browse Source

style: CRLF to LF

master
dnomd343 2 years ago
parent
commit
a7dc63bf9d
  1. 130
      Basis/Functions.py
  2. 300
      ProxyTester/VMess.py

130
Basis/Functions.py

@ -1,65 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import psutil
import random
from Basis.Logger import logging
def genFlag(length: int = 12) -> str: # generate random task flag
flag = ''
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
flag += chr(tmp + 87) # a ~ f
else:
flag += str(tmp) # 0 ~ 9
logging.debug('generate new flag -> ' + flag)
return flag
def getAvailablePort(rangeStart: int = 41952, rangeEnd: int = 65535) -> int: # get a available port
if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535:
raise RuntimeError('invalid port range')
while True:
port = random.randint(rangeStart, rangeEnd) # choose randomly
if checkPortStatus(port):
logging.debug('get new port -> %i' % port)
return port
time.sleep(0.1) # wait for 100ms
def checkPortStatus(port: int) -> bool: # check if the port is occupied
logging.debug('check status of port %i -> available' % port)
for connection in networkStatus(): # scan every connections
if connection['local']['port'] == port: # port occupied (whatever ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp)
logging.debug('check status of port %i -> occupied' % port)
return False
return True
def networkStatus() -> list: # get all network connections
result = []
for connection in psutil.net_connections():
if not connection.family.name.startswith('AF_INET'): # AF_INET / AF_INET6
continue
if connection.type.name not in ['SOCK_STREAM', 'SOCK_DGRAM']: # TCP / UDP
continue
result.append({
'fd': connection.fd,
'family': 'ipv6' if connection.family.name[-1] == '6' else 'ipv4', # ip version
'type': 'tcp' if connection.type.name == 'SOCK_STREAM' else 'udp', # tcp or udp
'local': { # local bind address
'addr': connection.laddr.ip,
'port': connection.laddr.port,
},
'remote': { # remote address
'addr': connection.raddr.ip,
'port': connection.raddr.port,
} if len(connection.raddr) != 0 else None,
'status': connection.status,
'pid': connection.pid, # process id
})
logging.debug('get network status -> found %i connections' % len(result))
return result
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import psutil
import random
from Basis.Logger import logging
def genFlag(length: int = 12) -> str: # generate random task flag
flag = ''
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
flag += chr(tmp + 87) # a ~ f
else:
flag += str(tmp) # 0 ~ 9
logging.debug('generate new flag -> ' + flag)
return flag
def getAvailablePort(rangeStart: int = 41952, rangeEnd: int = 65535) -> int: # get a available port
if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535:
raise RuntimeError('invalid port range')
while True:
port = random.randint(rangeStart, rangeEnd) # choose randomly
if checkPortStatus(port):
logging.debug('get new port -> %i' % port)
return port
time.sleep(0.1) # wait for 100ms
def checkPortStatus(port: int) -> bool: # check if the port is occupied
logging.debug('check status of port %i -> available' % port)
for connection in networkStatus(): # scan every connections
if connection['local']['port'] == port: # port occupied (whatever ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp)
logging.debug('check status of port %i -> occupied' % port)
return False
return True
def networkStatus() -> list: # get all network connections
result = []
for connection in psutil.net_connections():
if not connection.family.name.startswith('AF_INET'): # AF_INET / AF_INET6
continue
if connection.type.name not in ['SOCK_STREAM', 'SOCK_DGRAM']: # TCP / UDP
continue
result.append({
'fd': connection.fd,
'family': 'ipv6' if connection.family.name[-1] == '6' else 'ipv4', # ip version
'type': 'tcp' if connection.type.name == 'SOCK_STREAM' else 'udp', # tcp or udp
'local': { # local bind address
'addr': connection.laddr.ip,
'port': connection.laddr.port,
},
'remote': { # remote address
'addr': connection.raddr.ip,
'port': connection.raddr.port,
} if len(connection.raddr) != 0 else None,
'status': connection.status,
'pid': connection.pid, # process id
})
logging.debug('get network status -> found %i connections' % len(result))
return result

300
ProxyTester/VMess.py

@ -1,150 +1,150 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import V2ray
testConfig = {}
vmessMethodList = [
'aes-128-gcm',
'chacha20-poly1305',
'auto',
'none',
'zero',
]
def vmessBasicTest(method: str, alterId: int) -> dict:
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id'],
'alterId': alterId
}
]
}
}
caption = 'VMess method ' + method
if alterId == 0:
envVar = {}
caption += ' (AEAD)'
else:
envVar = {
'v2ray.vmess.aead.forced': 'false'
}
caption += ' (alterId ' + str(alterId) + ')'
return {
'caption': caption,
'proxy': {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'method': method,
'id': testConfig['id'],
'aid': alterId
},
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': envVar
},
'aider': None
}
def loadVmessStream(streamInfo: dict) -> dict:
proxyInfo = {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'id': testConfig['id'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id']
}
]
},
'streamSettings': streamInfo['server']
}
return {
'caption': 'VMess network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
# Basic test
for method in vmessMethodList: # methods and AEAD/MD5+AES test
testList.append(vmessBasicTest(method, 0))
testList.append(vmessBasicTest(method, 64))
# TCP stream
streamInfo = V2ray.loadTcpStream(False, '', '')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadTcpStream(True, config['host'], '/')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# mKCP stream
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadKcpStream(config['passwd'], obfs)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# WebSocket stream
streamInfo = V2ray.loadWsStream(config['host'], config['path'], False)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadWsStream(config['host'], config['path'], True)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# HTTP/2 stream
streamInfo = V2ray.loadH2Stream(config['host'], config['path'])
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# QUIC stream
for method in V2ray.quicMethodList:
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# GRPC stream
streamInfo = V2ray.loadGrpcStream(config['service'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
return testList
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import V2ray
testConfig = {}
vmessMethodList = [
'aes-128-gcm',
'chacha20-poly1305',
'auto',
'none',
'zero',
]
def vmessBasicTest(method: str, alterId: int) -> dict:
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id'],
'alterId': alterId
}
]
}
}
caption = 'VMess method ' + method
if alterId == 0:
envVar = {}
caption += ' (AEAD)'
else:
envVar = {
'v2ray.vmess.aead.forced': 'false'
}
caption += ' (alterId ' + str(alterId) + ')'
return {
'caption': caption,
'proxy': {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'method': method,
'id': testConfig['id'],
'aid': alterId
},
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': envVar
},
'aider': None
}
def loadVmessStream(streamInfo: dict) -> dict:
proxyInfo = {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'id': testConfig['id'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id']
}
]
},
'streamSettings': streamInfo['server']
}
return {
'caption': 'VMess network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
# Basic test
for method in vmessMethodList: # methods and AEAD/MD5+AES test
testList.append(vmessBasicTest(method, 0))
testList.append(vmessBasicTest(method, 64))
# TCP stream
streamInfo = V2ray.loadTcpStream(False, '', '')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadTcpStream(True, config['host'], '/')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# mKCP stream
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadKcpStream(config['passwd'], obfs)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# WebSocket stream
streamInfo = V2ray.loadWsStream(config['host'], config['path'], False)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadWsStream(config['host'], config['path'], True)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# HTTP/2 stream
streamInfo = V2ray.loadH2Stream(config['host'], config['path'])
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# QUIC stream
for method in V2ray.quicMethodList:
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# GRPC stream
streamInfo = V2ray.loadGrpcStream(config['service'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
return testList

Loading…
Cancel
Save