123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- # @Author : lightXu
- # @File : sheet_server.py
- # @Time : 2018/12/19 0019 下午 14:33
- import json
- import os
- import shutil
- import time
- import traceback
- import uuid
- import xml.etree.cElementTree as ET
- import cv2
- import numpy as np
- from PIL import Image
- from django.conf import settings
- from segment.sheet_resolve.tools.tf_settings import xml_template_path
- import segment.logging_config as logging
- from segment.sheet_resolve.analysis.anchor.marker_detection import find_anchor
- from segment.sheet_resolve.analysis.resolve import choice, choice_m_row_col
- from segment.sheet_resolve.analysis.resolve import cloze
- from segment.sheet_resolve.analysis.resolve import exam_number_row_col
- from segment.sheet_resolve.analysis.resolve import sheet
- from segment.sheet_resolve.analysis.resolve import solve, solve_with_number, cloze_with_number
- from segment.sheet_resolve.analysis.sheet.analysis_sheet import box_region_format, question_number_format
- from segment.sheet_resolve.analysis.sheet.sheet_points import get_sheet_points
- from segment.sheet_resolve.analysis.sheet.sheet_points_total import get_sheet_number_total
- from segment.sheet_resolve.tools import utils
- from segment.sheet_resolve.tools.brain_api import get_ocr_text_and_coordinate, change_format_baidu_to_google
- logger = logging.getLogger(settings.LOGGING_TYPE)
- def decide_blank_sheet(image):
- if len(image.shape) <= 2:
- gray_image = image
- else:
- gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
- height = gray_image.shape[0]
- width = gray_image.shape[1]
- if max(height, width) > 800:
- percent = max(height, width) / 800
- new_x = int(width * percent)
- new_y = int(height * percent)
- gray_image = cv2.resize(gray_image, (new_x, new_y), interpolation=cv2.INTER_AREA)
- if height > width: # 纵向
- image = gray_image[height//2:, :]
- PIX_VALUE_LOW = 25.0 # 二进制参数
- PIX_VALUE_HIGH = 220 # 原始图像参数
- else: # 横向
- image = gray_image[:, width // 2:]
- PIX_VALUE_LOW = 15.0
- PIX_VALUE_HIGH = 250
- bin_img = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
- bin_img_mean = np.mean(bin_img)
- img_raw_mean = np.mean(image)
- print(bin_img_mean, img_raw_mean)
- blank_cond = bin_img_mean < PIX_VALUE_LOW or img_raw_mean > PIX_VALUE_HIGH
- return blank_cond
- def convert_pil_to_jpeg(raw_img):
- if raw_img.mode == 'L':
- channels = raw_img.split()
- img = Image.merge("RGB", (channels[0], channels[0], channels[0]))
- elif raw_img.mode == 'RGB':
- img = raw_img
- elif raw_img.mode == 'RGBA':
- img = Image.new("RGB", raw_img.size, (255, 255, 255))
- img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel
- else:
- img = raw_img
- open_cv_image = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
- return img, open_cv_image
- def handle_uploaded_xml_file(f, save_path):
- with open(save_path, 'wb+') as destination:
- for chunk in f.chunks():
- destination.write(chunk)
- def generate_serial_number(time_str, sheet_big_boxes):
- if len(sheet_big_boxes.objects.all()) < 1:
- last_number_gen = time_str + '000001'
- else:
- objects = sheet_big_boxes.objects.latest('update_time')
- last_number_in_db = objects.series_number
- if time_str in last_number_in_db[0:9]:
- last_number_gen = str(int(last_number_in_db) + 1)
- else:
- last_number_gen = time_str + '000001'
- return last_number_gen
- def save_raw_image_with_paper_id(subject, paper_id, img_file, analysis_type):
- time_str = time.strftime('%Y-%m-%d', time.localtime(time.time()))
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- # raw_name = img_file.name[0:-len(ext) - 1]
- # file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- file_name = '{}.{}'.format(paper_id, ext)
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, time_str)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- pil_img, open_cv_image = convert_pil_to_jpeg(raw_img)
- try:
- pil_img.save(save_path)
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, time_str, file_name).replace('\\', '/')
- return save_path, open_cv_image, url_path
- def save_raw_image_without_segment(subject, datetime, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- raw_name = img_file.name[0:-len(ext) - 1]
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- pil_img, open_cv_image = convert_pil_to_jpeg(raw_img)
- try:
- pil_img.save(save_path)
- shutil.copy(save_path, save_path.replace('.jpg', '_small.jpg'))
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, open_cv_image, url_path
- def sheet_big_boxes_resolve(series_number, image, saved_path, subject, sheet_sess, ocr=''):
- status = 1
- conf_thresh_0 = 0.7
- mns_thresh_0 = 0.3
- sheets_dict_0 = ''
- xml_save_path = ''
- try:
- sheets_dict_0, xml_save_path = sheet(series_number, saved_path, image,
- conf_thresh_0, mns_thresh_0, subject, sheet_sess, ocr)
- except Exception as e:
- status = 0
- logger.info('试卷:{} 答题卡区域解析失败: {}'.format(saved_path, e))
- return status, sheets_dict_0, xml_save_path
- def sheet_small_boxes_resolve(raw_img, sheet_dict, choice_sess, cloze_sess, xml_save_path):
- conf_thresh_0 = 0.7
- mns_thresh_0 = 0.3
- regions = sheet_dict['regions']
- classes_name = str([ele['class_name'] for ele in regions])
- sheet_dict.pop('regions')
- json.dumps(sheet_dict, ensure_ascii=False)
- if 'choice' in classes_name:
- try:
- sheet_dict['choice'] = choice(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'exam_number' in classes_name:
- try:
- # sheet_dict['exam_number'] = exam_number(raw_img, regions, xml_save_path)
- sheet_dict['exam_number'] = exam_number_row_col(raw_img, regions, xml_save_path)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'cloze' in classes_name:
- try:
- sheet_dict['cloze'] = cloze(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, cloze_sess)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 填空题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'solve' in classes_name:
- try:
- # solve_list, mark_list = solve(raw_img, regions, xml_save_path,)
- # sheet_dict['solve'] = solve_list
- # sheet_dict['mark'] = mark_list
- sheet_dict['solve'] = solve(raw_img, regions, xml_save_path)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'qr_code' in classes_name:
- try:
- for ele in regions:
- if 'qr_code' == ele['class_name']:
- sheet_dict['qr_code'] = ele['bounding_box']
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 二维码区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'bar_code' in classes_name:
- try:
- for ele in regions:
- if 'bar_code' == ele['class_name']:
- sheet_dict['bar_code'] = ele['bounding_box']
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 条形码区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- return sheet_dict
- def sheet_row_col_resolve(raw_img, sheet_dict, choice_sess, cloze_sess, xml_save_path):
- conf_thresh_0 = 0.7
- mns_thresh_0 = 0.3
- regions = sheet_dict['regions']
- classes_name = str([ele['class_name'] for ele in regions])
- region_tmp = regions.copy()
- # json.dumps(sheet_dict, ensure_ascii=False)
- # if 'choice' in classes_name:
- # try:
- # # sheet_dict['choice'] = choice(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess)
- # choice_dict_list = choice_row_col(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, choice_sess)
- # if len(choice_dict_list) > 0:
- # region_tmp.extend(choice_dict_list)
- # except Exception as e:
- # traceback.print_exc()
- # logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'choice_m' in classes_name:
- try:
- choice_dict_list = choice_m_row_col(raw_img, regions, xml_save_path)
- if len(choice_dict_list) > 0:
- region_tmp.extend(choice_dict_list)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'exam_number' in classes_name:
- try:
- # sheet_dict['exam_number'] = exam_number(raw_img, regions, xml_save_path)
- exam_number_dict_list = exam_number_row_col(raw_img, regions, xml_save_path)
- if len(exam_number_dict_list) > 0:
- region_tmp.extend(exam_number_dict_list)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'cloze' in classes_name:
- try:
- cloze_dict_list = cloze(raw_img, regions, xml_save_path, conf_thresh_0, mns_thresh_0, cloze_sess)
- if len(cloze_dict_list) > 0:
- region_tmp.extend(cloze_dict_list)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 填空题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- sheet_dict.update({'regions': region_tmp})
- return sheet_dict
- def sheet_detail_resolve(raw_img, sheet_dict, xml_save_path, shrink=True):
- regions = sheet_dict['regions']
- classes_names_list = set([ele['class_name'] for ele in regions])
- region_tmp = regions.copy()
- # json.dumps(sheet_dict, ensure_ascii=False)
- if 'choice_m' in classes_names_list:
- try:
- choice_dict_list = choice_m_row_col(raw_img, regions, xml_save_path)
- if shrink:
- for ele in choice_dict_list:
- if 'all_small_coordinate' in ele.keys():
- ele.pop('all_small_coordinate')
- region_tmp = [ele for ele in region_tmp if ele['class_name'] != 'choice_m'] # 重名
- if len(choice_dict_list) > 0:
- region_tmp.extend(choice_dict_list)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 选择题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'exam_number' in classes_names_list:
- try:
- exam_number_dict_list = exam_number_row_col(raw_img, regions, xml_save_path)
- for ele in exam_number_dict_list:
- ele.pop('all_small_coordinate')
- if len(exam_number_dict_list) > 0:
- region_tmp.extend(exam_number_dict_list)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 考号区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'solve' or 'solve0' or 'composition' or 'composition0' in classes_names_list:
- try:
- solve_number = solve_with_number(region_tmp, xml_save_path)
- region_tmp = [ele for ele in region_tmp if 'solve' not in ele['class_name']] # 重名
- region_tmp = [ele for ele in region_tmp if 'composition' not in ele['class_name']]
- if len(solve_number) > 0:
- region_tmp.extend(solve_number)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- if 'cloze' in classes_names_list or 'cloze_s' in classes_names_list:
- try:
- cloze_number = cloze_with_number(region_tmp, xml_save_path)
- region_tmp = [ele for ele in region_tmp if 'cloze' not in ele['class_name']] # 重名
- if len(cloze_number) > 0:
- region_tmp.extend(cloze_number)
- except Exception as e:
- traceback.print_exc()
- logger.info('试卷:{} 解答题区域解析失败: {}'.format(xml_save_path.replace('xml', '.jpg'), e))
- sheet_dict.update({'regions': region_tmp})
- return sheet_dict
- def sheet_points(sheet_dict_list, image_list, ocr_list, if_ocr=False):
- sheet_list = []
- for index, ele in enumerate(sheet_dict_list):
- ocr_res = ocr_list[index]
- h, w = image_list[index].shape[0], image_list[index].shape[1]
- sheet_dict = {'sheet_dict': sheet_dict_list[index], 'ocr': ocr_res, 'shape': (h, w), 'raw_image': image_list[index]}
- sheet_list.append(sheet_dict)
- try:
- res = get_sheet_points(sheet_list)
- sheet_dict_list = [ele['sheet_dict'] for ele in res]
- except Exception as e:
- traceback.print_exc()
- sheet_dict_list = [ele['sheet_dict'] for ele in sheet_list]
- try:
- sheet_total_list = []
- for index, ele in enumerate(sheet_dict_list):
- ocr_res = change_format_baidu_to_google(ocr_list[index])
- sheet_dict = get_sheet_number_total(ele, ocr_res, image_list[index])
- sheet_total_list.append(sheet_dict)
- except Exception as e:
- traceback.print_exc()
- sheet_total_list = sheet_dict_list
- if if_ocr:
- for index, ele in enumerate(sheet_total_list):
- ele.update({'sheet_ocr': ocr_list[index]})
- return sheet_total_list
- def sheet_format_output(init_number, crt_numbers, sheet_dict, image, subject, shrink):
- sheet_dict = box_region_format(sheet_dict, image, subject, shrink)
- sheet_dict, init_number, crt_numbers = question_number_format(init_number, crt_numbers, sheet_dict)
- return sheet_dict, init_number, crt_numbers
- def sheet_anchor(image):
- anchor_list = find_anchor(image)
- return anchor_list
- def gen_xml(sheet_region_dict, xml_path):
- tree = ET.parse(xml_template_path)
- for index_num, box in enumerate(sheet_region_dict):
- if len(box['bounding_box']) > 0:
- abcd = box['bounding_box']
- name = box["class_name"]
- box_tmp = box.copy()
- box_tmp.pop('bounding_box')
- box_tmp.pop('class_name')
- info = str(box_tmp)
- name = '{}_{}'.format(name, info)
- tree = utils.create_xml(name, tree,
- abcd['xmin'], abcd['ymin'],
- abcd['xmax'], abcd['ymax'])
- tree.write(xml_path)
|