今天正在做新项目的框架,比如下面的gen_task方法是一个很耗时的方法,这样的方法在系统里有很多,于是就写了一个装饰器nuke_long_task用于处理长请求事务,用它去处理一些常规通用的业务,比如记录和更新状态等。
1 2 3 4 5 6 7
| @nuke_long_task() def gen_task(data): print(" start -- " + threading.current_thread().name) time.sleep(15) print(" finish -- " + threading.current_thread().name) return "%s finish" % (threading.current_thread().name)
|
装饰器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| def nuke_long_task(): def out_wrapper(func): @wraps(func) def wrapper(*args, **kwargs):
transaction_id = uuid.uuid4().hex update_transaction(transaction_id, TASK_RUNNING) try: result = func(*args, **kwargs) update_transaction(transaction_id, TASK_DONE) except Exception as exc: traceback.print_exc() update_transaction(transaction_id, TASK_FAILED, error_msg=str(exc)) result = {"error": str(exc)}, 500 return result
return result
wrapper.nuke_long_task_func = True
return wrapper
return out_wrapper
|
遇到的问题是,当想把这个业务提交到线程池之前,想去判断一下这个方法gen_task有没有被正确加上这个装饰器nuke_long_task,找了一圈,没有发现好办法,从gen_task的函数中直接找到与nuke_long_task的相关性,于是换了个思路,在装饰器里给加了一行“wrapper.nuke_long_task_func = True”,就可以解决这个问题了。
提交任务:
1
| submit_jobs(gen_task, "datas", get_result)
|
检查方法:
1 2 3 4 5 6 7
| def submit_jobs(task_func, task_func_args, callback_func=None): if hasattr(task_func, 'nuke_long_task_func'): future = executor.submit(task_func, task_func_args) else: raise Exception("need to add @nuke_long_task() for task_func") if callback_func is not None: future.add_done_callback(callback_func)
|
以上