import base64 import glob import os import time import uuid import shutil import xml.etree.cElementTree as ET from urllib import parse, request from io import BytesIO import cv2 import numpy as np import pypinyin import requests from PIL import Image from django.conf import settings from pdf2image import convert_from_path import segment.logging_config as logging from segment.image_operation.exam_segment import get_page_text from segment.image_operation.pre_segment import segment2parts from segment.image_operation.segment import joint_image from segment.image_operation.split_lines import line_split from segment.image_operation.utils import create_xml, resize_by_percent from segment.image_operation.utils import write_single_img from segment.models import OcrToken from segment.ocr.group_pictures import group_pictures from segment.ocr.group_text import group_text from segment.ocr.penguin_ocr import get_ocr_english_text logger = logging.getLogger(settings.LOGGING_TYPE) def convert_pil_to_jpeg(raw_img): if raw_img.mode == 'L': channels = raw_img.split() img = Image.merge("RGB", (channels[0], channels[0], channels[0])) elif raw_img.mode == 'RGB': img = raw_img elif raw_img.mode == 'RGBA': img = Image.new("RGB", raw_img.size, (255, 255, 255)) img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel else: img = raw_img open_cv_image = np.array(img) return img, open_cv_image def opencv2base64(image, to_pil=True): # image = cv2.imencode('.jpg', img)[1] # base64_data = str(base64.b64encode(image))[2:-1] if to_pil: image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) output_buffer = BytesIO() image.save(output_buffer, format='JPEG') byte_data = output_buffer.getvalue() base64_data = base64.b64encode(byte_data) return base64_data def get_dir_next_index_name(path, file_type): files_list = os.listdir(path) imgs_list = [file.replace(file_type, '') for file in files_list if file.endswith(file_type)] length = len(imgs_list) if length == 0: return 1 else: index_name = max(imgs_list) return int(index_name) + 1 def save_raw_image(subject, datetime, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] raw_name = img_file.name[0:-len(ext) - 1] file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) channels = raw_img.split() if len(channels) >= 3: img = Image.merge("RGB", (channels[0], channels[1], channels[2])) open_cv_image = np.array(img) img_reload = open_cv_image[:, :, ::-1].copy() parts_list = segment2parts(img_reload, save_path) else: img = raw_img open_cv_image = np.array(img) parts_list = segment2parts(open_cv_image, save_path) # for part in parts_list: # with open(part['img_part'], 'rb') as f: # bin_img = f.read() # part['img_part'] = bin_img try: img.save(save_path) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, parts_list, url_path def save_raw_image_without_segment(subject, datetime, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] raw_name = img_file.name[0:-len(ext) - 1] file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) pil_img, open_cv_image = convert_pil_to_jpeg(raw_img) try: pil_img.save(save_path) shutil.copy(save_path, save_path.replace('.jpg', '_small.jpg')) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, open_cv_image, url_path def save_raw_image_without_segment_formula(subject, datetime, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] raw_name = img_file.name[0:-len(ext) - 1] file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], ext) raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) channels = raw_img.split() # if ext == 'png' and len(channels) >= 3: # 公式ocr分割透明png # img = Image.merge("RGB", (channels[0], channels[1], channels[2])) # open_cv_image = np.array(img) # resize_img = resize_by_percent(open_cv_image, 0.5) # # else: # img = raw_img # open_cv_image = np.array(img) # resize_img = resize_by_percent(open_cv_image, 0.5) try: raw_img.save(save_path) # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg')) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, url_path, raw_img def save_raw_image_in_jpeg(subject, datetime, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 ext = img_file.name.split('.')[-1] raw_name = img_file.name[0:-len(ext) - 1] file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) if raw_img.mode == 'L': channels = raw_img.split() img = Image.merge("RGB", (channels[0], channels[0], channels[0])) elif raw_img.mode == 'RGB': img = raw_img elif raw_img.mode == 'RGBA': img = Image.new("RGB", raw_img.size, (255, 255, 255)) img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel else: img = raw_img open_cv_image = np.array(img) # resize_img = resize_by_percent(open_cv_image, 0.5) try: img.save(save_path) # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg')) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, url_path, open_cv_image def ocr_login(): def login(): grant_type = 'client_credentials' client_id = settings.OCR_CLIENT_ID client_secret = settings.OCR_CLIENT_SECRET textmod = {'grant_type': grant_type, 'client_id': client_id, 'client_secret': client_secret} textmod = parse.urlencode(textmod) # 输出内容:user=admin&password=admin header_dict = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko'} url = 'https://aip.baidubce.com/oauth/2.0/token' req = request.Request(url='{}{}{}'.format(url, '?', textmod), headers=header_dict) res = request.urlopen(req).read() token = eval(res.decode(encoding='utf-8'))['access_token'] lastest_access_token = OcrToken(access_token=token) lastest_access_token.save() return token objects = OcrToken.objects.latest('update_time') lastest_access_token_db = objects.access_token lastest_date = objects.update_time ans_time = time.mktime(lastest_date.timetuple()) update_date = settings.OCR_TOKEN_UPDATE_DATE current_time = time.time() if (ans_time + update_date * 24 * 60 * 60) > current_time: return lastest_access_token_db else: return login() def get_exam_bbox_by_tesseract(img_raw_name, img_path, subject): error_info = '' status = 1 text = [] lines_save_dir = img_path.replace('.jpg', '_lines') img_path = os.path.abspath(img_path) lines_save_dir = os.path.abspath(lines_save_dir) if not os.path.exists(lines_save_dir): os.makedirs(lines_save_dir) start_time = time.time() try: bbox, lines_abs_path_list = line_split(img_path, lines_save_dir, settings.TOLERANCE_PIX_NUMBER) # 分行 except Exception as e: logger.error('line_split failed: {}'.format(e), exc_info=True) status = 0 error_info = str(e) info = {'is_success': status, 'img_name': img_raw_name, 'coordinate': text, 'error': error_info} return info time1 = time.time() logger.info('lines_segment, cost: {}'.format(time1 - start_time)) exam_group = [] try: _, exam_group = group_pictures(lines_abs_path_list, subject) logger.info('exam_group info : {}'.format(exam_group)) except (SystemExit, KeyboardInterrupt): raise except Exception as e: logger.error('ocr failed: {}'.format(e), exc_info=True) status = 0 error_info = error_info + str(e) time2 = time.time() logger.info('exam_grouped, cost: {}'.format(time2 - time1)) try: text = joint_image(img_path, bbox, exam_group) except (SystemExit, KeyboardInterrupt): raise except Exception as e: logger.error('generate coordinate info failed: {}'.format(e), exc_info=True) status = 0 error_info = error_info + str(e) info = {'img_name': img_raw_name, 'coordinate': text} if error_info: info = {'img_name': img_raw_name, 'coordinate': text, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info def get_ocr_text(access_token, img, subject=None): textmod = {'access_token': access_token} textmod = parse.urlencode(textmod) url = '{}{}{}{}{}'.format(settings.OCR_URL, settings.OCR_ACCURACY, '_basic', '?', textmod) url_general = '{}{}{}{}{}'.format(settings.OCR_URL, 'general', '_basic', '?', textmod) headers = {'Content-Type': 'application/x-www-form-urlencoded'} # image = opecv2base64(img) # 得到 byte 编码的数据 image = img data = { 'image': image, 'detect_direction': 'true', 'language_type': 'CHN_ENG', } if subject == 'english': resp = requests.post(url, data=data, headers=headers).json() else: resp = requests.post(url, data=data, headers=headers).json() if resp.get('error_msg'): if 'internal error' in resp.get('error_msg'): resp = requests.post(url_general, data=data, headers=headers).json() if resp.get('error_msg'): raise Exception("ocr {}!".format(resp.get('error_msg'))) else: raise Exception("ocr {}!".format(resp.get('error_msg'))) words_result = resp.get('words_result') text_list = [word.get('words') for word in words_result] # words_list = {'word': text_list, 'subject': subject} return text_list def get_ocr_text_and_coordinate_in_raw_format(access_token, img): textmod = {'access_token': access_token} textmod = parse.urlencode(textmod) url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod) url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod) headers = {'Content-Type': 'application/x-www-form-urlencoded'} image_type = 'base64' group_id = 'group001' user_id = 'usr001' # image = base64.b64encode(img) # 得到 byte 编码的数据 image = img data = { 'image_type': image_type, 'group_id': group_id, 'user_id': user_id, 'image': image, 'detect_direction': 'true', 'recognize_granularity': 'small', # 'vertexes_location': 'true', # 'probability': 'true' } resp = requests.post(url, data=data, headers=headers).json() if resp.get('error_msg'): if 'internal error' in resp.get('error_msg'): resp = requests.post(url_general, data=data, headers=headers).json() if resp.get('error_msg'): raise Exception("ocr {}!".format(resp.get('error_msg'))) else: raise Exception("ocr {}!".format(resp.get('error_msg'))) return resp def get_ocr_text_and_coordinate(access_token, img): textmod = {'access_token': access_token} textmod = parse.urlencode(textmod) url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod) url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod) headers = {'Content-Type': 'application/x-www-form-urlencoded'} image_type = 'base64' group_id = 'group001' user_id = 'usr001' # image = base64.b64encode(img) # 得到 byte 编码的数据 image = img data = { 'image_type': image_type, 'group_id': group_id, 'user_id': user_id, 'image': image, 'detect_direction': 'true', # 'recognize_granularity': 'small', # 'vertexes_location': 'true', # 'probability': 'true' } resp = requests.post(url, data=data, headers=headers).json() if resp.get('error_msg'): if 'internal error' in resp.get('error_msg'): resp = requests.post(url_general, data=data, headers=headers).json() if resp.get('error_msg'): raise Exception("ocr {}!".format(resp.get('error_msg'))) else: raise Exception("ocr {}!".format(resp.get('error_msg'))) words_result = resp.get('words_result') text_list = [word.get('words') for word in words_result] # words_list = {'word': text_list, 'subject': subject} matrix_lt, matrix_rb = resolve_json(words_result) return text_list, matrix_lt, matrix_rb def get_ocr_text_and_coordinate_formula(img, access_token, base64=False): textmod = {'access_token': access_token} textmod = parse.urlencode(textmod) url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod) url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod) headers = {'Content-Type': 'application/x-www-form-urlencoded'} image_type = 'base64' group_id = 'group001' user_id = 'usr001' if base64: image = img else: image = opencv2base64(img) data = { 'image_type': image_type, 'group_id': group_id, 'user_id': user_id, 'image': image, 'detect_direction': 'true', 'recognize_granularity': 'small', 'language_type': 'CHN_ENG', # 'vertexes_location': 'true', # 'probability': 'true' } resp = requests.post(url, data=data, headers=headers).json() if resp.get('error_msg'): if 'internal error' in resp.get('error_msg'): resp = requests.post(url_general, data=data, headers=headers).json() if resp.get('error_msg'): raise Exception("ocr {}!".format(resp.get('error_msg'))) else: raise Exception("ocr {}!".format(resp.get('error_msg'))) words_result = resp.get('words_result') return words_result def resolve_json(words_result): box_list = [item[key] for item in words_result for key in item if key == 'location'] matrix = np.array([0, 0, 0, 0]) for box in box_list: # num_list = list(box.values()) w = box.get('width') l = box.get('left') t = box.get('top') h = box.get('height') num_list = [w, t, l, h] matrix = np.vstack([matrix, np.array(num_list)]) matrix = matrix[1:] matrix_w = matrix[:, 0:1] matrix_t = matrix[:, 1:2] matrix_l = matrix[:, 2:3] matrix_h = matrix[:, 3:] matrix_lt = np.hstack([matrix_l, matrix_t]) matrix_wh = np.hstack([matrix_w, matrix_h]) matrix_rb = matrix_lt + matrix_wh return matrix_lt, matrix_rb def group_to_coordinate(group_list, matrix_lt, matrix_rb): matrix_box_vlist = np.array([0, 0, 0, 0]) for element in group_list: if element[0] < element[1]: rb = matrix_rb[element[0]:element[1]].max(axis=0) lt = matrix_lt[element[0]:element[1]].min(axis=0) matrix_box = np.hstack([lt, rb]) matrix_box_vlist = np.vstack([matrix_box_vlist, matrix_box]) matrix_box_vlist = matrix_box_vlist[1:] return matrix_box_vlist.tolist() def get_exam_box(img_raw_name, img_list, save_path, subject, access_token): status = 1 error_info = '' box_list = [] words_list_all = [] group_list_all = [] try: for img_part in img_list: x_bias = img_part['x_bias'] y_bias = img_part['y_bias'] img = img_part['img_part'] words_list, matrix_lt, matrix_rb = get_ocr_text_and_coordinate(access_token, img) matrix_lt = matrix_lt + np.asarray([x_bias, y_bias]) matrix_rb = matrix_rb + np.asarray([x_bias, y_bias]) group_list = group_text(words_list, subject) part_box_list = group_to_coordinate(group_list, matrix_lt, matrix_rb) box_list = box_list + part_box_list words_list.append('********************************') words_list_all = words_list_all + words_list group_list_all.append(group_list) try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + ',\n' for line in words_list_all] with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') writer.writelines(str(group_list_all)) logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) # 记录xml坐标信息 tree = ET.parse(r'./segment/exam_info/000000-template.xml') # xml tree for index_num, exam_bbox in enumerate(box_list): tree = create_xml('{:02d}'.format(index_num), tree, exam_bbox[0], exam_bbox[1], exam_bbox[2], exam_bbox[3]) # print(exam_items_bbox) tree.write(save_path.replace('.jpg', '.xml')) except Exception as e: logger.error('{}试卷: {} 坐标生成失败: {}'.format(subject, img_raw_name, e), exc_info=True) status = 0 error_info = error_info + str(e) info = {'img_name': img_raw_name, 'coordinate': box_list} if error_info: info = {'img_name': img_raw_name, 'coordinate': box_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info def get_exam_ocr(img_raw_name, img_list, save_path, subject, access_token): status = 1 error_info = '' words_list = [] for img_part in img_list: img = img_part['img_part'] try: part_words_list = get_ocr_text(access_token, img, subject) except Exception as e: part_words_list = [] error_info = error_info + str(e) words_list = words_list + part_words_list if len(words_list) < 1: logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True) status = 0 else: try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + '\n' for line in words_list] # # words_list.append(group_list) with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) info = {'img_name': img_raw_name, 'text': words_list} if error_info: info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info def get_exam_ocr_single(img_raw_name, img, save_path, subject, access_token): status = 1 error_info = '' words_list = [] try: part_words_list = get_ocr_text(access_token, img) except Exception as e: part_words_list = [] error_info = error_info + str(e) words_list = words_list + part_words_list if len(words_list) < 1: logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True) status = 0 else: try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + ',\n' for line in words_list] # # words_list.append(group_list) with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) info = {'img_name': img_raw_name, 'text': words_list} if error_info: info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info def get_segment_by_ocr_once(opencv_img, token, subject, save_path, img_raw_name): img = opencv2base64(opencv_img) resp = get_ocr_text_and_coordinate_in_raw_format(token, img) if len(opencv_img.shape) == 3: opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_BGR2GRAY) test_list = get_page_text(resp['words_result'], opencv_img) status = 1 error_info = '' box_list = [] words_list_all = [] group_list_all = [] try: for one_page_text in test_list: words_list = [word.get('words') for word in one_page_text] matrix_lt, matrix_rb = resolve_json(one_page_text) group_list = group_text(words_list, subject) part_box_list = group_to_coordinate(group_list, matrix_lt, matrix_rb) box_list = box_list + part_box_list words_list.append('********************************') words_list_all = words_list_all + words_list group_list_all.append(group_list) try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + '\n' for line in words_list_all] with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') writer.writelines(str(group_list_all)) logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) # 记录xml坐标信息 tree = ET.parse(r'./segment/exam_info/000000-template.xml') # xml tree for index_num, exam_bbox in enumerate(box_list): tree = create_xml('{:02d}'.format(index_num), tree, exam_bbox[0], exam_bbox[1], exam_bbox[2], exam_bbox[3]) # print(exam_items_bbox) tree.write(save_path.replace('.jpg', '.xml')) except Exception as e: logger.error('{}试卷: {} 坐标生成失败: {}'.format(subject, img_raw_name, e), exc_info=True) status = 0 error_info = error_info + str(e) info = {'img_name': img_raw_name, 'coordinate': box_list} if error_info: info = {'img_name': img_raw_name, 'coordinate': box_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info # opencv_img, token, subject, save_path, img_raw_name def get_exam_ocr_once(opencv_img, token, subject, save_path, img_raw_name): img = opencv2base64(opencv_img) resp = get_ocr_text_and_coordinate_in_raw_format(token, img) if len(opencv_img.shape) == 3: opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_BGR2GRAY) test_list = get_page_text(resp['words_result'], opencv_img) words_list = [] for one_page_raw_text in test_list: one_page_words_list = [word.get('words') for word in one_page_raw_text] words_list = words_list + one_page_words_list status = 1 error_info = '' if len(words_list) < 1: logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True) status = 0 else: try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + '\n' for line in words_list] # # words_list.append(group_list) with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) info = {'img_name': img_raw_name, 'text': words_list} if error_info: info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info def save_pdf_image(pdf_file, subject, time_str): name = pdf_file.name[:-4] ext0 = pdf_file.name.split('.')[-1] raw_name0 = ''.join([''.join(i) for i in pypinyin.pinyin(name, style=pypinyin.NORMAL)]) save_dir0 = os.sep.join( [settings.MEDIA_ROOT, 'ocr', subject, time_str, raw_name0 + '_{}'.format(uuid.uuid4().hex[:10])]) if not os.path.exists(save_dir0): os.makedirs(save_dir0) pdf_path = os.sep.join([save_dir0, raw_name0 + '.' + ext0]) with open(pdf_path, 'wb') as pdfFileObj: for chunk in pdf_file.chunks(): pdfFileObj.write(chunk) images_list = convert_from_path(pdf_path, dpi=200, output_folder=save_dir0, output_file='image', first_page=None, last_page=None, fmt='JPEG') upload_img_path_list = glob.glob(os.sep.join([save_dir0, '*.jpg'])) try: images_list = [cv2.cvtColor(np.asarray(ele), cv2.COLOR_RGB2BGR) for ele in images_list] except Exception: images_list = [np.asarray(ele) for ele in images_list] return upload_img_path_list, images_list def save_raw_image_without_segment_pdf(subject, datetime, raw_name, img_file, analysis_type): # 随机生成新的图片名,自定义路径。 file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg') raw_img = Image.open(img_file) # 读取上传的网络图像 save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime) if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, file_name) channels = raw_img.split() if len(channels) > 3: img = Image.merge("RGB", (channels[1], channels[2], channels[3])) open_cv_image = np.array(img) resize_img = resize_by_percent(open_cv_image, 0.5) else: img = raw_img open_cv_image = np.array(img) resize_img = resize_by_percent(open_cv_image, 0.5) try: img.save(save_path) # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg')) except Exception as e: raise e url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/') return save_path, url_path, open_cv_image def get_exam_ocr_by_penguin(img_raw_name, raw_image, size, save_path, subject): status = 1 error_info = '' words_list = [] try: words_list = get_ocr_english_text(raw_image, size) except Exception as e: error_info = error_info + str(e) if len(words_list) < 1: logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True) status = 0 else: try: txt_backup_path = save_path.replace('.jpg', '.txt') words_list = [line + '\n' for line in words_list] # # words_list.append(group_list) with open(txt_backup_path, 'w', encoding='utf-8') as writer: writer.writelines('subject:' + subject + '\n') writer.writelines('[\n') writer.writelines(words_list) writer.writelines(']\n') logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name)) except Exception as e: logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True) info = {'img_name': img_raw_name, 'text': words_list} if error_info: info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info} logger.info('{} done'.format(img_raw_name)) return status, info