整理两个python工具

整理两个Python工具

仅供自己使用

Log封装工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# -*- coding: utf-8 -*-
import logging
import datetime, re
import time, random
import os
import ctypes
from logging.handlers import TimedRotatingFileHandler, BaseRotatingHandler
from pathlib import Path
import traceback

# 多进程或者多线程日志模块
try:
import codecs
except ImportError:
codecs = None


# 支持多进程的TimeRotatingFileHandler
class MultiProcessHandler(logging.FileHandler):
def __init__(self, filename, when='D', backupCount=0, encoding='utf-8', delay=False, log_path='logs'):
"""
filename 日志文件名,when 时间间隔的单位,backupCount 保留文件个数
delay 是否开启 OutSteam缓存
True 表示开启缓存,OutStream输出到缓存,待缓存区满后,刷新缓存区,并输出缓存数据到文件。
False表示不缓存,OutStrea直接输出到文件
"""
self.prefix = filename
self.backupCount = backupCount
self.when = when.upper()
# 正则匹配 年-月-日
# 正则写到这里就对了
self.extMath = r"^\d{4}-\d{2}-\d{2}"

# S 每秒建立一个新文件
# M 每分钟建立一个新文件
# H 每天建立一个新文件
# D 每天建立一个新文件
self.when_dict = {
'S': "%Y-%m-%d-%H-%M-%S",
'M': "%Y-%m-%d-%H-%M",
'H': "%Y-%m-%d-%H",
'D': "%Y-%m-%d"
}

# 日志文件日期后缀
self.suffix = self.when_dict.get(when)
# 源码中self.extMath写在这里
# 这个正则匹配不应该写到这里,不然非D模式下 会造成 self.extMath属性不存在的问题
# 不管是什么模式都是按照这个正则来搜索日志文件的。
# if self.when == 'D':
# 正则匹配 年-月-日
# self.extMath = r"^\d{4}-\d{2}-\d{2}"
if not self.suffix:
raise ValueError(u"指定的日期间隔单位无效: %s" % self.when)
# 拼接文件路径 格式化字符串
self.filefmt = os.path.join(log_path, "%s.%s.log" % (self.prefix, self.suffix))
# 使用当前时间,格式化文件格式化字符串
self.filePath = datetime.datetime.now().strftime(self.filefmt)
# 获得文件夹路径
_dir = os.path.dirname(self.filefmt)
try:
# 如果日志文件夹不存在,则创建文件夹
if not os.path.exists(_dir):
os.makedirs(_dir)
except Exception:
print(u"创建文件夹失败")
print(u"文件夹路径:" + self.filePath)
pass
if codecs is None:
encoding = None
# 调用FileHandler
logging.FileHandler.__init__(self, self.filePath, 'a+', encoding, delay)

def shouldChangeFileToWrite(self):
"""更改日志写入目的写入文件
return True 表示已更改,False 表示未更改"""
# 以当前时间获得新日志文件路径
_filePath = datetime.datetime.now().strftime(self.filefmt)
# 新日志文件日期 不等于 旧日志文件日期,则表示 已经到了日志切分的时候
# 更换日志写入目的为新日志文件。
# 例如 按 天 (D)来切分日志
# 当前新日志日期等于旧日志日期,则表示在同一天内,还不到日志切分的时候
# 当前新日志日期不等于旧日志日期,则表示不在
# 同一天内,进行日志切分,将日志内容写入新日志内。
if _filePath != self.filePath:
self.filePath = _filePath
return True
return False

def doChangeFile(self):
"""输出信息到日志文件,并删除多于保留个数的所有日志文件"""
# 日志文件的绝对路径
self.baseFilename = os.path.abspath(self.filePath)
# stream == OutStream
# stream is not None 表示 OutStream中还有未输出完的缓存数据
if self.stream:
# self.stream.flush()
# flush close 都会刷新缓冲区,flush不会关闭stream,close则关闭stream
self.stream.close()
# 关闭stream后必须重新设置stream为None,否则会造成对已关闭文件进行IO操作。
self.stream = None
# delay 为False 表示 不OutStream不缓存数据 直接输出
# 所有,只需要关闭OutStream即可
if not self.delay:
# 这个地方如果关闭colse那么就会造成进程往已关闭的文件中写数据,从而造成IO错误
# delay == False 表示的就是 不缓存直接写入磁盘
# self.stream.close()
# 我们需要重新在打开一次stream
self.stream = self._open()

# 删除多于保留个数的所有日志文件
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)

def getFilesToDelete(self):
"""获得过期需要删除的日志文件"""
# 分离出日志文件夹绝对路径
# split返回一个元组(absFilePath,fileName)
# 例如:split('I:\ScripPython\char4\mybook\util\logs\mylog.2017-03-19)
# 返回(I:\ScripPython\char4\mybook\util\logs, mylog.2017-03-19)
# _ 表示占位符,没什么实际意义,
dirName, _ = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
# self.prefix 为日志文件名 列如:mylog.2017-03-19 中的 mylog
# 加上 点号 . 方便获取点号后面的日期
prefix = self.prefix + '.'
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
# 日期后缀 mylog.2017-03-19 中的 2017-03-19
suffix = fileName[plen:]
# 匹配符合规则的日志文件,添加到result列表中
if re.compile(self.extMath).match(suffix):
result.append(os.path.join(dirName, fileName))
result.sort()

# 返回 待删除的日志文件
# 多于 保留文件个数 backupCount的所有前面的日志文件。
if len(result) < self.backupCount:
result = []
else:
result = result[:len(result) - self.backupCount]
return result

def emit(self, record):
"""发送一个日志记录
覆盖FileHandler中的emit方法,logging会自动调用此方法"""
try:
if self.shouldChangeFileToWrite():
self.doChangeFile()
logging.FileHandler.emit(self, record)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)


class MyTimedRotatingFileHandler(TimedRotatingFileHandler):
def __init__(self, filename, when='d', interval=1, backupCount=0,
encoding=None, delay=False, utc=False, atTime=None,
errors=None):
super().__init__(filename=filename, when=when, interval=interval, backupCount=backupCount,
encoding=encoding, delay=delay, utc=utc, atTime=atTime, errors=errors)

filename = self.baseFilename
if os.path.exists(filename):
t = os.stat(filename).st_ctime
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t)
# rolloverAtFmt = datetime.datetime.fromtimestamp(self.rolloverAt).strftime('%Y-%m-%d %H:%M:%S')
# print(f'log rolloverAt ===> {self.rolloverAt} rolloverAtFmt ===> {rolloverAtFmt}')


class DailyRotatingFileHandler(BaseRotatingHandler):
"""
同`logging.TimedRotatingFileHandler`类似,不过这个handler:
- 可以支持多进程
- 只支持自然日分割
- 暂不支持UTC
"""

def __init__(self, filename, backupCount=0, encoding=None, delay=False, utc=False, log_path='logs', **kwargs):
self.backup_count = backupCount
self.utc = utc
self.suffix = "%Y-%m-%d"
self.base_log_path = Path(os.path.join(log_path, filename))
self.base_filename = self.base_log_path.name
self.current_filename = self._compute_fn()
self.current_log_path = self.base_log_path.with_name(self.current_filename)
BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)

def shouldRollover(self, record):
"""
判断是否该滚动日志,如果当前时间对应的日志文件名与当前打开的日志文件名不一致,则需要滚动日志
"""
if self.current_filename != self._compute_fn():
return True
return False

def doRollover(self):
"""
滚动日志
"""
# 关闭旧的日志文件
if self.stream:
self.stream.close()
self.stream = None

# 计算新的日志文件
self.current_filename = self._compute_fn()
self.current_log_path = self.base_log_path.with_name(self.current_filename)
# 打开新的日志文件
if not self.delay:
self.stream = self._open()

# 删除过期日志
self.delete_expired_files()

def _compute_fn(self):
"""
计算当前时间对应的日志文件名
"""
return self.base_filename + "." + time.strftime(self.suffix, time.localtime())

def _open(self):
"""
打开新的日志文件,同时更新base_filename指向的软链,修改软链不会对日志记录产生任何影响
"""
if self.encoding is None:
stream = open(str(self.current_log_path), self.mode)
else:
stream = codecs.open(str(self.current_log_path), self.mode, self.encoding)

# 删除旧的软链
if self.base_log_path.exists():
try:
# 如果base_log_path不是软链或者指向的日志文件不对,则先删除该软链
if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_filename:
os.remove(self.base_log_path)
except OSError:
pass

# 建立新的软链
try:
os.symlink(self.current_filename, str(self.base_log_path))
except OSError:
pass
return stream

def delete_expired_files(self):
"""
删除过期的日志
"""
if self.backup_count <= 0:
return

file_names = os.listdir(str(self.base_log_path.parent))
result = []
prefix = self.base_filename + "."
plen = len(prefix)
for file_name in file_names:
if file_name[:plen] == prefix:
suffix = file_name[plen:]
if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", suffix):
result.append(file_name)
if len(result) < self.backup_count:
result = []
else:
result.sort()
result = result[:len(result) - self.backup_count]

for file_name in result:
os.remove(str(self.base_log_path.with_name(file_name)))


class LogPrint:
def __init__(self):
pass

def info(self, msg):
print(msg)

def error(self, msg):
print(msg)

def debug(self, msg):
print(msg)

def printErrorLog(self, e):
print("Error Info: " + str(e))
print("Error File: " + str(e.__traceback__.tb_frame.f_globals["__file__"]))
print("Error Row: " + str(e.__traceback__.tb_lineno))
print(e.args)
print('Error Traceback:\n%s' % traceback.format_exc())

class LogTool:
def __init__(self, log_name='info', name='', log_path='logs', path='', randFlag=False):
self.LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
self.DATE_FORMAT = "%Y-%m-%d %H:%M:%S %a"

if name:
log_name = name

if path:
log_path = path

if randFlag:
log_name = '{}_{}{}'.format(log_name, int(time.time() * 1000), random.randint(100000, 999999))

self.log_name = log_name
self.log_path = log_path
self.log_level = logging.INFO
self.logger = logging.getLogger(log_name)
self.logger.root.setLevel(self.log_level)

# Windows CMD命令行 字体颜色定义 text colors
self.FOREGROUND_WHIT = 0x0f
self.FOREGROUND_GREEN = 0xf2 # green.

# 常规日志
def init_log(self, new_log_flag=False):
try:
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
except Exception as e:
pass

if not self.logger.handlers:
formatter = logging.Formatter(fmt=self.LOG_FORMAT, datefmt=self.DATE_FORMAT)

# 输出到console
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(self.log_level)
consoleHandler.setFormatter(formatter)
self.logger.addHandler(consoleHandler)

# 创建 TimedRotatingFileHandler,并设置文件名、滚动间隔和保留日志文件个数
fname = '%s.log' % self.log_name
logFile = os.path.join(self.log_path, fname)
# fileHandler = TimedRotatingFileHandler(filename=logFile,when='d',interval=1,backupCount=3,encoding='utf-8')
if new_log_flag:
fileHandler = MyTimedRotatingFileHandler(filename=logFile, when='MIDNIGHT', interval=1, backupCount=3,
encoding='utf-8')
else:
fileHandler = TimedRotatingFileHandler(filename=logFile, when='MIDNIGHT', interval=1, backupCount=3,
encoding='utf-8')
# MIDNIGHT 按照自然日分割日志
# fileHandler = TimedRotatingFileHandler(filename=logFile,when='MIDNIGHT',interval=1,backupCount=3,encoding='utf-8')
# fileHandler.suffix = '%Y-%m-%d.log'
# fileHandler.encoding = 'utf8'
fileHandler.setLevel(self.log_level)
fileHandler.setFormatter(formatter)
self.logger.addHandler(fileHandler)

return self.logger

# 多进程或者多线程日志
def init_multi_process_log(self):
try:
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
except Exception as e:
pass

if not self.logger.handlers:
formatter = logging.Formatter(fmt=self.LOG_FORMAT, datefmt=self.DATE_FORMAT)

# 输出到console
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(self.log_level)
consoleHandler.setFormatter(formatter)
self.logger.addHandler(consoleHandler)

fileHandler = MultiProcessHandler(self.log_name, 'D', 3, log_path=self.log_path)
fileHandler.setFormatter(formatter)
fileHandler.setLevel(self.log_level)
self.logger.addHandler(fileHandler)
return self.logger

def init_mpt_log(self):
try:
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
except Exception as e:
pass

if not self.logger.handlers:
formatter = logging.Formatter(fmt=self.LOG_FORMAT, datefmt=self.DATE_FORMAT)

# 输出到console
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(self.log_level)
consoleHandler.setFormatter(formatter)
self.logger.addHandler(consoleHandler)

fileHandler = DailyRotatingFileHandler(self.log_name, 3, 'utf-8')
fileHandler.setFormatter(formatter)
fileHandler.setLevel(self.log_level)
self.logger.addHandler(fileHandler)
return self.logger

def close_log(self):
logging.shutdown()

def printErrorLog(self, e):
log = self.init_log()
log.error("Error Info: " + str(e))
log.error("Error File: " + str(e.__traceback__.tb_frame.f_globals["__file__"]))
log.error("Error Row: " + str(e.__traceback__.tb_lineno))
log.error(e.args)
log.error('Error Traceback:\n%s' % traceback.format_exc())

def set_cmd_text_color(self, color):
# 字体颜色定义 ,关键在于颜色编码,由2位十六进制组成,分别取0~f,前一位指的是背景色,后一位指的是字体色
# 由于该函数的限制,应该是只有这16种,可以前景色与背景色组合。也可以几种颜色通过或运算组合,组合后还是在这16种颜色中
STD_INPUT_HANDLE = -10
STD_OUTPUT_HANDLE = -11
STD_ERROR_HANDLE = -12

handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
Bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
return Bool

# reset white
def resetColor(self, color):
self.set_cmd_text_color(color)

def sqlInfo(self, mess):
self.resetColor(self.FOREGROUND_GREEN)
self.init_log().info(str(mess))
self.resetColor(self.FOREGROUND_WHIT)

def get_curr_log(self):
app_log = os.path.join(self.log_path, self.log_name)

if os.path.exists(app_log):
return app_log

_path = Path(app_log)
fname = _path.stem
ext = _path.suffix[1:]
log_path = _path.parent
# hard_link = '_curr_'
hard_link = ''

fmt = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')

log_hard_link = '.'.join([fname, hard_link, ext])
log_name = '.'.join([fname, fmt, ext])

self.log_data(log_path=log_path, source=log_name, target=log_hard_link)

return os.path.join(log_path, log_hard_link)

def log_data(self, source='', target=''):
source_path = os.path.join(self.log_path, source)

if not os.path.exists(source_path):
return False

target_path = os.path.join(self.log_path, target)

if os.path.exists(target_path):
f_stat = os.stat(target_path)

if time.time() - f_stat.st_mtime >= 5:
if os.path.exists(target_path):
os.unlink(target_path)
os.link(source_path, target_path)

if not os.path.exists(target_path):
os.link(source_path, target_path)


'''
file.seek(offset,whence)
offset -- 开始的偏移量,也就是代表需要移动偏移的字节数
whence:可选,默认值为 0。给offset参数一个定义,表示要从哪个位置开始偏移;0代表从文件开头开始算起,1代表从当前位置开始算起,2代表从文件末尾算起

file.tell()
返回文件的当前位置,即文件指针当前位置
'''


# 获取日志末尾指定行数
def show_logs_foot(log_path, n, block=-4096, del_space=False):
if not os.path.exists(log_path):
return ''
with open(log_path, 'rb') as f:
# 指针移动到文件末尾
f.seek(0, 2)
# 返回指针当前位置
log_size = f.tell()
# print('fileSize: {}'.format(log_size))
# 如果文件为空则返回空
if log_size < 1:
return ''

while True:
# 判断offset是否大于文件字节数,是则读取所有行,并返回
if log_size >= abs(block):
# 将指针移动到倒数的字节数位置
f.seek(block, 2)
# 读取这个范围的文本
data = f.readlines()
# 排除空白字符
if del_space:
data_t = [t for t in data[1:] if t.strip()]
data_t_size = len(data_t)
if data_t_size > n:
return data_t[-n:]
if abs(block) < log_size:
block *= 2
continue
else:
return data_t

if len(data) > n:
return data[-n:]
else:
block *= 2
else:
# 将指针移动至文件开头
f.seek(0, 0)
# 读取这个范围的文本
data = f.readlines()
# 排除空白字符
if del_space:
data_t = [t for t in data if t.strip()]
return data_t
return data


# 获取日志头部指定行数
def show_logs_head(log_path, n, block=4096, del_space=False):
if not os.path.exists(log_path):
return ''
with open(log_path, 'rb') as f:
# 指针移动到文件末尾
f.seek(0, 2)
# 返回指针当前位置
log_size = f.tell()
# print('fileSize: {}'.format(log_size))
# 如果文件为空则返回空
if log_size < 1:
return ''

# 将指针移动到文件开头
f.seek(0, 0)

data = []
data_t = []
while True:
if f.tell() >= log_size:
if del_space:
return data_t
return data

data.append(f.readline())

if len(data) >= n:
# 排除空白字符
if del_space:
data_t = [t for t in data if t.strip()]
data_t_size = len(data_t)
if data_t_size >= n:
return data_t[:n + 1]
else:
continue
return data


def get_current_log_name(log_path):
_path = Path(log_path)
fname = _path.stem
ext = _path.suffix[1:]
log_path = _path.parent
hard_link = '_curr_'
# hard_link = ''
fmt = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
log_hard_link = '.'.join([fname,hard_link,ext])
#log_hard_link = '.'.join([fname, ext])
log_name1 = '.'.join([fname, ext, fmt])
log_name2 = '.'.join([fname, ext])
log_name3 = '.'.join([fname, fmt, ext])
source_path1 = os.path.join(log_path, log_name1)
source_path2 = os.path.join(log_path, log_name2)
source_path3 = os.path.join(log_path, log_name3)
target_path = os.path.join(log_path, log_hard_link)
try:
flag1 = False
flag2 = False
flag3 = False
sz_arr = [0,0,0]

if os.path.exists(source_path1):
sz_arr[0] = os.path.getsize(source_path1)
flag1 = True

if os.path.exists(source_path2):
sz_arr[1] = os.path.getsize(source_path2)
flag2 = True

if os.path.exists(source_path3):
sz_arr[2] = os.path.getsize(source_path3)
flag3 = True

sz_max = sz_arr[0]
sz_i = 0
for i in range(1,len(sz_arr)):
if sz_arr[i] > sz_max:
sz_max = sz_arr[i]
sz_i = i
if sz_i == 0:
flag1 = True
flag2 = False
flag3 = False
elif sz_i == 1:
flag1 = False
flag2 = True
flag3 = False
elif sz_i == 2:
flag1 = False
flag2 = False
flag3 = True
#print('sp1_sz: {} sp2_sz: {}'.format(sp1_sz, sp2_sz))
#print('flag1: {} flag2: {}'.format(flag1, flag2))
#print('sp1: {} sp2: {}'.format(source_path1, source_path2))
#print('flag1: {} flag2: {} flag3: {}'.format(flag1,flag2,flag3))

if flag2 and os.path.exists(source_path2):
if os.path.exists(target_path):
f_stat = os.stat(target_path)
if time.time() - f_stat.st_mtime >= 5:
if os.path.exists(target_path):
os.unlink(target_path)
os.link(source_path2, target_path)
if not os.path.exists(target_path):
os.link(source_path2, target_path)
elif flag1 and os.path.exists(source_path1):
if os.path.exists(target_path):
f_stat = os.stat(target_path)
if time.time() - f_stat.st_mtime >= 5:
if os.path.exists(target_path):
os.unlink(target_path)
os.link(source_path1, target_path)
if not os.path.exists(target_path):
os.link(source_path1, target_path)
elif flag3 and os.path.exists(source_path3):
if os.path.exists(target_path):
f_stat = os.stat(target_path)
if time.time() - f_stat.st_mtime >= 5:
if os.path.exists(target_path):
os.unlink(target_path)
os.link(source_path3, target_path)
if not os.path.exists(target_path):
os.link(source_path3, target_path)
except:
pass
return target_path

def printErrorLog(e, log=None):
show = print
if log:
show = log
show("Error Info: " + str(e))
show("Error File: " + str(e.__traceback__.tb_frame.f_globals["__file__"]))
show("Error Row: " + str(e.__traceback__.tb_lineno))
show(e.args)
show('Error Traceback:\n%s' % traceback.format_exc())

多线程或者多进程工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# -*- coding: utf-8 -*-
import os
import sys

import time
import random
from queue import Queue, LifoQueue
from threading import Event, Thread
from multiprocessing.managers import BaseManager, SyncManager
from multiprocessing.pool import ThreadPool
import multiprocessing as mp
import multiprocessing.dummy as mp_dummy
import traceback
import logging
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# 一般如果是计算型任务,线程池的大小设置为cpu_num+1个,如果是IO型任务,设置为2*cpu_num+1个线程

class MyManager(BaseManager):
pass
MyManager.register('Event', Event)
MyManager.register('Queue', LifoQueue)


def printErrorLog(e, log=None):
show = print
if log:
show = log
show("Error Info: " + str(e))
show("Error File: " + str(e.__traceback__.tb_frame.f_globals["__file__"]))
show("Error Row: " + str(e.__traceback__.tb_lineno))
show(e.args)
show('Error Traceback:\n%s' % traceback.format_exc())

class LogPrint:
def __init__(self):
pass

def info(self, msg):
print(msg)

def error(self, msg):
print(msg)

def debug(self, msg):
print(msg)

def init_process():
if 'win32' == sys.platform:
print('...freeze_support...')
mp.freeze_support()

# 多进程或者多线程主程序
class MultiProcessTask:
def __init__(self, pool_count=os.cpu_count(), log=None, app_queue_res_count=1, \
lifo_flag=False, thread_flag=True, result_flag=False, app_name='MultiProcessTask', \
app_func=None, show_log_flag=True, app_queue_req_max_size=0, \
app_queue_res_max_size=0, app_exec_timeout=None, new_impl=False, \
result_max_cache_size=None):
# 定义日志
self.log : logging = log
if log == None:
self.log = LogPrint()
if pool_count == None:
pool_count = os.cpu_count()
self.show_log_flag : bool = show_log_flag
# 定义线程数
self.pool_count = pool_count
# 定义应用名字
self.app_name = app_name
app_manager : SyncManager = None
if lifo_flag:
manager = MyManager()
manager.start()
app_manager = manager
#self.app_manager = Manager1()
pass
elif thread_flag:
# 定义多线程管理
app_manager = mp_dummy.Manager()
else:
# 定义多进程管理
app_manager = mp.Manager()
# 定义事件 用于通知关闭程序
self.app_event : Event = app_manager.Event()
# 定义请求队列
self.app_queue_req : Queue = app_manager.Queue(app_queue_req_max_size)
# 定义响应队列
self.app_queue_res : Queue = None
if result_flag:
self.app_queue_res = app_manager.Queue(app_queue_res_max_size)
# 定义进程池
self.pool : ThreadPool
if thread_flag:
self.pool = mp_dummy.Pool(self.pool_count)
else:
self.pool = mp.Pool(self.pool_count)
# 结果存取次数
self.app_queue_res_count = app_queue_res_count
# 定义程序执行最大时长
self.app_exec_timeout = app_exec_timeout
# 新方法实现
self.new_impl = new_impl
# 默认执行方法
self.new_func = None
# 定义执行方法
if app_func:
self.add_func(app_func)
else:
if not self.new_impl:
raise Exception('please init app_func!')
else:
self.add_func(None)
# 结果缓存
self.result_cache_dct = {}
# 结果ID列表
self.result_cache_id_list = []
# 结果最大缓存数量
# 最大缓存数量 = 线程数量 x 3
if result_max_cache_size:
self.result_max_cache_size = result_max_cache_size
else:
self.result_max_cache_size = self.pool_count * 3
if thread_flag:
self.log.info(f'{self.app_name} threads total: {self.pool_count} mpt init...')
else:
self.log.info(f'{self.app_name} process total: {self.pool_count} mpt init...')

# 定义异步错误信息
def err_callback(self, err: Exception, app_name=''):
self.log.error(f'{app_name} app_func error > {err}')
printErrorLog(err, self.log.error)
return True

# 初始化默认执行方法
def init_new_func(self, app_func):
self.new_func = app_func

# 实现方法传递
def handle_new_func(self, params={}, app_func=None):
key = '_app_func'
if key not in params:
if app_func:
params['_app_func'] = app_func
elif self.new_func:
params['_app_func'] = self.new_func
t = int(time.time() * 10000000)
rand = random.randint(1,99999999)
uk_id = f'{t}{rand}'
params['_app_req_id'] = uk_id
return uk_id


# 定义执行方法
def add_func(self, app_func):
mptf = MultiProcessTaskFunc(
log = self.log,
app_name = self.app_name,
app_event = self.app_event,
app_queue_req = self.app_queue_req,
app_queue_res = self.app_queue_res,
app_queue_res_count = self.app_queue_res_count,
app_func = app_func,
show_log_flag = self.show_log_flag,
app_exec_timeout=self.app_exec_timeout
)

for _ in range(self.pool_count):
if self.new_impl:
self.pool.apply_async(func=mptf.init_method,error_callback=self.err_callback)
else:
self.pool.apply_async(func=mptf.init_method_old,error_callback=self.err_callback)


# 等待执行完成,并且释放资源
def close(self, forceFlag=False):
if not forceFlag:
if not self.app_queue_req.empty():
self.app_queue_req.join()

if not self.app_event.is_set():
self.app_event.set()

for _ in range(self.pool_count):
self.app_queue_req.put(None)

self.pool.close()

if not forceFlag:
self.pool.join()

# 发送请求并拿到数据
def sendReqAndReceiveRes(self, params={}, timeout=180, app_func=None):
if self.new_impl:
self.handle_new_func(params,app_func)
self.app_queue_req.put(params)
if self.app_queue_res:
try:
res = self.app_queue_res.get(timeout=timeout)
self.app_queue_res.task_done()
if res:
return res
except:
pass
return {}
else:
return True

def send_req_and_receive_res(self, params={}, timeout=180):
self.sendReqAndReceiveRes(params,timeout)

# 等待请求执行完毕
def waitReqComplete(self):
self.app_queue_req.join()

def wait_req_complete(self):
self.waitReqComplete()

# 缓存返回结果
def cache_result_done_func(self, result:dict):
if self.result_max_cache_size < len(self.result_cache_id_list):
for _ in range(len(self.result_cache_id_list)-self.result_max_cache_size):
result_cache_key = self.result_cache_id_list.pop()
try:
del self.result_cache_dct[result_cache_key]
except Exception:
pass
try:
for k, v in result.items():
self.result_cache_id_list.append(k)
self.result_cache_dct[k] = v
#self.log.info(f'result_cache_id_list ==> {self.result_cache_id_list}')
#self.log.info(f'result_cache_dct ==> {self.result_cache_dct}')
except Exception:
pass

# 获取返回值实现方法
def get_cache_result_func(self, key):
res = None
key_ = None
if key in self.result_cache_dct:
res = self.result_cache_dct[key]
del self.result_cache_dct[key]
key_ = key
try:
self.result_cache_id_list.remove(key)
except Exception:
pass
return res, key_

# 获取返回值
def get_cache_result(self, key, wait_req_done=False):
while True:
res = self.get_cache_result_func(key)
if res[1] is not None:
return res[0]
if wait_req_done:
self.app_queue_req.join()
while not self.app_queue_res.empty():
try:
data = self.app_queue_res.get_nowait()
self.app_queue_res.task_done()
self.cache_result_done_func(data)
except Exception:
pass
time.sleep(0.1)

def getCacheResult(self, key, waitReqDone=False):
return self.get_cache_result(key,waitReqDone)

# 批量获取返回值
def batch_get_cache_result(self, keys:list, wait_req_done=False):
result_arr = []
for key in keys:
res = self.get_cache_result(key,wait_req_done)
result_arr.append(res)
return result_arr

def batchGetCacheResult(self, keys:list, waitReqDone=False):
return self.batch_get_cache_result(keys,waitReqDone)

# 获取返回值无key
def get_cache_result_no_key(self, wait_req_done=False):
while True:
if wait_req_done:
self.app_queue_req.join()
result_arr = []
while not self.app_queue_res.empty():
try:
data:dict = self.app_queue_res.get_nowait()
self.app_queue_res.task_done()
for k, v in data.items():
if wait_req_done:
result_arr.append(v)
else:
return v
except Exception:
pass
if wait_req_done:
return result_arr
time.sleep(0.1)

def getCacheResultNoKey(self, waitReqDone=False):
return self.get_cache_result_no_key(waitReqDone)


# 发送请求
def sendReq(self, params={}, app_func=None):
app_req_id = None
if self.new_impl:
app_req_id = self.handle_new_func(params,app_func)
self.app_queue_req.put(params)
return app_req_id

def send_req(self, params={}, app_func=None):
return self.sendReq(params,app_func)

# 批量发送请求
def sendReqArr(self, paramsArr=[]):
app_req_id_arr = []
for params in paramsArr:
app_req_id = self.sendReq(params)
app_req_id_arr.append(app_req_id)
return app_req_id_arr

def send_req_arr(self, paramsArr=[]):
return self.sendReqArr(paramsArr)

# 清空请求
def clearReq(self):
while not self.app_queue_req.empty():
self.app_queue_req.get()
self.app_queue_req.task_done()
return True

def clear_req(self):
return self.clearReq()

# 清空请求与响应
def clearReqAndRes(self):
f1 = self.clearReq()
f2 = self.clearRes()
return f1 and f2

def clear_req_and_res(self):
return self.clearReqAndRes()

# 返回请求队列是否为空
def reqQueueIsEmpty(self):
return self.app_queue_req.empty()

def req_queue_is_empty(self):
return self.reqQueueIsEmpty()

# 返回响应队列是否为空
def resQueueIsEmpty(self):
return self.app_queue_res.empty()

def res_queue_is_empty(self):
return self.resQueueIsEmpty()

# 接收请求
def receiveRes(self, allFlag=False):
if allFlag:
self.waitReqComplete()
data_arr = []
if self.app_queue_res:
while not self.app_queue_res.empty():
data = self.app_queue_res.get()
self.app_queue_res.task_done()
data_arr.append(data)
return data_arr
else:
data = {}
if self.app_queue_res:
data = self.app_queue_res.get()
self.app_queue_res.task_done()
return data

def receive_res(self, all_flag=False):
return self.receiveRes(all_flag)

# 清空结果
def clearRes(self):
while not self.app_queue_res.empty():
self.app_queue_res.get()
self.app_queue_res.task_done()
return True

def clear_res(self):
return self.clearRes()

def sendReqNoResult(self, params={}):
if self.app_queue_req.qsize() < self.pool_count:
self.app_queue_req.put(params)
else:
self.app_queue_req.join()
return True

def send_req_no_result(self, params={}):
return self.sendReqNoResult(params)

def sendReqGetResult(self, params={}, results : list = [], timeout=None):
if not self.app_queue_res:
return False

if self.app_queue_req.qsize() >= self.pool_count:
for i in range(self.pool_count):
if self.app_queue_req.empty() and self.app_queue_res.empty():
break
res = self.app_queue_res.get(timeout=timeout)
self.app_queue_res.task_done()
results.append(res)

self.app_queue_req.put(params)

def send_req_get_result(self, params={}, results : list = [], timeout=None):
return self.sendReqGetResult(params, results, timeout)

def __enter__(self):
return self

def __exit__(self,exc_type,exc_val,exc_tb):
try:
if not exc_type is None:
self.log.info('exception_type: {}'.format(exc_type))
self.log.info('exception_val: {}'.format(exc_val))
self.log.info('exception_tb: {}'.format(exc_tb))
self.close(forceFlag=True)
raise exc_val
else:
pass
except Exception as e:
printErrorLog(e,self.log)
raise e
finally:
self.close()
return True

class MultiProcessTaskFunc:
def __init__(self, log=None, app_name=None, app_event=None, app_queue_req=None, app_queue_res=None, \
app_queue_res_count=None, app_func=None, show_log_flag=True, app_exec_timeout=None):
# 定义日志
self.log : logging = log
# 停止打印日志
self.show_log_flag : bool = show_log_flag
# 定义应用名字
self.app_name : str = app_name
# 定义事件 用于通知关闭程序
self.app_event : Event = app_event
# 定义请求队列
self.app_queue_req : Queue = app_queue_req
# 定义响应队列
self.app_queue_res : Queue = app_queue_res
# 结果存取次数
self.app_queue_res_count : int = app_queue_res_count
self.app_func = app_func
# 超时时间 默认永久
self.app_exec_timeout = app_exec_timeout

# 定义异步错误信息
def err_callback(self, err: Exception, app_name=''):
self.log.error(f'{app_name} app_func error > {err}')
printErrorLog(err, self.log.error)

# 定义执行方法并且设定超时时长
def handle_exec_app_func_old(self, app_req, task_index):
if self.app_exec_timeout:
try:
pool: ThreadPool = mp_dummy.Pool(1)
future = pool.apply_async(self.app_func,[app_req])
return future.get(self.app_exec_timeout)
except Exception as e:
self.err_callback(e,f'{self.app_name}_{task_index}')
finally:
pool.close()
else:
try:
return self.app_func(app_req)
except Exception as e:
self.err_callback(e,f'{self.app_name}_{task_index}')
return None

# 执行方法实现
def init_method_old(self):
task_index = f'{int(time.time())}{random.randint(100,999)}'

if self.show_log_flag:
self.log.info(f'{self.app_name}_{task_index} start...')

while not self.app_event.is_set():
req = self.app_queue_req.get()

if not req:
self.app_queue_req.task_done()
continue
res = self.handle_exec_app_func_old(req, task_index)
self.app_queue_req.task_done()
if self.app_queue_res:
for _ in range(self.app_queue_res_count):
try:
self.app_queue_res.put(res)
except:
pass

if self.show_log_flag:
self.log.info(f'{self.app_name}_{task_index} end...')


# 定义执行方法并且设定超时时长
def handle_exec_app_func(self, app_req, task_index):
app_func = None
if '_app_func' in app_req:
app_func = app_req['_app_func']
del app_req['_app_func']
else:
app_func = self.app_func
app_req_id = None
if '_app_req_id' in app_req:
app_req_id = app_req['_app_req_id']
del app_req['_app_req_id']
if self.app_exec_timeout:
try:
pool: ThreadPool = mp_dummy.Pool(1)
future = pool.apply_async(app_func,kwds=app_req)
res = future.get(self.app_exec_timeout)
if app_req_id:
return {app_req_id:res}
return res
except Exception as e:
self.err_callback(e,f'{self.app_name}_{task_index}')
finally:
pool.close()
else:
try:
res = app_func(**app_req)
if app_req_id:
return {app_req_id:res}
return res
except Exception as e:
self.err_callback(e,f'{self.app_name}_{task_index}')
return None

# 执行方法实现
def init_method(self):
task_index = f'{int(time.time())}{random.randint(100,999)}'

if self.show_log_flag:
self.log.info(f'{self.app_name}_{task_index} start...')

while not self.app_event.is_set():
req = self.app_queue_req.get()

if not req:
self.app_queue_req.task_done()
continue
res = self.handle_exec_app_func(req, task_index)
self.app_queue_req.task_done()
if self.app_queue_res:
for _ in range(self.app_queue_res_count):
try:
self.app_queue_res.put(res)
except:
pass

if self.show_log_flag:
self.log.info(f'{self.app_name}_{task_index} end...')