import re
import random
import numpy as np
import pickle
from copy import deepcopy
from bson.binary import Binary
from concurrent.futures import ThreadPoolExecutor
from sentence_transformers import SentenceTransformer
import config
from main_clear.sci_clear import get_maplef_items
# 按数据对应顺序随机打乱数据
def shuffle_data_pair(idx_list, vec_list):
zip_list = list(zip(idx_list, vec_list))
random.shuffle(zip_list)
idx_list, vec_list = zip(*zip_list)
return idx_list, vec_list
# 通用公有变量
public_id = 0
# 数据预处理
class DataPreProcessing():
def __init__(self, mongo_coll=None, logger=None, is_train=False):
# 配置初始数据
self.mongo_coll = mongo_coll
self.sbert_model = SentenceTransformer(config.sbert_path)
self.is_train = is_train
# 日志采集
self.logger = logger
self.log_msg = config.log_msg
# 主函数
def __call__(self, origin_dataset, is_retrieve=False):
# 句向量存储列表
sent_vec_list = []
# 批量处理数据字典
bp_dict = deepcopy(config.batch_processing_dict)
# 批量数据清洗
if self.is_train is False:
with ThreadPoolExecutor(max_workers=5) as executor:
executor_list = list(executor.map(self.content_clear_process, origin_dataset))
cont_clear_tuple, cont_cut_tuple = zip(*executor_list)
for data_idx, data in enumerate(origin_dataset):
# 通用公有变量
global public_id
# 记录id
public_id = data["id"] if "id" in data else data_idx + 1
print(public_id) if self.logger is None else None
if self.is_train is True:
content_clear, content_cut_list = self.content_clear_process(data)
# 根据self.is_train赋值content_clear, content_cut_list
content_clear = content_clear if self.is_train else cont_clear_tuple[data_idx]
content_cut_list = content_cut_list if self.is_train else cont_cut_tuple[data_idx]
# 日志采集
self.logger.info(self.log_msg.format(
id=public_id,
type="数据清洗结果",
message=content_clear)) if self.logger and is_retrieve else None
print(content_clear) if self.logger is None else None
bp_dict["id_list"].append(data["id"]) if is_retrieve is False else None
bp_dict["cont_clear_list"].append(content_clear)
# 将所有截断数据融合进行一次句向量计算
bp_dict["cont_cut_list"].extend(content_cut_list)
# 获取每条数据的截断长度
bp_dict["cut_idx_list"].append(bp_dict["cut_idx_list"][-1] + len(content_cut_list))
# 设置批量处理长度,若满足条件则进行批量处理
if (data_idx+1) % 5000 == 0:
sent_vec_list = self.batch_processing(sent_vec_list, bp_dict, is_retrieve)
# 数据满足条件处理完毕后,则重置数据结构
bp_dict = deepcopy(config.batch_processing_dict)
if len(bp_dict["cont_clear_list"]) > 0:
sent_vec_list = self.batch_processing(sent_vec_list, bp_dict, is_retrieve)
return sent_vec_list, bp_dict["cont_clear_list"]
# 数据批量处理计算句向量
def batch_processing(self, sent_vec_list, bp_dict, is_retrieve):
vec_list = self.sbert_model.encode(bp_dict["cont_cut_list"])
# 计算题目中每个句子的完整句向量
sent_length = len(bp_dict["cut_idx_list"]) - 1
for i in range(sent_length):
sentence_vec = np.array([np.nan])
if bp_dict["cont_clear_list"][i] != '':
# 平均池化
sentence_vec = np.sum(vec_list[bp_dict["cut_idx_list"][i]:bp_dict["cut_idx_list"][i+1]], axis=0) \
/(bp_dict["cut_idx_list"][i+1]-bp_dict["cut_idx_list"][i])
sent_vec_list.append(sentence_vec) if self.is_train is False else None
# 将结果存入数据库
if is_retrieve is False:
condition = {"id": bp_dict["id_list"][i]}
# 用二进制存储句向量以节约存储空间
sentence_vec_byte = Binary(pickle.dumps(sentence_vec, protocol=-1), subtype=128)
# 需要新增train_flag,防止机器奔溃重复训练
update_elements = {"$set": {"content_clear": bp_dict["cont_clear_list"][i],
"sentence_vec": sentence_vec_byte,
"sent_train_flag": config.sent_train_flag}}
self.mongo_coll.update_one(condition, update_elements)
return sent_vec_list
# 清洗函数
def clear_func(self, content):
if content in {'', None}:
return ''
# 将content字符串化,防止content是int/float型
if isinstance(content, str) is False:
if isinstance(content, int) or isinstance(content, float):
return str(content)
try:
# 进行文本清洗
content_clear = get_maplef_items(content)
except Exception as e:
# 通用公有变量
global public_id
# 日志采集
print(self.log_msg.format(id=public_id,
type="清洗错误: "+str(e),
message=str(content))) if self.logger is None else None
self.logger.error(self.log_msg.format(id=public_id,
type="清洗错误: "+str(e),
message=str(content))) if self.logger is not None else None
# 对于无法清洗的文本通过正则表达式直接获取文本中的中文字符
content_clear = re.sub(r'[^\u4e00-\u9fa5]', '', content)
return content_clear
# 重叠截取长文本进行Sentence-Bert训练
def truncate_func(self, content):
# 设置长文本截断长度
cut_length = 150
# 设置截断重叠长度
overlap = 10
content_cut_list = []
# 若文本长度小于等于截断长度,则取消截取直接返回
cont_length = len(content)
if cont_length <= cut_length:
content_cut_list = [content]
return content_cut_list
# 若文本长度大于截断长度,则进行重叠截断
# 设定文本截断尾部合并阈值(针对尾部文本根据长度进行合并)
# 防止截断后出现极短文本影响模型效果
tail_merge_value = 0.5 * cut_length
for i in range(0,cont_length,cut_length-overlap):
tail_idx = i + cut_length
cut_content = content[i:tail_idx]
# 保留单词完整性
# 判断尾部字符
if cont_length - tail_idx > tail_merge_value:
for j in range(len(cut_content)-1,-1,-1):
# 判断当前字符是否为字母或者数字
# 若不是字母或者数字则截取成功
if re.search('[A-Za-z]', cut_content[j]) is None:
cut_content = cut_content[:j+1]
break
else:
cut_content = content[i:]
# 判断头部字符
if i != 0:
for k in range(len(cut_content)):
# 判断当前字符是否为字母或者数字
# 若不是字母或者数字则截取成功
if re.search('[A-Za-z]', cut_content[k]) is None:
cut_content = cut_content[k+1:]
break
# 将头部和尾部都处理好的截断文本存入content_cut_list
content_cut_list.append(cut_content)
# 针对尾部文本截断长度为140-150以及满足尾部合并阈值的文本
# 进行重叠截断进行特殊处理
if cont_length - tail_idx <= tail_merge_value:
break
return content_cut_list
# 全文本数据清洗
def content_clear_func(self, content):
# 文本清洗
content_clear = self.clear_func(content)
# 去除文本中的空格以及空字符串
content_clear = re.sub(r',+', ',', re.sub(r'[\s_]', '', content_clear))
# 去除题目开头"【题文】(多选)/(..分)"
content_clear = re.sub(r'\[题文\]', '', content_clear)
content_clear = re.sub(r'(\([单多]选\)|\[[单多]选\])', '', content_clear)
content_clear = re.sub(r'(\(\d{1,2}分\)|\[\d{1,2}分\])', '', content_clear)
# # 将文本中的选项"A.B.C.D."改为";"
# content_clear = re.sub(r'[ABCD]\.', ';', content_clear)
# # 若选项中只有1,2,3或(1)(2)(3),或者只有标点符号,则过滤该选项
# content_clear = re.sub(r'(\(\d\)[、,;\.]?)+\(\d\)|\d[、,;]+\d', '', content_clear)
# 去除题目开头(...年...[中模月]考)文本
head_search = re.search(r'^(\(.*?[\)\]]?\)|\[.*?[\)\]]?\])', content_clear)
if head_search is not None and 5 < head_search.span(0)[1] < 40:
head_value = content_clear[head_search.span(0)[0]+1:head_search.span(0)[1]-1]
if re.search(r'.*?(\d{2}|[模检测训练考试验期省市县外第初高中学]).*?[模检测训练考试验期省市县外第初高中学].*?', head_value):
content_clear = content_clear[head_search.span(0)[1]:].lstrip()
# 对于只有图片格式以及标点符号的信息进行特殊处理(去除标点符号/空格/连接符)
if re.sub(r'[\.、。,;\:\?!#\-> ]+', '', content_clear) == '':
content_clear = ''
return content_clear
# 数据清洗与长文本重叠截取处理
def content_clear_process(self, data):
# 初始化content_clear
content_clear = ''
# 全文本数据清洗
if "quesBody" in data:
content_clear = self.content_clear_func(data["quesBody"])
elif "stem" in data:
content_clear = self.content_clear_func(data["stem"])
# 重叠截取长文本用于进行Sentence-Bert训练
content_cut_list = self.truncate_func(content_clear)
return content_clear, content_cut_list
if __name__ == "__main__":
# 获取mongodb数据
mongo_coll = config.mongo_coll
test_data = {
'quesBody': """【题文】某同学设计了如下的电路测量电压表内阻,R为能够满足实验条件的滑动变阻器,为电阻箱,电压表量程合适。实验的粗略步骤如下:
①闭合开关S1、S2,调节滑动变阻器R,使电压表指针指向满刻度的处;
②断开开关S2,调节某些仪器,使电压表指针指向满刻度的处;
③读出电阻箱的阻值,该阻值即为电压表内阻的测量值;
④断开开关S1、S2拆下实验仪器,整理器材。
(1)上述实验步骤②中,调节某些仪器时,正确的操作是__________
A.保持电阻箱阻值不变,调节滑动变阻器的滑片,使电压表指针指向满刻度的处
B.保持滑动变阻器的滑片位置不变,调节电阻箱阻值,使电压表指针指向满刻度的处
C.同时调节滑动变阻器和电阻箱的阻值,使电压表指针指向满刻度的处
(2)此实验电压表内阻的测量值与真实值相比________(选填“偏大”“偏小”或“相等”);
(3)如实验测得该电压表内阻为8500Ω,要将其量程扩大为原来的倍,需串联_____Ω的电阻。""",
'option': ['$\\left\\{-2,0\\right\\}$', '$\\left\\{-2,0,2\\right\\}$', '$\\left\\{-1,1,2\\right\\}$', '$\\left\\{-1,0,2\\right\\}$']}
dpp = DataPreProcessing(mongo_coll)
string = """如图三角形的每个顶点均在格点上,且每个小正方形的边长为1.
(1)________;
(2)求的面积."""
string = """已知c水=4.2×103J/(kg·℃),求"""
res = dpp.clear_func(string)
print(res)
# res = dpp.content_clear_process(test_data)
# print(res[0])
# print(dpp.content_clear_process(mongo_coll.find_one({}))[0])
# print(dpp(test_data,is_retrieve=True))