# -*- coding: utf-8 -*- import logging import datetime, re import time, random import os import ctypes from logging.handlers import TimedRotatingFileHandler, BaseRotatingHandler from pathlib import Path import traceback
# 获取日志末尾指定行数 defshow_logs_foot(log_path, n, block=-4096, del_space=False): ifnot os.path.exists(log_path): return'' with open(log_path, 'rb') as f: # 指针移动到文件末尾 f.seek(0, 2) # 返回指针当前位置 log_size = f.tell() # print('fileSize: {}'.format(log_size)) # 如果文件为空则返回空 if log_size < 1: return''
whileTrue: # 判断offset是否大于文件字节数,是则读取所有行,并返回 if log_size >= abs(block): # 将指针移动到倒数的字节数位置 f.seek(block, 2) # 读取这个范围的文本 data = f.readlines() # 排除空白字符 if del_space: data_t = [t for t in data[1:] if t.strip()] data_t_size = len(data_t) if data_t_size > n: return data_t[-n:] if abs(block) < log_size: block *= 2 continue else: return data_t
if len(data) > n: return data[-n:] else: block *= 2 else: # 将指针移动至文件开头 f.seek(0, 0) # 读取这个范围的文本 data = f.readlines() # 排除空白字符 if del_space: data_t = [t for t in data if t.strip()] return data_t return data
# 获取日志头部指定行数 defshow_logs_head(log_path, n, block=4096, del_space=False): ifnot os.path.exists(log_path): return'' with open(log_path, 'rb') as f: # 指针移动到文件末尾 f.seek(0, 2) # 返回指针当前位置 log_size = f.tell() # print('fileSize: {}'.format(log_size)) # 如果文件为空则返回空 if log_size < 1: return''
# 将指针移动到文件开头 f.seek(0, 0)
data = [] data_t = [] whileTrue: if f.tell() >= log_size: if del_space: return data_t return data
data.append(f.readline())
if len(data) >= n: # 排除空白字符 if del_space: data_t = [t for t in data if t.strip()] data_t_size = len(data_t) if data_t_size >= n: return data_t[:n + 1] else: continue return data
import time import random from queue import Queue, LifoQueue from threading import Event, Thread from multiprocessing.managers import BaseManager, SyncManager from multiprocessing.pool import ThreadPool import multiprocessing as mp import multiprocessing.dummy as mp_dummy import traceback import logging from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 缓存返回结果 defcache_result_done_func(self, result:dict): if self.result_max_cache_size < len(self.result_cache_id_list): for _ in range(len(self.result_cache_id_list)-self.result_max_cache_size): result_cache_key = self.result_cache_id_list.pop() try: del self.result_cache_dct[result_cache_key] except Exception: pass try: for k, v in result.items(): self.result_cache_id_list.append(k) self.result_cache_dct[k] = v #self.log.info(f'result_cache_id_list ==> {self.result_cache_id_list}') #self.log.info(f'result_cache_dct ==> {self.result_cache_dct}') except Exception: pass
# 获取返回值实现方法 defget_cache_result_func(self, key): res = None key_ = None if key in self.result_cache_dct: res = self.result_cache_dct[key] del self.result_cache_dct[key] key_ = key try: self.result_cache_id_list.remove(key) except Exception: pass return res, key_
# 获取返回值 defget_cache_result(self, key, wait_req_done=False): whileTrue: res = self.get_cache_result_func(key) if res[1] isnotNone: return res[0] if wait_req_done: self.app_queue_req.join() whilenot self.app_queue_res.empty(): try: data = self.app_queue_res.get_nowait() self.app_queue_res.task_done() self.cache_result_done_func(data) except Exception: pass time.sleep(0.1)
defsendReqGetResult(self, params={}, results : list = [], timeout=None): ifnot self.app_queue_res: returnFalse if self.app_queue_req.qsize() >= self.pool_count: for i in range(self.pool_count): if self.app_queue_req.empty() and self.app_queue_res.empty(): break res = self.app_queue_res.get(timeout=timeout) self.app_queue_res.task_done() results.append(res) self.app_queue_req.put(params) defsend_req_get_result(self, params={}, results : list = [], timeout=None): return self.sendReqGetResult(params, results, timeout)