Browse Source

refactor: new ProxyBuilder

master
Dnomd343 3 years ago
parent
commit
3f1f3223d7
  1. 3
      .gitignore
  2. 471
      ProxyBuilder/Shadowsocks.py
  3. 114
      ProxyBuilder/ShadowsocksR.py
  4. 102
      ProxyBuilder/builder.py

3
.gitignore

@ -1 +1,2 @@
**/__pycache__
**/__pycache__/
/.idea/

471
ProxyBuilder/Shadowsocks.py

@ -1,185 +1,226 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
"""
# Shadowsocks构建器
## 节点格式
```json
{
"server": "...",
"port": ...,
"method": "...",
"passwd": "...",
"plugin": pluginObject
}
```
+ server (str) -> 必选, 服务器地址 (IPv4 / IPv6 / Domain)
+ port (int) -> 必选, 服务器端口 (1 ~ 65535)
+ method (str) -> 必选, Shadowsocks加密方式 (`aes-128-gcm`,`aes-192-gcm`,`aes-256-gcm`,
`aes-128-ctr`,`aes-192-ctr`,`aes-256-ctr`,
`aes-128-ocb`,`aes-192-ocb`,`aes-256-ocb`,
`aes-128-ofb`,`aes-192-ofb`,`aes-256-ofb`,
`aes-128-cfb`,`aes-192-cfb`,`aes-256-cfb`,
`aes-128-cfb1`,`aes-192-cfb1`,`aes-256-cfb1`,
`aes-128-cfb8`,`aes-192-cfb8`,`aes-256-cfb8`,
`aes-128-cfb128`,`aes-192-cfb128`,`aes-256-cfb128`,
`camellia-128-cfb`,`camellia-192-cfb`,`camellia-256-cfb`,
`camellia-128-cfb128`,`camellia-192-cfb128`,`camellia-256-cfb128`,
`plain`,`none`,`table`,`rc4`,`rc4-md5`,`rc2-cfb`,`bf-cfb`,
`cast5-cfb`,`des-cfb`,`idea-cfb`,`seed-cfb`,`salsa20`,`salsa20-ctr`,
`xchacha20`,`chacha20`,`chacha20-ietf`,`chacha20-poly1305`,
`chacha20-ietf-poly1305`,`xchacha20-ietf-poly1305`)
+ passwd (str) -> 必选, Shadowsocks连接密码
+ plugin (pluginObject) -> 必选, Shadowsocks连接插件
### pluginObject (None / dict)
无插件: None
带插件:
```json
{
"type": "...",
"param": "..."
}
```
+ type (str) -> 插件名称
+ param (str) -> 插件参数
"""
import json
ssMethodList = { # shadowsocks各版本支持的加密方式
"ss-python": [
"aes-128-gcm",
"aes-192-gcm",
"aes-256-gcm",
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-ocb",
"aes-192-ocb",
"aes-256-ocb",
"aes-128-ofb",
"aes-192-ofb",
"aes-256-ofb",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"aes-128-cfb1",
"aes-192-cfb1",
"aes-256-cfb1",
"aes-128-cfb8",
"aes-192-cfb8",
"aes-256-cfb8",
"aes-128-cfb128",
"aes-192-cfb128",
"aes-256-cfb128",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"camellia-128-cfb128",
"camellia-192-cfb128",
"camellia-256-cfb128",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"xchacha20",
"chacha20",
"chacha20-ietf",
"chacha20-poly1305",
"chacha20-ietf-poly1305",
"xchacha20-ietf-poly1305",
ssMethodList = { # Shadowsocks各版本加密方式支持
'ss-python': [
'aes-128-gcm',
'aes-192-gcm',
'aes-256-gcm',
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-ocb',
'aes-192-ocb',
'aes-256-ocb',
'aes-128-ofb',
'aes-192-ofb',
'aes-256-ofb',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'aes-128-cfb1',
'aes-192-cfb1',
'aes-256-cfb1',
'aes-128-cfb8',
'aes-192-cfb8',
'aes-256-cfb8',
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'xchacha20',
'chacha20',
'chacha20-ietf',
'chacha20-poly1305',
'chacha20-ietf-poly1305',
'xchacha20-ietf-poly1305',
],
"ss-python-legacy": [
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-ofb",
"aes-192-ofb",
"aes-256-ofb",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"aes-128-cfb1",
"aes-192-cfb1",
"aes-256-cfb1",
"aes-128-cfb8",
"aes-192-cfb8",
"aes-256-cfb8",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"salsa20-ctr",
"chacha20",
'ss-python-legacy': [
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-ofb',
'aes-192-ofb',
'aes-256-ofb',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'aes-128-cfb1',
'aes-192-cfb1',
'aes-256-cfb1',
'aes-128-cfb8',
'aes-192-cfb8',
'aes-256-cfb8',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'salsa20-ctr',
'chacha20',
],
"ss-libev": [
"aes-128-gcm",
"aes-192-gcm",
"aes-256-gcm",
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"rc4",
"rc4-md5",
"bf-cfb",
"salsa20",
"chacha20",
"chacha20-ietf",
"chacha20-ietf-poly1305",
"xchacha20-ietf-poly1305",
'ss-libev': [
'aes-128-gcm',
'aes-192-gcm',
'aes-256-gcm',
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'rc4',
'rc4-md5',
'bf-cfb',
'salsa20',
'chacha20',
'chacha20-ietf',
'chacha20-ietf-poly1305',
'xchacha20-ietf-poly1305',
],
"ss-libev-legacy": [
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"chacha20",
"chacha20-ietf",
'ss-libev-legacy': [
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'chacha20',
'chacha20-ietf',
],
"ss-rust": [
"aes-128-gcm",
"aes-256-gcm",
"plain",
"none",
"chacha20-ietf-poly1305",
'ss-rust': [
'aes-128-gcm',
'aes-256-gcm',
'plain',
'none',
'chacha20-ietf-poly1305',
]
}
def __baseJSON(proxyInfo, socksPort): # 生成JSON基本结构
jsonContent = {
def __baseConfig(proxyInfo: dict, socksPort: int) -> dict: # 生成基本配置
config = {
'server': proxyInfo['server'],
'server_port': int(proxyInfo['port']),
'server_port': proxyInfo['port'],
'local_address': '127.0.0.1',
'local_port': int(socksPort),
'password': proxyInfo['password'],
'local_port': socksPort,
'method': proxyInfo['method'],
'password': proxyInfo['passwd'],
}
if proxyInfo['plugin'] != '':
jsonContent['plugin'] = proxyInfo['plugin']
jsonContent['plugin_opts'] = proxyInfo['pluginParam']
return jsonContent
if proxyInfo['plugin'] is not None: # 带插件
config['plugin'] = proxyInfo['plugin']['type']
config['plugin_opts'] = proxyInfo['plugin']['param']
return config
def __pluginUdpCheck(plugin, pluginParam): # 插件是否使用UDP通讯
if plugin == '': # 无插件
return False
noUdpPlugin = [ # 不使用UDP通讯的插件
'obfs-local',
'simple-tls',
'ck-client',
'gq-client',
'mtt-client',
'rabbit-plugin',
'gun-plugin',
]
onlyUdpPlugin = [ # 仅使用UDP通讯的插件
'kcptun-client',
'qtun-client',
]
if plugin in noUdpPlugin:
def __pluginWithUdp(plugin: str, pluginParam: str) -> bool: # 插件是否使用UDP通讯
if plugin in ['obfs-local', 'simple-tls', 'ck-client', 'gq-client',
'mtt-client', 'rabbit-plugin', 'gun-plugin']: # 不使用UDP通讯的插件
return False
if plugin in onlyUdpPlugin:
return True
if plugin == 'v2ray-plugin' or plugin == 'xray-plugin' or plugin == 'gost-plugin':
if not 'mode=quic' in pluginParam.split(';'):
if plugin in ['v2ray-plugin', 'xray-plugin', 'gost-plugin']:
if 'mode=quic' not in pluginParam.split(';'): # 非quic模式不使用UDP通讯
return False
return True # 默认假定占用UDP
def __ssPython(proxyInfo, socksPort, isLegacy = False): # ss-python配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
specialMethods = [
def __ssPython(proxyInfo: dict, socksPort: int, isLegacy: bool = False) -> tuple[dict, str]: # ss-python配置生成
config = __baseConfig(proxyInfo, socksPort)
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
@ -187,46 +228,80 @@ def __ssPython(proxyInfo, socksPort, isLegacy = False): # ss-python配置文件
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if isLegacy == False: # 仅新版本支持
if jsonContent['method'] in specialMethods:
jsonContent['method'] = 'mbedtls:' + jsonContent['method']
if jsonContent['method'] == 'idea-cfb' or jsonContent['method'] == 'seed-cfb':
jsonContent['extra_opts'] = '--libopenssl=libcrypto.so.1.0.0'
if proxyInfo['udp'] != True:
jsonContent['no_udp'] = True
if isLegacy == True:
jsonContent['shadowsocks'] = 'ss-python-legacy-local'
else:
jsonContent['shadowsocks'] = 'ss-python-local'
return jsonContent, 'ss-bootstrap-local'
def __ssLibev(proxyInfo, socksPort, isLegacy = False): # ss-libev配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
if proxyInfo['udp'] == True:
jsonContent['mode'] = 'tcp_and_udp'
if isLegacy == True:
return jsonContent, 'ss-libev-legacy-local'
if not isLegacy: # 新版本特性
if config['method'] in mbedtlsMethods: # mbedtls库流加密
config['method'] = 'mbedtls:' + config['method']
if config['method'] in ['idea-cfb', 'seed-cfb']: # 仅openssl旧版本支持
config['extra_opts'] = '--libopenssl=libcrypto.so.1.0.0'
if not proxyInfo['udp']:
config['no_udp'] = True # 关闭UDP代理
config['shadowsocks'] = 'ss-python-legacy-local' if isLegacy else 'ss-python-local'
return config, 'ss-bootstrap-local'
def __ssLibev(proxyInfo: dict, socksPort: int, isLegacy: bool = False) -> tuple[dict, str]: # ss-libev配置生成
config = __baseConfig(proxyInfo, socksPort)
if proxyInfo['udp']:
config['mode'] = 'tcp_and_udp'
return config, 'ss-libev-legacy-local' if isLegacy else 'ss-libev-local'
def __ssRust(proxyInfo: dict, socksPort: int) -> tuple[dict, str]: # ss-rust配置生成
config = __baseConfig(proxyInfo, socksPort)
if proxyInfo['udp']:
config['mode'] = 'tcp_and_udp'
return config, 'ss-rust-local'
def __ssFormatCheck(proxyInfo: dict) -> bool: # Shadowsocks参数检查
if 'server' not in proxyInfo or not isinstance(proxyInfo['server'], str): # server -> str
return False
if 'port' not in proxyInfo or not isinstance(proxyInfo['port'], int): # port -> int
return False
if 'method' not in proxyInfo or not isinstance(proxyInfo['method'], str): # method -> str
return False
if 'passwd' not in proxyInfo or not isinstance(proxyInfo['passwd'], str): # passwd -> str
return False
if 'plugin' not in proxyInfo:
return False
plugin = proxyInfo['plugin']
if isinstance(plugin, dict): # plugin -> dict / None
if 'type' not in plugin or not isinstance(plugin['type'], str): # plugin.type -> str
return False
if 'param' not in plugin or not isinstance(plugin['param'], str): # plugin.param -> str
return False
elif plugin is not None:
return False
return True
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list or None, str or None, dict or None]:
"""
Shadowsocks配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
节点有误:
return None, None, None
载入成功:
return startCommand, fileContent, envVar
"""
if not __ssFormatCheck(proxyInfo): # 参数有误
return None, None, None
if proxyInfo['plugin'] is None: # 无插件时启用UDP
proxyInfo['udp'] = True
else:
return jsonContent, 'ss-libev-local'
def __ssRust(proxyInfo, socksPort): # ss-rust配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
if proxyInfo['udp'] == True:
jsonContent['mode'] = 'tcp_and_udp'
return jsonContent, 'ss-rust-local'
def load(proxyInfo, socksPort, configFile): # Shadowsocks配置载入
proxyInfo['udp'] = not __pluginUdpCheck(proxyInfo['plugin'], proxyInfo['pluginParam'])
if proxyInfo['method'] in ssMethodList['ss-libev']:
jsonContent, ssFile = __ssLibev(proxyInfo, socksPort)
proxyInfo['udp'] = not __pluginWithUdp( # 获取插件UDP冲突状态
proxyInfo['plugin']['type'], proxyInfo['plugin']['param']
)
if proxyInfo['method'] in ssMethodList['ss-libev']: # 按序匹配客户端
config, ssFile = __ssLibev(proxyInfo, socksPort)
elif proxyInfo['method'] in ssMethodList['ss-libev-legacy']:
jsonContent, ssFile = __ssLibev(proxyInfo, socksPort, isLegacy = True)
config, ssFile = __ssLibev(proxyInfo, socksPort, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-python']:
jsonContent, ssFile = __ssPython(proxyInfo, socksPort)
config, ssFile = __ssPython(proxyInfo, socksPort)
elif proxyInfo['method'] in ssMethodList['ss-python-legacy']:
jsonContent, ssFile = __ssPython(proxyInfo, socksPort, isLegacy = True)
config, ssFile = __ssPython(proxyInfo, socksPort, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-rust']:
jsonContent, ssFile = __ssRust(proxyInfo, socksPort)
config, ssFile = __ssRust(proxyInfo, socksPort)
else:
return None, None # 匹配不到加密方式
return [ ssFile, '-c', configFile ], json.dumps(jsonContent)
return None, None, None # 匹配加密方式
return [ssFile, '-c', configFile], json.dumps(config), {}

114
ProxyBuilder/ShadowsocksR.py

@ -1,9 +1,59 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
"""
# ShadowsocksR构建器
## 节点格式
```json
{
"server": "...",
"port": ...,
"method": "...",
"passwd": "...",
"protocol": "...",
"protocolParam": "...",
"obfs": "...",
"obfsParam": "..."
}
```
+ server (str) -> 必选, 服务器地址 (IPv4 / IPv6 / Domain)
+ port (int) -> 必选, 服务器端口 (1 ~ 65535)
+ method (str) -> 必选, ShadowsocksR加密方式 (`aes-128-cfb`,`aes-192-cfb`,`aes-256-cfb`,
`aes-128-cfb1`,`aes-192-cfb1`,`aes-256-cfb1`,
`aes-128-cfb8`,`aes-192-cfb8`,`aes-256-cfb8`,
`aes-128-ctr`,`aes-192-ctr`,`aes-256-ctr`,
`aes-128-gcm`,`aes-192-gcm`,`aes-256-gcm`,
`aes-128-ofb`,`aes-192-ofb`,`aes-256-ofb`,
`camellia-128-cfb`,`camellia-192-cfb`,`camellia-256-cfb`,
`none`,`table`,`rc4`,`rc4-md5`,`rc4-md5-6`,`bf-cfb`,
`cast5-cfb`,`des-cfb`,`idea-cfb`,`seed-cfb`,`rc2-cfb`,
`salsa20`,`xsalsa20`,`chacha20`,`xchacha20`,`chacha20-ietf`)
+ passwd (str) -> 必选, ShadowsocksR连接密码
+ protocol (str) -> 必选, ShadowsocksR连接协议 (`origin`,`verify_sha1`,`verify_simple`,`verify_deflate`,
`auth_simple`,`auth_sha1`,`auth_sha1_v2`,`auth_sha1_v4`,
`auth_aes128`,`auth_aes128_md5`,`auth_aes128_sha1`,
`auth_chain_a`,`auth_chain_b`,`auth_chain_c`,
`auth_chain_d`,`auth_chain_e`,`auth_chain_f`)
+ protocolParam (str) -> 必选, ShadowsocksR协议参数
+ obfs (str) -> 必选, ShadowsocksR混淆方式 (`plain`,`http_post`,`http_simple`,`tls_simple`,
`tls1.2_ticket_auth`,`tls1.2_ticket_fastauth`,`random_head`)
+ obfsParam (str) -> 必选, ShadowsocksR混淆参数
"""
import json
ssrMethodList = [ # 加密方法
ssrMethodList = [ # ShadowsocksR加密方式
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
@ -43,7 +93,7 @@ ssrMethodList = [ # 加密方法
"chacha20-ietf",
]
ssrProtocolList = [ # 协议
ssrProtocolList = [ # ShadowsocksR协议列表
"origin",
"verify_sha1",
"verify_simple",
@ -63,7 +113,7 @@ ssrProtocolList = [ # 协议
"auth_chain_f",
]
ssrObfsList = [ # 混淆方式
ssrObfsList = [ # ShadowsocksR混淆方式
"plain",
"http_post",
"http_simple",
@ -73,22 +123,56 @@ ssrObfsList = [ # 混淆方式
"random_head",
]
def load(proxyInfo, socksPort, configFile): # ShadowsocksR配置载入
if not proxyInfo['method'] in ssrMethodList:
return None, None # 匹配不到加密方法
if not proxyInfo['protocol'] in ssrProtocolList:
return None, None # 匹配不到协议
if not proxyInfo['obfs'] in ssrObfsList:
return None, None # 匹配不到混淆方式
return [ 'ssr-local', '-c', configFile ], json.dumps({
def __ssrFormatCheck(proxyInfo: dict) -> bool: # ShadowsocksR参数检查
if 'server' not in proxyInfo or not isinstance(proxyInfo['server'], str): # server -> str
return False
if 'port' not in proxyInfo or not isinstance(proxyInfo['port'], int): # port -> int
return False
if 'method' not in proxyInfo or not isinstance(proxyInfo['method'], str): # method -> str
return False
if 'passwd' not in proxyInfo or not isinstance(proxyInfo['passwd'], str): # passwd -> str
return False
if 'protocol' not in proxyInfo or not isinstance(proxyInfo['protocol'], str): # protocol -> str
return False
if 'protocolParam' not in proxyInfo or not isinstance(proxyInfo['protocolParam'], str): # protocolParam -> str
return False
if 'obfs' not in proxyInfo or not isinstance(proxyInfo['obfs'], str): # obfs -> str
return False
if 'obfsParam' not in proxyInfo or not isinstance(proxyInfo['obfsParam'], str): # obfsParam -> str
return False
return True
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list or None, str or None, dict or None]:
"""
ShadowsocksR配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
节点有误:
return None, None, None
载入成功:
return startCommand, fileContent, envVar
"""
if not __ssrFormatCheck(proxyInfo): # 参数有误
return None, None, None
if not proxyInfo['method'] in ssrMethodList: # 匹配不到加密方法
return None, None, None
if not proxyInfo['protocol'] in ssrProtocolList: # 匹配不到协议
return None, None, None
if not proxyInfo['obfs'] in ssrObfsList: # 匹配不到混淆方式
return None, None, None
config = {
'server': proxyInfo['server'],
'server_port': int(proxyInfo['port']),
'server_port': proxyInfo['port'],
'local_address': '127.0.0.1',
'local_port': int(socksPort),
'password': proxyInfo['password'],
'local_port': socksPort,
'password': proxyInfo['passwd'],
'method': proxyInfo['method'],
'protocol': proxyInfo['protocol'],
'protocol_param': proxyInfo['protocolParam'],
'obfs': proxyInfo['obfs'],
'obfs_param': proxyInfo['obfsParam']
})
}
return ['ssr-local', '-c', configFile], json.dumps(config), {}

102
ProxyBuilder/builder.py

@ -13,13 +13,17 @@ from ProxyBuilder import ShadowsocksR
libcPaths = [
'/usr/lib64/libc.so.6', # CentOS
'/lib/libc.musl-x86_64.so.1', # Alpine
'/lib/i386-linux-gnu/libc.so.6', # Debian / Ubuntu
'/lib/x86_64-linux-gnu/libc.so.6',
'/lib/aarch64-linux-gnu/libc.so.6',
'/lib/libc.musl-x86_64.so.1', # Alpine
]
def __checkPortAvailable(port): # 检测端口可用性
def __checkPortAvailable(port: int) -> bool: # 检测端口可用性
ipv4_tcp = None
ipv4_udp = None
ipv6_tcp = None
ipv6_udp = None
try:
ipv4_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ipv4_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -50,8 +54,8 @@ def __checkPortAvailable(port): # 检测端口可用性
ipv6_udp.close()
except: pass
def __genTaskFlag(length = 16): # 生成任务代号
flag = ""
def __genTaskFlag(length: int = 16) -> str: # 生成任务代号
flag = ''
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
@ -60,8 +64,8 @@ def __genTaskFlag(length = 16): # 生成任务代号
flag += str(tmp) # 0 ~ 9
return flag
def __getAvailablePort(rangeStart, rangeEnd): # 获取一个空闲端口
if rangeStart > rangeEnd:
def __getAvailablePort(rangeStart: int, rangeEnd: int) -> int or None: # 获取一个空闲端口
if rangeStart > rangeEnd or rangeStart < 0 or rangeEnd > 65535:
return None
while True:
port = random.randint(rangeStart, rangeEnd) # 随机选取
@ -69,8 +73,9 @@ def __getAvailablePort(rangeStart, rangeEnd): # 获取一个空闲端口
return port
time.sleep(0.1) # 100ms
def build(proxyInfo, configDir, portRangeStart = 1024, portRangeEnd = 65535):
'''
def build(proxyInfo: dict, configDir: str,
portRangeStart: int = 1024, portRangeEnd: int = 65535) -> tuple[bool or None, str or dict]:
"""
创建代理节点客户端
程序内部错误:
@ -86,45 +91,54 @@ def build(proxyInfo, configDir, portRangeStart = 1024, portRangeEnd = 65535):
'file': configFile,
'process': process
}
'''
"""
taskFlag = __genTaskFlag() # 生成测试标志
socksPort = __getAvailablePort(portRangeStart, portRangeEnd) # 获取Socks5测试端口
if not 'type' in proxyInfo: # 未指定节点类型
if 'type' not in proxyInfo: # 未指定节点类型
return False, 'Proxy type not specified'
proxyType = proxyInfo['type'] # 节点类型
proxyInfo.pop('type')
configFile = configDir + '/' + taskFlag + '.json' # 配置文件路径
if proxyType == 'ss': # Shadowsocks节点
startCommand, fileContent = Shadowsocks.load(proxyInfo, socksPort, configFile)
clientObj = Shadowsocks
elif proxyType == 'ssr': # ShadowsocksR节点
startCommand, fileContent = ShadowsocksR.load(proxyInfo, socksPort, configFile)
clientObj = ShadowsocksR
else: # 未知类型
return False, 'Unknown proxy type'
if startCommand == None: # 格式出错
configFile = configDir + '/' + taskFlag + '.json' # 配置文件路径
startCommand, fileContent, envVar = clientObj.load(proxyInfo, socksPort, configFile) # 载入配置
if startCommand is None: # 格式出错
return False, 'Format error with ' + str(proxyType)
try:
with open(configFile, 'w') as fileObject:
fileObject.write(fileContent) # 保存配置文件
with open(configFile, 'w') as fileObject: # 保存配置文件
fileObject.write(fileContent)
except: # 配置文件写入失败
return None, "Unable write to file " + str(configFile)
process = None
try: # 子进程形式启动
for libcPath in libcPaths:
if os.path.exists(libcPath): # 定位libc.so文件
break
exitWithMe = lambda: ctypes.CDLL(libcPath).prctl(1, 15) # SIGTERM
process = subprocess.Popen(startCommand,
process = subprocess.Popen( # 启动子进程
startCommand,
env = envVar,
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL,
preexec_fn = exitWithMe) # 子进程跟随退出
preexec_fn = lambda: ctypes.CDLL(libcPath).prctl(1, 15) # 子进程跟随退出
)
except:
try:
process = subprocess.Popen(startCommand,
process = subprocess.Popen( # prctl失败 回退正常启动
startCommand,
env = envVar,
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL) # prctl失败 回退正常启动
except: pass
if not 'process' in vars(): # 启动失败
stderr = subprocess.DEVNULL
)
except:
pass
if process is None: # 启动失败
return None, 'Subprocess start failed by `' + ' '.join(startCommand) + '`'
return True, { # 返回连接参数
@ -134,8 +148,8 @@ def build(proxyInfo, configDir, portRangeStart = 1024, portRangeEnd = 65535):
'process': process
}
def check(client):
'''
def check(client: dict) -> bool or None:
"""
检查客户端是否正常运行
检测出错: return None
@ -143,44 +157,34 @@ def check(client):
工作异常: return False
工作正常: return True
'''
if client == None:
return None
"""
try:
if client['process'].poll() != None:
return False # 死亡
else:
return True # 正常
return client['process'].poll() is None
except:
return None # 异常
def destroy(client):
'''
def destroy(client: dict) -> bool:
"""
结束客户端并清理
销毁异常: return None
销毁异常: return False
客户端退出: return True
'''
if client == None:
return None
销毁成功: return True
"""
try:
process = client['process']
if process.poll() == None: # 未死亡
if process.poll() is None: # 未死亡
while process.poll() is None: # 等待退出
process.terminate() # SIGTERM
while process.poll() == None: # 等待退出
time.sleep(1)
process.terminate()
time.sleep(0.2)
except:
return None
return False
try:
file = client['file']
if os.path.exists(file) and os.path.isfile(file):
os.remove(file) # 删除配置文件
else:
return None
return True # 销毁成功
except:
return None
return True # 销毁完成
pass
return False

Loading…
Cancel
Save