123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- #!/usr/bin/env/python
- # -*- coding:utf-8 -*-
- import logging
- import json
- import os, re
- import sys
- import datetime
- import threading, time, multiprocessing
- import jieba
- import hanlp
- import requests
- import logging.handlers
- jieba.lcut("load_jieba")
- class myLog(object):
- """
- 封装后的logging
- """
- def __init__(self, logger=None, log_cate='my_log'):
- """
- 指定保存日志的文件路径,日志级别,以及调用文件
- 将日志存入到指定的文件中
- :param logger:
- :param log_cate: 日志名
- """
- # 创建一个logger
- self.logger = logging.getLogger(logger)
- self.logger.setLevel(logging.INFO) # DEBUG
- # 创建一个handler,用于写入日志文件
- self.log_name = os.path.join(parse_log_dir, '{}.log'.format(log_cate)) # 日志地址
- # if os.path.exists(self.log_name): # 设置日志定长自动新建
- # logsize = os.path.getsize(self.log_name)
- # if logsize > 110000000: # 110M
- # os.rename(self.log_name, os.path.join(parse_log_dir, '{}_{}.log'.format(log_cate, int(time.time()))))
- # # 有6个日志时,开始删除最早的一份
- # logfiles = [int(re.search("e2cc_log_(\d+)\.log", i).group(1)) for i in os.listdir(parse_log_dir)
- # if re.search("e2cc_log_\d+\.log", i)]
- # if len(logfiles) > 5:
- # os.remove(os.path.join(parse_log_dir, "e2cc_log_{}.log".format(min(logfiles))))
- fh = logging.handlers.RotatingFileHandler(self.log_name, maxBytes=120000000, backupCount=5,
- mode='a', encoding='utf-8', delay=True)
- # temp.log设置删除条件
- temp_log = os.path.join(parse_log_dir, 'temp.log')
- if os.path.exists(temp_log):
- logsize = os.path.getsize(temp_log)
- if logsize > 110000000: # 110M
- os.remove(temp_log)
- # fh = logging.FileHandler(self.log_name, mode='a', encoding='utf-8', delay=True)
- fh.setLevel(logging.INFO)
- # 定义handler的输出格式pip
- formatter_dict = {
- "host-ip": "{}".format(external_ip),
- "log-msg": "%(message)s",
- "other-msg": "%(filename)s-%(lineno)s-%(levelname)s-%(asctime)s"
- }
- formatter = logging.Formatter(json.dumps(formatter_dict, ensure_ascii=False))
- # 第二种格式使用方法
- # formatter_dict = '{"host-ip": ' + '"{}"'.format(external_ip) \
- # + ', "log-msg": "%(message)s", '\
- # '"other-msg": "%(filename)s-%(lineno)s-%(asctime)s"}'
- # formatter = logging.Formatter(formatter_dict)
- fh.setFormatter(formatter)
- self.logger.addHandler(fh) # 给logger添加handler
- # 添加下面一句,在记录日志之后移除句柄
- # self.logger.removeHandler(ch)
- # self.logger.removeHandler(fh)
- # 关闭打开的文件
- fh.close()
- def getlog(self):
- return self.logger
- class simpLog(object):
- def __init__(self, logger=None, log_cate='simp_log'):
- """
- 指定保存日志的文件路径,日志级别,以及调用文件
- 将日志存入到指定的文件中
- :param logger:
- :param log_cate: 日志名
- """
- # 创建一个logger
- self.logger = logging.getLogger(logger)
- self.logger.setLevel(logging.INFO) # DEBUG
- # 创建一个handler,用于写入日志文件
- self.log_name = os.path.join(parse_log_dir, '{}.log'.format(log_cate)) # 日志地址
- if os.path.exists(self.log_name): # 设置日志定长自动新建
- logsize = os.path.getsize(self.log_name)
- if logsize > 20000000: # 20M
- os.rename(self.log_name, os.path.join(parse_log_dir, '{}_{}.log'.format(log_cate,
- datetime.datetime.now().strftime('%m_%d'))))
- fh = logging.FileHandler(self.log_name, mode='a', encoding='utf-8', delay=True)
- fh.setLevel(logging.INFO)
- # 定义handler的输出格式
- # formatter_dict = {
- # "ch2en": "%(message)s",
- # }
- # formatter = logging.Formatter(json.dumps(formatter_dict, ensure_ascii=False))
- formatter = logging.Formatter("%(message)s")
- fh.setFormatter(formatter)
- self.logger.addHandler(fh) # 给logger添加handler
- # 添加下面一句,在记录日志之后移除句柄
- # self.logger.removeHandler(ch)
- # self.logger.removeHandler(fh)
- # 关闭打开的文件
- fh.close()
- def getlog(self):
- return self.logger
- # (重写)MyThread.py线程类,使其能够返回值
- class MyProcess(multiprocessing.Process):
- def __init__(self, func, args=(), kwargs=None):
- super(MyProcess, self).__init__()
- self.func = func
- self.args = args
- self.kwargs = kwargs
- # 重写后的run()方法不再执行以前的run()方法了
- # 注意:即使加了return也不会返回值,如return self.func(*self.args)
- def run(self):
- if self.kwargs:
- self.result = self.func(self.kwargs["arg1"], self.kwargs["arg2"], self.kwargs["arg3"])
- else:
- self.result = self.func(*self.args)
- def get_result(self):
- # return self.result
- # 必须等待线程执行完毕,如果线程还未执行完毕就去获取result是没有结果的
- multiprocessing.Process.join(self)
- try:
- return self.result
- except Exception:
- return None
- # 获取百度云access_token
- class BaiduApi:
- def __init__(self):
- self.host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=OuO8mVZNIkDCP9eDQgF8txER&client_secret=2Wv28a9WBDKvkoXvcHMINNWZt8QdOzZg'
- self.expires_in = 0
- self.access_token = ""
- def update_access_token(self):
- """
- 判断是否需要更新access_token,默认 {expires_in * 0.5} 每15天更新一次
- 默认 expires_in = 2592000为30天,一天为86400
- :return:
- """
- try:
- print(self.host)
- response = requests.get(self.host)
- if response.status_code == 200:
- result = response.json()
- # print(result)
- self.access_token = result["access_token"]
- self.expires_in = int(result["expires_in"])
- print(f"开启access_token: {self.access_token} expires_in: {self.expires_in}")
- else:
- print("获取Access Token失败")
- self.access_token = ""
- except Exception as ex:
- print("获取Access Token异常", ex)
- self.access_token = ""
- return self.access_token, self.expires_in
- #################################################################
- class LocalCfg: # testing
- internal_ip = '192.168.1.65' # internal
- external_ip = '192.168.1.65' # external
- server_port = 10116
- public_bucket = 'zxhx-1302712961' # 桶名称
- region = "ap-shanghai" # 存储桶地域
- public_bucket_addr = 'zxhx-1302712961.cos.ap-shanghai.myqcloud.com'
- PyTorch_REST_API_URL = "http://192.168.1.209:7015/{0}/{1}"
- # parse_log_dir = "G:/zwj/WL/en2cn/logs"
- # client = pymongo.MongoClient(host="192.168.1.140", port=27017)
- # mycol = client["hfs_math"]["e2cc"]
- correct_mod = "book"
- class TestingCfg: # testing
- internal_ip = '192.168.1.192' # internal
- external_ip = '192.168.1.192' # external
- server_port = 10116
- public_bucket = 'zxhx-1302712961' # 桶名称
- region = "ap-shanghai" # 存储桶地域
- public_bucket_addr = 'zxhx-1302712961.cos.ap-shanghai.myqcloud.com'
- PyTorch_REST_API_URL = "http://192.168.1.209:7015/{0}/{1}"
- # parse_log_dir = "G:/zwj/WL/en2cn/logs"
- # client = pymongo.MongoClient(host="192.168.1.140", port=27017)
- # mycol = client["hfs_math"]["e2cc"]
- correct_mod = "general"
- class ProductionCfg: # production
- internal_ip = '0.0.0.0' # internal
- external_ip = '49.232.72.198' if sys.argv[-1] == '198' else '82.156.64.176' # external
- server_port = 10116
- public_bucket = 'zxhx-pro-1302712961' # 桶名称
- region = "ap-beijing" # 存储桶地域
- public_bucket_addr = 'zxhx-pro-1302712961.cos.ap-beijing.myqcloud.com' # 桶地址
- PyTorch_REST_API_URL = "http://api.cv.zxhx.com/{0}/{1}"
- # parse_log_dir = "E:/en2cn/logs"
- # mongo_ip = ""
- # mongo_port = ""
- correct_mod = "general"
- config_class = LocalCfg # 没有参数时,默认按测试环境
- if len(sys.argv) > 1:
- print(sys.argv, sys.argv[0])
- if sys.argv[1] == 'test':
- config_class = TestingCfg
- elif sys.argv[1] == 'product':
- config_class = ProductionCfg
- else:
- print('cmd should be: python server.py test')
- print('or: python server.py product')
- raise ValueError("命令不正确")
- server_ip = config_class.internal_ip
- external_ip = config_class.external_ip
- server_port = config_class.server_port
- correct_mod = config_class.correct_mod
- # mycol = config_class.mycol
- region = config_class.region
- public_bucket = config_class.public_bucket
- PyTorch_REST_API_URL = config_class.PyTorch_REST_API_URL
- dirpath = os.getcwd()
- print("dirpath:", dirpath)
- parse_log_dir = os.path.join(dirpath, "logs")
- if not os.path.isdir(parse_log_dir):
- os.makedirs(parse_log_dir)
- LANG_EMB_MODEL = {
- "eng": os.path.join(dirpath, "Bert_Base/bert-base-uncased"),
- "cn": os.path.join(dirpath, "Bert_Base/bert-base-chinese"),
- "all": os.path.join(dirpath, "Bert_Base/all-MiniLM-L6-v2")
- }
- if config_class == ProductionCfg:
- LANG_EMB_MODEL["all"] = "/home/zhengwenjuan/Models/all-MiniLM-L6-v2"
- secret_id = "AKIDC9pETRbZfWBbmhoglkT4PUJGzRjmj3Ia" # "云 API 密钥 SecretId";
- secret_key = "C6jlX4LKfleGdmfQvGNgj74lESRpBIEJ" # "云 API 密钥 SecretKey";
- # APPID = '1302712961'
- # TIMEOUT = 30
- token = None # 使用临时密钥需要传入Token,默认为空,可不填
- path_phrase_yhk = os.path.join(dirpath, "files/main/en-ch_phrase_dict_yhk.json")
- path_txt = os.path.join(dirpath, "files/main/en-ch_dict_from_txt.json")
- path_word_yhk = os.path.join(dirpath, "files/main/en-ch_dict_from_3_website.json")
- path_word_bd = os.path.join(dirpath, "files/main/en-ch_dict_bd.json")
- phrase_dict_yhk = json.loads(open(path_phrase_yhk, encoding="utf8").read())
- word_dict_from_txt = json.loads(open(path_txt, encoding="utf8").read())
- word_dict_yhk = json.loads(open(path_word_yhk, encoding="utf8").read())
- word_dict_bd = json.loads(open(path_word_bd, encoding="utf8").read())
- path1 = os.path.join(dirpath, "files/main/dict_ch2en_merge.json")
- dict_ch2en = json.loads(open(path1, encoding="utf8").read())
- # # hanlp相关模型
- sts = hanlp.load(hanlp.pretrained.sts.STS_ELECTRA_BASE_ZH)
- pos = hanlp.load(hanlp.pretrained.pos.PKU_POS_ELECTRA_SMALL)
- dict_tags = {'明白': ["v", "n"], "意识": "n"}
- pos.dict_tags = dict_tags
- # 获取百度云access_token
- # bce_access_token, bce_expires_in = BaiduApi().update_access_token()
- # last_token_update_time = time.time()
- # print("开启时间:", last_token_update_time)
|