diff --git a/shadowsocks/obfsplugin/plain.py b/shadowsocks/obfsplugin/plain.py index 8c6355c..593a23f 100644 --- a/shadowsocks/obfsplugin/plain.py +++ b/shadowsocks/obfsplugin/plain.py @@ -21,76 +21,81 @@ import os import sys import hashlib import logging +import typing +from ..obfs import (server_info as ServerInfo) from shadowsocks.common import ord + def create_obfs(method): return plain(method) -obfs_map = { - 'plain': (create_obfs,), - 'origin': (create_obfs,), + +obfs_map: typing.Dict[str, tuple] = { + 'plain': (create_obfs,), + 'origin': (create_obfs,), } + class plain(object): - def __init__(self, method): + def __init__(self, method: str): self.method = method - self.server_info = None + self.server_info: ServerInfo = None - def init_data(self): + def init_data(self) -> bytes: return b'' - def get_overhead(self, direction): # direction: true for c->s false for s->c + def get_overhead(self, direction) -> int: # direction: true for c->s false for s->c return 0 - def get_server_info(self): + def get_server_info(self) -> ServerInfo: return self.server_info - def set_server_info(self, server_info): + def set_server_info(self, server_info: ServerInfo): self.server_info = server_info - def client_pre_encrypt(self, buf): + def client_pre_encrypt(self, buf: bytes) -> bytes: return buf - def client_encode(self, buf): + def client_encode(self, buf: bytes) -> bytes: return buf - def client_decode(self, buf): + def client_decode(self, buf: bytes) -> typing.Tuple[bytes, bool]: # (buffer_to_recv, is_need_to_encode_and_send_back) return (buf, False) - def client_post_decrypt(self, buf): + def client_post_decrypt(self, buf: bytes) -> bytes: return buf - def server_pre_encrypt(self, buf): + def server_pre_encrypt(self, buf: bytes) -> bytes: return buf - def server_encode(self, buf): + def server_encode(self, buf: bytes) -> bytes: return buf - def server_decode(self, buf): + def server_decode(self, buf: bytes) -> typing.Tuple[bytes, bool, bool]: # (buffer_to_recv, is_need_decrypt, is_need_to_encode_and_send_back) return (buf, True, False) - def server_post_decrypt(self, buf): + def server_post_decrypt(self, buf: bytes) -> typing.Tuple[bytes, bool]: return (buf, False) - def client_udp_pre_encrypt(self, buf): + def client_udp_pre_encrypt(self, buf: bytes) -> bytes: return buf - def client_udp_post_decrypt(self, buf): + def client_udp_post_decrypt(self, buf: bytes) -> bytes: return buf - def server_udp_pre_encrypt(self, buf, uid): + def server_udp_pre_encrypt(self, buf: bytes, uid) -> bytes: return buf - def server_udp_post_decrypt(self, buf): + def server_udp_post_decrypt(self, buf: bytes) -> typing.Tuple[bytes, any]: return (buf, None) def dispose(self): pass - def get_head_size(self, buf, def_value): + def get_head_size(self, buf: bytes, def_value: int) -> int: if len(buf) < 2: return def_value head_type = ord(buf[0]) & 0x7 @@ -101,4 +106,3 @@ class plain(object): if head_type == 3: return 4 + ord(buf[1]) return def_value -