Browse Source

update: v2rayn url decode

dev
dnomd343 2 years ago
parent
commit
06a03e4e0e
  1. 70
      Decoder/VMess.py
  2. 2
      Filter/V2ray.py
  3. 19
      Utils/Common/Format.py
  4. 2
      test.py

70
Decoder/VMess.py

@ -4,75 +4,71 @@
import json import json
from Utils.Logger import logger from Utils.Logger import logger
from Utils.Exception import decodeException from Utils.Exception import decodeException
from Utils.Common import b64Decode, checkScheme, hostFormat from Utils.Common import b64Decode, checkScheme
from Utils.Common import hostFormat, splitEdParam
def v2rayN(url: str) -> dict: def v2rayN(url: str) -> dict:
""" """
FORMAT: vmess://BASE64-ENCODED-JSON-STRING FORMAT: vmess://BASE64-ENCODED-JSON-STRING
fields => v(=2) / ps / add / port / id / aid / scy / net / type / host / path / tls / sni / alpn fields => v(=2) / ps / add / port / id / aid / scy / net / type / host / path / tls / sni / alpn
""" """
config = {
'type': 'vmess',
'info': {}
}
info = config['info']
logger.debug('V2rayN url decode -> %s' % url) logger.debug('V2rayN url decode -> %s' % url)
url = json.loads(b64Decode(checkScheme(url, 'vmess', 'V2rayN'))) url = json.loads(b64Decode(checkScheme(url, 'vmess', 'V2rayN')))
logger.debug('V2rayN json format -> %s' % url) logger.debug('V2rayN json format -> %s' % url)
if int(url['v']) != 2: if int(url['v']) != 2:
logger.warning('V2rayN url with unknown version') logger.warning('V2rayN url with unknown version')
config['name'] = url['ps'] if 'ps' in url else '' # ps -> remark config = {
info = { 'type': 'vmess',
'server': hostFormat(url['add']), 'name': url['ps'] if 'ps' in url else '', # ps -> remark
'port': url['port'], 'info': {
'id': url['id'], 'server': hostFormat(url['add']),
'aid': url['aid'] if 'aid' in url else 0, # default alter id -> 0 'port': url['port'],
'method': url['scy'] if 'scy' in url else 'auto', # scy -> method (default = auto) 'id': url['id'],
'aid': url['aid'] if 'aid' in url else 0, # default alter id -> 0
'method': url['scy'] if 'scy' in url else 'auto', # scy -> method (default = auto)
}
} }
stream = { stream = {
'type': url['net'] if 'net' in url else 'tcp' # net -> stream.type (default = tcp) 'type': url['net'] if 'net' in url else 'tcp' # net -> stream.type (default = tcp)
} }
if stream['type'] == 'tcp': if stream['type'] == 'tcp':
if 'http' in url and url['type'] == 'http': # type -> none / http if 'http' in url and url['type'] == 'http': # type -> obfs
stream['obfs'] = { stream['obfs'] = {
'host': url['host'] if 'host' in url else '', 'host': url['host'] if 'host' in url else '',
'path': url['path'] if 'path' in url else '', 'path': url['path'] if 'path' in url else '/',
} }
elif stream['type'] == 'kcp': elif stream['type'] == 'kcp':
stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs
stream['seed'] = url['path'] if 'path' in url else None # path -> seed stream['seed'] = url['path'] if 'path' in url else None # path -> seed
elif stream['type'] == 'ws': elif stream['type'] == 'ws':
if 'host' in url: stream['host'] = url['host'] if 'host' in url else '' # host -> host
stream['host'] = url['host']
if 'path' in url: if 'path' in url:
try: try:
stream['ed'], stream['path'] = baseFunc.splitEdParam(url['path']) stream['path'], stream['ed'] = splitEdParam(url['path'])
except: except:
stream['path'] = url['path'] stream['path'] = url['path']
elif stream['type'] == 'h2': elif stream['type'] == 'h2':
if 'host' in url: stream['host'] = url['host'] if 'host' in url else '' # host -> host
stream['host'] = url['host'] stream['path'] = url['path'] if 'path' in url else '/' # path -> path
if 'path' in url:
stream['path'] = url['path']
elif stream['type'] == 'quic': elif stream['type'] == 'quic':
if 'type' in url: stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs
stream['obfs'] = url['type'] stream['method'] = url['host'] if 'host' in url else 'none' # host -> method
if 'host' in url: stream['passwd'] = url['path'] if 'path' in url else '' # path -> passwd
stream['method'] = url['host']
if 'path' in url:
stream['passwd'] = url['path']
elif stream['type'] == 'grpc': elif stream['type'] == 'grpc':
if 'type' in url and url['type'] == 'multi': stream['mode'] = 'multi' if 'type' in url and url['type'] == 'multi' else 'gun' # type -> mode
stream['mode'] = 'multi' stream['service'] = url['path'] # path -> service
if 'path' in url:
stream['service'] = url['path']
else: else:
logger.error('V2rayN url with unknown network type -> %s' % stream['type']) logger.error('V2rayN url with unknown network type -> %s' % stream['type'])
raise decodeException('Unknown v2rayN network type') raise decodeException('Unknown v2rayN network type')
info['stream'] = info secure = None
if 'tls' in url and url['tls'] == 'tls': # enable TLS
secure = {
'sni': url['sni'] if 'sni' in url else '', # sni option
'alpn': url['alpn'] if 'alpn' in url and url['alpn'] != '' else None # alpn option
}
stream['secure'] = secure
config['info']['stream'] = stream
logger.debug('V2rayN url release -> %s', config) logger.debug('V2rayN url release -> %s', config)
logger.critical(stream) return config
logger.critical(info)

2
Filter/V2ray.py

@ -90,7 +90,7 @@ kcpObject = rulesFilter({
'optional': True, 'optional': True,
'default': 'none', 'default': 'none',
'type': str, 'type': str,
'format': lambda s: toStrTidy(s).replace('_', '-'), # TODO: '' => 'none' 'format': lambda s: 'none' if s == '' else toStrTidy(s).replace('_', '-'),
'filter': lambda s: s in udpObfuscations, 'filter': lambda s: s in udpObfuscations,
'errMsg': 'Unknown mKCP obfs method' 'errMsg': 'Unknown mKCP obfs method'
}, },

19
Utils/Common/Format.py

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re
from IPy import IP from IPy import IP
from Utils.Logger import logger from Utils.Logger import logger
from Utils.Common.Coding import * from Utils.Common.Coding import *
@ -48,3 +49,21 @@ def splitParam(params: str) -> dict: # split params
for param in params.split('&'): for param in params.split('&'):
ret[param.split('=', 1)[0]] = urlDecode(param.split('=', 1)[1]) ret[param.split('=', 1)[0]] = urlDecode(param.split('=', 1)[1])
return ret return ret
def splitEdParam(path: str) -> tuple[str, int or None]: # split early-data option
if path.find('?') == -1:
return path, None
ed = None
params = []
content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path) # ...?...
for field in content[2].split('&'): # ?param_1=...&param_2=...
if not field.startswith('ed='):
params.append(field)
continue
ed = int(field[3:]) # ed=...
if ed is None: # ed param not found
return path, None
if len(params) == 0: # param -> []
return content[1], ed
return '%s?%s' % (content[1], '&'.join(params)), ed

2
test.py

@ -21,6 +21,8 @@ from Filter import Filter
ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ') ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ')
# ret = Decoder.v2rayN('vmess://ew0KICAidiI6ICIyIiwNCiAgInBzIjogIummmea4rzLnur8iLA0KICAiYWRkIjogIjExOS4yOC44OC4yMzAiLA0KICAicG9ydCI6ICI0NDMiLA0KICAiaWQiOiAiODRhOGU2ZDItMWFkMy00NjEyLTgzNjItYTdjMjNlOWU5MzEyIiwNCiAgImFpZCI6ICIwIiwNCiAgInNjeSI6ICJhdXRvIiwNCiAgIm5ldCI6ICJ3cyIsDQogICJ0eXBlIjogIm5vbmUiLA0KICAiaG9zdCI6ICJ0ZXN0LnNjdXRyb2JvdC5jb20iLA0KICAicGF0aCI6ICIvdm1lc3M/ZGVtbz10cnVlJmVkPTIwNDgmdGVzdD1vayIsDQogICJ0bHMiOiAidGxzIiwNCiAgInNuaSI6ICJoay5zY3V0cm9ib3QuY29tIiwNCiAgImFscG4iOiAiIg0KfQ==')
# ret['info']['server'] = '[%s]' % ret['info']['server'] # ret['info']['server'] = '[%s]' % ret['info']['server']
ret['info'] = Filter(ret['type'], ret['info']) ret['info'] = Filter(ret['type'], ret['info'])

Loading…
Cancel
Save