Browse Source

update: remove legacy Builder

master
dnomd343 2 years ago
parent
commit
cd767a8474
  1. 60
      ProxyBuilder/Brook.py
  2. 44
      ProxyBuilder/Hysteria.py
  3. 225
      ProxyBuilder/Shadowsocks.py
  4. 27
      ProxyBuilder/ShadowsocksR.py
  5. 45
      ProxyBuilder/Trojan.py
  6. 76
      ProxyBuilder/TrojanGo.py
  7. 161
      ProxyBuilder/V2ray.py
  8. 46
      ProxyBuilder/VLESS.py
  9. 36
      ProxyBuilder/VMess.py
  10. 63
      ProxyBuilder/Xray.py
  11. 6
      ProxyBuilder/__init__.py
  12. 219
      ProxyBuilder/builder.py

60
ProxyBuilder/Brook.py

@ -1,60 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
def __originConfig(proxyInfo: dict) -> list:
return [
'client',
'--server', proxyInfo['server'] + ':' + str(proxyInfo['port']),
'--password', proxyInfo['passwd']
]
def __wsConfig(proxyInfo: dict) -> list:
return [
'wsclient',
'--wsserver', 'ws://' + proxyInfo['ws']['host'] + ':' + str(proxyInfo['port']) + proxyInfo['ws']['path'],
'--address', proxyInfo['server'] + ':' + str(proxyInfo['port']),
'--password', proxyInfo['passwd']
]
def __wssConfig(proxyInfo: dict) -> list:
wssConfig = [
'wssclient',
'--wssserver', 'wss://' + proxyInfo['ws']['host'] + ':' + str(proxyInfo['port']) + proxyInfo['ws']['path'],
'--address', proxyInfo['server'] + ':' + str(proxyInfo['port']),
'--password', proxyInfo['passwd']
]
if not proxyInfo['ws']['secure']['verify']:
wssConfig += ['--insecure']
return wssConfig
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str or None, dict]:
"""
Brook配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
proxyInfo = copy.deepcopy(proxyInfo)
if proxyInfo['server'].find(':') >= 0:
proxyInfo['server'] = '[' + proxyInfo['server'] + ']' # IPv6
if proxyInfo['ws'] is not None and proxyInfo['ws']['host'].find(':') >= 0:
proxyInfo['ws']['host'] = '[' + proxyInfo['ws']['host'] + ']' # IPv6
command = [
'brook',
'--debug', '--listen', 'skip success', # debug on
]
if proxyInfo['ws'] is None:
command += __originConfig(proxyInfo) # original mode
elif proxyInfo['ws']['secure'] is None:
command += __wsConfig(proxyInfo) # ws mode
else:
command += __wssConfig(proxyInfo) # wss mode
command += [
'--socks5', '127.0.0.1:' + str(socksPort)
]
return command, None, {}

44
ProxyBuilder/Hysteria.py

@ -1,44 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import json
defaultUpSpeed = 10000
defaultDownSpeed = 10000
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str or None, dict]:
"""
Hysteria配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
proxyInfo = copy.deepcopy(proxyInfo)
if proxyInfo['server'].find(':') >= 0:
proxyInfo['server'] = '[' + proxyInfo['server'] + ']' # IPv6
config = {
'server': proxyInfo['server'] + ':' + str(proxyInfo['port']),
'protocol': proxyInfo['protocol'],
'up_mbps': defaultUpSpeed,
'down_mbps': defaultDownSpeed,
'socks5': {
'listen': '127.0.0.1:' + str(socksPort)
}
}
if proxyInfo['obfs'] is not None:
config['obfs'] = proxyInfo['obfs']
if proxyInfo['auth'] is not None:
config['auth_str'] = proxyInfo['auth']
if proxyInfo['sni'] != '':
config['server_name'] = proxyInfo['sni']
if proxyInfo['alpn'] is not None:
config['alpn'] = proxyInfo['alpn']
if not proxyInfo['verify']:
config['insecure'] = True
return ['hysteria', '-c', configFile, 'client'], json.dumps(config), {}

225
ProxyBuilder/Shadowsocks.py

@ -1,225 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
ssMethodList = { # Shadowsocks各版本加密方式支持
'ss-python': [
'aes-128-gcm',
'aes-192-gcm',
'aes-256-gcm',
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-ocb',
'aes-192-ocb',
'aes-256-ocb',
'aes-128-ofb',
'aes-192-ofb',
'aes-256-ofb',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'aes-128-cfb1',
'aes-192-cfb1',
'aes-256-cfb1',
'aes-128-cfb8',
'aes-192-cfb8',
'aes-256-cfb8',
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'xchacha20',
'chacha20',
'chacha20-ietf',
'chacha20-poly1305',
'chacha20-ietf-poly1305',
'xchacha20-ietf-poly1305',
],
'ss-python-legacy': [
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-ofb',
'aes-192-ofb',
'aes-256-ofb',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'aes-128-cfb1',
'aes-192-cfb1',
'aes-256-cfb1',
'aes-128-cfb8',
'aes-192-cfb8',
'aes-256-cfb8',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'salsa20-ctr',
'chacha20',
],
'ss-libev': [
'aes-128-gcm',
'aes-192-gcm',
'aes-256-gcm',
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'rc4',
'rc4-md5',
'bf-cfb',
'salsa20',
'chacha20',
'chacha20-ietf',
'chacha20-ietf-poly1305',
'xchacha20-ietf-poly1305',
],
'ss-libev-legacy': [
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'chacha20',
'chacha20-ietf',
],
'ss-rust': [
'aes-128-gcm',
'aes-256-gcm',
'plain',
'none',
'chacha20-ietf-poly1305',
]
}
def __baseConfig(proxyInfo: dict, socksPort: int) -> dict: # 生成基本配置
config = {
'server': proxyInfo['server'],
'server_port': proxyInfo['port'],
'local_address': '127.0.0.1',
'local_port': socksPort,
'method': proxyInfo['method'],
'password': proxyInfo['passwd'],
}
if proxyInfo['plugin'] is not None: # 带插件
config['plugin'] = proxyInfo['plugin']['type']
config['plugin_opts'] = proxyInfo['plugin']['param']
return config
def __pluginWithUdp(plugin: str, pluginParam: str) -> bool: # 插件是否使用UDP通讯
if plugin in ['obfs-local', 'simple-tls', 'ck-client',
'gq-client', 'mtt-client', 'rabbit-plugin']: # 不使用UDP通讯的插件
return False
if plugin in ['v2ray-plugin', 'xray-plugin', 'gost-plugin']:
if 'mode=quic' not in pluginParam.split(';'): # 非quic模式不使用UDP通讯
return False
return True # 默认假定占用UDP
def __ssPython(proxyInfo: dict, socksPort: int,
isUdp: bool, isLegacy: bool = False) -> tuple[dict, str]: # ss-python配置生成
config = __baseConfig(proxyInfo, socksPort)
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if not isLegacy: # 新版本特性
if config['method'] in mbedtlsMethods: # mbedtls库流加密
config['method'] = 'mbedtls:' + config['method']
if config['method'] in ['idea-cfb', 'seed-cfb']: # 仅openssl旧版本支持
config['extra_opts'] = '--libopenssl=libcrypto.so.1.0.0'
if not isUdp:
config['no_udp'] = True # 关闭UDP代理
config['shadowsocks'] = 'ss-python-legacy-local' if isLegacy else 'ss-python-local'
return config, 'ss-bootstrap-local'
def __ssLibev(proxyInfo: dict, socksPort: int,
isUdp: bool, isLegacy: bool = False) -> tuple[dict, str]: # ss-libev配置生成
config = __baseConfig(proxyInfo, socksPort)
if isUdp:
config['mode'] = 'tcp_and_udp'
return config, 'ss-libev-legacy-local' if isLegacy else 'ss-libev-local'
def __ssRust(proxyInfo: dict, socksPort: int, isUdp: bool) -> tuple[dict, str]: # ss-rust配置生成
config = __baseConfig(proxyInfo, socksPort)
if isUdp:
config['mode'] = 'tcp_and_udp'
return config, 'ss-rust-local'
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list or None, str or None, dict or None]:
"""
Shadowsocks配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
if proxyInfo['plugin'] is None: # 无插件时启用UDP
isUdp = True
else:
isUdp = not __pluginWithUdp( # 获取插件UDP冲突状态
proxyInfo['plugin']['type'], proxyInfo['plugin']['param']
)
if proxyInfo['method'] in ssMethodList['ss-libev']: # 按序匹配客户端
config, ssFile = __ssLibev(proxyInfo, socksPort, isUdp)
elif proxyInfo['method'] in ssMethodList['ss-libev-legacy']:
config, ssFile = __ssLibev(proxyInfo, socksPort, isUdp, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-python']:
config, ssFile = __ssPython(proxyInfo, socksPort, isUdp)
elif proxyInfo['method'] in ssMethodList['ss-python-legacy']:
config, ssFile = __ssPython(proxyInfo, socksPort, isUdp, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-rust']:
config, ssFile = __ssRust(proxyInfo, socksPort, isUdp)
else:
raise Exception('Unknown Shadowsocks method') # 无匹配加密方式
return [ssFile, '-c', configFile], json.dumps(config), {}

27
ProxyBuilder/ShadowsocksR.py

@ -1,27 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list or None, str or None, dict or None]:
"""
ShadowsocksR配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
config = {
'server': proxyInfo['server'],
'server_port': proxyInfo['port'],
'local_address': '127.0.0.1',
'local_port': socksPort,
'password': proxyInfo['passwd'],
'method': proxyInfo['method'],
'protocol': proxyInfo['protocol'],
'protocol_param': proxyInfo['protocolParam'],
'obfs': proxyInfo['obfs'],
'obfs_param': proxyInfo['obfsParam']
}
return ['ssr-local', '-c', configFile], json.dumps(config), {}

45
ProxyBuilder/Trojan.py

@ -1,45 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
from ProxyBuilder import Xray
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str, dict]:
"""
Trojan配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
flowType = None
if proxyInfo['stream']['secure'] is not None and proxyInfo['stream']['secure']['type'] == 'xtls':
flowType = proxyInfo['stream']['secure']['flow']
if flowType == 'xtls-origin':
flowType = 'xtls-rprx-origin'
elif flowType == 'xtls-direct':
flowType = 'xtls-rprx-direct'
elif flowType == 'xtls-splice':
flowType = 'xtls-rprx-splice'
else:
raise Exception('Unknown XTLS flow')
if proxyInfo['stream']['secure']['udp443']:
flowType += '-udp443'
outboundConfig = {
'protocol': 'trojan',
'settings': {
'servers': [
{
'address': proxyInfo['server'],
'port': proxyInfo['port'],
'password': proxyInfo['passwd'],
}
]
},
'streamSettings': Xray.xrayStreamConfig(proxyInfo['stream'])
}
if flowType is not None: # 添加XTLS流控类型
outboundConfig['settings']['servers'][0]['flow'] = flowType
config = Xray.baseConfig(socksPort, outboundConfig) # Trojan节点配置
return ['xray', '-c', configFile], json.dumps(config), {}

76
ProxyBuilder/TrojanGo.py

@ -1,76 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
def __tlsConfig(proxyInfo: dict) -> dict:
tlsConfig = {
'verify': proxyInfo['verify']
}
if proxyInfo['sni'] != '':
tlsConfig['sni'] = proxyInfo['sni']
if proxyInfo['alpn'] is not None:
tlsConfig['alpn'] = proxyInfo['alpn'].split(',')
return tlsConfig
def __wsConfig(proxyInfo: dict) -> dict:
if proxyInfo['ws'] is None:
return {
'enabled': False
}
wsConfig = {
'enabled': True,
'path': proxyInfo['ws']['path']
}
if proxyInfo['ws']['host'] != '':
wsConfig['host'] = proxyInfo['ws']['host']
return wsConfig
def __ssConfig(proxyInfo: dict) -> dict:
if proxyInfo['ss'] is None:
return {
'enabled': False
}
return {
'enabled': True,
'method': proxyInfo['ss']['method'],
'password': proxyInfo['ss']['passwd']
}
def __pluginConfig(proxyInfo: dict) -> dict:
if proxyInfo['plugin'] is None:
return {
'enabled': False
}
return {
'enabled': True,
'type': 'shadowsocks',
'command': proxyInfo['plugin']['type'],
'option': proxyInfo['plugin']['param']
}
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str, dict]:
"""
Trojan-Go配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
config = {
'run_type': 'client',
'local_addr': '127.0.0.1',
'local_port': socksPort,
'remote_addr': proxyInfo['server'],
'remote_port': proxyInfo['port'],
'password': [
proxyInfo['passwd']
],
'log_level': 0,
'ssl': __tlsConfig(proxyInfo),
'websocket': __wsConfig(proxyInfo),
'shadowsocks': __ssConfig(proxyInfo),
'transport_plugin': __pluginConfig(proxyInfo)
}
return ['trojan-go', '-config', configFile], json.dumps(config), {'PATH': '/usr/bin'}

161
ProxyBuilder/V2ray.py

@ -1,161 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
logLevel = 'warning'
httpHeader = {
'type': 'http',
'request': {
'version': '1.1',
'method': 'GET',
'path': [],
'headers': {
'Host': [],
'User-Agent': [
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.75 Safari/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 10_0_2 like Mac OS X) AppleWebKit/601.1 (KHTML, like Gecko) CriOS/53.0.2785.109 Mobile/14A456 Safari/601.1.46'
],
'Accept-Encoding': [
'gzip, deflate'
],
'Connection': [
'keep-alive'
],
'Pragma': 'no-cache'
}
}
}
kcpSettings = {
'mtu': 1350,
'tti': 50,
'uplinkCapacity': 12,
'downlinkCapacity': 100,
'congestion': False,
'readBufferSize': 2,
'writeBufferSize': 2,
'header': {}
}
def __secureConfig(secureInfo: dict or None) -> dict: # TLS加密传输配置
if secureInfo is None:
return {}
tlsObject = {
'allowInsecure': not secureInfo['verify']
}
if secureInfo['alpn'] is not None:
tlsObject['alpn'] = secureInfo['alpn'].split(',')
if secureInfo['sni'] != '':
tlsObject['serverName'] = secureInfo['sni']
return {
'security': 'tls',
'tlsSettings': tlsObject
}
def tcpConfig(streamInfo: dict, secureFunc) -> dict: # TCP传输方式配置
tcpObject = {}
if streamInfo['obfs'] is not None:
httpHeader['request']['path'].append(streamInfo['obfs']['path'])
httpHeader['request']['headers']['Host'] = streamInfo['obfs']['host'].split(',')
tcpObject['header'] = httpHeader
return {**{
'network': 'tcp',
'tcpSettings': tcpObject
}, **secureFunc(streamInfo['secure'])}
def kcpConfig(streamInfo: dict, secureFunc) -> dict: # mKCP传输方式配置
kcpObject = kcpSettings
kcpObject['header']['type'] = streamInfo['obfs']
if streamInfo['seed'] is not None:
kcpObject['seed'] = streamInfo['seed']
return {**{
'network': 'kcp',
'kcpSettings': kcpObject
}, **secureFunc(streamInfo['secure'])}
def wsConfig(streamInfo: dict, secureFunc) -> dict: # WebSocket传输方式配置
wsObject = {
'path': streamInfo['path']
}
if streamInfo['host'] != '':
wsObject['headers'] = {}
wsObject['headers']['Host'] = streamInfo['host']
if streamInfo['ed'] is not None:
wsObject['maxEarlyData'] = streamInfo['ed']
wsObject['earlyDataHeaderName'] = 'Sec-WebSocket-Protocol'
return {**{
'network': 'ws',
'wsSettings': wsObject
}, **secureFunc(streamInfo['secure'])}
def h2Config(streamInfo: dict, secureFunc) -> dict: # HTTP/2传输方式配置
h2Object = {
'path': streamInfo['path']
}
if streamInfo['host'] != '':
h2Object['host'] = streamInfo['host'].split(',')
return {**{
'network': 'http',
'httpSettings': h2Object
}, **secureFunc(streamInfo['secure'])}
def quicConfig(streamInfo: dict, secureFunc) -> dict: # QUIC传输方式配置
return {**{
'network': 'quic',
'quicSettings': {
'security': streamInfo['method'],
'key': streamInfo['passwd'],
'header': {
'type': streamInfo['obfs']
}
}
}, **secureFunc(streamInfo['secure'])}
def grpcConfig(streamInfo: dict, secureFunc) -> dict: # gRPC传输方式配置
grpcObject = {
'serviceName': streamInfo['service']
}
if streamInfo['mode'] == 'multi': # gRPC multi-mode not work in v2fly-core
grpcObject['multiMode'] = True
return {**{
'network': 'grpc',
'grpcSettings': grpcObject
}, **secureFunc(streamInfo['secure'])}
def v2rayStreamConfig(streamInfo: dict) -> dict: # 生成v2ray传输方式配置
streamType = streamInfo['type']
if streamType == 'tcp':
return tcpConfig(streamInfo, __secureConfig)
elif streamType == 'kcp':
return kcpConfig(streamInfo, __secureConfig)
elif streamType == 'ws':
return wsConfig(streamInfo, __secureConfig)
elif streamType == 'h2':
return h2Config(streamInfo, __secureConfig)
elif streamType == 'quic':
return quicConfig(streamInfo, __secureConfig)
elif streamType == 'grpc':
return grpcConfig(streamInfo, __secureConfig)
else:
raise Exception('Unknown stream type')
def baseConfig(socksPort: int, outboundObject: dict) -> dict: # 基础配置生成
return {
'log': {
'loglevel': logLevel
},
'inbounds': [
{
'port': socksPort,
'listen': '127.0.0.1',
'protocol': 'socks',
'settings': {
'udp': True,
'auth': 'noauth'
}
}
],
'outbounds': [
outboundObject
]
}

46
ProxyBuilder/VLESS.py

@ -1,46 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
from ProxyBuilder import Xray
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str, dict]:
"""
VLESS配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
user = {
'id': proxyInfo['id'],
'encryption': proxyInfo['method']
}
if proxyInfo['stream']['secure'] is not None and proxyInfo['stream']['secure']['type'] == 'xtls':
flowType = proxyInfo['stream']['secure']['flow']
if flowType == 'xtls-origin':
user['flow'] = 'xtls-rprx-origin'
elif flowType == 'xtls-direct':
user['flow'] = 'xtls-rprx-direct'
elif flowType == 'xtls-splice':
user['flow'] = 'xtls-rprx-splice'
else:
raise Exception('Unknown XTLS flow')
if proxyInfo['stream']['secure']['udp443']:
user['flow'] += '-udp443'
outboundConfig = {
'protocol': 'vless',
'settings': {
'vnext': [
{
'address': proxyInfo['server'],
'port': proxyInfo['port'],
'users': [user]
}
]
},
'streamSettings': Xray.xrayStreamConfig(proxyInfo['stream'])
}
config = Xray.baseConfig(socksPort, outboundConfig) # VLESS节点配置
return ['xray', '-c', configFile], json.dumps(config), {}

36
ProxyBuilder/VMess.py

@ -1,36 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
from ProxyBuilder import V2ray
def load(proxyInfo: dict, socksPort: int, configFile: str) -> tuple[list, str, dict]:
"""
VMess配置载入
proxyInfo: 节点信息
socksPort: 本地通讯端口
configFile: 配置文件路径
return startCommand, fileContent, envVar
"""
outboundConfig = {
'protocol': 'vmess',
'settings': {
'vnext': [
{
'address': proxyInfo['server'],
'port': proxyInfo['port'],
'users': [
{
'id': proxyInfo['id'],
'alterId': proxyInfo['aid'],
'security': proxyInfo['method']
}
]
}
]
},
'streamSettings': V2ray.v2rayStreamConfig(proxyInfo['stream'])
}
config = V2ray.baseConfig(socksPort, outboundConfig) # VMess节点配置
return ['v2ray', '-c', configFile], json.dumps(config), {}

63
ProxyBuilder/Xray.py

@ -1,63 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyBuilder import V2ray
baseConfig = V2ray.baseConfig
def __secureConfig(secureInfo: dict or None) -> dict: # TLS/XTLS加密传输配置
if secureInfo is None:
return {}
secureObject = {
'allowInsecure': not secureInfo['verify']
}
if secureInfo['alpn'] is not None:
secureObject['alpn'] = secureInfo['alpn'].split(',')
if secureInfo['sni'] != '':
secureObject['serverName'] = secureInfo['sni']
if secureInfo['type'] == 'tls':
return {
'security': 'tls',
'tlsSettings': secureObject
}
elif secureInfo['type'] == 'xtls':
return {
'security': 'xtls',
'xtlsSettings': secureObject
}
else:
raise Exception('Unknown secure type')
def wsConfig(streamInfo: dict, secureFunc) -> dict: # WebSocket传输方式配置
wsObject = {
'path': streamInfo['path']
}
if streamInfo['host'] != '':
wsObject['headers'] = {}
wsObject['headers']['Host'] = streamInfo['host']
if streamInfo['ed'] is not None: # ed参数写入路径 -> /...?ed=xxx
if wsObject['path'].find('?') == -1: # 原路径不带参数
wsObject['path'] += '?ed=' + str(streamInfo['ed'])
else:
wsObject['path'] += '&ed=' + str(streamInfo['ed'])
return {**{
'network': 'ws',
'wsSettings': wsObject
}, **secureFunc(streamInfo['secure'])}
def xrayStreamConfig(streamInfo: dict) -> dict: # 生成xray传输方式配置
streamType = streamInfo['type']
if streamType == 'tcp':
return V2ray.tcpConfig(streamInfo, __secureConfig)
elif streamType == 'kcp':
return V2ray.kcpConfig(streamInfo, __secureConfig)
elif streamType == 'ws':
return wsConfig(streamInfo, __secureConfig)
elif streamType == 'h2':
return V2ray.h2Config(streamInfo, __secureConfig)
elif streamType == 'quic':
return V2ray.quicConfig(streamInfo, __secureConfig)
elif streamType == 'grpc':
return V2ray.grpcConfig(streamInfo, __secureConfig)
else:
raise Exception('Unknown stream type')

6
ProxyBuilder/__init__.py

@ -1,6 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyBuilder.builder import *
__all__ = ['build', 'check', 'destroy']

219
ProxyBuilder/builder.py

@ -1,219 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import os
import time
import ctypes
import random
import socket
import signal
import subprocess
from ProxyBuilder import Shadowsocks
from ProxyBuilder import ShadowsocksR
from ProxyBuilder import VMess
from ProxyBuilder import VLESS
from ProxyBuilder import Trojan
from ProxyBuilder import TrojanGo
from ProxyBuilder import Brook
from ProxyBuilder import Hysteria
libcPaths = [
'/usr/lib/libc.so.6', # CentOS
'/usr/lib64/libc.so.6',
'/lib/libc.musl-i386.so.1', # Alpine
'/lib/libc.musl-x86_64.so.1',
'/lib/libc.musl-aarch64.so.1',
'/lib/i386-linux-gnu/libc.so.6', # Debian / Ubuntu
'/lib/x86_64-linux-gnu/libc.so.6',
'/lib/aarch64-linux-gnu/libc.so.6',
]
def __preExec(libcPath) -> None:
ctypes.CDLL(libcPath).prctl(1, signal.SIGTERM) # 子进程跟随退出
os.setpgrp() # 新进程组
def __checkPortAvailable(port: int) -> bool: # 检测端口可用性
ipv4_tcp = None
ipv4_udp = None
ipv6_tcp = None
ipv6_udp = None
try:
ipv4_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ipv4_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipv4_tcp.bind(('0.0.0.0', port))
ipv4_udp.bind(('0.0.0.0', port))
ipv4_tcp.close()
ipv4_udp.close()
ipv6_tcp = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
ipv6_udp = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
ipv6_tcp.bind(('::', port))
ipv6_udp.bind(('::', port))
ipv6_tcp.close()
ipv6_udp.close()
return True # IPv4 TCP / IPv4 UDP / IPv6 TCP / IPv6 UDP 均无占用
except:
return False
finally: # 关闭socket
try:
ipv4_tcp.close()
except: pass
try:
ipv4_udp.close()
except: pass
try:
ipv6_tcp.close()
except: pass
try:
ipv6_udp.close()
except: pass
def __genTaskFlag(length: int = 16) -> str: # 生成任务标志
flag = ''
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
flag += chr(tmp + 87) # a ~ f
else:
flag += str(tmp) # 0 ~ 9
return flag
def __getAvailablePort(rangeStart: int = 41952, rangeEnd: int = 65535) -> int or None: # 获取一个空闲端口
if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535:
return None
while True:
port = random.randint(rangeStart, rangeEnd) # 随机选取
if __checkPortAvailable(port):
return port
time.sleep(0.1) # wait for 100ms
def build(proxyInfo: dict, configDir: str,
portRangeStart: int = 1024, portRangeEnd: int = 65535) -> tuple[bool, str or dict]:
"""
创建代理节点客户端
代理节点无效:
return False, {reason}
代理工作正常:
return True, {
'flag': taskFlag,
'port': socksPort,
'file': configFile,
'process': process
}
"""
taskFlag = __genTaskFlag() # 生成测试标志
socksPort = __getAvailablePort(portRangeStart, portRangeEnd) # 获取Socks5测试端口
if 'type' not in proxyInfo: # 未指定节点类型
return False, 'Proxy type not specified'
if proxyInfo['type'] == 'ss': # Shadowsocks节点
clientObj = Shadowsocks
elif proxyInfo['type'] == 'ssr': # ShadowsocksR节点
clientObj = ShadowsocksR
elif proxyInfo['type'] == 'vmess': # VMess节点
clientObj = VMess
elif proxyInfo['type'] == 'vless': # VLESS节点
clientObj = VLESS
elif proxyInfo['type'] == 'trojan': # Trojan节点
clientObj = Trojan
elif proxyInfo['type'] == 'trojan-go': # Trojan-Go节点
clientObj = TrojanGo
elif proxyInfo['type'] == 'brook': # Brook节点
clientObj = Brook
elif proxyInfo['type'] == 'hysteria': # Hysteria节点
clientObj = Hysteria
else: # 未知类型
return False, 'Unknown proxy type'
configFile = configDir + '/' + taskFlag + '.json' # 配置文件路径
try:
startCommand, fileContent, envVar = clientObj.load(proxyInfo, socksPort, configFile) # 载入配置
except: # 格式出错
return False, 'Format error with ' + str(proxyInfo['type'])
try:
if fileContent is not None:
with open(configFile, 'w') as fileObject: # 保存配置文件
fileObject.write(fileContent)
except: # 配置文件写入失败
raise Exception('Unable write to file ' + str(configFile))
try: # 子进程形式启动
for libcPath in libcPaths:
if os.path.exists(libcPath): # 定位libc.so文件
break
process = subprocess.Popen( # 启动子进程
startCommand,
env = envVar,
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL,
preexec_fn = lambda: __preExec(libcPath)
)
except:
process = subprocess.Popen( # prctl失败 回退正常启动
startCommand,
env = envVar,
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL
)
if process is None: # 启动失败
raise Exception('Subprocess start failed by `' + ' '.join(startCommand) + '`')
return True, { # 返回连接参数
'flag': taskFlag,
'port': socksPort,
'file': configFile if fileContent is not None else None,
'process': process
}
def check(client: dict) -> bool or None:
"""
检查客户端是否正常运行
工作异常: return False
工作正常: return True
"""
return client['process'].poll() is None
def destroy(client: dict) -> bool:
"""
结束客户端并清理
销毁异常: return False
销毁成功: return True
"""
try:
maxTermTime = 100 # SIGTERM -> SIGKILL
process = client['process']
os.killpg(os.getpgid(process.pid), signal.SIGTERM) # 杀死子进程组
time.sleep(0.2)
while process.poll() is None: # 等待退出
maxTermTime -= 1
if maxTermTime < 0:
process.kill() # SIGKILL -> force kill
else:
process.terminate() # SIGTERM -> soft kill
time.sleep(0.2)
except:
return False
try:
file = client['file']
if file is None: # 无配置文件
return True
if os.path.exists(file) and os.path.isfile(file):
os.remove(file) # 删除配置文件
return True # 销毁成功
except:
pass
return False
Loading…
Cancel
Save