from gevent import monkey; monkey.patch_all() from flask import Flask, request, jsonify from flask_apscheduler import APScheduler from gevent.pywsgi import WSGIServer import requests from concurrent.futures import ThreadPoolExecutor import config from hnsw_model import HNSW from restart_server import restart_math_dup_app app = Flask(__name__) class APS_Config(object): SCHEDULER_API_ENABLED = True scheduler = APScheduler() # 定时更新HNSW模型增/改/删变化 @scheduler.task('cron', id='hnsw_update', day='*', hour='01', minute='10', second='00', timezone='Asia/Shanghai') def hnsw_update_schedule(): if sum(hnsw.hnsw_update_flag_list) > 0: # 保存HNSW模型 hnsw.save_hnsw() # 重启math_dup_app服务 restart_math_dup_app() # 日志采集 hnsw_retrieve_logger.info(config.log_msg.format(id="数据更新", type="HNSW Update", message="HNSW模型定时更新完毕")) # 手动HNSW数据更新 @app.route('/hnsw/update', methods=['GET', 'POST']) def hnsw_update(): if request.method == 'POST': # 获取post数据 update_command = request.get_json() # 日志采集 if update_command == "save" and sum(hnsw.hnsw_update_flag_list) > 0: # 保存HNSW模型 hnsw.save_hnsw() # 日志采集 hnsw_retrieve_logger.info(config.log_msg.format(id="数据更新", type="hnsw/update", message="手动HNSW数据更新完毕")) return jsonify("手动HNSW数据更新完毕") elif update_command != "save" or sum(hnsw.hnsw_update_flag_list) == 0: return jsonify("不符合手动更新条件") # hnsw模型数据检索 @app.route('/retrieve', methods=['GET', 'POST']) def retrieve(): if request.method == 'POST': # 获取post数据 query_dict = request.get_json() # HNSW检索 query_labels = hnsw.retrieve(query_dict["query_vec"], query_dict["hnsw_index"]) return jsonify(query_labels) # hnsw模型数据更新 @app.route('/update', methods=['GET', 'POST']) def update(): if request.method == 'POST': # 获取post数据 update_dict = request.get_json() # 更新HNSW模型 hnsw.update(update_dict["id"], update_dict["hnsw_index"]) # 追加保存hnsw_update_data.txt中更新数据,等待定时或手动更新 with open("hnsw_update_data.txt", 'a', encoding='utf8') as f: f.write(str(update_dict)+"\n") return jsonify("数据更新完毕") # hnsw模型数据更新 @app.route('/chc/transfer', methods=['GET', 'POST']) def chc_transfer(): if request.method == 'POST': # 获取post数据 topics_dict = request.get_json() # 异步查重 executor = ThreadPoolExecutor(max_workers=1) def async_chc(): nonlocal topics_dict # 将结果post给callback_url requests.post(r"http://localhost:8855/chc/process", json=topics_dict).json() # 线程内存回收 nonlocal executor executor.shutdown() executor.submit(async_chc) return jsonify("") if __name__ == '__main__': # 日志采集初始化 hnsw_retrieve_LogConfig = config.LogConfig(config.math_dup_path, "hnsw_retrieve") hnsw_retrieve_logger = hnsw_retrieve_LogConfig.get_log() # HNSW模型初始化 hnsw = HNSW(hnsw_retrieve_logger) # 定时更新HNSW模型增/改/删变化 app.config.from_object(APS_Config()) scheduler.init_app(app) scheduler.start() # app.run(host='0.0.0.0',port='8858') server = WSGIServer(('0.0.0.0', 8858), app) server.serve_forever()