Browse Source

update: Tester module

dev
dnomd343 2 years ago
parent
commit
3681e0f7ef
  1. 12
      Tester/Brook.py
  2. 14
      Tester/Hysteria.py
  3. 14
      Tester/Plugin.py
  4. 14
      Tester/Shadowsocks.py
  5. 14
      Tester/ShadowsocksR.py
  6. 16
      Tester/Trojan.py
  7. 14
      Tester/TrojanGo.py
  8. 6
      Tester/V2ray.py
  9. 14
      Tester/VLESS.py
  10. 14
      Tester/VMess.py
  11. 6
      Tester/Xray.py
  12. 42
      Utils/Test.py
  13. 42
      main.py

12
Tester/Brook.py

@ -4,10 +4,10 @@
import copy
import itertools
from Builder import Brook
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import hostFormat, genFlag, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Common import hostFormat, genFlag, getAvailablePort
def originStream(isUot: bool) -> dict:
@ -78,7 +78,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New Brook test -> %s' % testInfo)
logger.debug('New Brook test -> %s' % testInfo)
return testInfo
@ -91,4 +91,4 @@ def load():
addStream(wsStream(isRaw, isSecure)) # websocket stream test
for stream in streams:
yield loadTest(stream)
logging.info('Brook test yield complete')
logger.info('Brook test yield complete')

14
Tester/Hysteria.py

@ -5,11 +5,11 @@ import os
import json
import itertools
from Builder import Hysteria
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import hysteriaProtocols
from Basis.Functions import hostFormat, genFlag, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import hysteriaProtocols
from Utils.Common import hostFormat, genFlag, getAvailablePort
def loadServer(configFile: str, hysteriaConfig: dict) -> Process:
@ -78,11 +78,11 @@ def loadTest(protocol: str, isObfs: bool, isAuth: bool) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New Hysteria test -> %s' % testInfo)
logger.debug('New Hysteria test -> %s' % testInfo)
return testInfo
def load():
for protocol, isObfs, isAuth in itertools.product(hysteriaProtocols, [False, True], [False, True]):
yield loadTest(protocol, isObfs, isAuth)
logging.info('Hysteria test yield complete')
logger.info('Hysteria test yield complete')

14
Tester/Plugin.py

@ -4,11 +4,11 @@
import os
import re
import json
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import Plugins
from Basis.Functions import genFlag, hostFormat, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import Plugins
from Utils.Common import genFlag, hostFormat, getAvailablePort
pluginParams = {}
@ -220,8 +220,8 @@ def cloakLoad() -> None:
pluginParams['CK_PUBLIC'] = re.search(r'\s+(\S+)$', ckKey.split('\n')[0])[1]
pluginParams['CK_PRIVATE'] = re.search(r'\s+(\S+)$', ckKey.split('\n')[1])[1]
pluginParams['CK_UID'] = re.search(r'\s+(\S+)\n', os.popen('ck-server -uid').read())[1] # generate uid for clock
logging.info('generate cloak uid -> %s' % pluginParams['CK_UID'])
logging.info('generate cloak key -> %s (Public) | %s (Private)' % (
logger.info('generate cloak uid -> %s' % pluginParams['CK_UID'])
logger.info('generate cloak key -> %s (Public) | %s (Private)' % (
pluginParams['CK_PUBLIC'], pluginParams['CK_PRIVATE']
))
ckPrefix = 'UID=${CK_UID};PublicKey=${CK_PUBLIC};ServerName=${SITE};' # cloak plugin's basic command

14
Tester/Shadowsocks.py

@ -7,11 +7,11 @@ import base64
import itertools
from Tester import Plugin
from Builder import Shadowsocks
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import md5Sum, genFlag, getAvailablePort
from Basis.Constant import PathEnv, ssMethods, ssAllMethods, mbedtlsMethods
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Common import md5Sum, genFlag, getAvailablePort
from Utils.Constant import PathEnv, ssMethods, ssAllMethods, mbedtlsMethods
def loadConfig(proxyInfo: dict) -> dict: # load basic config option
@ -134,7 +134,7 @@ def loadTest(serverType: str, clientType: str, method: str, plugin: dict or None
}
if plugin is not None:
testInfo['server'] = plugin['inject'](testInfo['server'], plugin)
logging.debug('New Shadowsocks test -> %s' % testInfo)
logger.debug('New Shadowsocks test -> %s' % testInfo)
return testInfo
@ -167,4 +167,4 @@ def load(isExtra: bool = False):
yield next(ssIter)
except StopIteration:
break
logging.info('Shadowsocks test yield complete')
logger.info('Shadowsocks test yield complete')

14
Tester/ShadowsocksR.py

@ -3,12 +3,12 @@
import os
import json
from Basis.Test import Settings
from Builder import ShadowsocksR
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import genFlag, getAvailablePort
from Basis.Constant import ssrMethods, ssrProtocols, ssrObfuscations
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Common import genFlag, getAvailablePort
from Utils.Constant import ssrMethods, ssrProtocols, ssrObfuscations
def loadServer(configFile: str, proxyInfo: dict) -> Process: # load server process
@ -64,7 +64,7 @@ def loadTest(method: str, protocol: str, obfs: str) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New ShadowsocksR test -> %s' % testInfo)
logger.debug('New ShadowsocksR test -> %s' % testInfo)
return testInfo
@ -75,4 +75,4 @@ def load():
yield loadTest('aes-128-ctr', protocol, 'plain')
for obfs in ssrObfuscations:
yield loadTest('aes-128-ctr', 'origin', obfs)
logging.info('ShadowsocksR test yield complete')
logger.info('ShadowsocksR test yield complete')

16
Tester/Trojan.py

@ -5,11 +5,11 @@ import os
import json
from Tester import Xray
from Builder import Trojan
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import xtlsFlows
from Basis.Functions import md5Sum, genFlag, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import xtlsFlows
from Utils.Common import md5Sum, genFlag, getAvailablePort
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process:
@ -80,7 +80,7 @@ def loadBasicTest(tcpTlsStream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New trojan test -> %s' % testInfo)
logger.debug('New trojan test -> %s' % testInfo)
return testInfo
@ -110,7 +110,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New Trojan test -> %s' % testInfo)
logger.debug('New Trojan test -> %s' % testInfo)
return testInfo
@ -119,4 +119,4 @@ def load():
yield loadBasicTest(streams[1]) # Trojan basic test -> TCP stream with TLS
for stream in streams: # test all stream cases
yield loadTest(stream)
logging.info('Trojan test yield complete')
logger.info('Trojan test yield complete')

14
Tester/TrojanGo.py

@ -5,11 +5,11 @@ import os
import json
from Tester import Plugin
from Builder import TrojanGo
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import trojanGoMethods
from Basis.Functions import md5Sum, genFlag, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import trojanGoMethods
from Utils.Common import md5Sum, genFlag, getAvailablePort
def loadServer(configFile: str, proxyInfo: dict) -> Process:
@ -85,7 +85,7 @@ def loadTest(wsObject: dict or None, ssObject: dict or None, plugin: dict or Non
}
if plugin is not None:
testInfo['server'] = plugin['inject'](testInfo['server'], plugin)
logging.debug('New Trojan-Go test -> %s' % testInfo)
logger.debug('New Trojan-Go test -> %s' % testInfo)
return testInfo
@ -103,4 +103,4 @@ def load():
yield loadTest(wsObject, None if ssObject['method'] == '' else ssObject, None)
for plugin in Plugin.load('trojan-go'): # different plugin for trojan-go
yield loadTest(None, None, plugin)
logging.info('Trojan-Go test yield complete')
logger.info('Trojan-Go test yield complete')

6
Tester/V2ray.py

@ -3,9 +3,9 @@
import copy
import itertools
from Basis.Test import Settings
from Basis.Functions import genFlag
from Basis.Constant import quicMethods, udpObfuscations
from Utils.Test import Settings
from Utils.Common import genFlag
from Utils.Constant import quicMethods, udpObfuscations
httpConfig = {
'version': '1.1',

14
Tester/VLESS.py

@ -5,11 +5,11 @@ import os
import json
from Tester import Xray
from Builder import VLESS
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import xtlsFlows
from Basis.Functions import md5Sum, genUUID, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import xtlsFlows
from Utils.Common import md5Sum, genUUID, getAvailablePort
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process:
@ -71,7 +71,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New VLESS test -> %s' % testInfo)
logger.debug('New VLESS test -> %s' % testInfo)
return testInfo
@ -79,4 +79,4 @@ def load():
streams = Xray.loadStream() # load xray-core stream list
for stream in streams: # test all stream cases
yield loadTest(stream)
logging.info('VLESS test yield complete')
logger.info('VLESS test yield complete')

14
Tester/VMess.py

@ -6,11 +6,11 @@ import json
import itertools
from Tester import V2ray
from Builder import VMess
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import PathEnv, vmessMethods
from Basis.Functions import md5Sum, genUUID, getAvailablePort
from Utils.Logger import logger
from Utils.Test import Settings
from Utils.Process import Process
from Utils.Constant import PathEnv, vmessMethods
from Utils.Common import md5Sum, genUUID, getAvailablePort
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict) -> Process: # load server process
@ -69,7 +69,7 @@ def loadTest(method: str, aid: int, stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New VMess test -> %s' % testInfo)
logger.debug('New VMess test -> %s' % testInfo)
return testInfo
@ -79,4 +79,4 @@ def load():
yield loadTest(method, aid, streams[0])
for stream in streams[1:]: # skip first stream that has benn checked
yield loadTest('auto', 0, stream) # aead with auto security
logging.info('VMess test yield complete')
logger.info('VMess test yield complete')

6
Tester/Xray.py

@ -4,9 +4,9 @@
import copy
import itertools
from Tester import V2ray
from Basis.Test import Settings
from Basis.Functions import genFlag
from Basis.Constant import xtlsFlows, quicMethods, udpObfuscations
from Utils.Test import Settings
from Utils.Common import genFlag
from Utils.Constant import xtlsFlows, quicMethods, udpObfuscations
loadConfig = V2ray.loadConfig
tcpStream = V2ray.tcpStream

42
Basis/Test.py → Utils/Test.py

@ -5,9 +5,9 @@ import os
import time
import requests
from threading import Thread
from Basis.Logger import logging
from Basis.Constant import WorkDir, TestHost, TestSite
from Basis.Functions import md5Sum, genFlag, hostFormat, checkPortStatus
from Utils.Logger import logger
from Utils.Constant import WorkDir, TestHost, TestSite
from Utils.Common import md5Sum, genFlag, hostFormat, isVacantPort
Settings = {
'workDir': WorkDir,
@ -27,7 +27,7 @@ def loadBind(serverV6: bool = False, clientV6: bool = False) -> None:
def waitPort(port: int, times: int = 50, delay: int = 100) -> bool: # wait until port occupied
for i in range(times):
if not checkPortStatus(port): # port occupied
if not isVacantPort(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms * 50 => 5s
return False # timeout
@ -35,24 +35,24 @@ def waitPort(port: int, times: int = 50, delay: int = 100) -> bool: # wait unti
def genCert(host: str, certInfo: dict, remark: str = 'ProxyC') -> None: # generate self-signed certificate
certOrgInfo = ['--organization', remark, '--organizationUnit', remark] # organization info
logging.critical('Load self-signed certificate')
logger.critical('Load self-signed certificate')
os.system('mkdir -p %s' % Settings['workDir']) # make sure that work directory exist
# create CA data at first (by mad)
logging.critical('Creating CA certificate and key...')
logger.critical('Creating CA certificate and key...')
os.system(' '.join( # generate CA certificate and privkey
['mad', 'ca', '--ca', certInfo['caCert'], '--key', certInfo['caKey'], '--commonName', remark] + certOrgInfo
))
# generate private key and sign certificate
logging.critical('Signing certificate...')
logger.critical('Signing certificate...')
os.system(' '.join(['mad', 'cert', '--domain', host] + [ # generate certificate and privkey, then signed by CA
'--ca', certInfo['caCert'], '--ca_key', certInfo['caKey'],
'--cert', certInfo['cert'], '--key', certInfo['key'],
] + certOrgInfo))
# install CA certificate and record self-signed cert info
logging.critical('Installing CA certificate...')
logger.critical('Installing CA certificate...')
os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % certInfo['caCert']) # add into system's trust list
@ -70,21 +70,21 @@ def loadCert(host: str = TestHost, certId: str = '') -> None: # load certificat
Settings['host'] = host
Settings['cert'] = certInfo['cert']
Settings['key'] = certInfo['key']
logging.warning('Certificate load complete -> ID = %s' % certId)
logger.warning('Certificate load complete -> ID = %s' % certId)
def httpCheck(socksInfo: dict, url: str, testId: str, timeout: int = 10) -> None:
socksProxy = 'socks5://%s:%i' % (hostFormat(socksInfo['addr'], v6Bracket = True), socksInfo['port'])
try:
logging.debug('[%s] Http request via %s' % (testId, socksProxy))
logger.debug('[%s] Http request via %s' % (testId, socksProxy))
request = requests.get(url, timeout = timeout, proxies = { # http request via socks5
'http': socksProxy,
'https': socksProxy,
})
request.raise_for_status() # throw error when server return 4xx or 5xx (don't actually need)
logging.info('[%s] %s -> ok' % (testId, socksProxy))
logger.info('[%s] %s -> ok' % (testId, socksProxy))
except Exception as exp:
logging.error('[%s] %s -> error\n%s' % (testId, socksProxy, exp)) # show detail of error reason
logger.error('[%s] %s -> error\n%s' % (testId, socksProxy, exp)) # show detail of error reason
raise RuntimeError('Http request via socks5 failed')
@ -93,8 +93,8 @@ def runTest(testInfo: dict, testUrl: str, testSelect: set or None, delay: int =
if testSelect is not None: # testSelect is None -> run all test
if testId not in testSelect: # skip unselected task
return
logging.warning('[%s] %s' % (testId, testInfo['caption'])) # show caption
logging.debug('[%s] Server ID -> %s | Client ID -> %s' % (
logger.warning('[%s] %s' % (testId, testInfo['caption'])) # show caption
logger.debug('[%s] Server ID -> %s | Client ID -> %s' % (
testId, testInfo['server'].id, testInfo['client'].id
))
testInfo['server'].id = testId + '-server'
@ -103,14 +103,14 @@ def runTest(testInfo: dict, testUrl: str, testSelect: set or None, delay: int =
# build server and client and wait them start
testInfo['server'].start() # start test server
if waitPort(testInfo['interface']['port']): # wait for server
logging.debug('[%s] Test server start complete' % testId)
logger.debug('[%s] Test server start complete' % testId)
testInfo['client'].start() # start test client
if waitPort(testInfo['socks']['port']): # wait for client
logging.debug('[%s] Test client start complete' % testId)
logger.debug('[%s] Test client start complete' % testId)
# start test process
try:
logging.debug('[%s] Test process start' % testId)
logger.debug('[%s] Test process start' % testId)
time.sleep(delay) # delay a short time before check
httpCheck(testInfo['socks'], testUrl, testId) # run http request test
testInfo['client'].quit() # clean up client
@ -118,16 +118,16 @@ def runTest(testInfo: dict, testUrl: str, testSelect: set or None, delay: int =
except:
testInfo['client'].quit()
testInfo['server'].quit()
logging.warning('[%s] Client info' % testId)
logging.error('[%(id)s-server]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
logger.warning('[%s] Client info' % testId)
logger.error('[%(id)s-server]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
'id': testId,
'cmd': testInfo['client'].cmd,
'env': testInfo['client'].env,
'file': testInfo['client'].file,
'output': '-' * 96 + '\n' + testInfo['client'].output + '-' * 96,
})
logging.warning('[%s] Server info' % testId)
logging.error('[%(id)s-client]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
logger.warning('[%s] Server info' % testId)
logger.error('[%(id)s-client]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
'id': testId,
'cmd': testInfo['server'].cmd,
'env': testInfo['server'].env,

42
main.py

@ -7,8 +7,8 @@ import time
import _thread
import argparse
import compileall
from Basis import Constant
from Basis.Exception import checkException
from Utils import Constant
from Utils.Exception import checkException
def mainArgParse(rawArgs: list) -> argparse.Namespace:
@ -62,18 +62,18 @@ else:
from Tester import testEntry
from Basis.Check import Check
from Basis import Api, DnsProxy
from Basis.Logger import logging
from Basis.Manager import Manager
from Basis.Test import Test, loadBind, loadCert
from Utils.Check import Check
from Utils import Api, DnsProxy
from Utils.Logger import logger
from Utils.Manager import Manager
from Utils.Test import Test, loadBind, loadCert
from concurrent.futures import ThreadPoolExecutor
def pythonCompile(dirRange: str = '/') -> None: # python optimize compile
for optimize in [-1, 1, 2]:
compileall.compile_dir(dirRange, quiet = 1, optimize = optimize)
logging.warning('Python optimize compile -> %s (level = %i)' % (dirRange, optimize))
logger.warning('Python optimize compile -> %s (level = %i)' % (dirRange, optimize))
def runCheck(taskId: str, taskInfo: dict) -> None:
@ -81,13 +81,13 @@ def runCheck(taskId: str, taskInfo: dict) -> None:
checkResult = {}
try:
checkResult = Check(taskId, taskInfo) # check by task info
logging.warning('[%s] Task finish' % taskId)
logger.warning('[%s] Task finish' % taskId)
except checkException as exp:
success = False
logging.error('[%s] Task error -> %s' % (taskId, exp))
logger.error('[%s] Task error -> %s' % (taskId, exp))
except:
success = False
logging.error('[%s] Task error -> Unknown error' % taskId)
logger.error('[%s] Task error -> Unknown error' % taskId)
finally:
if not success: # got some error in check process
taskInfo.pop('check')
@ -99,12 +99,12 @@ def runCheck(taskId: str, taskInfo: dict) -> None:
def loop(threadNum: int) -> None:
logging.warning('Loop check start -> %i threads' % threadNum)
logger.warning('Loop check start -> %i threads' % threadNum)
threadPool = ThreadPoolExecutor(max_workers = threadNum) # init thread pool
while True:
try:
taskId, taskInfo = Manager.popTask() # pop a task
logging.warning('[%s] Load new task' % taskId)
logger.warning('[%s] Load new task' % taskId)
except: # no more task
time.sleep(2)
continue
@ -114,26 +114,26 @@ def loop(threadNum: int) -> None:
if testMode: # test mode
loadBind(serverV6 = testArgs.ipv6, clientV6 = testArgs.ipv6) # ipv4 / ipv6 (127.0.0.1 / ::1)
loadCert(certId = testArgs.cert) # cert config
logging.critical('TEST ITEM: %s' % testArgs.PROTOCOL)
logging.critical('SELECT: ' + str(testArgs.select))
logging.critical('URL: %s' % testArgs.url)
logging.critical('THREAD NUMBER: %i' % testArgs.thread)
logging.critical('-' * 32 + ' TEST START ' + '-' * 32)
logger.critical('TEST ITEM: %s' % testArgs.PROTOCOL)
logger.critical('SELECT: ' + str(testArgs.select))
logger.critical('URL: %s' % testArgs.url)
logger.critical('THREAD NUMBER: %i' % testArgs.thread)
logger.critical('-' * 32 + ' TEST START ' + '-' * 32)
if testArgs.PROTOCOL == 'all': # run all test items
for item in testEntry:
if item == ('ss' if testArgs.all else 'ss-all'): # skip ss / ss-all
continue
logging.critical('TEST ITEM -> ' + item)
logger.critical('TEST ITEM -> ' + item)
Test(testEntry[item], testArgs.thread, testArgs.url, testArgs.select)
else: # run single item
if testArgs.PROTOCOL == 'ss' and testArgs.all: # test shadowsocks extra items
testItem = 'ss-all'
Test(testEntry[testArgs.PROTOCOL], testArgs.thread, testArgs.url, testArgs.select)
logging.critical('-' * 32 + ' TEST COMPLETE ' + '-' * 32)
logger.critical('-' * 32 + ' TEST COMPLETE ' + '-' * 32)
sys.exit(0) # test complete
logging.warning('ProxyC starts running (%s)' % Constant.Version)
logger.warning('ProxyC starts running (%s)' % Constant.Version)
_thread.start_new_thread(pythonCompile, ('/usr',)) # python compile (generate .pyc file)
_thread.start_new_thread(DnsProxy.start, (Constant.DnsServer, 53)) # start dns server
_thread.start_new_thread(loop, (Constant.CheckThread, )) # start check loop

Loading…
Cancel
Save