Browse Source

feat: test process of shadowsocksr

master
dnomd343 2 years ago
parent
commit
9a151cfa41
  1. 8
      Tester/Shadowsocks.py
  2. 82
      Tester/ShadowsocksR.py
  3. 62
      test.py

8
Tester/Shadowsocks.py

@ -136,9 +136,13 @@ def loadTest(serverType: str, clientType: str, method: str) -> dict:
'title': 'Shadowsocks test: {%s <- %s -> %s}' % (serverType, method, clientType),
'client': loadClient(clientType, configName + '_client.json', proxyInfo, socksInfo),
'server': loadServer(serverType, configName + '_server.json', proxyInfo),
'socks': socksInfo, # exposed socks5 interface
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port']
}
}
logging.debug('New shadowsocks test connection -> %s' % testInfo)
logging.debug('New shadowsocks test -> %s' % testInfo)
return testInfo

82
Tester/ShadowsocksR.py

@ -0,0 +1,82 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from Builder import ShadowsocksR
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import genFlag, getAvailablePort
from Basis.Methods import ssrMethods, ssrProtocols, ssrObfuscations
settings = {
'serverBind': '127.0.0.1',
'clientBind': '127.0.0.1', # aka socks5 address
'workDir': '/tmp/ProxyC'
}
def loadServer(configFile: str, proxyInfo: dict) -> Process: # load server process
ssrConfig = {
'server': proxyInfo['server'],
'server_port': proxyInfo['port'], # type -> int
'password': proxyInfo['passwd'],
'method': proxyInfo['method'],
'protocol': proxyInfo['protocol'],
'protocol_param': proxyInfo['protocolParam'],
'obfs': proxyInfo['obfs'],
'obfs_param': proxyInfo['obfsParam']
}
serverFile = os.path.join(settings['workDir'], configFile)
return Process(settings['workDir'], cmd = ['ssr-server', '-vv', '-c', serverFile], file = {
'path': serverFile,
'content': json.dumps(ssrConfig)
}, isStart = False)
def loadClient(configFile: str, proxyInfo: dict, socksInfo: dict) -> Process: # load client process
clientFile = os.path.join(settings['workDir'], configFile)
ssrCommand, ssrConfig, _ = ShadowsocksR.load(proxyInfo, socksInfo, clientFile)
return Process(settings['workDir'], cmd = ssrCommand, file = {
'path': clientFile,
'content': ssrConfig
}, isStart = False)
def loadTest(method: str, protocol: str, obfs: str) -> dict:
proxyInfo = { # connection info
'server': settings['serverBind'],
'port': getAvailablePort(),
'passwd': genFlag(length = 8), # random password
'method': method,
'protocol': protocol,
'protocolParam': '',
'obfs': obfs,
'obfsParam': '',
}
socksInfo = { # socks5 interface for test
'addr': settings['clientBind'],
'port': getAvailablePort()
}
configName = '%s_%s_%s' % (method, protocol, obfs) # prefix of config file name
testInfo = { # release test info
'title': 'ShadowsocksR test: method = %s | protocol = %s | obfs = %s' % (method, protocol, obfs),
'client': loadClient(configName + '_client.json', proxyInfo, socksInfo),
'server': loadServer(configName + '_server.json', proxyInfo),
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port']
}
}
logging.debug('New shadowsocksr test -> %s' % testInfo)
return testInfo
def load():
for method in ssrMethods:
yield loadTest(method, 'origin', 'plain')
for protocol in ssrProtocols:
yield loadTest('aes-128-ctr', protocol, 'plain')
for obfs in ssrObfuscations:
yield loadTest('aes-128-ctr', 'origin', obfs)

62
test.py

@ -5,16 +5,29 @@ import time
import requests
from threading import Thread
from Tester import Shadowsocks
from Tester import ShadowsocksR
from Basis.Logger import logging
from Basis.Functions import checkPortStatus
def waitForStart(port: int, times: int = 100, delay: int = 100) -> bool:
for i in range(times):
if not checkPortStatus(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms
return False # timeout
testDelay = 1 # wait 1s before request test
threadNum = 128 # thread number
def test(testObj: dict) -> None:
logging.warning(testObj['title'])
testObj['client'].start()
testObj['server'].start()
time.sleep(testDelay)
if waitForStart(testObj['socks']['port']):
logging.debug('client start complete')
if waitForStart(testObj['interface']['port']):
logging.debug('server start complete')
logging.debug('start test process')
errFlag = False
try:
request = requests.get(
@ -49,21 +62,36 @@ def test(testObj: dict) -> None:
logging.error('\n' + str(testObj['server'].output))
threads = []
def runTest(testIter: iter, threadNum: int):
threads = []
while True: # infinite loop
try:
for thread in threads:
if thread.is_alive(): continue
threads.remove(thread) # remove dead thread
if len(threads) < threadNum:
for i in range(threadNum - len(threads)): # start threads within limit
thread = Thread(target=test, args=(next(testIter),)) # create new thread
thread.start()
threads.append(thread) # record thread info
time.sleep(0.1)
except StopIteration: # traverse completed
break
for thread in threads: # wait until all threads exit
thread.join()
ss = Shadowsocks.load(isExtra = True)
ssr = ShadowsocksR.load()
logging.critical('test start')
while True:
try:
for i in range(threadNum):
thread = Thread(target = test, args = (next(ss),))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
threads.clear()
except StopIteration:
break
for thread in threads:
thread.join()
runTest(ss, 64)
runTest(ssr, 64)
# ssThread = Thread(target=runTest, args=(ss, 64))
# ssrThread = Thread(target=runTest, args=(ssr, 64))
# ssThread.start()
# ssrThread.start()
# ssThread.join()
# ssrThread.join()
logging.critical('test complete')

Loading…
Cancel
Save