From 90ae1fdc4034d0394082a056b0fbc002b86aaed8 Mon Sep 17 00:00:00 2001 From: BreakWa11 Date: Mon, 28 Dec 2015 15:05:47 +0800 Subject: [PATCH] refine manyuser thread start & stop --- db_transfer.py | 44 ++++++++++++++++++++++++++++++++------------ server.py | 14 +++++++++++--- server_pool.py | 3 +++ 3 files changed, 46 insertions(+), 15 deletions(-) diff --git a/db_transfer.py b/db_transfer.py index caf60f3..fb8f5d9 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -15,7 +15,9 @@ class DbTransfer(object): instance = None def __init__(self): + import threading self.last_get_transfer = {} + self.event = threading.Event() @staticmethod def get_instance(): @@ -148,6 +150,15 @@ class DbTransfer(object): logging.info('db stop server at port [%s] reason: port not exist' % (row['port'])) ServerPool.get_instance().cb_del_server(row['port']) + @staticmethod + def del_servers(): + for port in ServerPool.get_instance().tcp_servers_pool.keys(): + if ServerPool.get_instance().server_is_run(port) > 0: + ServerPool.get_instance().cb_del_server(port) + for port in ServerPool.get_instance().tcp_ipv6_servers_pool.keys(): + if ServerPool.get_instance().server_is_run(port) > 0: + ServerPool.get_instance().cb_del_server(port) + @staticmethod def thread_db(): import socket @@ -155,16 +166,25 @@ class DbTransfer(object): timeout = 60 socket.setdefaulttimeout(timeout) last_rows = [] - while True: - try: - DbTransfer.get_instance().push_db_all_user() - rows = DbTransfer.get_instance().pull_db_all_user() - DbTransfer.del_server_out_of_bound_safe(last_rows, rows) - last_rows = rows - except Exception as e: - trace = traceback.format_exc() - logging.error(trace) - #logging.warn('db thread except:%s' % e) - finally: - time.sleep(15) + try: + while True: + try: + DbTransfer.get_instance().push_db_all_user() + rows = DbTransfer.get_instance().pull_db_all_user() + DbTransfer.del_server_out_of_bound_safe(last_rows, rows) + last_rows = rows + except Exception as e: + trace = traceback.format_exc() + logging.error(trace) + #logging.warn('db thread except:%s' % e) + if DbTransfer.get_instance().event.wait(15): + break + except KeyboardInterrupt as e: + pass + DbTransfer.del_servers() + ServerPool.get_instance().stop() + + @staticmethod + def thread_db_stop(): + DbTransfer.get_instance().event.set() diff --git a/server.py b/server.py index ac32a7c..a28f026 100644 --- a/server.py +++ b/server.py @@ -39,15 +39,23 @@ class MainThread(threading.Thread): def run(self): db_transfer.DbTransfer.thread_db() + def stop(self): + db_transfer.DbTransfer.thread_db_stop() + def main(): shell.check_python() - if True: + if False: db_transfer.DbTransfer.thread_db() else: thread = MainThread() thread.start() - while True: - time.sleep(99999) + try: + while True: + time.sleep(99999) + except (KeyboardInterrupt, IOError, OSError) as e: + import traceback + traceback.print_exc() + thread.stop() if __name__ == '__main__': main() diff --git a/server_pool.py b/server_pool.py index 002ddfe..6c751c1 100644 --- a/server_pool.py +++ b/server_pool.py @@ -72,6 +72,9 @@ class ServerPool(object): ServerPool.instance = ServerPool() return ServerPool.instance + def stop(self): + self.loop.stop() + @staticmethod def _loop(loop, dns_resolver, mgr): try: