You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

101 lines
4.4 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import copy
from Basis.Logger import logging
from Basis.Functions import genFlag
from Basis.Exception import managerException
class Task(object):
""" Manage global check task.
"""
__tasks = {} # task status -> loaded / running / complete
__unions = {} # one union include multi tasks
__TASK_LOADED = -1
__TASK_RUNNING = 0
__TASK_FINISH = 1
def __init__(self):
logging.info('Manager start')
def listUnion(self) -> list: # get all union ids
return [x for x in self.__unions]
def isUnion(self, unionId: str) -> bool: # check if the union id exist
return unionId in self.__unions
def addUnion(self, union: list) -> str: # add union to manager (include multi tasks)
tasks = {} # temporary task storage
taskIds = [] # task id list for manage union
unionId = genFlag(length = 12) # generate union id (12 bytes)
logging.debug('Manager start to load union [%s]' % unionId)
for task in union:
taskId = genFlag(length = 16) # generate task id (16 bytes)
taskIds.append(taskId)
tasks[taskId] = {
'status': self.__TASK_LOADED, # task status -> loaded
'data': copy.deepcopy(task) # save task info
}
logging.info('Manager add task [%s] -> %s' % (taskId, task))
self.__tasks.update(tasks) # load into task list
self.__unions[unionId] = {
'finish': False,
'items': taskIds # record task items
}
logging.info('Manager add union [%s] -> %s' % (unionId, taskIds))
return unionId
def delUnion(self, unionId) -> None: # remove union
if unionId not in self.__unions:
logging.error('Manager union [%s] not found' % unionId)
raise managerException('Union id not found')
if not self.__unions[unionId]['finish']: # some tasks are still running
raise managerException('Couldn\'t remove working union')
self.__unions.pop(unionId)
def getUnion(self, unionId: str) -> dict: # get union status (remove tasks when all completed)
if unionId not in self.__unions:
logging.error('Manager union [%s] not found' % unionId)
raise managerException('Union id not found')
if self.__unions[unionId]['finish']: # all tasks are finished
return self.__unions[unionId]
tasks = self.__unions[unionId]['items']
finishNum = 0
for taskId in tasks:
if self.__tasks[taskId]['status'] == self.__TASK_FINISH: # get number of completed task
finishNum += 1
logging.info('Manager statistics union [%s] -> %i/%i' % (unionId, finishNum, len(tasks)))
if finishNum < len(tasks): # some tasks are not yet completed
logging.debug('Manager union [%s] still working' % unionId)
return {
'finish': False,
'percent': round(finishNum / len(tasks), 2)
}
self.__unions[unionId]['result'] = []
for taskId in tasks:
task = self.__tasks[taskId]
self.__tasks.pop(taskId) # remove from task list
self.__unions[unionId]['result'].append(task['data'])
self.__unions[unionId]['finish'] = True
self.__unions[unionId].pop('items')
logging.info('Manager release union [%s] -> %s' % (unionId, self.__unions[unionId]['result']))
return self.__unions[unionId]
def popTask(self) -> tuple[str or None, any]: # fetch a loaded task
for taskId, task in self.__tasks.items():
if task['status'] != self.__TASK_LOADED: continue # only get loaded task
task['status'] = self.__TASK_RUNNING # set task status as running
logging.info('Manager pop task [%s] -> %s' % (taskId, task['data']))
return taskId, copy.deepcopy(task['data'])
logging.debug('Manager has no more task')
raise managerException('No more tasks')
def finishTask(self, taskId: str, taskData: dict) -> None: # update task data when completed
if taskId not in self.__tasks:
logging.error('Manager task [%s] not found' % taskId)
raise managerException('Task id not found')
self.__tasks[taskId]['data'] = copy.deepcopy(taskData)
self.__tasks[taskId]['status'] = self.__TASK_FINISH # set task status as completed
Manager = Task() # global task manager