You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
5.9 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import requests
from threading import Thread
from Basis.Logger import logging
from Tester.Settings import Settings
from Basis.Functions import md5Sum, genFlag, hostFormat, checkPortStatus
from Tester import Brook
from Tester import VMess
from Tester import VLESS
from Tester import Trojan
from Tester import TrojanGo
from Tester import Hysteria
from Tester import Shadowsocks
from Tester import ShadowsocksR
entry = {
'ss': Shadowsocks.load(),
'ss-all': Shadowsocks.load(isExtra = True),
'ssr': ShadowsocksR.load(),
'vmess': VMess.load(),
'vless': VLESS.load(),
'trojan': Trojan.load(),
'trojan-go': TrojanGo.load(),
'brook': Brook.load(),
'hysteria': Hysteria.load(),
}
def waitPort(port: int, times: int = 100, delay: int = 100) -> bool: # wait until port occupied
for i in range(times):
if not checkPortStatus(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms
return False # timeout
def httpCheck(socksInfo: dict, url: str, testId: str, timeout: int = 10) -> None:
socksProxy = 'socks5://%s:%i' % (hostFormat(socksInfo['addr'], v6Bracket = True), socksInfo['port'])
try:
proxy = {
'http': socksProxy,
'https': socksProxy,
}
request = requests.get(url, timeout = timeout, proxies = proxy)
request.raise_for_status()
logging.info('[%s] %s -> ok' % (testId, socksProxy))
except Exception as exp:
logging.error('[%s] %s -> error' % (testId, socksProxy))
logging.error('requests exception\n' + str(exp))
raise RuntimeError('socks5 test failed')
def runTest(testInfo: dict, testUrl: str, testFilter: set or None, delay: int = 1) -> None:
testInfo['hash'] = md5Sum(testInfo['caption'])[:12]
if testFilter is not None and testInfo['hash'] not in testFilter: return
logging.warning('[%s] %s' % (testInfo['hash'], testInfo['caption']))
testInfo['server'].start() # start test server
if waitPort(testInfo['interface']['port']): # wait for server
logging.debug('server start complete')
testInfo['client'].start() # start test client
if waitPort(testInfo['socks']['port']): # wait for client
logging.debug('client start complete')
try:
logging.debug('start test process')
time.sleep(delay)
httpCheck(testInfo['socks'], testUrl, testInfo['hash'])
testInfo['client'].quit()
testInfo['server'].quit()
except:
# client debug info
testInfo['client'].quit()
logging.warning('[%s] client info' % testInfo['hash'])
logging.error('command -> %s' % testInfo['client'].cmd)
logging.error('envVar -> %s' % testInfo['client'].env)
logging.error('file -> %s' % testInfo['client'].file)
logging.warning('[%s] client capture output' % testInfo['hash'])
logging.error('\n%s' % testInfo['client'].output)
# server debug info
testInfo['server'].quit()
logging.warning('[%s] server info' % testInfo['hash'])
logging.error('command -> %s' % testInfo['server'].cmd)
logging.error('envVar -> %s' % testInfo['server'].env)
logging.error('file -> %s' % testInfo['server'].file)
logging.warning('[%s] server capture output' % testInfo['hash'])
logging.error('\n%s' % testInfo['server'].output)
def test(testIter: iter, threadNum: int, testUrl: str, testFilter: set or None = None) -> None:
threads = []
while True: # infinite loop
try:
for thread in threads:
if thread.is_alive(): continue
threads.remove(thread) # remove dead thread
if len(threads) < threadNum:
for i in range(threadNum - len(threads)): # start threads within limit
thread = Thread( # create new thread
target = runTest,
args = (next(testIter), testUrl, testFilter)
)
thread.start()
threads.append(thread) # record thread info
time.sleep(0.1)
except StopIteration: # traverse completed
break
for thread in threads: # wait until all threads exit
thread.join()
def loadBind(serverV6: bool = False, clientV6: bool = False) -> None:
Settings['serverBind'] = '::1' if serverV6 else '127.0.0.1'
Settings['clientBind'] = '::1' if clientV6 else '127.0.0.1'
def loadCert(host: str = 'proxyc.net', remark: str = 'ProxyC') -> None:
loadPath = lambda x: os.path.join(Settings['workDir'], x)
certFlag = genFlag(length = 8)
caCert = loadPath('proxyc_%s_ca.pem' % certFlag)
caKey = loadPath('proxyc_%s_ca_key.pem' % certFlag)
cert = loadPath('proxyc_%s_cert.pem' % certFlag)
key = loadPath('proxyc_%s_cert_key.pem' % certFlag)
logging.critical('Load self-signed certificate')
os.system('mkdir -p %s' % Settings['workDir']) # create work directory
logging.critical('Creating CA certificate and key...')
os.system(' '.join(['mad', 'ca'] + [ # generate CA certificate and privkey
'--ca', caCert, '--key', caKey,
'--commonName', remark,
'--organization', remark,
'--organizationUnit', remark,
]))
logging.critical('Signing certificate...')
os.system(' '.join(['mad', 'cert'] + [ # generate certificate and privkey, then signed by CA
'--ca', caCert, '--ca_key', caKey,
'--cert', cert, '--key', key,
'--domain', host,
'--organization', remark,
'--organizationUnit', remark,
]))
logging.critical('Installing CA certificate...')
os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % caCert) # add into system's trust list
Settings['host'] = host
Settings['cert'] = cert
Settings['key'] = key
logging.warning('Certificate loading complete')