From 06a03e4e0e8e7f1fd358b2ed5654f41b6f08056a Mon Sep 17 00:00:00 2001 From: dnomd343 Date: Wed, 7 Sep 2022 21:38:05 +0800 Subject: [PATCH] update: v2rayn url decode --- Decoder/VMess.py | 70 ++++++++++++++++++++---------------------- Filter/V2ray.py | 2 +- Utils/Common/Format.py | 19 ++++++++++++ test.py | 2 ++ 4 files changed, 55 insertions(+), 38 deletions(-) diff --git a/Decoder/VMess.py b/Decoder/VMess.py index 963ee61..d20acc2 100644 --- a/Decoder/VMess.py +++ b/Decoder/VMess.py @@ -4,75 +4,71 @@ import json from Utils.Logger import logger from Utils.Exception import decodeException -from Utils.Common import b64Decode, checkScheme, hostFormat +from Utils.Common import b64Decode, checkScheme +from Utils.Common import hostFormat, splitEdParam def v2rayN(url: str) -> dict: """ FORMAT: vmess://BASE64-ENCODED-JSON-STRING fields => v(=2) / ps / add / port / id / aid / scy / net / type / host / path / tls / sni / alpn """ - config = { - 'type': 'vmess', - 'info': {} - } - info = config['info'] logger.debug('V2rayN url decode -> %s' % url) url = json.loads(b64Decode(checkScheme(url, 'vmess', 'V2rayN'))) logger.debug('V2rayN json format -> %s' % url) if int(url['v']) != 2: logger.warning('V2rayN url with unknown version') - config['name'] = url['ps'] if 'ps' in url else '' # ps -> remark - info = { - 'server': hostFormat(url['add']), - 'port': url['port'], - 'id': url['id'], - 'aid': url['aid'] if 'aid' in url else 0, # default alter id -> 0 - 'method': url['scy'] if 'scy' in url else 'auto', # scy -> method (default = auto) + config = { + 'type': 'vmess', + 'name': url['ps'] if 'ps' in url else '', # ps -> remark + 'info': { + 'server': hostFormat(url['add']), + 'port': url['port'], + 'id': url['id'], + 'aid': url['aid'] if 'aid' in url else 0, # default alter id -> 0 + 'method': url['scy'] if 'scy' in url else 'auto', # scy -> method (default = auto) + } } - stream = { - 'type': url['net'] if 'net' in url else 'tcp' # net -> stream.type (default = tcp) + 'type': url['net'] if 'net' in url else 'tcp' # net -> stream.type (default = tcp) } if stream['type'] == 'tcp': - if 'http' in url and url['type'] == 'http': # type -> none / http + if 'http' in url and url['type'] == 'http': # type -> obfs stream['obfs'] = { 'host': url['host'] if 'host' in url else '', - 'path': url['path'] if 'path' in url else '', + 'path': url['path'] if 'path' in url else '/', } elif stream['type'] == 'kcp': stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs stream['seed'] = url['path'] if 'path' in url else None # path -> seed elif stream['type'] == 'ws': - if 'host' in url: - stream['host'] = url['host'] + stream['host'] = url['host'] if 'host' in url else '' # host -> host if 'path' in url: try: - stream['ed'], stream['path'] = baseFunc.splitEdParam(url['path']) + stream['path'], stream['ed'] = splitEdParam(url['path']) except: stream['path'] = url['path'] elif stream['type'] == 'h2': - if 'host' in url: - stream['host'] = url['host'] - if 'path' in url: - stream['path'] = url['path'] + stream['host'] = url['host'] if 'host' in url else '' # host -> host + stream['path'] = url['path'] if 'path' in url else '/' # path -> path elif stream['type'] == 'quic': - if 'type' in url: - stream['obfs'] = url['type'] - if 'host' in url: - stream['method'] = url['host'] - if 'path' in url: - stream['passwd'] = url['path'] + stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs + stream['method'] = url['host'] if 'host' in url else 'none' # host -> method + stream['passwd'] = url['path'] if 'path' in url else '' # path -> passwd elif stream['type'] == 'grpc': - if 'type' in url and url['type'] == 'multi': - stream['mode'] = 'multi' - if 'path' in url: - stream['service'] = url['path'] + stream['mode'] = 'multi' if 'type' in url and url['type'] == 'multi' else 'gun' # type -> mode + stream['service'] = url['path'] # path -> service else: logger.error('V2rayN url with unknown network type -> %s' % stream['type']) raise decodeException('Unknown v2rayN network type') - info['stream'] = info + secure = None + if 'tls' in url and url['tls'] == 'tls': # enable TLS + secure = { + 'sni': url['sni'] if 'sni' in url else '', # sni option + 'alpn': url['alpn'] if 'alpn' in url and url['alpn'] != '' else None # alpn option + } + stream['secure'] = secure + config['info']['stream'] = stream logger.debug('V2rayN url release -> %s', config) - logger.critical(stream) - logger.critical(info) + return config diff --git a/Filter/V2ray.py b/Filter/V2ray.py index e5768ac..f824307 100644 --- a/Filter/V2ray.py +++ b/Filter/V2ray.py @@ -90,7 +90,7 @@ kcpObject = rulesFilter({ 'optional': True, 'default': 'none', 'type': str, - 'format': lambda s: toStrTidy(s).replace('_', '-'), # TODO: '' => 'none' + 'format': lambda s: 'none' if s == '' else toStrTidy(s).replace('_', '-'), 'filter': lambda s: s in udpObfuscations, 'errMsg': 'Unknown mKCP obfs method' }, diff --git a/Utils/Common/Format.py b/Utils/Common/Format.py index 53eb800..a7707b4 100644 --- a/Utils/Common/Format.py +++ b/Utils/Common/Format.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import re from IPy import IP from Utils.Logger import logger from Utils.Common.Coding import * @@ -48,3 +49,21 @@ def splitParam(params: str) -> dict: # split params for param in params.split('&'): ret[param.split('=', 1)[0]] = urlDecode(param.split('=', 1)[1]) return ret + + +def splitEdParam(path: str) -> tuple[str, int or None]: # split early-data option + if path.find('?') == -1: + return path, None + ed = None + params = [] + content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path) # ...?... + for field in content[2].split('&'): # ?param_1=...¶m_2=... + if not field.startswith('ed='): + params.append(field) + continue + ed = int(field[3:]) # ed=... + if ed is None: # ed param not found + return path, None + if len(params) == 0: # param -> [] + return content[1], ed + return '%s?%s' % (content[1], '&'.join(params)), ed diff --git a/test.py b/test.py index 76ab81d..e8f15a2 100755 --- a/test.py +++ b/test.py @@ -21,6 +21,8 @@ from Filter import Filter ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ') +# ret = Decoder.v2rayN('vmess://ew0KICAidiI6ICIyIiwNCiAgInBzIjogIummmea4rzLnur8iLA0KICAiYWRkIjogIjExOS4yOC44OC4yMzAiLA0KICAicG9ydCI6ICI0NDMiLA0KICAiaWQiOiAiODRhOGU2ZDItMWFkMy00NjEyLTgzNjItYTdjMjNlOWU5MzEyIiwNCiAgImFpZCI6ICIwIiwNCiAgInNjeSI6ICJhdXRvIiwNCiAgIm5ldCI6ICJ3cyIsDQogICJ0eXBlIjogIm5vbmUiLA0KICAiaG9zdCI6ICJ0ZXN0LnNjdXRyb2JvdC5jb20iLA0KICAicGF0aCI6ICIvdm1lc3M/ZGVtbz10cnVlJmVkPTIwNDgmdGVzdD1vayIsDQogICJ0bHMiOiAidGxzIiwNCiAgInNuaSI6ICJoay5zY3V0cm9ib3QuY29tIiwNCiAgImFscG4iOiAiIg0KfQ==') + # ret['info']['server'] = '[%s]' % ret['info']['server'] ret['info'] = Filter(ret['type'], ret['info'])