123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811 |
- import base64
- import glob
- import os
- import time
- import uuid
- import shutil
- import xml.etree.cElementTree as ET
- from urllib import parse, request
- from io import BytesIO
- import cv2
- import numpy as np
- import pypinyin
- import requests
- from PIL import Image
- from django.conf import settings
- from pdf2image import convert_from_path
- import segment.logging_config as logging
- from segment.image_operation.exam_segment import get_page_text
- from segment.image_operation.pre_segment import segment2parts
- from segment.image_operation.segment import joint_image
- from segment.image_operation.split_lines import line_split
- from segment.image_operation.utils import create_xml, resize_by_percent
- from segment.image_operation.utils import write_single_img
- from segment.models import OcrToken
- from segment.ocr.group_pictures import group_pictures
- from segment.ocr.group_text import group_text
- from segment.ocr.penguin_ocr import get_ocr_english_text
- logger = logging.getLogger(settings.LOGGING_TYPE)
- def convert_pil_to_jpeg(raw_img):
- if raw_img.mode == 'L':
- channels = raw_img.split()
- img = Image.merge("RGB", (channels[0], channels[0], channels[0]))
- elif raw_img.mode == 'RGB':
- img = raw_img
- elif raw_img.mode == 'RGBA':
- img = Image.new("RGB", raw_img.size, (255, 255, 255))
- img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel
- else:
- img = raw_img
- open_cv_image = np.array(img)
- return img, open_cv_image
- def opencv2base64(image, to_pil=False):
- # image = cv2.imencode('.jpg', img)[1]
- # base64_data = str(base64.b64encode(image))[2:-1]
- if to_pil:
- image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
- output_buffer = BytesIO()
- image.save(output_buffer, format='JPEG')
- byte_data = output_buffer.getvalue()
- base64_data = base64.b64encode(byte_data)
- else:
- data = cv2.imencode('.jpg', image)[1]
- base64_data = base64.b64encode(data.tostring()).decode('utf8')
- return base64_data
- def get_dir_next_index_name(path, file_type):
- files_list = os.listdir(path)
- imgs_list = [file.replace(file_type, '') for file in files_list if file.endswith(file_type)]
- length = len(imgs_list)
- if length == 0:
- return 1
- else:
- index_name = max(imgs_list)
- return int(index_name) + 1
- def save_raw_image(subject, datetime, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- raw_name = img_file.name[0:-len(ext) - 1]
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- channels = raw_img.split()
- if len(channels) >= 3:
- img = Image.merge("RGB", (channels[0], channels[1], channels[2]))
- open_cv_image = np.array(img)
- img_reload = open_cv_image[:, :, ::-1].copy()
- parts_list = segment2parts(img_reload, save_path)
- else:
- img = raw_img
- open_cv_image = np.array(img)
- parts_list = segment2parts(open_cv_image, save_path)
- # for part in parts_list:
- # with open(part['img_part'], 'rb') as f:
- # bin_img = f.read()
- # part['img_part'] = bin_img
- try:
- img.save(save_path)
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, parts_list, url_path
- def save_raw_image_without_segment(subject, datetime, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- raw_name = img_file.name[0:-len(ext) - 1]
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- pil_img, open_cv_image = convert_pil_to_jpeg(raw_img)
- try:
- pil_img.save(save_path)
- shutil.copy(save_path, save_path.replace('.jpg', '_small.jpg'))
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, open_cv_image, url_path
- def save_raw_image_without_segment_formula(subject, datetime, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- raw_name = img_file.name[0:-len(ext) - 1]
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], ext)
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- channels = raw_img.split()
- # if ext == 'png' and len(channels) >= 3: # 公式ocr分割透明png
- # img = Image.merge("RGB", (channels[0], channels[1], channels[2]))
- # open_cv_image = np.array(img)
- # resize_img = resize_by_percent(open_cv_image, 0.5)
- #
- # else:
- # img = raw_img
- # open_cv_image = np.array(img)
- # resize_img = resize_by_percent(open_cv_image, 0.5)
- try:
- raw_img.save(save_path)
- # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg'))
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, url_path, raw_img
- def save_raw_image_in_jpeg(subject, datetime, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- ext = img_file.name.split('.')[-1]
- raw_name = img_file.name[0:-len(ext) - 1]
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- if raw_img.mode == 'L':
- channels = raw_img.split()
- img = Image.merge("RGB", (channels[0], channels[0], channels[0]))
- elif raw_img.mode == 'RGB':
- img = raw_img
- elif raw_img.mode == 'RGBA':
- img = Image.new("RGB", raw_img.size, (255, 255, 255))
- img.paste(raw_img, mask=raw_img.split()[3]) # 3 is the alpha channel
- else:
- img = raw_img
- open_cv_image = np.array(img)
- # resize_img = resize_by_percent(open_cv_image, 0.5)
- try:
- img.save(save_path)
- # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg'))
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, url_path, open_cv_image
- def ocr_login():
- def login():
- grant_type = 'client_credentials'
- client_id = settings.OCR_CLIENT_ID
- client_secret = settings.OCR_CLIENT_SECRET
- textmod = {'grant_type': grant_type, 'client_id': client_id, 'client_secret': client_secret}
- textmod = parse.urlencode(textmod)
- # 输出内容:user=admin&password=admin
- header_dict = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko'}
- url = 'https://aip.baidubce.com/oauth/2.0/token'
- req = request.Request(url='{}{}{}'.format(url, '?', textmod), headers=header_dict)
- res = request.urlopen(req).read()
- token = eval(res.decode(encoding='utf-8'))['access_token']
- lastest_access_token = OcrToken(access_token=token)
- lastest_access_token.save()
- return token
- objects = OcrToken.objects.latest('update_time')
- lastest_access_token_db = objects.access_token
- lastest_date = objects.update_time
- ans_time = time.mktime(lastest_date.timetuple())
- update_date = settings.OCR_TOKEN_UPDATE_DATE
- current_time = time.time()
- if (ans_time + update_date * 24 * 60 * 60) > current_time:
- return lastest_access_token_db
- else:
- return login()
- def get_exam_bbox_by_tesseract(img_raw_name, img_path, subject):
- error_info = ''
- status = 1
- text = []
- lines_save_dir = img_path.replace('.jpg', '_lines')
- img_path = os.path.abspath(img_path)
- lines_save_dir = os.path.abspath(lines_save_dir)
- if not os.path.exists(lines_save_dir):
- os.makedirs(lines_save_dir)
- start_time = time.time()
- try:
- bbox, lines_abs_path_list = line_split(img_path, lines_save_dir, settings.TOLERANCE_PIX_NUMBER) # 分行
- except Exception as e:
- logger.error('line_split failed: {}'.format(e), exc_info=True)
- status = 0
- error_info = str(e)
- info = {'is_success': status, 'img_name': img_raw_name, 'coordinate': text, 'error': error_info}
- return info
- time1 = time.time()
- logger.info('lines_segment, cost: {}'.format(time1 - start_time))
- exam_group = []
- try:
- _, exam_group = group_pictures(lines_abs_path_list, subject)
- logger.info('exam_group info : {}'.format(exam_group))
- except (SystemExit, KeyboardInterrupt):
- raise
- except Exception as e:
- logger.error('ocr failed: {}'.format(e), exc_info=True)
- status = 0
- error_info = error_info + str(e)
- time2 = time.time()
- logger.info('exam_grouped, cost: {}'.format(time2 - time1))
- try:
- text = joint_image(img_path, bbox, exam_group)
- except (SystemExit, KeyboardInterrupt):
- raise
- except Exception as e:
- logger.error('generate coordinate info failed: {}'.format(e), exc_info=True)
- status = 0
- error_info = error_info + str(e)
- info = {'img_name': img_raw_name, 'coordinate': text}
- if error_info:
- info = {'img_name': img_raw_name, 'coordinate': text, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- def get_ocr_text(access_token, img, subject=None):
- textmod = {'access_token': access_token}
- textmod = parse.urlencode(textmod)
- url = '{}{}{}{}{}'.format(settings.OCR_URL, settings.OCR_ACCURACY, '_basic', '?', textmod)
- url_general = '{}{}{}{}{}'.format(settings.OCR_URL, 'general', '_basic', '?', textmod)
- headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- # image = opecv2base64(img) # 得到 byte 编码的数据
- image = img
- data = {
- 'image': image,
- 'detect_direction': 'true',
- 'language_type': 'CHN_ENG',
- }
- if subject == 'english':
- resp = requests.post(url, data=data, headers=headers).json()
- else:
- resp = requests.post(url, data=data, headers=headers).json()
- if resp.get('error_msg'):
- if 'internal error' in resp.get('error_msg'):
- resp = requests.post(url_general, data=data, headers=headers).json()
- if resp.get('error_msg'):
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- else:
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- words_result = resp.get('words_result')
- text_list = [word.get('words') for word in words_result]
- # words_list = {'word': text_list, 'subject': subject}
- return text_list
- def get_ocr_text_and_coordinate_in_raw_format(access_token, img):
- textmod = {'access_token': access_token}
- textmod = parse.urlencode(textmod)
- url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod)
- url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod)
- headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- image_type = 'base64'
- group_id = 'group001'
- user_id = 'usr001'
- # image = base64.b64encode(img) # 得到 byte 编码的数据
- image = img
- data = {
- 'image_type': image_type,
- 'group_id': group_id,
- 'user_id': user_id,
- 'image': image,
- 'detect_direction': 'true',
- 'recognize_granularity': 'small',
- # 'vertexes_location': 'true',
- # 'probability': 'true'
- }
- resp = requests.post(url, data=data, headers=headers).json()
- if resp.get('error_msg'):
- if 'internal error' in resp.get('error_msg'):
- resp = requests.post(url_general, data=data, headers=headers).json()
- if resp.get('error_msg'):
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- else:
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- return resp
- def get_ocr_text_and_coordinate(access_token, img):
- textmod = {'access_token': access_token}
- textmod = parse.urlencode(textmod)
- url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod)
- url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod)
- headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- image_type = 'base64'
- group_id = 'group001'
- user_id = 'usr001'
- # image = base64.b64encode(img) # 得到 byte 编码的数据
- image = img
- data = {
- 'image_type': image_type,
- 'group_id': group_id,
- 'user_id': user_id,
- 'image': image,
- 'detect_direction': 'true',
- # 'recognize_granularity': 'small',
- # 'vertexes_location': 'true',
- # 'probability': 'true'
- }
- resp = requests.post(url, data=data, headers=headers).json()
- if resp.get('error_msg'):
- if 'internal error' in resp.get('error_msg'):
- resp = requests.post(url_general, data=data, headers=headers).json()
- if resp.get('error_msg'):
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- else:
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- words_result = resp.get('words_result')
- text_list = [word.get('words') for word in words_result]
- # words_list = {'word': text_list, 'subject': subject}
- matrix_lt, matrix_rb = resolve_json(words_result)
- return text_list, matrix_lt, matrix_rb
- def get_ocr_text_and_coordinate_formula(img, access_token, base64=False):
- textmod = {'access_token': access_token}
- textmod = parse.urlencode(textmod)
- url = '{}{}{}{}'.format(settings.OCR_BOX_URL, settings.OCR_ACCURACY, '?', textmod)
- url_general = '{}{}{}{}'.format(settings.OCR_BOX_URL, 'general', '?', textmod)
- headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- image_type = 'base64'
- group_id = 'group001'
- user_id = 'usr001'
- if base64:
- image = img
- else:
- image = opencv2base64(img)
- data = {
- 'image_type': image_type,
- 'group_id': group_id,
- 'user_id': user_id,
- 'image': image,
- 'detect_direction': 'true',
- 'recognize_granularity': 'small',
- 'language_type': 'CHN_ENG',
- # 'vertexes_location': 'true',
- # 'probability': 'true'
- }
- resp = requests.post(url, data=data, headers=headers).json()
- if resp.get('error_msg'):
- if 'internal error' in resp.get('error_msg'):
- resp = requests.post(url_general, data=data, headers=headers).json()
- if resp.get('error_msg'):
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- else:
- raise Exception("ocr {}!".format(resp.get('error_msg')))
- words_result = resp.get('words_result')
- return words_result
- def resolve_json(words_result):
- box_list = [item[key] for item in words_result for key in item if key == 'location']
- matrix = np.array([0, 0, 0, 0])
- for box in box_list:
- # num_list = list(box.values())
- w = box.get('width')
- l = box.get('left')
- t = box.get('top')
- h = box.get('height')
- num_list = [w, t, l, h]
- matrix = np.vstack([matrix, np.array(num_list)])
- matrix = matrix[1:]
- matrix_w = matrix[:, 0:1]
- matrix_t = matrix[:, 1:2]
- matrix_l = matrix[:, 2:3]
- matrix_h = matrix[:, 3:]
- matrix_lt = np.hstack([matrix_l, matrix_t])
- matrix_wh = np.hstack([matrix_w, matrix_h])
- matrix_rb = matrix_lt + matrix_wh
- return matrix_lt, matrix_rb
- def group_to_coordinate(group_list, matrix_lt, matrix_rb):
- matrix_box_vlist = np.array([0, 0, 0, 0])
- for element in group_list:
- if element[0] < element[1]:
- rb = matrix_rb[element[0]:element[1]].max(axis=0)
- lt = matrix_lt[element[0]:element[1]].min(axis=0)
- matrix_box = np.hstack([lt, rb])
- matrix_box_vlist = np.vstack([matrix_box_vlist, matrix_box])
- matrix_box_vlist = matrix_box_vlist[1:]
- return matrix_box_vlist.tolist()
- def get_exam_box(img_raw_name, img_list, save_path, subject, access_token):
- status = 1
- error_info = ''
- box_list = []
- words_list_all = []
- group_list_all = []
- try:
- for img_part in img_list:
- x_bias = img_part['x_bias']
- y_bias = img_part['y_bias']
- img = img_part['img_part']
- words_list, matrix_lt, matrix_rb = get_ocr_text_and_coordinate(access_token, img)
- matrix_lt = matrix_lt + np.asarray([x_bias, y_bias])
- matrix_rb = matrix_rb + np.asarray([x_bias, y_bias])
- group_list = group_text(words_list, subject)
- part_box_list = group_to_coordinate(group_list, matrix_lt, matrix_rb)
- box_list = box_list + part_box_list
- words_list.append('********************************')
- words_list_all = words_list_all + words_list
- group_list_all.append(group_list)
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + ',\n' for line in words_list_all]
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- writer.writelines(str(group_list_all))
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- # 记录xml坐标信息
- tree = ET.parse(r'./segment/exam_info/000000-template.xml') # xml tree
- for index_num, exam_bbox in enumerate(box_list):
- tree = create_xml('{:02d}'.format(index_num), tree,
- exam_bbox[0], exam_bbox[1], exam_bbox[2], exam_bbox[3])
- # print(exam_items_bbox)
- tree.write(save_path.replace('.jpg', '.xml'))
- except Exception as e:
- logger.error('{}试卷: {} 坐标生成失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- status = 0
- error_info = error_info + str(e)
- info = {'img_name': img_raw_name, 'coordinate': box_list}
- if error_info:
- info = {'img_name': img_raw_name, 'coordinate': box_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- def get_exam_ocr(img_raw_name, img_list, save_path, subject, access_token):
- status = 1
- error_info = ''
- words_list = []
- for img_part in img_list:
- img = img_part['img_part']
- try:
- part_words_list = get_ocr_text(access_token, img, subject)
- except Exception as e:
- part_words_list = []
- error_info = error_info + str(e)
- words_list = words_list + part_words_list
- if len(words_list) < 1:
- logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True)
- status = 0
- else:
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + '\n' for line in words_list]
- # # words_list.append(group_list)
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- info = {'img_name': img_raw_name, 'text': words_list}
- if error_info:
- info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- def get_exam_ocr_single(img_raw_name, img, save_path, subject, access_token):
- status = 1
- error_info = ''
- words_list = []
- try:
- part_words_list = get_ocr_text(access_token, img)
- except Exception as e:
- part_words_list = []
- error_info = error_info + str(e)
- words_list = words_list + part_words_list
- if len(words_list) < 1:
- logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True)
- status = 0
- else:
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + ',\n' for line in words_list]
- # # words_list.append(group_list)
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- info = {'img_name': img_raw_name, 'text': words_list}
- if error_info:
- info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- def get_segment_by_ocr_once(opencv_img, token, subject, save_path, img_raw_name):
- img = opencv2base64(opencv_img)
- resp = get_ocr_text_and_coordinate_in_raw_format(token, img)
- if len(opencv_img.shape) == 3:
- opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_BGR2GRAY)
- test_list = get_page_text(resp['words_result'], opencv_img)
- status = 1
- error_info = ''
- box_list = []
- words_list_all = []
- group_list_all = []
- try:
- for one_page_text in test_list:
- words_list = [word.get('words') for word in one_page_text]
- matrix_lt, matrix_rb = resolve_json(one_page_text)
- group_list = group_text(words_list, subject)
- part_box_list = group_to_coordinate(group_list, matrix_lt, matrix_rb)
- box_list = box_list + part_box_list
- words_list.append('********************************')
- words_list_all = words_list_all + words_list
- group_list_all.append(group_list)
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + '\n' for line in words_list_all]
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- writer.writelines(str(group_list_all))
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- # 记录xml坐标信息
- tree = ET.parse(r'./segment/exam_info/000000-template.xml') # xml tree
- for index_num, exam_bbox in enumerate(box_list):
- tree = create_xml('{:02d}'.format(index_num), tree,
- exam_bbox[0], exam_bbox[1], exam_bbox[2], exam_bbox[3])
- # print(exam_items_bbox)
- tree.write(save_path.replace('.jpg', '.xml'))
- except Exception as e:
- logger.error('{}试卷: {} 坐标生成失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- status = 0
- error_info = error_info + str(e)
- info = {'img_name': img_raw_name, 'coordinate': box_list}
- if error_info:
- info = {'img_name': img_raw_name, 'coordinate': box_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- # opencv_img, token, subject, save_path, img_raw_name
- def get_exam_ocr_once(opencv_img, token, subject, save_path, img_raw_name):
- img = opencv2base64(opencv_img)
- resp = get_ocr_text_and_coordinate_in_raw_format(token, img)
- if len(opencv_img.shape) == 3:
- opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_BGR2GRAY)
- test_list = get_page_text(resp['words_result'], opencv_img)
- words_list = []
- for one_page_raw_text in test_list:
- one_page_words_list = [word.get('words') for word in one_page_raw_text]
- words_list = words_list + one_page_words_list
- status = 1
- error_info = ''
- if len(words_list) < 1:
- logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True)
- status = 0
- else:
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + '\n' for line in words_list]
- # # words_list.append(group_list)
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- info = {'img_name': img_raw_name, 'text': words_list}
- if error_info:
- info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
- def save_pdf_image(pdf_file, subject, time_str):
- name = pdf_file.name[:-4]
- ext0 = pdf_file.name.split('.')[-1]
- raw_name0 = ''.join([''.join(i) for i in pypinyin.pinyin(name, style=pypinyin.NORMAL)])
- save_dir0 = os.sep.join(
- [settings.MEDIA_ROOT, 'ocr', subject, time_str, raw_name0 + '_{}'.format(uuid.uuid4().hex[:10])])
- if not os.path.exists(save_dir0):
- os.makedirs(save_dir0)
- pdf_path = os.sep.join([save_dir0, raw_name0 + '.' + ext0])
- with open(pdf_path, 'wb') as pdfFileObj:
- for chunk in pdf_file.chunks():
- pdfFileObj.write(chunk)
- images_list = convert_from_path(pdf_path, dpi=200, output_folder=save_dir0,
- output_file='image',
- first_page=None, last_page=None, fmt='JPEG')
- upload_img_path_list = glob.glob(os.sep.join([save_dir0, '*.jpg']))
- try:
- images_list = [cv2.cvtColor(np.asarray(ele), cv2.COLOR_RGB2BGR) for ele in images_list]
- except Exception:
- images_list = [np.asarray(ele) for ele in images_list]
- return upload_img_path_list, images_list
- def save_raw_image_without_segment_pdf(subject, datetime, raw_name, img_file, analysis_type):
- # 随机生成新的图片名,自定义路径。
- file_name = '{}_{}.{}'.format(raw_name, uuid.uuid4().hex[:10], 'jpg')
- raw_img = Image.open(img_file) # 读取上传的网络图像
- save_dir = os.path.join(settings.MEDIA_ROOT, analysis_type, subject, datetime)
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- save_path = os.path.join(save_dir, file_name)
- channels = raw_img.split()
- if len(channels) > 3:
- img = Image.merge("RGB", (channels[1], channels[2], channels[3]))
- open_cv_image = np.array(img)
- resize_img = resize_by_percent(open_cv_image, 0.5)
- else:
- img = raw_img
- open_cv_image = np.array(img)
- resize_img = resize_by_percent(open_cv_image, 0.5)
- try:
- img.save(save_path)
- # write_single_img(resize_img, save_path.replace('.jpg', '_small.jpg'))
- except Exception as e:
- raise e
- url_path = os.path.join(settings.MEDIA_URL, analysis_type, subject, datetime, file_name).replace('\\', '/')
- return save_path, url_path, open_cv_image
- def get_exam_ocr_by_penguin(img_raw_name, raw_image, size, save_path, subject):
- status = 1
- error_info = ''
- words_list = []
- try:
- words_list = get_ocr_english_text(raw_image, size)
- except Exception as e:
- error_info = error_info + str(e)
- if len(words_list) < 1:
- logger.error('{}试卷: {} OCR解析失败: {}'.format(subject, img_raw_name, error_info), exc_info=True)
- status = 0
- else:
- try:
- txt_backup_path = save_path.replace('.jpg', '.txt')
- words_list = [line + '\n' for line in words_list]
- # # words_list.append(group_list)
- with open(txt_backup_path, 'w', encoding='utf-8') as writer:
- writer.writelines('subject:' + subject + '\n')
- writer.writelines('[\n')
- writer.writelines(words_list)
- writer.writelines(']\n')
- logger.info('{}试卷: {} 文本信息保存成功'.format(subject, img_raw_name))
- except Exception as e:
- logger.error('{}试卷: {} 文本信息保存失败: {}'.format(subject, img_raw_name, e), exc_info=True)
- info = {'img_name': img_raw_name, 'text': words_list}
- if error_info:
- info = {'img_name': img_raw_name, 'text': words_list, 'error': error_info}
- logger.info('{} done'.format(img_raw_name))
- return status, info
|