123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # -*- coding=utf-8
- # appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成
- # 1. 设置用户配置, 包括 secretId,secretKey 以及 Region
- import datetime
- import random
- import hashlib
- from qcloud_cos import CosConfig
- from qcloud_cos import CosS3Client
- from qcloud_cos import CosServiceError
- from qcloud_cos import CosClientError
- import sys, io
- import logging
- import requests
- import configs
- # 日志
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf8')
- handler = logging.StreamHandler(sys.stdout)
- handler.encoding = 'UTF-8' # 解决控制台输出编码问题
- image_upload_log = configs.parse_log_dir + "/image_log.txt"
- logging.basicConfig(level=logging.INFO,
- format="[%(asctime)s, %(levelname)s]-%(filename)s-%(lineno)s: %(message)s",
- filename=image_upload_log,
- # handlers=[handler],
- )
- secret_id = "AKIDC9pETRbZfWBbmhoglkT4PUJGzRjmj3Ia" # "云 API 密钥 SecretId";
- secret_key = "C6jlX4LKfleGdmfQvGNgj74lESRpBIEJ" # "云 API 密钥 SecretKey";
- APPID = '1302712961'
- TIMEOUT = 30
- token = None # 使用临时密钥需要传入Token,默认为空,可不填
- scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
- addr = ".cos." + configs.region + ".myqcloud.com" # 后缀拼接
- cos_config = CosConfig(Region=configs.region,
- SecretId=secret_id,
- SecretKey=secret_key,
- Token=token) # 获取配置对象
- client = CosS3Client(cos_config) # 获取客户端对象
- def upload_img_to_qcloud(param_qcloud):
- put_key, localfile = param_qcloud
- # 高级上传接口(推荐)
- # 根据文件大小自动选择简单上传或分块上传,分块上传具备断点续传功能。
- response = client.upload_file(
- Bucket=configs.public_bucket,
- LocalFilePath=localfile, # 本地文件的路径
- Key=put_key, # 上传到桶之后的文件名
- # PartSize=10,
- PartSize=1, # 上传分成几部分
- MAXThread=10, # 支持最多的线程数
- EnableMD5=False # 是否支持MD5
- )
- # print(response['ETag'])
- def filestream_upload(param_qcloud):
- """
- file upload:file byte
- 文件流简单上传(不支持超过5G的文件,推荐使用下方高级上传接口)
- img upload addrs="http://{}.cos.{}/{}".format(public_bucket,region,ucloud_name)
- http://zxhx-pro-1302712961.cos.ap-beijing.myqcloud.com/xueguan/uploadfiles/wording/5fc8c620c6a4dcc23e55a5d0/image4.png"
- :return:
- """
- put_key, fileurl = param_qcloud
- filebyte = requests.get(fileurl).content
- response = client.put_object(
- Bucket=configs.public_bucket,
- Body=filebyte,
- Key=put_key, # 文件名
- StorageClass='STANDARD',
- EnableMD5=False
- )
- def img_inbucket_count(wordid, flag='w'):
- """
- 统计腾讯云桶中某个wordid下的图片个数
- flag:
- 'w':word解析中的图片存放的腾讯云地址
- 'p':图片、pdf类结构化图片存放的腾讯云地址
- 一份试卷在第一步解析中,腾讯云地址只有一种情况
- :return:
- """
- key_d = {"w": "zyk/uploadfiles/wording/", "p": "imgpaper/lqy_upload/"}
- try:
- obj = client.list_objects(
- Bucket=configs.public_bucket,
- Prefix='{}'.format(key_d[flag])+ wordid,
- # Marker=Marker,
- )
- if 'Contents' in obj:
- return obj['Contents']
- except:
- pass
- return ""
- if __name__ == "__main__":
- localfile1 = r"C:\Users\Python\Desktop\test\5fc0a554e3e93cca5b9e2193\word\media\image9.png"
- put_key2 = "zyk/uploadfiles/wording/5fc0a554e3e93cca5b9e2193/image9.png"
- pic_name = "99.png"
- put_key1 = "https://" + str(configs.public_bucket) + ".cos." + configs.region + ".myqcloud.com/" + pic_name
- # put_key2 = "http://zxhx-1302712961.cos.ap-shanghai.myqcloud.com/" + pic_name
- # response = client.upload_file(
- # Bucket=public_bucket,
- # LocalFilePath=localfile1, # 本地文件的路径
- # Key=put_key2, # 上传到桶之后的文件名
- # # PartSize=10,
- # # MAXThread=10,
- # # EnableMD5=False
- # )
- # print(response)
- # print("http://{0}.cos.{1}.myqcloud.com/{2}".format(public_bucket, region, put_key2))
- # get_object list_objects
- 'http://zxhx-1302712961.cos.ap-shanghai.myqcloud.com/imgpaper/lqy_upload/img_hebei_4.png'
- obj = client.list_objects(
- Bucket=configs.public_bucket,
- # Key=put_key2
- # Prefix='zyk/uploadfiles/wording/5fc64a0a4994183dda7e74b9',
- Prefix='imgpaper/lqy_upload',
- # Marker=Marker,
- )
- objects = {
- "Quiet": "true",
- "Object": [item["Key"] for item in obj['Contents']]
- }
- print(obj['Contents'])
- # print(response['ETag']) # 可以与Web资源关联的记号
- #### 字节流简单上传
- # response = client.put_object(
- # Bucket=bucket,
- # Body=b'bytes',
- # Key='picture.jpg',
- # EnableMD5=False
- # )
- # print(response['ETag'])
|