日常工作部署 Web App 经常需要在 AWS 控制台上操作 S3 (存储桶)和 CloudFront(CDN),为了提高效率,我们需要通过 AWS 提供的 API 来实现自动部署。下面分别就 python 和 js 介绍一下对应的 API。
- JavaScript:aws-sdk-js,分为 v2 和 v3 两个版本,推荐用 v3 版本,支持 Promise 和 await / async。官方文档👉🏻AWS SDK for JavaScript v3 和 AWS SDK for JavaScript v2。
- Python:boto3,官方文档👉🏻Boto3 documentation。
接下来,我们就可以基于Jenkins Pipeline 使用 boto3 来实现部署流水线,使用 aws-sdk-js 实现基础看板。
- 封装 s3-client:
import asyncio import functools import os import re import traceback from typing import Optional from boto3 import client from log import getLogger logger = getLogger(__name__) class S3Client: """ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html """ def __init__(self, *args, **kwargs): self.client = client('s3', *args, **kwargs) def upload_file(self, file_path: str, bucket: str, object_name: str, extra_args: dict = None): if not os.path.isfile(file_path): logger.error(f'{file_path} is not a file !') return if not bucket or not object_name: logger.error(f'bucket or object name is invalid !') return try: file_name = os.path.split(file_path)[-1] extra_args = extra_args if extra_args else dict() # 对常见文件添加 ContentType 处理,默认为 application/octet-stream # https://www.iana.org/assignments/media-types/media-types.xhtml postfix_type = { '.js': 'application/javascript', '.json': 'application/json', '.xml': 'application/xml', '.otf': 'font/otf', '.ttf': 'font/ttf', '.woff': 'font/woff', '.woff2': 'font/woff2', '.gif': 'image/gif', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.css': 'text/css', '.html': 'text/html', '.svg': 'text/xml', } for k, v in postfix_type.items(): if file_name.endswith(k): extra_args = dict({'ContentType': v}, **extra_args) break logger.info(f'from: {file_path}, to: {bucket}/{object_name}, extra args: {extra_args}') self.client.upload_file(file_path, bucket, object_name, ExtraArgs=extra_args) except Exception as e: logger.error(e) traceback.print_exc() async def upload_file_async(self, file_path: str, bucket: str, object_name: str, extra_args: dict = None) -> None: return await asyncio.get_event_loop().run_in_executor( None, functools.partial(self.upload_file, file_path, bucket, object_name, extra_args) ) def upload_dir_async(self, dir_name: str, bucket: str, object_name: str, skip_dir_name: bool = True, extra_args: dict = None) -> Optional[list]: if not os.path.isdir(dir_name): logger.error(f'{dir_name} is not a dir !') return if not bucket or not object_name: logger.error(f'bucket or object name is invalid !') return extra_args = extra_args if extra_args else dict() path = [dir_name] upload_files = list() logger.info(f'from: {dir_name}, to: {bucket}/{object_name}, extra_args: {extra_args}') def dfs(): nonlocal extra_args, validation_urls path_ = os.path.join(*path) if os.path.isdir(path_): for item in os.listdir(path_): path.append(item) dfs() path.pop() else: if skip_dir_name: object_ = '/'.join([object_name] + path[1:]) else: object_ = '/'.join([object_name] + path[0].split('/')[-1:] + path[1:]) # 对首页html添加版本号处理 # if re.match(r'index.*\.html', os.path.split(path_)[-1]): if os.path.split(path_)[-1].endswith('.html'): if extra_args.get('Metadata', dict()).get('ver'): extra_args = {'Metadata': {'ver': extra_args['Metadata']['ver']}} upload_files.append((path_, bucket, object_, extra_args)) return upload_files.append((path_, bucket, object_)) return logger.info(f'start upload dir') dfs() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = [self.upload_file_async(*_) for _ in upload_files] results, pending = loop.run_until_complete(asyncio.wait(tasks)) loop.close() logger.info(f'end upload dir')
- 封装 cloudfront-client:
import time import traceback from typing import Optional, List from boto3 import client from log import getLogger logger = getLogger(__name__) class CfClient: """ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudfront.html """ def __init__(self, *args, **kwargs): self.client = client('cloudfront', *args, **kwargs) def create_invalidation(self, dist_id: str, path: str = '/*') -> Optional[str]: if not dist_id: logger.error('dist id is invalid !') return try: id_ = self.client.create_invalidation( DistributionId=dist_id, InvalidationBatch={ 'Paths': { 'Quantity': 1, 'Items': [ path, ] }, 'CallerReference': str(int(time.time())) } )['Invalidation']['Id'] logger.info(f'invalidation on {dist_id} is created: {id_}') return id_ except Exception as e: logger.error(f'failed to create invalidation on {dist_id}:{e}') traceback.print_exc() def get_invalidation(self, dist_id: str, inval_id: str) -> Optional[str]: if not dist_id or not inval_id: logger.error('dist id or inval id is invalid !') return try: status = self.client.get_invalidation( DistributionId=dist_id, Id=inval_id )['Invalidation']['Status'] logger.info(f'invalidation {inval_id} status is {status}') return status except Exception as e: logger.error(f'failed to get invalidation {inval_id} status:{e}') traceback.print_exc() def get_aliases(self, dist_id: str) -> Optional[List]: if not dist_id: logger.error('dist id is invalid !') return try: return self.client.get_distribution_config(Id=dist_id)['DistributionConfig']['Aliases']['Items'] except Exception as e: logger.error(f'failed to get aliases: {e}') traceback.print_exc()
