12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- import os
- import time
- import shutil
- import sqlite3
- from config import sqlite_path, sqlite_copy_path, mongo_coll
- from word_segment import Word_Segment
- # 建立倒排索引
- class Inverted_Index_Establish():
- def __init__(self, n_grams_flag=False):
- # 获取mongodb数据
- self.mongo_coll = mongo_coll
- # 获取数据库地址
- self.sqlite_path = sqlite_path
- self.sqlite_copy_path = sqlite_copy_path
- # 分词算法
- self.word_seg = Word_Segment(n_grams_flag=n_grams_flag)
- def __call__(self):
- # 从mongodb读取数据集
- origin_dataset = self.mongo_coll.find(no_cursor_timeout=True, batch_size=5)
- # 将数据构建倒排索引
- inverted_index = self.inverted_index_compute(origin_dataset)
- # 将倒排索引数据写入sqlite
- self.sqlite_write(inverted_index)
- # 备份sqlite数据库,防止出现损坏
- shutil.copy(self.sqlite_path, self.sqlite_copy_path)
-
- # 读取数据构建倒排索引
- def inverted_index_compute(self, origin_dataset):
- start = time.time()
- # 倒排索引词典
- inverted_index = dict()
- # 计算文档总长度,用于计算平均长度
- all_doc_length = 0
- for i,data in enumerate(origin_dataset):
- if "content_clear" not in data:
- continue
- seg_list, _ = self.word_seg(data["content_clear"])
- # 计算每篇文档长度和总文档长度
- doc_length = len(seg_list)
- all_doc_length += doc_length
-
- # 存储每个文档中每个词的出现次数
- tf_dict = dict()
- for word in seg_list:
- tf_dict[word] = tf_dict.get(word, 0) + 1
- # 将每个term对应的文档按固定格式存入倒排索引词典
- for term,tf in tf_dict.items():
- # 文档间通过"\n"分隔,文档内通过"\t"分隔(term_freq, doc_length)
- doc_info = "{}\t{}".format(tf, doc_length)
- if term not in inverted_index:
- inverted_index[term] = [str(data["id"]), doc_info]
- else:
- inverted_index[term][0] += "\n" + str(data["id"])
- inverted_index[term][1] += "\n" + doc_info
- # 获取文档总数与文档总长度存入倒排索引词典
- inverted_index["doc_data_statistics"] = [i + 1, str(all_doc_length)]
- print(time.time()-start)
- print("倒排索引计算完毕")
- return inverted_index
-
- # 将倒排索引数据写入sqlite
- def sqlite_write(self, inverted_index):
- # 若已存在sqlite数据库,则删除数据库重建
- if os.path.exists(self.sqlite_path):
- os.remove(self.sqlite_path)
- # 建立sqlite数据库链接
- sqlite_conn = sqlite3.connect(self.sqlite_path)
- # 创建游标对象cursor
- cursor = sqlite_conn.cursor()
- # 创建数据表
- cursor.execute("DROP TABLE IF EXISTS physics")
- cursor.execute("CREATE TABLE physics (term TEXT PRIMARY KEY, doc_freq TEXT, docs TEXT)")
-
- for key, value in inverted_index.items():
- t = (key, value[0], value[1])
- cursor.execute("INSERT INTO physics VALUES (?, ?, ?)", t)
- # 提交事务
- sqlite_conn.commit()
- # 关闭数据库连接
- cursor.close()
- sqlite_conn.close()
- if __name__ == "__main__":
- # 初始化倒排索引建立(n-grams为True)
- sql_etl = Inverted_Index_Establish(True)
- sql_etl()
|