123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- from gevent import monkey; monkey.patch_all()
- from flask import Flask, request, jsonify
- from flask_apscheduler import APScheduler
- from gevent.pywsgi import WSGIServer
- import requests
- from concurrent.futures import ThreadPoolExecutor
- import config
- from hnsw_model import HNSW
- from restart_server import restart_math_dup_app
- app = Flask(__name__)
- class APS_Config(object):
- SCHEDULER_API_ENABLED = True
- scheduler = APScheduler()
- # 定时更新HNSW模型增/改/删变化
- @scheduler.task('cron', id='hnsw_update', day='*', hour='01', minute='10', second='00', timezone='Asia/Shanghai')
- def hnsw_update_schedule():
- if sum(hnsw.hnsw_update_flag_list) > 0:
- # 保存HNSW模型
- hnsw.save_hnsw()
- # 重启math_dup_app服务
- restart_math_dup_app()
- # 日志采集
- hnsw_retrieve_logger.info(config.log_msg.format(id="数据更新",
- type="HNSW Update",
- message="HNSW模型定时更新完毕"))
- # 手动HNSW数据更新
- @app.route('/hnsw/update', methods=['GET', 'POST'])
- def hnsw_update():
- if request.method == 'POST':
- # 获取post数据
- update_command = request.get_json()
- # 日志采集
- if update_command == "save" and sum(hnsw.hnsw_update_flag_list) > 0:
- # 保存HNSW模型
- hnsw.save_hnsw()
- # 日志采集
- hnsw_retrieve_logger.info(config.log_msg.format(id="数据更新",
- type="hnsw/update",
- message="手动HNSW数据更新完毕"))
- return jsonify("手动HNSW数据更新完毕")
- elif update_command != "save" or sum(hnsw.hnsw_update_flag_list) == 0:
- return jsonify("不符合手动更新条件")
- # hnsw模型数据检索
- @app.route('/retrieve', methods=['GET', 'POST'])
- def retrieve():
- if request.method == 'POST':
- # 获取post数据
- query_dict = request.get_json()
- # HNSW检索
- query_labels = hnsw.retrieve(query_dict["query_vec"], query_dict["hnsw_index"])
- return jsonify(query_labels)
- # hnsw模型数据更新
- @app.route('/update', methods=['GET', 'POST'])
- def update():
- if request.method == 'POST':
- # 获取post数据
- update_dict = request.get_json()
- # 更新HNSW模型
- hnsw.update(update_dict["id"], update_dict["hnsw_index"])
- # 追加保存hnsw_update_data.txt中更新数据,等待定时或手动更新
- with open("hnsw_update_data.txt", 'a', encoding='utf8') as f:
- f.write(str(update_dict)+"\n")
- return jsonify("数据更新完毕")
- # hnsw模型数据更新
- @app.route('/chc/transfer', methods=['GET', 'POST'])
- def chc_transfer():
- if request.method == 'POST':
- # 获取post数据
- topics_dict = request.get_json()
- # 异步查重
- executor = ThreadPoolExecutor(max_workers=1)
- def async_chc():
- nonlocal topics_dict
- # 将结果post给callback_url
- requests.post(r"http://localhost:8855/chc/process", json=topics_dict).json()
- # 线程内存回收
- nonlocal executor
- executor.shutdown()
- executor.submit(async_chc)
- return jsonify("")
- if __name__ == '__main__':
- # 日志采集初始化
- hnsw_retrieve_LogConfig = config.LogConfig(config.math_dup_path, "hnsw_retrieve")
- hnsw_retrieve_logger = hnsw_retrieve_LogConfig.get_log()
- # HNSW模型初始化
- hnsw = HNSW(hnsw_retrieve_logger)
- # 定时更新HNSW模型增/改/删变化
- app.config.from_object(APS_Config())
- scheduler.init_app(app)
- scheduler.start()
- # app.run(host='0.0.0.0',port='8858')
- server = WSGIServer(('0.0.0.0', 8858), app)
- server.serve_forever()
|