import json import torch import sys import re import random # sys.path.append(r'/home/cv/workspace/tujintao/document_segmentation') from Utils.read_data import read_data from tqdm import tqdm import numpy as np from pprint import pprint # from Data.paper_combination import generate_paper_math, generate_paper_phy from torch.utils.data import DataLoader, Dataset from concurrent.futures import ThreadPoolExecutor class ListDataset(Dataset): def __init__(self, file_path=None, data=None, tokenizer=None, max_len=None, label_list=None, **kwargs): self.kwargs = kwargs self.tokenizer = tokenizer self.max_len = max_len self.label_list = label_list if isinstance(file_path, (str, list)): self.data = self.load_data(file_path, tokenizer, max_len, label_list) elif isinstance(data, list): self.data = data elif isinstance(data, tuple): # 数据量大,需要用多进程 all_data, all_allback_info = [], [] executor = ThreadPoolExecutor(max_workers=10) # 开2个线程会稍微快点 for res in executor.map(self.format_data, zip(data[0], data[1])): all_data.append(res[0]) all_allback_info.append(res[1]) self.data = (all_data, all_allback_info) # 单进程处理 # self.data = format_data(data, label_list, tokenizer, max_len) else: raise ValueError('The input args shall be str format file_path / list format dataset') def __len__(self): return len(self.data) def __getitem__(self, index): return self.data[index] def format_data(self, doc_seg_labels): """ doc_seg_labels:(doc_list:list, seg_labels: list) 将划分了训练集、验证集、测试集的数据,再按规定格式进行整理 """ one_d, d_labels = doc_seg_labels[0], doc_seg_labels[1] # 重新对one_d中的空句子进行处理(原先判断有误:【start:1】【end:1】没有去除再判断) if any([True for sent in one_d if not sent.strip()]): sentences, new_labels = [], [] for lab in d_labels: # print(lab) if not one_d[lab[0]].strip(): one_d[lab[0]+1] = "【start:1】" + one_d[lab[0]+1] else: one_d[lab[0]] = "【start:1】" + one_d[lab[0]] if not one_d[lab[1]-1].strip(): one_d[lab[1]-2] += "【end:1】" else: one_d[lab[1]-1] += "【end:1】" # print(one_d, 999999999999999999999) all_sents = [sent for sent in one_d if sent.replace("【start:1】", "").replace("【end:1】", "").strip()] st = 0 for n, sentence in enumerate(all_sents): if sentence.startswith("【start:1】"): sentence = sentence.replace("【start:1】", "") st = n if sentence.endswith("【end:1】"): sentence = sentence.replace("【end:1】", "") new_labels.append((st, n+1)) # pprint(all_sents[st:n+1]) sentences.append(sentence) one_d = sentences d_labels = new_labels # 句子分词编码 inputs = self.tokenizer(one_d, padding='max_length', truncation=True, max_length=self.max_len, return_tensors='pt') label = [] label_dict = {x: [] for x in self.label_list} for lab in d_labels: label.append([lab[0], lab[1], "TOPIC"]) label_dict.get("TOPIC", []).append((one_d[lab[0]:lab[1]], lab[0])) # label为[[start, end, entity], ...] # one_d[start:end]为一个topic_item return (inputs, label), (one_d, label_dict) @staticmethod def load_data(file_path, tokenizer, max_len, label_list): return file_path # 加载实体(试题)识别数据集 class NerDataset(ListDataset): @staticmethod def load_data1(filename, tokenizer, max_len, label_list): data = [] callback_info = [] # 用于计算评价指标 with open(filename, encoding='utf-8') as f: f = f.read() f = json.loads(f) for d in f: text = d['text'] if len(text) == 0: continue labels = d['labels'] tokens = [i for i in text] if len(tokens) > max_len - 2: tokens = tokens[:max_len - 2] text = text[:max_len] tokens = ['[CLS]'] + tokens + ['[SEP]'] token_ids = tokenizer.convert_tokens_to_ids(tokens) label = [] label_dict = {x: [] for x in label_list} for lab in labels: # 这里需要加上CLS的位置, lab[3]不用加1,因为是实体结尾的后一位 label.append([lab[2] + 1, lab[3], lab[1]]) label_dict.get(lab[1], []).append((text[lab[2]:lab[3]], lab[2])) data.append((token_ids, label)) # label为[[start, end, entity], ...] callback_info.append((text, label_dict)) return data, callback_info @staticmethod def load_data(filename): """ label_list:所有实体类别标签,本项目只有一个TOPIC """ # 样本生成 all_documents, all_labels = read_data(filename) all_pointer_labels = [] for one_d_labels in all_labels: new_labels = [] st = 0 # 索引从0开始 for n, s_label in enumerate(one_d_labels): if s_label: new_labels.append((st, n+1)) # end的索引 +1 st = n+1 all_pointer_labels.append(new_labels) train_data, valid_data, test_data = split_dataset(all_documents, all_pointer_labels) # train_doc, train_seg_labels = train_data # valid_doc, valid_seg_labels = valid_data # test_doc, test_seg_labels = test_data return train_data, valid_data, test_data def format_data(doc_seg_labels, label_list, max_seq_len, tokenizer): """ doc_seg_labels:doc_list及seg_labels 将划分了训练集、验证集、测试集的数据,再按规定格式进行整理 """ data = [] callback_info = [] # 用于计算评价指标 for one_d, d_labels in zip(doc_seg_labels[0], doc_seg_labels[1]): # for sent in one_d: # if not sent.strip(): # continue inputs = tokenizer(one_d, padding='max_length', truncation=True, max_length=max_seq_len, return_tensors='pt') label = [] label_dict = {x: [] for x in label_list} for lab in d_labels: label.append([lab[0], lab[1], "TOPIC"]) label_dict.get("TOPIC", []).append((one_d[lab[0]:lab[1]], lab[0])) data.append((inputs, label)) # label为[[start, end, entity], ...] callback_info.append((one_d, label_dict)) # one_d[start:end]为一个topic_item return data, callback_info def get_papers(paper_num): """ 生成指定数量的试卷样本 """ def txt_split(content): labels = [] sentences = [] st = 0 # all_sents = [i.strip() for i in content.split("\n") if i.strip()] all_sents = [i.strip() for i in content.split("\n") if i.replace("【start:1】", "").replace("【end:1】", "").strip()] for n, sentence in enumerate(all_sents): if sentence.startswith("【start:1】"): sentence = sentence.replace("【start:1】", "") st = n if sentence.endswith("【end:1】"): sentence = sentence.replace("【end:1】", "") labels.append((st, n+1)) sentences.append(sentence) # pprint(sentences) # pprint(labels) # a,b = labels[1] # print(sentences[a:b]) return sentences, labels input_txts = [] segment_labels = [] for n in range(paper_num): print(n) paper_content1 = generate_paper_math(max_questions_num=7, min_questions_num=1) paper_content2 = generate_paper_phy(max_questions_num=7, min_questions_num=1) sentences_1, labels_1 = txt_split(paper_content1) input_txts.append(sentences_1) segment_labels.append(labels_1) sentences_2, labels_2 = txt_split(paper_content2) input_txts.append(sentences_2) segment_labels.append(labels_2) with open("/home/cv/workspace/tujintao/document_segmentation/Data/samples/train_data.json", "w", encoding="utf-8") as f1: json.dump({"input_txts": input_txts, "segment_labels": segment_labels}, f1, ensure_ascii=False) def get_paper_for_predict(): """ 制作测试预测时用的样本,每次随机生成1份 """ def txt_split(content): labels = [] sentences = [] st = 0 all_sents = [i.strip() for i in content.split("\n") if i.replace("【start:1】", "").replace("【end:1】", "").strip()] for n, sentence in enumerate(all_sents): if sentence.startswith("【start:1】"): sentence = sentence.replace("【start:1】", "") st = n if sentence.endswith("【end:1】"): sentence = sentence.replace("【end:1】", "") labels.append((st, n+1)) sentences.append(sentence) return sentences, labels if random.choice([0,1]) == 0: paper_content = generate_paper_math(max_questions_num=7, min_questions_num=1) else: paper_content = generate_paper_phy(max_questions_num=7, min_questions_num=1) pprint(paper_content) sentences, labels = txt_split(paper_content) return sentences, labels def load_and_split_dataset(filename, train_ratio=0.7, valid_ratio=0.1): # -----------测试时小批量样本--------------- # input_texts, all_labels = read_data(filename) # segment_labels = [] # for one_d_labels in all_labels[:1]: # new_labels = [] # st = 0 # 索引从0开始 # for n, s_label in enumerate(one_d_labels): # if s_label: # new_labels.append((st, n+1)) # end的索引 +1 # st = n+1 # segment_labels.append(new_labels) # print("segment_labels:::", segment_labels) # a, b = segment_labels[0][0] # print(input_texts[0][a: b]) # -----------正式的大批量样本---------------------- with open(filename, "r", encoding="utf-8") as f1: sample5w = json.load(f1) input_texts = sample5w["input_txts"] segment_labels = sample5w["segment_labels"] # print("input_texts:::", input_texts[:1]) # json.dump({"input_txts": input_txts, "segment_labels": segment_labels}, f1, ensure_ascii=False) # 把数据划分为 Train/Valid/Test Set total_samples = len(input_texts) train_size = int(total_samples * train_ratio) valid_size = int(total_samples * valid_ratio) test_size = total_samples - train_size - valid_size train_doc = input_texts[:train_size] train_seg_labels = segment_labels[:train_size] valid_doc = input_texts[train_size:train_size + valid_size] valid_seg_labels = segment_labels[train_size:train_size + valid_size] test_doc = input_texts[-test_size:] test_seg_labels = segment_labels[-test_size:] return (train_doc, train_seg_labels), (valid_doc, valid_seg_labels), ( test_doc, test_seg_labels) def split_dataset(input_texts, segment_labels, train_ratio=0.7, valid_ratio=0.1): """把数据划分为 Train/Valid/Test Set""" total_samples = len(input_texts) train_size = int(total_samples * train_ratio) valid_size = int(total_samples * valid_ratio) test_size = total_samples - train_size - valid_size train_doc = input_texts[:train_size] train_seg_labels = segment_labels[:train_size] valid_doc = input_texts[train_size:train_size + valid_size] valid_seg_labels = segment_labels[train_size:train_size + valid_size] test_doc = input_texts[-test_size:] test_seg_labels = segment_labels[-test_size:] return (train_doc, train_seg_labels), (valid_doc, valid_seg_labels), ( test_doc, test_seg_labels) def convert_list_to_tensor(alist, dtype=torch.long): # return torch.tensor(np.array(alist) if isinstance(alist, list) else alist, dtype=dtype) return [torch.tensor(np.array(a) if isinstance(a, list) else a, dtype=dtype).squeeze(0) for a in alist] class NerCollate: def __init__(self, max_len, label2id): self.maxlen = max_len self.label2id = label2id def collate_fn(self, batch): batch_token_ids = [] batch_attention_mask = [] # batch_token_type_ids = [] batch_start_labels = [] batch_end_labels = [] batch_content_labels = [] # 0 1标签:是否为试题 for i, (inputs, sent_labels) in enumerate(batch): # a = inputs['input_ids'] token_ids = inputs['input_ids'] #.squeeze(0) attention_mask = inputs['attention_mask'] sent_num = token_ids.size()[0] start_labels = np.zeros((len(self.label2id), sent_num), dtype=np.int64) end_labels = np.zeros((len(self.label2id), sent_num), dtype=np.int64) content_labels = np.zeros((len(self.label2id), sent_num), dtype=np.int64) # token_type_ids = [0] * self.maxlen assert attention_mask.size()[1] == self.maxlen # assert len(token_type_ids) == self.maxlen assert token_ids.size()[1] == self.maxlen batch_token_ids.append(token_ids) # 前面编码时已经限制了长度 batch_attention_mask.append(attention_mask) # batch_token_type_ids.append(token_type_ids) # pointer的start、end处理 for start, end, label in sent_labels: # NER标签 label_id = self.label2id[label] start_labels[label_id][start] = 1 # if end < self.maxlen - 1: # end_labels[label_id][end-1] = 1 content_labels[label_id][start: end] = [1] * (end - start) batch_start_labels.append(start_labels) batch_end_labels.append(end_labels) batch_content_labels.append(content_labels) # batch_token_ids = convert_list_to_tensor(batch_token_ids) # batch_token_type_ids = convert_list_to_tensor(batch_token_type_ids) # batch_attention_mask = convert_list_to_tensor(batch_attention_mask) batch_start_labels = convert_list_to_tensor(batch_start_labels, dtype=torch.float) batch_end_labels = convert_list_to_tensor(batch_end_labels, dtype=torch.float) batch_content_labels = convert_list_to_tensor(batch_content_labels, dtype=torch.float) # print("batch_end_labels:::", batch_end_labels[0].size()) res = { "input_ids": batch_token_ids, # "token_type_ids": batch_token_type_ids, "attention_mask": batch_attention_mask, "ner_start_labels": batch_start_labels, "ner_end_labels": batch_end_labels, "ner_content_labels": batch_content_labels, } return res if __name__ == "__main__": import sys sys.path.append(r'/home/cv/workspace/tujintao/document_segmentation') # sys.path.append(r'/home/cv/workspace/tujintao/PointerNet_Chinese_Information_Extraction') from transformers import BertTokenizer from Utils.read_data import read_data # model_dir = r'/home/cv/workspace/tujintao/PointerNet_Chinese_Information_Extraction/UIE/model_hub/chinese-bert-wwm-ext/' # tokenizer = BertTokenizer.from_pretrained(model_dir) # 测试 # max_seq_len = 50 # label_path = "PointerNet/data/labels.txt" # with open(label_path,"r") as fp: # labels = fp.read().strip().split("\n") # train_dataset, train_callback = NerDataset(file_path=r"Data/samples", # tokenizer=tokenizer, # max_len=max_seq_len, # label_list=labels) # print(train_dataset[0]) # # 测试实体识别 # ============================ # max_seq_len = 150 # label_path = "PointerNet_Chinese_Information_Extraction/UIE/data/ner/cner/labels.txt" # with open(label_path,"r") as fp: # labels = fp.read().strip().split("\n") # train_dataset, train_callback = NerDataset(file_path='PointerNet_Chinese_Information_Extraction/UIE/data/ner/cner/train.json', # tokenizer=tokenizer, # max_len=max_seq_len, # label_list=labels) # print(train_dataset[1]) # ([101, 2382, 2456, 5679, 8024, 4511, 8024, 102], [[1, 3, 'NAME']]) # print(train_callback[1]) # ('常建良,男,', {'TITLE': [], 'RACE': [], 'CONT': [], 'ORG': [], 'NAME': [('常建良', 0)], 'EDU': [], 'PRO': [], 'LOC': []}) # ------------------------------------------------------------------ # id2tag = {} # tag2id = {} # for i, label in enumerate(labels): # id2tag[i] = label # tag2id[label] = i # collate = NerCollate(max_len=max_seq_len, label2id=tag2id) # batch_size = 2 # train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate.collate_fn) # for batch in tqdm(train_dataloader): # print(222222222222222222, batch) # # print(batch["ner_start_labels"].shape) #[bs, label_n, maxlen] # # # for k, v in batch.items(): # # # print(k,v.shape) # # ============================ # get_papers(50000) # 取试卷 a, b = get_paper_for_predict() # print(a) # print(b) # print(a[6:8]) for i in b: print(a[i[0]: i[1]]) # ------------------------------------------ # load_and_split_dataset(r"Data/samples/临时样本") # load_and_split_dataset("") from PointerNet.config import NerArgs args = NerArgs() # load_and_split_dataset(args.train_path,train_ratio=0.995, valid_ratio=0.003)