Browse Source

feat: add v2rayn url decode

dev
dnomd343 2 years ago
parent
commit
5da6884c8e
  1. 2
      Basis/Filter.py
  2. 17
      Basis/Functions.py
  3. 78
      Decoder/VMess.py
  4. 1
      Decoder/__init__.py
  5. 2
      Filter/V2ray.py
  6. 20
      Utils/Common/Host.py
  7. 1
      Utils/Common/__init__.py
  8. 5
      test.py

2
Basis/Filter.py

@ -81,12 +81,14 @@ def Filter(raw: dict, rules: dict) -> dict:
else: # key exist
data[key] = raw[key]
# format process (data --[format]--> data)
# TODO: some 'none' value should be format as None
if data[key] is None: # key content is None
if not rule['allowNone']: # key is not allow None
raise filterException('Field `%s` shouldn\'t be None' % key)
continue # skip following process
try:
data[key] = rule['format'](data[key]) # run format
# TODO: format result maybe None, check allowNone again (allow -> skip following process)
except:
raise filterException(rule['errMsg']) # format error
# filter process (data --[type check (& filter check)]--> pass / non-pass)

17
Basis/Functions.py

@ -2,11 +2,10 @@
# -*- coding: utf-8 -*-
import re
import time
import uuid
import random
import hashlib
from IPy import IP
from Utils.Logger import logger
@ -44,20 +43,6 @@ def md5Sum(data: str, encode: str = 'utf-8') -> str:
return hashlib.md5(data.encode(encoding = encode)).hexdigest() # MD5 hash
def hostFormat(host: str, v6Bracket: bool = False) -> str:
try:
ip = IP(host)
if v6Bracket and ip.version() == 6:
return '[%s]' % str(ip) # [IPv6]
return str(ip) # IPv4 / IPV6
except: # not ip address
return host
def v6AddBracket(host: str) -> str: # add bracket for ipv6
return hostFormat(host, v6Bracket = True)
def genFlag(length: int = 12) -> str: # generate random task flag
flag = ''
for i in range(0, length):

78
Decoder/VMess.py

@ -0,0 +1,78 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from Utils.Logger import logger
from Utils.Exception import decodeException
from Utils.Common import base64Decode, checkScheme, hostFormat
def v2rayN(url: str) -> dict:
"""
FORMAT: vmess://BASE64-ENCODED-JSON-STRING
fields => v(=2) / ps / add / port / id / aid / scy / net / type / host / path / tls / sni / alpn
"""
config = {
'type': 'vmess',
'info': {}
}
info = config['info']
logger.debug('V2rayN url decode -> %s' % url)
url = json.loads(base64Decode(checkScheme(url, 'vmess', 'V2rayN')))
logger.debug('V2rayN json format -> %s' % url)
if int(url['v']) != 2:
logger.warning('V2rayN url with unknown version')
config['name'] = url['ps'] if 'ps' in url else '' # ps -> remark
info = {
'server': hostFormat(url['add']),
'port': url['port'],
'id': url['id'],
'aid': url['aid'] if 'aid' in url else 0, # default alter id -> 0
'method': url['scy'] if 'scy' in url else 'auto', # scy -> method (default = auto)
}
stream = {
'type': url['net'] if 'net' in url else 'tcp' # net -> stream.type (default = tcp)
}
if stream['type'] == 'tcp':
if 'http' in url and url['type'] == 'http': # type -> none / http
stream['obfs'] = {
'host': url['host'] if 'host' in url else '',
'path': url['path'] if 'path' in url else '',
}
elif stream['type'] == 'kcp':
stream['obfs'] = url['type'] if 'type' in url else 'none' # type -> obfs
stream['seed'] = url['path'] if 'path' in url else None # path -> seed
elif stream['type'] == 'ws':
if 'host' in url:
stream['host'] = url['host']
if 'path' in url:
try:
stream['ed'], stream['path'] = baseFunc.splitEdParam(url['path'])
except:
stream['path'] = url['path']
elif stream['type'] == 'h2':
if 'host' in url:
stream['host'] = url['host']
if 'path' in url:
stream['path'] = url['path']
elif stream['type'] == 'quic':
if 'type' in url:
stream['obfs'] = url['type']
if 'host' in url:
stream['method'] = url['host']
if 'path' in url:
stream['passwd'] = url['path']
elif stream['type'] == 'grpc':
if 'type' in url and url['type'] == 'multi':
stream['mode'] = 'multi'
if 'path' in url:
stream['service'] = url['path']
else:
logger.error('V2rayN url with unknown network type -> %s' % stream['type'])
raise decodeException('Unknown v2rayN network type')
info['stream'] = info
logger.debug('V2rayN url release -> %s', config)
logger.critical(stream)
logger.critical(info)

1
Decoder/__init__.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Decoder.VMess import v2rayN
from Decoder.ShadowsocksR import ssr
from Decoder.Shadowsocks import ssPlain, ssCommon, sip002

2
Filter/V2ray.py

@ -90,7 +90,7 @@ kcpObject = rulesFilter({
'optional': True,
'default': 'none',
'type': str,
'format': lambda s: toStrTidy(s).replace('_', '-'),
'format': lambda s: toStrTidy(s).replace('_', '-'), # TODO: '' => 'none'
'filter': lambda s: s in udpObfuscations,
'errMsg': 'Unknown mKCP obfs method'
},

20
Utils/Common/Host.py

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from IPy import IP
def v6AddBracket(host: str) -> str: # add bracket for ipv6
return hostFormat(host, v6Bracket = True)
def hostFormat(host: str, v6Bracket: bool = False) -> str:
try:
if host[:1] == '[' and host[-1:] == ']': # [IPv6]
host = host[1:-1] # remove extra bracket
ip = IP(host)
if v6Bracket and ip.version() == 6:
return '[%s]' % str(ip) # [IPv6]
return str(ip) # IPv4 / IPV6
except: # not ip address
return host

1
Utils/Common/__init__.py

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
from Utils.Common.Coding import urlEncode, urlDecode
from Utils.Common.Host import hostFormat, v6AddBracket
from Utils.Common.Coding import base64Encode, base64Decode
from Utils.Common.Network import isVacantPort, getAvailablePort
from Utils.Common.Coding import checkScheme, splitTag, splitParam

5
test.py

@ -14,7 +14,10 @@ from Filter import Filter
# ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP29iZnNwYXJhbT1ZMlUzTUdVeE5EY3dOekF1ZFhCa1lYUmxMbTFwWTNKdmMyOW1kQzVqYjIwJnByb3RvcGFyYW09TVRRM01EY3dPa0pGTTIxck9RJnJlbWFya3M9UlZoQlRWQk1SUSZncm91cD1kR1Z6ZEE')
ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP3Byb3RvcGFyYW09')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs')
ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ')
# ret['info']['server'] = '[%s]' % ret['info']['server']

Loading…
Cancel
Save