# @Author : lightXu # @File : sheet_server.py # @Time : 2018/12/19 0019 下午 14:33 import json import os import shutil import time import traceback import uuid import xml.etree.cElementTree as ET import cv2 import numpy as np from PIL import Image from django.conf import settings from segment.sheet_resolve.tools.tf_settings import xml_template_path import segment.logging_config as logging from segment.sheet_resolve.analysis.anchor.marker_detection import find_anchor from segment.sheet_resolve.analysis.resolve import choice, choice_m_row_col from segment.sheet_resolve.analysis.resolve import cloze from segment.sheet_resolve.analysis.resolve import exam_number_row_col from segment.sheet_resolve.analysis.resolve import sheet from segment.sheet_resolve.analysis.resolve import solve, solve_with_number, cloze_with_number from segment.sheet_resolve.analysis.sheet.analysis_sheet import box_region_format, question_number_format from segment.sheet_resolve.analysis.sheet.sheet_points import get_sheet_points from segment.sheet_resolve.analysis.sheet.sheet_points_total import get_sheet_number_total from segment.sheet_resolve.tools import utils from segment.sheet_resolve.tools.brain_api import get_ocr_text_and_coordinate, change_format_baidu_to_google from segment.sheet_resolve.analysis.sheet.sheet_points_by_nlp import get_sheet_points_by_nlp logger = logging.getLogger(settings.LOGGING_TYPE) def decide_blank_sheet(image): if len(image.shape) <= 2: gray_image = image else: gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) height = gray_image.shape[0] width = gray_image.shape[1] if max(height, width) > 800: percent = max(height, width) / 800 new_x = int(width * percent) new_y = int(height * percent) gray_image = cv2.resize(gray_image, (new_x, new_y), interpolation=cv2.INTER_AREA) if height > width: # 纵向 image = gray_image[height//2:, :] PIX_VALUE_LOW = 25.0 # 二进制参数 PIX_VALUE_HIGH = 220 # 原始图像参数 else: # 横向 image = gray_image[:, width // 2:] PIX_VALUE_LOW = 15.0 PIX_VALUE_HIGH = 250 bin_img = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] bin_img_mean = np.mean(bin_img) img_raw_mean = np.mean(image) print(bin_img_mean, img_raw_mean) blank_cond = bin_img_mean < PIX_VALUE_LOW or img_raw_mean > PIX_VALUE_HIGH return blank_cond def convert_pil_to_jpeg(raw_img): if raw_img.mode == 'L': channels = raw_img.split() img = Image.merge("RGB", (channels[0], channels[0], channels[0])) elif raw_img.mode == 'RGB': img = raw_img elif raw_img.mode == 'RGBA': img = Image.new("RGB", raw_img.size, (255, 255, 255)) img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel else: img = raw_img open_cv_image = np.array(img) return img, open_cv_image def handle_uploaded_xml_file(f, save_path): with open(save_path, 'wb+') as destination: for chunk in f.chunks(): destination.write(chunk) def generate_serial_number(time_str, sheet_big_boxes): if len(sheet_big_boxes.objects.all()) < 1: last_number_gen = time_str + '000001' else: objects = sheet_big_boxes.objects.latest('update_time') last_number_in_db = objects.series_number if time_str in last_number_in_db[0:9]: last_number_gen = str(int(last_number_in_db) + 1) else: last_number_gen = time_str + '000001' return last_number_gen def save_raw_image_with_paper_id(subject, paper_id, img_file, analysis_type): time_str = time.strftime('%Y-%m-%d', time.localtime(time.time())) # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] # raw_name = img_file.name[0:-len(ext) - 1] # file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') file_name = '{}.{}'.format(paper_id, ext) raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, time_str) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) pil_img, open_cv_image = convert_pil_to_jpeg(raw_img) try: pil_img.save(save_path) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, time_str, file_name).replace('\\', '/') return save_path, open_cv_image, url_path def save_raw_image_without_segment(subject, datetime, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] raw_name = img_file.name[0:-len(ext) - 1] file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) pil_img, open_cv_image = convert_pil_to_jpeg(raw_img) try: pil_img.save(save_path) shutil.copy(save_path, save_path.replace('.jpg', '_small.jpg')) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, open_cv_image, url_path def sheet_big_boxes_resolve(series_number, image, saved_path, subject, sheet_sess, ocr=''): status = 1 conf_thresh_0 = 0.7 mns_thresh_0 = 0.3 sheets_dict_0 = '' xml_save_path = '' try: sheets_dict_0, xml_save_path = sheet(series_number, saved_path, image, conf_thresh_0, mns_thresh_0, subject, sheet_sess, ocr) except Exception as e: status = 0 logger.info('试卷:{} 答题卡区域解析失败: {}'.format(saved_path, e)) return status, sheets_dict_0, xml_save_path def sheet_small_boxes_resolve(raw_img, sheet_dict, choice_sess, cloze_sess, xml_save_path): conf_thresh_0 = 0.7 mns_thresh_0 = 0.3 regions = sheet_dict['regions'] classes_name = str([ele['class_name'] for ele in regions]) sheet_dict.pop('regions') json.dumps(sheet_dict, ensure_ascii=False) if 'choice' in classes_name: try: sheet_dict['choice'] = choice(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess) except Exception as e: traceback.print_exc() logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'exam_number' in classes_name: try: # sheet_dict['exam_number'] = exam_number(raw_img, regions, xml_save_path) sheet_dict['exam_number'] = exam_number_row_col(raw_img, regions, xml_save_path) except Exception as e: traceback.print_exc() logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'cloze' in classes_name: try: sheet_dict['cloze'] = cloze(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, cloze_sess) except Exception as e: traceback.print_exc() logger.info('试卷:{} 填空题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'solve' in classes_name: try: # solve_list, mark_list = solve(raw_img, regions, xml_save_path,) # sheet_dict['solve'] = solve_list # sheet_dict['mark'] = mark_list sheet_dict['solve'] = solve(raw_img, regions, xml_save_path) except Exception as e: traceback.print_exc() logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'qr_code' in classes_name: try: for ele in regions: if 'qr_code' == ele['class_name']: sheet_dict['qr_code'] = ele['bounding_box'] except Exception as e: traceback.print_exc() logger.info('试卷:{} 二维码区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'bar_code' in classes_name: try: for ele in regions: if 'bar_code' == ele['class_name']: sheet_dict['bar_code'] = ele['bounding_box'] except Exception as e: traceback.print_exc() logger.info('试卷:{} 条形码区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) return sheet_dict def sheet_row_col_resolve(raw_img, sheet_dict, choice_sess, cloze_sess, xml_save_path): conf_thresh_0 = 0.7 mns_thresh_0 = 0.3 regions = sheet_dict['regions'] classes_name = str([ele['class_name'] for ele in regions]) region_tmp = regions.copy() # json.dumps(sheet_dict, ensure_ascii=False) # if 'choice' in classes_name: # try: # # sheet_dict['choice'] = choice(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess) # choice_dict_list = choice_row_col(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess) # if len(choice_dict_list) > 0: # region_tmp.extend(choice_dict_list) # except Exception as e: # traceback.print_exc() # logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'choice_m' in classes_name: try: choice_dict_list = choice_m_row_col(raw_img, regions, xml_save_path) if len(choice_dict_list) > 0: region_tmp.extend(choice_dict_list) except Exception as e: traceback.print_exc() logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'exam_number' in classes_name: try: # sheet_dict['exam_number'] = exam_number(raw_img, regions, xml_save_path) exam_number_dict_list = exam_number_row_col(raw_img, regions, xml_save_path) if len(exam_number_dict_list) > 0: region_tmp.extend(exam_number_dict_list) except Exception as e: traceback.print_exc() logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'cloze' in classes_name: try: cloze_dict_list = cloze(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, cloze_sess) if len(cloze_dict_list) > 0: region_tmp.extend(cloze_dict_list) except Exception as e: traceback.print_exc() logger.info('试卷:{} 填空题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) sheet_dict.update({'regions': region_tmp}) return sheet_dict def sheet_detail_resolve(raw_img, sheet_dict, xml_save_path, shrink=True): regions = sheet_dict['regions'] classes_names_list = set([ele['class_name'] for ele in regions]) region_tmp = regions.copy() # json.dumps(sheet_dict, ensure_ascii=False) if 'choice_m' in classes_names_list: try: choice_dict_list = choice_m_row_col(raw_img, regions, xml_save_path) if shrink: for ele in choice_dict_list: if 'all_small_coordinate' in ele.keys(): ele.pop('all_small_coordinate') region_tmp = [ele for ele in region_tmp if ele['class_name'] != 'choice_m'] # 重名 if len(choice_dict_list) > 0: region_tmp.extend(choice_dict_list) except Exception as e: traceback.print_exc() logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'exam_number' in classes_names_list: try: exam_number_dict_list = exam_number_row_col(raw_img, regions, xml_save_path) for ele in exam_number_dict_list: ele.pop('all_small_coordinate') if len(exam_number_dict_list) > 0: region_tmp.extend(exam_number_dict_list) except Exception as e: traceback.print_exc() logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'solve' or 'solve0' or 'composition' or 'composition0' in classes_names_list: try: solve_number = solve_with_number(region_tmp, xml_save_path) region_tmp = [ele for ele in region_tmp if 'solve' not in ele['class_name']] # 重名 region_tmp = [ele for ele in region_tmp if 'composition' not in ele['class_name']] if len(solve_number) > 0: region_tmp.extend(solve_number) except Exception as e: traceback.print_exc() logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) if 'cloze' in classes_names_list or 'cloze_s' in classes_names_list: try: cloze_number = cloze_with_number(region_tmp, xml_save_path) region_tmp = [ele for ele in region_tmp if 'cloze' not in ele['class_name']] # 重名 if len(cloze_number) > 0: region_tmp.extend(cloze_number) except Exception as e: traceback.print_exc() logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e)) sheet_dict.update({'regions': region_tmp}) return sheet_dict def sheet_points(sheet_dict_list, image_list, ocr_list, if_ocr=False): sheet_list = [] for index, ele in enumerate(sheet_dict_list): ocr_res = ocr_list[index] h, w = image_list[index].shape[0], image_list[index].shape[1] sheet_dict = {'sheet_dict': sheet_dict_list[index], 'ocr': ocr_res, 'shape': (h, w), 'raw_image': image_list[index]} sheet_list.append(sheet_dict) try: res = get_sheet_points(sheet_list) sheet_dict_list = [ele['sheet_dict'] for ele in res] except Exception as e: traceback.print_exc() sheet_dict_list = [ele['sheet_dict'] for ele in sheet_list] try: sheet_total_list = [] for index, ele in enumerate(sheet_dict_list): ocr_res = change_format_baidu_to_google(ocr_list[index]) sheet_dict = get_sheet_number_total(ele, ocr_res, image_list[index]) regions_list = sheet_dict['regions'] type_score_ocr = [ele for ele in regions_list if 'type_score_ocr' in ele] if len(type_score_ocr) == 0: sheet_total_list.append(sheet_dict) else: sheet_dict0 = get_sheet_points_by_nlp(sheet_dict) sheet_total_list.append(sheet_dict0) except Exception as e: traceback.print_exc() sheet_total_list = sheet_dict_list if if_ocr: for index, ele in enumerate(sheet_total_list): ele.update({'sheet_ocr': ocr_list[index]}) return sheet_total_list def sheet_format_output(init_number, crt_numbers, sheet_dict, image, subject, shrink): sheet_dict = box_region_format(sheet_dict, image, subject, shrink) sheet_dict, init_number, crt_numbers = question_number_format(init_number, crt_numbers, sheet_dict) return sheet_dict, init_number, crt_numbers def sheet_anchor(image): anchor_list = find_anchor(image) return anchor_list def gen_xml(sheet_region_dict, xml_path): tree = ET.parse(xml_template_path) for index_num, box in enumerate(sheet_region_dict): if len(box['bounding_box']) > 0: abcd = box['bounding_box'] name = box["class_name"] box_tmp = box.copy() box_tmp.pop('bounding_box') box_tmp.pop('class_name') info = str(box_tmp) name = '{}_{}'.format(name, info) tree = utils.create_xml(name, tree, abcd['xmin'], abcd['ymin'], abcd['xmax'], abcd['ymax']) tree.write(xml_path)