import os import time import datetime import re import base64 import requests import my_config # def get_image_base64(img_path): # with open(img_path, "rb") as f: # base64_byte = base64.b64encode(f.read()) # return base64_byte.decode('utf-8') def get_image_base64(image): """ 将各来源的图片转为base64编码,先默认为本地图片路径 :param image: :return: """ print("image:", str(image)) if re.search("^https?:", str(image)): # 远程图片 # filebyte = requests.get(image).content return "online" elif re.search("^[A-H]:", str(image)): # 本地图片 local_img = image filebyte = open(local_img, 'rb').read() else: # 二进制文件流 filebyte = image.read() # encoded = base64.b64encode(open(local_img, 'rb').read()) base64_data = base64.b64encode(filebyte).rstrip().decode('utf-8') return base64_data class BaiduRecognitionApi: def __init__(self): # client_id 为官网获取的AK, client_secret 为官网获取的SK # 谢易: # host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=OuO8mVZNIkDCP9eDQgF8txER&client_secret=2Wv28a9WBDKvkoXvcHMINNWZt8QdOzZg' # AK = "cGkyXvUHlccfVazpFNV7cRyp" # SK = "LSroI0zDzmOPZbzcs33Xb51p7oTak3NM" # host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={}&client_secret={}'.format( # AK, SK) # response = requests.get(host) # if response.status_code == 200: # result = response.json() # print(result) # self.access_token = result["access_token"] # else: # print("获取Access Token失败") # self.access_token = "" # self.access_token = "24.226efa451287e58f7fe02970a256b91c.2592000.1673679089.282335-27265553" # ZWJ # self.access_token = "24.d589893671869cd5b059c25bb567c7ca.2592000.1677722635.282335-21782130" # XY self.host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=OuO8mVZNIkDCP9eDQgF8txER&client_secret=2Wv28a9WBDKvkoXvcHMINNWZt8QdOzZg' self.last_update_time = my_config.last_token_update_time self.expires_in = my_config.bce_expires_in self.access_token = my_config.bce_access_token def update_access_token(self): """ 判断是否需要更新access_token,默认 {expires_in * 0.9} 每27天更新一次 默认 expires_in = 2592000为30天,一天为86400 :return: """ if int(time.time() - self.last_update_time) >= self.expires_in * 0.9 or not self.access_token: try: print(self.host) response = requests.get(self.host) if response.status_code == 200: self.last_update_time = time.time() my_config.last_token_update_time = self.last_update_time # 更新配置文件里上一次更新时间 result = response.json() # print(result) self.access_token = result["access_token"] self.expires_in = int(result["expires_in"]) my_config.bce_access_token = self.access_token # 更新配置文件里的token my_config.bce_expires_in = self.expires_in print(f"更新access_token: {self.access_token} expires_in: {self.expires_in}") else: print("获取Access Token失败") self.access_token = "" except Exception as ex: print("获取Access Token异常", ex) self.access_token = "" else: pass def recognition(self, in_img_path): """ 手写文字识别,百度提供了3种图片上传格式:image、url、pdf_file :param in_img_path: 图片路径 :return: """ tt0 = time.time() self.update_access_token() update_time = time.time() - tt0 access_token = self.access_token if len(access_token) < 1: return request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/handwriting" tt1 = time.time() image_base64 = get_image_base64(in_img_path) print("读取图片时间:", time.time() - tt1) params = {"url": in_img_path} if image_base64 == "online" else {"image": image_base64} # access_token = '[调用鉴权接口获取的token]' request_url = request_url + "?access_token=" + access_token headers = {'content-type': 'application/x-www-form-urlencoded'} while True: post_stime = str(datetime.datetime.now()) response = requests.post(request_url, data=params, headers=headers) post_cost_time = time.time() - tt1 print("调接口消费时间:", post_cost_time) if response.status_code == 200: result = response.json() print(result) if "words_result" in result and result["words_result"]: res_word = ";".join([i["words"] for i in result["words_result"]]) return res_word, result, post_stime, update_time elif "Open api qps request limit reached" in str(result): time.sleep(0.2) else: return "", 'ocr异常1:' + str(result), post_stime, update_time else: return "", 'ocr异常2:' + str(response.text), post_stime, update_time if __name__ == '__main__': from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor api = BaiduRecognitionApi() st1 = time.time() # image_path = r"http://zxhx-1302712961.cos.ap-shanghai.myqcloud.com/ai/review/image/20220816/a33a4179f59344b1b72cce222d1b4be5.png" # image_path1 = r"http://zxhx-n-1302712961.cos.ap-beijing.myqcloud.com/PC_Prod/client_SanFang_cutimg/2974/936118420988289024/864746148036284881/27.jpg" image_path2 = r"http://zxhx-1302712961.cos.ap-shanghai.myqcloud.com/ai/review/image/20220921/a781d0b14b6f4480a75c66314ef4e945.jpg" image_path = r"http://zxhx-m-1302712961.cos.ap-nanjing.myqcloud.com/PC_Prod/client_SanFang_cutimg/2031/953235651420184576/886480770777941106/124.jpg" print(api.recognition(image_path)) # def ocr_api(imp): # a = api.recognition(imp) # return a # # # with ThreadPoolExecutor(max_workers=3) as t: # all_png_info = [t.submit(ocr_api, arg) for arg in [image_path2] * 6] print("调接口时间:", time.time() - st1) # AK = "cGkyXvUHlccfVazpFNV7cRyp" # SK = "LSroI0zDzmOPZbzcs33Xb51p7oTak3NM" # host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={}&client_secret={}'.format( # AK, SK) # # # host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=【官网获取的AK】&client_secret=【官网获取的SK】' # response = requests.get(host) # if response: # print(response.json())