🐥
Honda
project/
├── aws_common.py # 文件1:访问 AWS (S3 / PostgreSQL / DynamoDB) 的共通函数
├── test_aws_common.py # 文件2:模拟器 + 测试驱动(不访问真实AWS)
├── app.py # 文件3:后端 Flask API 示例,调用文件1的共通函数
├── frontend.js # 文件4:前端,调用 Flask API
└── index.html # 简单网页
# aws_common.py
import boto3
import psycopg2
import json
import os
class AWSCommon:
"""统一访问 AWS 资源的共通函数类"""
def __init__(self):
self.s3 = boto3.client('s3')
self.dynamodb = boto3.resource('dynamodb')
self.db_conn = psycopg2.connect(
host=os.getenv('PG_HOST', 'localhost'),
port=os.getenv('PG_PORT', '5432'),
user=os.getenv('PG_USER', 'postgres'),
password=os.getenv('PG_PASSWORD', 'postgres'),
dbname=os.getenv('PG_DB', 'testdb')
)
def read_s3_file(self, bucket, key):
"""从 S3 读取文件"""
obj = self.s3.get_object(Bucket=bucket, Key=key)
return obj['Body'].read().decode('utf-8')
def run_sql(self, query):
"""执行 PostgreSQL SQL"""
with self.db_conn.cursor() as cur:
cur.execute(query)
return cur.fetchall()
def get_tree_data(self, table_name, key_name, key_value):
"""从 DynamoDB 获取树状数据"""
table = self.dynamodb.Table(table_name)
response = table.get_item(Key={key_name: key_value})
return response.get('Item', {})
# test_aws_common.py
import unittest
from unittest.mock import patch, MagicMock
from aws_common import AWSCommon
class TestAWSCommon(unittest.TestCase):
@patch('boto3.client')
@patch('boto3.resource')
@patch('psycopg2.connect')
def test_all(self, mock_connect, mock_resource, mock_client):
# --- 模拟 S3 ---
s3_mock = MagicMock()
s3_mock.get_object.return_value = {'Body': MagicMock(read=lambda: b'Hello from S3')}
mock_client.return_value = s3_mock
# --- 模拟 PostgreSQL ---
conn_mock = MagicMock()
cursor_mock = MagicMock()
cursor_mock.fetchall.return_value = [('row1',), ('row2',)]
conn_mock.cursor.return_value.__enter__.return_value = cursor_mock
mock_connect.return_value = conn_mock
# --- 模拟 DynamoDB ---
table_mock = MagicMock()
table_mock.get_item.return_value = {'Item': {'id': '123', 'name': 'Root Node'}}
resource_mock = MagicMock()
resource_mock.Table.return_value = table_mock
mock_resource.return_value = resource_mock
# --- 执行被测方法 ---
aws = AWSCommon()
print('S3:', aws.read_s3_file('test-bucket', 'a.txt'))
print('SQL:', aws.run_sql('SELECT * FROM table'))
print('TREE:', aws.get_tree_data('tree_table', 'id', '123'))
# --- 验证 ---
self.assertEqual(aws.read_s3_file('test-bucket', 'a.txt'), 'Hello from S3')
self.assertEqual(aws.run_sql('SELECT * FROM table'), [('row1',), ('row2',)])
self.assertEqual(aws.get_tree_data('tree_table', 'id', '123')['name'], 'Root Node')
if __name__ == '__main__':
unittest.main()
# app.py
from flask import Flask, jsonify
from aws_common import AWSCommon
app = Flask(__name__)
aws = AWSCommon()
@app.route('/api/s3')
def get_s3():
data = aws.read_s3_file('my-bucket', 'data.txt')
return jsonify({'data': data})
@app.route('/api/sql')
def get_sql():
result = aws.run_sql('SELECT * FROM my_table')
return jsonify({'rows': result})
@app.route('/api/tree')
def get_tree():
tree = aws.get_tree_data('tree_table', 'id', 'root')
return jsonify(tree)
if __name__ == '__main__':
app.run(debug=True, port=5000)
// frontend.js
async function callAPI(endpoint) {
const res = await fetch(endpoint);
const data = await res.json();
document.getElementById('output').textContent = JSON.stringify(data, null, 2);
}
document.getElementById('btnS3').onclick = () => callAPI('/api/s3');
document.getElementById('btnSQL').onclick = () => callAPI('/api/sql');
document.getElementById('btnTree').onclick = () => callAPI('/api/tree');
index.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>AWS Demo</title>
</head>
<body>
<h3>测试 AWS 共通函数</h3>
<button id="btnS3">读取 S3</button>
<button id="btnSQL">执行 SQL</button>
<button id="btnTree">获取树状数据</button>
<pre id="output"></pre>
<script src="frontend.js"></script>
</body>
</html>
Discussion