diff --git a/db_transfer.py b/db_transfer.py index f31a5d7..de0a481 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -41,16 +41,14 @@ class TransferBase(object): dt_transfer[id] = [self.last_get_transfer[id][0] - last_transfer[id][0], self.last_get_transfer[id][1] - last_transfer[id][1]] for id in curr_transfer.keys(): + if id in self.force_update_transfer: + continue #算出与上次记录的流量差值,保存于dt_transfer表 if id in last_transfer: if curr_transfer[id][0] + curr_transfer[id][1] - last_transfer[id][0] - last_transfer[id][1] <= 0: continue - if last_transfer[id][0] <= curr_transfer[id][0] and \ - last_transfer[id][1] <= curr_transfer[id][1]: - dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0], - curr_transfer[id][1] - last_transfer[id][1]] - else: - dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]] + dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0], + curr_transfer[id][1] - last_transfer[id][1]] else: if curr_transfer[id][0] + curr_transfer[id][1] <= 0: continue @@ -67,14 +65,16 @@ class TransferBase(object): update_transfer = self.update_all_user(dt_transfer) #返回有更新的表 for id in update_transfer.keys(): #其增量加在此表 - if id in self.force_update_transfer: #但排除在force_update_transfer内的 - if id in self.last_update_transfer: - del self.last_update_transfer[id] - self.force_update_transfer.remove(id) - else: + if id not in self.force_update_transfer: #但排除在force_update_transfer内的 last = self.last_update_transfer.get(id, [0,0]) self.last_update_transfer[id] = [last[0] + update_transfer[id][0], last[1] + update_transfer[id][1]] self.last_get_transfer = curr_transfer + for id in self.force_update_transfer: + if id in self.last_update_transfer: + del self.last_update_transfer[id] + if id in self.last_get_transfer: + del self.last_get_transfer[id] + self.force_update_transfer = set() def del_server_out_of_bound_safe(self, last_rows, rows): #停止超流量的服务 @@ -117,6 +117,7 @@ class TransferBase(object): if not allow: logging.info('db stop server at port [%s]' % (port,)) ServerPool.get_instance().cb_del_server(port) + self.force_update_transfer.add(port) else: cfgchange = False if port in ServerPool.get_instance().tcp_servers_pool: @@ -135,6 +136,7 @@ class TransferBase(object): if cfgchange: logging.info('db stop server at port [%s] reason: config changed: %s' % (port, cfg)) ServerPool.get_instance().cb_del_server(port) + self.force_update_transfer.add(port) new_servers[port] = (passwd, cfg) elif allow and ServerPool.get_instance().server_run_status(port) is False: @@ -146,6 +148,7 @@ class TransferBase(object): else: logging.info('db stop server at port [%s] reason: port not exist' % (row['port'])) ServerPool.get_instance().cb_del_server(row['port']) + self.clear_cache(row['port']) if row['port'] in self.port_uid_table: del self.port_uid_table[row['port']] @@ -156,13 +159,17 @@ class TransferBase(object): passwd, cfg = new_servers[port] self.new_server(port, passwd, cfg) + def clear_cache(self, port): + if port in self.force_update_transfer: del self.force_update_transfer[port] + if port in self.last_get_transfer: del self.last_get_transfer[port] + if port in self.last_update_transfer: del self.last_update_transfer[port] + def new_server(self, port, passwd, cfg): protocol = cfg.get('protocol', ServerPool.get_instance().config.get('protocol', 'origin')) method = cfg.get('method', ServerPool.get_instance().config.get('method', 'None')) obfs = cfg.get('obfs', ServerPool.get_instance().config.get('obfs', 'plain')) logging.info('db start server at port [%s] pass [%s] protocol [%s] method [%s] obfs [%s]' % (port, passwd, protocol, method, obfs)) ServerPool.get_instance().new_server(port, cfg) - self.force_update_transfer.add(port) def cmp(self, val1, val2): if type(val1) is bytes: @@ -230,7 +237,7 @@ class DbTransfer(TransferBase): "user": "ss", "password": "pass", "db": "shadowsocks", - "node_id": 1, + "node_id": 0, "transfer_mul": 1.0, "ssl_enable": 0, "ssl_ca": "", @@ -261,8 +268,9 @@ class DbTransfer(TransferBase): for id in dt_transfer.keys(): transfer = dt_transfer[id] #小于最低更新流量的先不更新 - update_trs = 1024 * max(2048 - self.user_pass.get(id, 0) * 64, 16) - if transfer[0] + transfer[1] < update_trs: + update_trs = 1024 * (2048 - self.user_pass.get(id, 0) * 64) + if transfer[0] + transfer[1] < update_trs and id not in self.force_update_transfer: + self.user_pass[id] = self.user_pass.get(id, 0) + 1 continue if id in self.user_pass: del self.user_pass[id] @@ -372,7 +380,7 @@ class Dbv3Transfer(DbTransfer): transfer = dt_transfer[id] bandwidth_thistime = bandwidth_thistime + transfer[0] + transfer[1] - update_trs = 1024 * max(2048 - self.user_pass.get(id, 0) * 64, 16) + update_trs = 1024 * (2048 - self.user_pass.get(id, 0) * 64) if transfer[0] + transfer[1] < update_trs: self.user_pass[id] = self.user_pass.get(id, 0) + 1 continue