• 售前

  • 售后

热门帖子
入门百科

python 阿里云oss实现直传签名与回调验证的示例方法

[复制链接]
123457549 显示全部楼层 发表于 2021-10-26 13:57:31 |阅读模式 打印 上一主题 下一主题
签名
  1. import base64
  2. import json
  3. import time
  4. from datetime import datetime
  5. import hmac
  6. from hashlib import sha1
  7. access_key_id = ''
  8. # 请填写您的AccessKeySecret。
  9. access_key_secret = ''
  10. # host的格式为 bucketname.endpoint ,请替换为您的真实信息。
  11. host = ''
  12. # callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。
  13. callback_url = ""
  14. # 用户上传文件时指定的前缀。
  15. upload_dir = 'user-dir-prefix/'
  16. expire_time = 1200
  17. expire_syncpoint = int(time.time() + expire_time)
  18. policy_dict = {
  19.   'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z',
  20.   'conditions': [
  21.     {"bucket": "test-paige"},
  22.     ['starts-with', '$key', 'user/test/']
  23.   ]
  24. }
  25. policy = json.dumps(policy_dict).strip()
  26. policy_encode = base64.b64encode(policy.encode())
  27. signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest())
  28. callback_dict = {
  29.   'callbackUrl': callback_url,
  30.   'callbackBody': 'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${'
  31.           'imageInfo.width}',
  32.   'callbackBodyType': 'application/json'
  33. }
  34. callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()
  35. var = {
  36.   'accessid': access_key_id,
  37.   'host': host,
  38.   'policy': policy_encode.decode(),
  39.   'signature': signature.decode().strip(),
  40.   'expire': expire_syncpoint,
  41.   'callback': callback
  42. }
复制代码
回调验签
  1. import asyncio
  2. import base64
  3. import time
  4. import aiomysql
  5. import rsa
  6. from aiohttp import web, ClientSession
  7. from urllib import parse
  8. import uuid
  9. def success(msg='', data=None):
  10.   if data is None:
  11.     data = {}
  12.   dict_data = {
  13.     'code': 1,
  14.     'msg': msg,
  15.     'data': data
  16.   }
  17.   return web.json_response(dict_data)
  18. def failed(msg='', data=None):
  19.   if data is None:
  20.     data = {}
  21.   dict_data = {
  22.     'code': 0,
  23.     'msg': msg,
  24.     'data': data
  25.   }
  26.   return web.json_response(dict_data)
  27. async def handle(request):
  28.   """
  29.   获取连接池
  30.   :param web.BaseRequest request:
  31.   :return:
  32.   """
  33.   authorization_base64 = request.headers['authorization']
  34.   x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url']
  35.   pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode())
  36.   authorization = base64.b64decode(authorization_base64.encode())
  37.   path = request.path
  38.   async with ClientSession() as session:
  39.     async with session.get(pub_key_url.decode()) as resp:
  40.       pub_key_body = await resp.text()
  41.       pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode())
  42.       body = await request.content.read()
  43.       auth_str = parse.unquote(path) + '\n' + body.decode()
  44.       parse_url = parse.parse_qs(body.decode())
  45.       print(parse_url)
  46.       try:
  47.         rsa.verify(auth_str.encode(), authorization, pubkey)
  48.         pool = request.app['mysql_pool']
  49.         async with pool.acquire() as conn:
  50.           async with conn.cursor() as cur:
  51.             id = str(uuid.uuid4())
  52.             url = parse_url['filename'][0]
  53.             mime = parse_url['mimeType'][0]
  54.             disk = 'oss'
  55.             time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime())
  56.             sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)"
  57.             await cur.execute(sql, (id, url, mime, disk, time_at, time_at))
  58.             await conn.commit()
  59.         dict_data = {
  60.           'id': id,
  61.           'url': url,
  62.           'cdn_url': 'https://cdn.***.net' + '/' + url,
  63.           'mime': mime,
  64.           'disk': disk,
  65.           'created_at': time_at,
  66.           'updated_at': time_at,
  67.         }
  68.         return success(data=dict_data)
  69.       except rsa.pkcs1.VerificationError:
  70.         return failed(msg='验证错误')
  71. async def init(loop):
  72.   # 创建连接池
  73.   mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
  74.                       user='', password='',
  75.                       db='', loop=loop)
  76.   async def on_shutdown(application):
  77.     """
  78.     接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.
  79.     :param web.Application application:
  80.     :return:
  81.     """
  82.     application['mysql_pool'].close()
  83.     # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错
  84.     await application['mysql_pool'].wait_closed()
  85.   application = web.Application()
  86.   application.on_shutdown.append(on_shutdown)
  87.   # 把连接池放到 application 实例中
  88.   application['mysql_pool'] = mysql_pool
  89.   application.add_routes([web.get('/', handle), web.post('/oss', handle)])
  90.   return application
  91. if __name__ == '__main__':
  92.   loop = asyncio.get_event_loop()
  93.   application = loop.run_until_complete(init(loop))
  94.   web.run_app(application, host='127.0.0.1')
  95.   loop.close()
复制代码
到此这篇关于python 阿里云oss实现直传签名与回调验证的文章就先容到这了,更多相干python 直传签名与回调验证内容请搜刮草根技术分享以前的文章或继续欣赏下面的相干文章希望各人以后多多支持草根技术分享!

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作