Browse Source

feat: ProxyBuilder and its demo

master
Dnomd343 3 years ago
parent
commit
c323dfca1a
  1. 238
      ProxyBuilder/Shadowsocks.py
  2. 79
      ProxyBuilder/__init__.py
  3. 42
      demo.py

238
ProxyBuilder/Shadowsocks.py

@ -0,0 +1,238 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
ssMethodList = { # shadowsocks各版本支持的加密方式
"ss-python": [
"aes-128-gcm",
"aes-192-gcm",
"aes-256-gcm",
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-ocb",
"aes-192-ocb",
"aes-256-ocb",
"aes-128-ofb",
"aes-192-ofb",
"aes-256-ofb",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"aes-128-cfb1",
"aes-192-cfb1",
"aes-256-cfb1",
"aes-128-cfb8",
"aes-192-cfb8",
"aes-256-cfb8",
"aes-128-cfb128",
"aes-192-cfb128",
"aes-256-cfb128",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"camellia-128-cfb128",
"camellia-192-cfb128",
"camellia-256-cfb128",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"xchacha20",
"chacha20",
"chacha20-ietf",
"chacha20-poly1305",
"chacha20-ietf-poly1305",
"xchacha20-ietf-poly1305",
],
"ss-python-legacy": [
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-ofb",
"aes-192-ofb",
"aes-256-ofb",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"aes-128-cfb1",
"aes-192-cfb1",
"aes-256-cfb1",
"aes-128-cfb8",
"aes-192-cfb8",
"aes-256-cfb8",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"salsa20-ctr",
"chacha20",
],
"ss-libev": [
"aes-128-gcm",
"aes-192-gcm",
"aes-256-gcm",
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"rc4",
"rc4-md5",
"bf-cfb",
"salsa20",
"chacha20",
"chacha20-ietf",
"chacha20-ietf-poly1305",
"xchacha20-ietf-poly1305",
],
"ss-libev-legacy": [
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"table",
"rc4",
"rc4-md5",
"rc2-cfb",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"salsa20",
"chacha20",
"chacha20-ietf",
],
"ss-rust": [
"aes-128-gcm",
"aes-256-gcm",
"plain",
"none",
"chacha20-ietf-poly1305",
]
}
def __baseJSON(proxyInfo, socksPort): # 生成JSON基本结构
jsonContent = {
'server': proxyInfo['server'],
'server_port': proxyInfo['port'],
'local_address': '127.0.0.1',
'local_port': socksPort,
'password': proxyInfo['password'],
'method': proxyInfo['method'],
}
if proxyInfo['plugin'] != '':
jsonContent['plugin'] = proxyInfo['plugin']
jsonContent['plugin_opts'] = proxyInfo['pluginArg']
return jsonContent
def __pluginUdpCheck(plugin, pluginArg): # 插件是否使用UDP通讯
if plugin == '': # 无插件
return False
noUdpPlugin = [ # 不使用UDP通讯的插件
'obfs-local',
'simple-tls',
'ck-client',
'gq-client',
'mtt-client',
'rabbit-plugin',
'gun-plugin',
]
onlyUdpPlugin = [ # 仅使用UDP通讯的插件
'kcptun-client',
'qtun-client',
]
if plugin in noUdpPlugin:
return False
if plugin in onlyUdpPlugin:
return True
if plugin == 'v2ray-plugin' or plugin == 'xray-plugin' or plugin == 'gost-plugin':
if 'mode=quic' in pluginArg.split(';'):
return True
return False
return True # 默认假定占用UDP
def __ssPython(proxyInfo, socksPort, isLegacy = False): # ss-python配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
specialMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if isLegacy == False: # 仅新版本支持
if jsonContent['method'] in specialMethods:
jsonContent['method'] = 'mbedtls:' + jsonContent['method']
if jsonContent['method'] == 'idea-cfb' or jsonContent['method'] == 'seed-cfb':
jsonContent['extra_opts'] = '--libopenssl=libcrypto.so.1.0.0'
if proxyInfo['udp'] != True:
jsonContent['no_udp'] = True
if isLegacy == True:
jsonContent['shadowsocks'] = 'ss-python-legacy-local'
else:
jsonContent['shadowsocks'] = 'ss-python-local'
return jsonContent, 'ss-bootstrap-local'
def __ssLibev(proxyInfo, socksPort, isLegacy = False): # ss-libev配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
if proxyInfo['udp'] == True:
jsonContent['mode'] = 'tcp_and_udp'
if isLegacy == True:
return jsonContent, 'ss-libev-legacy-local'
else:
return jsonContent, 'ss-libev-local'
def __ssRust(proxyInfo, socksPort): # ss-rust配置文件生成
jsonContent = __baseJSON(proxyInfo, socksPort)
if proxyInfo['udp'] == True:
jsonContent['mode'] = 'tcp_and_udp'
return jsonContent, 'ss-rust-local'
def load(proxyInfo, socksPort, configFile): # shadowsocks配置载入
proxyInfo['udp'] = not __pluginUdpCheck(proxyInfo['plugin'], proxyInfo['pluginArg'])
if proxyInfo['method'] in ssMethodList['ss-libev']:
jsonContent, ssFile = __ssLibev(proxyInfo, socksPort)
elif proxyInfo['method'] in ssMethodList['ss-libev-legacy']:
jsonContent, ssFile = __ssLibev(proxyInfo, socksPort, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-python']:
jsonContent, ssFile = __ssPython(proxyInfo, socksPort)
elif proxyInfo['method'] in ssMethodList['ss-python-legacy']:
jsonContent, ssFile = __ssPython(proxyInfo, socksPort, isLegacy = True)
elif proxyInfo['method'] in ssMethodList['ss-rust']:
jsonContent, ssFile = __ssRust(proxyInfo, socksPort)
else:
return None # 匹配不到加密方式
try:
with open(configFile, 'w') as fileObject:
fileObject.write(json.dumps(jsonContent)) # 保存配置文件
except:
return None # 配置文件写入失败
return [ ssFile, '-c', configFile ]

79
ProxyBuilder/__init__.py

@ -0,0 +1,79 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
import random
import socket
import subprocess
from ProxyBuilder import Shadowsocks
def __checkPortAvailable(port, host = '127.0.0.1'):
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect((host, int(port)))
return False
except socket.error:
return True
finally:
if s:
s.close()
def __genTaskFlag(length = 16): # 生成任务代号
flag = ""
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
flag += chr(tmp + 87) # a ~ f
else:
flag += str(tmp) # 0 ~ 9
return flag
def __getAvailablePort(rangeStart, rangeEnd): # 获取一个空闲端口
if rangeStart > rangeEnd:
return None
while True:
port = random.randint(rangeStart, rangeEnd)
if __checkPortAvailable(port):
return port
time.sleep(0.1) # 100ms
def build(proxyInfo, configDir): # 构建代理节点连接
taskFlag = __genTaskFlag()
socksPort = __getAvailablePort(1024, 65535)
if not 'type' in proxyInfo:
return None
proxyType = proxyInfo['type'] # 节点类型
proxyInfo.pop('type')
configFile = configDir + '/' + taskFlag + '.json' # 配置文件路径
if (proxyType == 'shadowsocks'): # Shadowsocks节点
startCommand = Shadowsocks.load(proxyInfo, socksPort, configFile)
else: # 未知类型
return None
process = subprocess.Popen(startCommand, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
return { # 返回连接参数
'flag': taskFlag,
'port': socksPort,
'file': configFile,
'process': process,
'pid': process.pid,
}
def check(taskInfo): # 检查客户端是否正常
process = taskInfo['process']
if process.poll() != None:
return False # 死亡
else:
return True # 正常
def destroy(taskInfo): # 结束客户端并清理
process = taskInfo['process']
if process.poll() == None: # 未死亡
process.terminate() # SIGTERM
while process.poll() == None: # 等待退出
time.sleep(1)
process.terminate()
try:
os.remove(taskInfo.file) # 删除配置文件
except: pass

42
demo.py

@ -0,0 +1,42 @@
import time
import socket
import requests
import ProxyBuilder
def checkSocksPort(port):
try:
r = requests.get('http://gstatic.com/generate_204', proxies = {
'http': 'socks5://127.0.0.1:' + str(port),
'https': 'socks5://127.0.0.1:' + str(port),
})
if r.status_code == 204:
return True
except: pass
return False
testInfo = {
'type': 'shadowsocks',
'server': '127.0.0.1',
'port': 12345,
'password': 'dnomd343',
'method': 'aes-256-ctr',
'plugin': '',
'pluginArg': '',
}
print("start")
print(testInfo)
task = ProxyBuilder.build(testInfo, '/tmp/ProxyC')
print(task)
time.sleep(1)
if ProxyBuilder.check(task) == False:
print("error exit")
else:
print("test with gstatic")
if checkSocksPort(task['port']):
print("ok")
else:
print("error")
ProxyBuilder.destroy(task)
print("stop")
Loading…
Cancel
Save