Día 62 - Integra servicios simulados con LocalStack
Integra servicios simulados con LocalStack
🔗 ¡El día de la integración total!
¡Hola Roxs! Los últimos 3 días aprendimos S3, Lambda y DynamoDB por separado. Hoy los conectamos todos para crear arquitecturas serverless completas.
¿Listos para ser arquitectos de la nube? 🏗️✨
🧩 Arquitecturas serverless reales
🎯 ¿Qué vamos a construir hoy?
Vamos a crear 4 arquitecturas completas que usan múltiples servicios integrados:
- 📸 Procesador de imágenes: S3 → Lambda → DynamoDB
- 📊 Sistema de logs: Lambda → DynamoDB → S3
- 🔔 Sistema de notificaciones: SQS → Lambda → SNS → DynamoDB
- 🌐 API completa: API Gateway → Lambda → DynamoDB → S3
🌟 ¿Por qué esto es importante?
En el mundo real, los servicios AWS nunca trabajan solos. Las aplicaciones modernas integran múltiples servicios para crear soluciones robustas y escalables.
🏗️ Arquitectura 1: Procesador de imágenes
🗺️ Flujo de la arquitectura:
Usuario sube imagen → S3 → Trigger → Lambda → Procesa → DynamoDB (metadata) → S3 (thumbnail)
🚀 Paso 1: Configurar buckets S3
# Crear buckets especializados
awslocal s3 mb s3://roxs-images-original
awslocal s3 mb s3://roxs-images-processed
awslocal s3 mb s3://roxs-images-thumbnails
# Verificar creación
awslocal s3 ls
🗃️ Paso 2: Crear tabla DynamoDB para metadata
# Tabla para guardar información de procesamiento de imágenes
awslocal dynamodb create-table \
--table-name RoxsImageMetadata \
--attribute-definitions \
AttributeName=ImageId,AttributeType=S \
AttributeName=UploadTimestamp,AttributeType=N \
--key-schema \
AttributeName=ImageId,KeyType=HASH \
AttributeName=UploadTimestamp,KeyType=RANGE \
--provisioned-throughput \
ReadCapacityUnits=5,WriteCapacityUnits=5
# Verificar tabla
awslocal dynamodb describe-table --table-name RoxsImageMetadata
⚡ Paso 3: Crear función Lambda procesadora
# image_processor.py
import json
import boto3
import uuid
from datetime import datetime
import base64
def lambda_handler(event, context):
"""
Procesador de imágenes: S3 → Lambda → DynamoDB
"""
print(f"Event recibido: {json.dumps(event)}")
# Clientes AWS para LocalStack
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
try:
# Extraer información del evento
source_bucket = event.get('bucket', 'roxs-images-original')
image_key = event.get('key', 'unknown.jpg')
# Generar ID único para la imagen
image_id = str(uuid.uuid4())
timestamp = int(datetime.now().timestamp())
print(f"Procesando imagen: {source_bucket}/{image_key}")
# 1. Obtener información del archivo original
head_response = s3_client.head_object(Bucket=source_bucket, Key=image_key)
file_size = head_response['ContentLength']
content_type = head_response.get('ContentType', 'unknown')
last_modified = head_response['LastModified'].isoformat()
# 2. Descargar imagen original
get_response = s3_client.get_object(Bucket=source_bucket, Key=image_key)
image_content = get_response['Body'].read()
# 3. "Procesar" imagen (simulado - en real usarías PIL/Pillow)
# Para este demo, creamos una versión "procesada" y un "thumbnail"
processed_content = image_content # En real: resize, filters, etc.
thumbnail_content = image_content[:len(image_content)//2] # Simular thumbnail más pequeño
# 4. Subir imagen procesada
processed_key = f"processed/{image_id}-{image_key}"
s3_client.put_object(
Bucket='roxs-images-processed',
Key=processed_key,
Body=processed_content,
ContentType=content_type,
Metadata={
'original-bucket': source_bucket,
'original-key': image_key,
'processed-by': 'roxs-image-processor',
'processed-at': datetime.now().isoformat()
}
)
# 5. Subir thumbnail
thumbnail_key = f"thumbnails/{image_id}-thumb-{image_key}"
s3_client.put_object(
Bucket='roxs-images-thumbnails',
Key=thumbnail_key,
Body=thumbnail_content,
ContentType=content_type,
Metadata={
'original-bucket': source_bucket,
'original-key': image_key,
'thumbnail-for': image_id
}
)
# 6. Guardar metadata en DynamoDB
dynamodb.put_item(
TableName='RoxsImageMetadata',
Item={
'ImageId': {'S': image_id},
'UploadTimestamp': {'N': str(timestamp)},
'OriginalBucket': {'S': source_bucket},
'OriginalKey': {'S': image_key},
'OriginalSize': {'N': str(file_size)},
'ContentType': {'S': content_type},
'ProcessedKey': {'S': processed_key},
'ThumbnailKey': {'S': thumbnail_key},
'ProcessedAt': {'S': datetime.now().isoformat()},
'Status': {'S': 'completed'},
'ProcessedBy': {'S': 'lambda'},
'LastModified': {'S': last_modified}
}
)
# 7. Respuesta de éxito
response = {
'statusCode': 200,
'body': json.dumps({
'message': 'Imagen procesada exitosamente',
'image_id': image_id,
'original': f"{source_bucket}/{image_key}",
'processed': f"roxs-images-processed/{processed_key}",
'thumbnail': f"roxs-images-thumbnails/{thumbnail_key}",
'size_bytes': file_size,
'processing_time': 'instant'
})
}
print(f"Procesamiento exitoso: {image_id}")
return response
except Exception as e:
error_msg = f"Error procesando imagen: {str(e)}"
print(error_msg)
# Guardar error en DynamoDB
try:
dynamodb.put_item(
TableName='RoxsImageMetadata',
Item={
'ImageId': {'S': str(uuid.uuid4())},
'UploadTimestamp': {'N': str(int(datetime.now().timestamp()))},
'OriginalBucket': {'S': source_bucket},
'OriginalKey': {'S': image_key},
'Status': {'S': 'error'},
'ErrorMessage': {'S': str(e)},
'ProcessedAt': {'S': datetime.now().isoformat()}
}
)
except:
pass # Si no podemos guardar el error, continuamos
return {
'statusCode': 500,
'body': json.dumps({'error': error_msg})
}
🚀 Paso 4: Desplegar la función
# Empaquetar función
zip image-processor.zip image_processor.py
# Crear función Lambda
awslocal lambda create-function \
--function-name roxs-image-processor \
--runtime python3.9 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler image_processor.lambda_handler \
--zip-file fileb://image-processor.zip \
--timeout 60 \
--description "Procesador de imágenes: S3 → Lambda → DynamoDB"
# Verificar función
awslocal lambda list-functions --query 'Functions[?FunctionName==`roxs-image-processor`]'
🧪 Paso 5: Probar la arquitectura
# 1. Crear imagen de prueba
echo "Esta es una imagen de prueba en formato texto" > test-image.jpg
echo "Otra imagen para procesar" > vacation-photo.png
# 2. Subir imágenes a S3
awslocal s3 cp test-image.jpg s3://roxs-images-original/
awslocal s3 cp vacation-photo.png s3://roxs-images-original/photos/
# 3. Procesar primera imagen
awslocal lambda invoke \
--function-name roxs-image-processor \
--payload '{
"bucket": "roxs-images-original",
"key": "test-image.jpg"
}' \
image-result1.json
cat image-result1.json
# 4. Procesar segunda imagen
awslocal lambda invoke \
--function-name roxs-image-processor \
--payload '{
"bucket": "roxs-images-original",
"key": "photos/vacation-photo.png"
}' \
image-result2.json
cat image-result2.json
# 5. Verificar que se crearon los archivos procesados
awslocal s3 ls s3://roxs-images-processed/ --recursive
awslocal s3 ls s3://roxs-images-thumbnails/ --recursive
# 6. Verificar metadata en DynamoDB
awslocal dynamodb scan --table-name RoxsImageMetadata \
--projection-expression "ImageId,OriginalKey,Status,ProcessedAt"
📊 Arquitectura 2: Sistema de logs distribuido
🗺️ Flujo de la arquitectura:
Aplicación → Lambda (log processor) → DynamoDB (índices) → S3 (archivos raw)
🗃️ Paso 1: Crear tabla de índices de logs
# Tabla para indexar logs por timestamp y nivel
awslocal dynamodb create-table \
--table-name RoxsLogIndex \
--attribute-definitions \
AttributeName=LogLevel,AttributeType=S \
AttributeName=Timestamp,AttributeType=N \
--key-schema \
AttributeName=LogLevel,KeyType=HASH \
AttributeName=Timestamp,KeyType=RANGE \
--provisioned-throughput \
ReadCapacityUnits=5,WriteCapacityUnits=5
# Bucket para logs raw
awslocal s3 mb s3://roxs-logs-storage
⚡ Paso 2: Función de procesamiento de logs
# log_processor.py
import json
import boto3
from datetime import datetime
import gzip
import uuid
def lambda_handler(event, context):
"""
Procesador de logs: recibe logs → DynamoDB (índice) → S3 (storage)
"""
print(f"Procesando logs: {json.dumps(event)}")
# Clientes AWS
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
try:
# Procesar cada entrada de log
logs = event.get('logs', [])
if not logs:
logs = [event] # Si se envía un log individual
processed_logs = []
log_batch_id = str(uuid.uuid4())
for log_entry in logs:
timestamp = int(datetime.now().timestamp() * 1000) # milliseconds
log_id = str(uuid.uuid4())
# Extraer información del log
level = log_entry.get('level', 'INFO').upper()
message = log_entry.get('message', '')
service = log_entry.get('service', 'unknown')
user_id = log_entry.get('user_id', 'anonymous')
# Preparar entrada procesada
processed_log = {
'log_id': log_id,
'timestamp': timestamp,
'level': level,
'message': message,
'service': service,
'user_id': user_id,
'batch_id': log_batch_id,
'processed_at': datetime.now().isoformat()
}
processed_logs.append(processed_log)
# Indexar en DynamoDB (para búsquedas rápidas)
dynamodb.put_item(
TableName='RoxsLogIndex',
Item={
'LogLevel': {'S': level},
'Timestamp': {'N': str(timestamp)},
'LogId': {'S': log_id},
'Message': {'S': message[:100]}, # Truncar mensaje para índice
'Service': {'S': service},
'UserId': {'S': user_id},
'BatchId': {'S': log_batch_id}
}
)
# Guardar logs completos en S3 (comprimidos)
log_date = datetime.now().strftime('%Y/%m/%d')
log_hour = datetime.now().strftime('%H')
s3_key = f"logs/{log_date}/{log_hour}/{log_batch_id}.json.gz"
# Comprimir logs
logs_json = json.dumps(processed_logs, indent=2)
compressed_logs = gzip.compress(logs_json.encode('utf-8'))
# Subir a S3
s3_client.put_object(
Bucket='roxs-logs-storage',
Key=s3_key,
Body=compressed_logs,
ContentType='application/json',
ContentEncoding='gzip',
Metadata={
'log-count': str(len(processed_logs)),
'batch-id': log_batch_id,
'processed-at': datetime.now().isoformat()
}
)
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Procesados {len(processed_logs)} logs exitosamente',
'batch_id': log_batch_id,
'storage_location': f"s3://roxs-logs-storage/{s3_key}",
'logs_indexed': len(processed_logs)
})
}
except Exception as e:
print(f"Error procesando logs: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
🚀 Paso 3: Desplegar sistema de logs
# Empaquetar función
zip log-processor.zip log_processor.py
# Crear función
awslocal lambda create-function \
--function-name roxs-log-processor \
--runtime python3.9 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler log_processor.lambda_handler \
--zip-file fileb://log-processor.zip \
--timeout 30 \
--description "Sistema de logs: Lambda → DynamoDB → S3"
🧪 Paso 4: Probar sistema de logs
# Simular logs de aplicación
awslocal lambda invoke \
--function-name roxs-log-processor \
--payload '{
"logs": [
{
"level": "ERROR",
"message": "Database connection failed",
"service": "user-api",
"user_id": "user123"
},
{
"level": "INFO",
"message": "User login successful",
"service": "auth-service",
"user_id": "user123"
},
{
"level": "WARNING",
"message": "High memory usage detected",
"service": "monitoring",
"user_id": "system"
}
]
}' \
logs-result.json
cat logs-result.json
# Verificar índice en DynamoDB
awslocal dynamodb scan --table-name RoxsLogIndex \
--projection-expression "LogLevel,#ts,LogId,Message,Service" \
--expression-attribute-names '{"#ts": "Timestamp"}'
# Verificar archivo en S3
awslocal s3 ls s3://roxs-logs-storage/ --recursive
🔔 Arquitectura 3: Sistema de notificaciones
🗺️ Flujo de la arquitectura:
Evento → SQS (cola) → Lambda (processor) → SNS (notificación) → DynamoDB (historial)
📬 Paso 1: Crear cola SQS y topic SNS
# Crear cola SQS para eventos
awslocal sqs create-queue --queue-name roxs-notifications-queue
# Crear topic SNS para notificaciones
awslocal sns create-topic --name roxs-notifications
# Obtener URLs/ARNs
queue_url=$(awslocal sqs get-queue-url --queue-name roxs-notifications-queue --query 'QueueUrl' --output text)
topic_arn=$(awslocal sns list-topics --query 'Topics[0].TopicArn' --output text)
echo "Queue URL: $queue_url"
echo "Topic ARN: $topic_arn"
🗃️ Paso 2: Crear tabla de historial de notificaciones
# Tabla para historial de notificaciones
awslocal dynamodb create-table \
--table-name RoxsNotificationHistory \
--attribute-definitions \
AttributeName=NotificationId,AttributeType=S \
AttributeName=CreatedAt,AttributeType=N \
--key-schema \
AttributeName=NotificationId,KeyType=HASH \
AttributeName=CreatedAt,KeyType=RANGE \
--provisioned-throughput \
ReadCapacityUnits=5,WriteCapacityUnits=5
⚡ Paso 3: Función procesadora de notificaciones
# notification_processor.py
import json
import boto3
from datetime import datetime
import uuid
def lambda_handler(event, context):
"""
Procesador de notificaciones: SQS → Lambda → SNS → DynamoDB
"""
print(f"Event: {json.dumps(event)}")
# Clientes AWS
sqs_client = boto3.client(
'sqs',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
sns_client = boto3.client(
'sns',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
try:
notification_id = str(uuid.uuid4())
timestamp = int(datetime.now().timestamp() * 1000)
# Extraer datos del evento
notification_type = event.get('type', 'general')
recipient = event.get('recipient', 'admin@roxs.com')
title = event.get('title', 'Notificación de Sistema')
message = event.get('message', 'Mensaje de notificación')
priority = event.get('priority', 'normal')
# Crear mensaje para SNS
sns_message = {
'notification_id': notification_id,
'type': notification_type,
'title': title,
'message': message,
'recipient': recipient,
'priority': priority,
'created_at': datetime.now().isoformat()
}
# Publicar en SNS
sns_response = sns_client.publish(
TopicArn='arn:aws:sns:us-east-1:000000000000:roxs-notifications',
Message=json.dumps(sns_message),
Subject=f"[{priority.upper()}] {title}",
MessageAttributes={
'notification_type': {
'DataType': 'String',
'StringValue': notification_type
},
'priority': {
'DataType': 'String',
'StringValue': priority
}
}
)
# Guardar en historial (DynamoDB)
dynamodb.put_item(
TableName='RoxsNotificationHistory',
Item={
'NotificationId': {'S': notification_id},
'CreatedAt': {'N': str(timestamp)},
'Type': {'S': notification_type},
'Recipient': {'S': recipient},
'Title': {'S': title},
'Message': {'S': message},
'Priority': {'S': priority},
'SNSMessageId': {'S': sns_response.get('MessageId', 'unknown')},
'Status': {'S': 'sent'},
'ProcessedAt': {'S': datetime.now().isoformat()}
}
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Notificación procesada exitosamente',
'notification_id': notification_id,
'sns_message_id': sns_response.get('MessageId'),
'recipient': recipient,
'type': notification_type
})
}
except Exception as e:
print(f"Error procesando notificación: {str(e)}")
# Guardar error en historial
try:
dynamodb.put_item(
TableName='RoxsNotificationHistory',
Item={
'NotificationId': {'S': str(uuid.uuid4())},
'CreatedAt': {'N': str(int(datetime.now().timestamp() * 1000))},
'Status': {'S': 'error'},
'ErrorMessage': {'S': str(e)},
'ProcessedAt': {'S': datetime.now().isoformat()}
}
)
except:
pass
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
🚀 Paso 4: Desplegar sistema de notificaciones
# Empaquetar función
zip notification-processor.zip notification_processor.py
# Crear función
awslocal lambda create-function \
--function-name roxs-notification-processor \
--runtime python3.9 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler notification_processor.lambda_handler \
--zip-file fileb://notification-processor.zip \
--timeout 30 \
--description "Sistema de notificaciones: SQS → Lambda → SNS → DynamoDB"
🧪 Paso 5: Probar sistema de notificaciones
# Suscribirse al topic SNS (para ver las notificaciones)
awslocal sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:roxs-notifications \
--protocol email \
--notification-endpoint admin@roxs.com
# Enviar notificación de prueba
awslocal lambda invoke \
--function-name roxs-notification-processor \
--payload '{
"type": "alert",
"recipient": "admin@roxs.com",
"title": "Sistema de monitoreo",
"message": "CPU usage above 90% on server-01",
"priority": "high"
}' \
notification-result.json
cat notification-result.json
# Enviar notificación informativa
awslocal lambda invoke \
--function-name roxs-notification-processor \
--payload '{
"type": "info",
"recipient": "team@roxs.com",
"title": "Deploy completado",
"message": "La versión 2.1.0 se desplegó exitosamente",
"priority": "normal"
}' \
notification-result2.json
# Verificar historial en DynamoDB
awslocal dynamodb scan --table-name RoxsNotificationHistory \
--projection-expression "NotificationId,Title,#status,Priority,ProcessedAt" \
--expression-attribute-names '{"#status": "Status"}'
# Ver mensajes en SNS (si hay subscriptores)
awslocal sns list-subscriptions
🌐 Arquitectura 4: API Gateway completa
🗺️ Flujo de la arquitectura:
Cliente HTTP → API Gateway → Lambda (router) → DynamoDB/S3 → Respuesta JSON
⚡ Función Lambda para API
# api_handler.py
import json
import boto3
from datetime import datetime
import uuid
def lambda_handler(event, context):
"""
API Handler: maneja todas las rutas de la API
"""
print(f"API Event: {json.dumps(event)}")
# Clientes AWS
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
try:
# Extraer información de la request
http_method = event.get('httpMethod', 'GET')
path = event.get('path', '/')
body = event.get('body', '{}')
if body:
try:
body_data = json.loads(body)
except:
body_data = {}
else:
body_data = {}
# Router básico
if path == '/health':
return api_response(200, {'status': 'healthy', 'timestamp': datetime.now().isoformat()})
elif path == '/images' and http_method == 'GET':
# Listar imágenes procesadas
response = dynamodb.scan(
TableName='RoxsImageMetadata',
ProjectionExpression='ImageId,OriginalKey,ProcessedAt,#status',
ExpressionAttributeNames={'#status': 'Status'}
)
images = []
for item in response.get('Items', []):
images.append({
'id': item.get('ImageId', {}).get('S'),
'filename': item.get('OriginalKey', {}).get('S'),
'processed_at': item.get('ProcessedAt', {}).get('S'),
'status': item.get('Status', {}).get('S', 'unknown')
})
return api_response(200, {
'images': images,
'count': len(images)
})
elif path == '/logs' and http_method == 'GET':
# Obtener logs recientes
response = dynamodb.scan(
TableName='RoxsLogIndex',
Limit=20
)
logs = []
for item in response.get('Items', []):
logs.append({
'level': item.get('LogLevel', {}).get('S'),
'timestamp': int(item.get('Timestamp', {}).get('N', '0')),
'message': item.get('Message', {}).get('S'),
'service': item.get('Service', {}).get('S')
})
# Ordenar por timestamp descendente
logs.sort(key=lambda x: x['timestamp'], reverse=True)
return api_response(200, {
'logs': logs[:10], # Top 10 más recientes
'total_scanned': len(logs)
})
elif path == '/notifications' and http_method == 'POST':
# Crear nueva notificación (integrar con sistema anterior)
title = body_data.get('title', 'Nueva notificación')
message = body_data.get('message', 'Mensaje desde API')
priority = body_data.get('priority', 'normal')
# Llamar al procesador de notificaciones
lambda_client = boto3.client(
'lambda',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
notification_payload = {
'type': 'api',
'title': title,
'message': message,
'priority': priority,
'recipient': 'api-user@roxs.com'
}
lambda_response = lambda_client.invoke(
FunctionName='roxs-notification-processor',
Payload=json.dumps(notification_payload)
)
result = json.loads(lambda_response['Payload'].read())
return api_response(200, {
'message': 'Notificación enviada exitosamente',
'notification_result': result
})
elif path.startswith('/files') and http_method == 'GET':
# Listar archivos en S3
bucket = 'roxs-images-original'
try:
response = s3_client.list_objects_v2(Bucket=bucket)
files = []
for obj in response.get('Contents', []):
files.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'url': f"http://localhost:4566/{bucket}/{obj['Key']}"
})
return api_response(200, {
'bucket': bucket,
'files': files,
'count': len(files)
})
except Exception as e:
return api_response(500, {'error': f'Error accessing S3: {str(e)}'})
elif path == '/stats' and http_method == 'GET':
# Estadísticas generales del sistema
stats = {}
# Contar imágenes
try:
img_response = dynamodb.scan(
TableName='RoxsImageMetadata',
Select='COUNT'
)
stats['total_images'] = img_response['Count']
except:
stats['total_images'] = 0
# Contar logs
try:
log_response = dynamodb.scan(
TableName='RoxsLogIndex',
Select='COUNT'
)
stats['total_logs'] = log_response['Count']
except:
stats['total_logs'] = 0
# Contar notificaciones
try:
notif_response = dynamodb.scan(
TableName='RoxsNotificationHistory',
Select='COUNT'
)
stats['total_notifications'] = notif_response['Count']
except:
stats['total_notifications'] = 0
# Contar archivos en S3
try:
s3_response = s3_client.list_objects_v2(Bucket='roxs-images-original')
stats['total_files'] = len(s3_response.get('Contents', []))
except:
stats['total_files'] = 0
stats['system_uptime'] = 'LocalStack simulation'
stats['generated_at'] = datetime.now().isoformat()
return api_response(200, stats)
else:
# Ruta no encontrada
return api_response(404, {
'error': 'Endpoint not found',
'path': path,
'method': http_method,
'available_endpoints': [
'GET /health',
'GET /images',
'GET /logs',
'POST /notifications',
'GET /files',
'GET /stats'
]
})
except Exception as e:
print(f"Error in API handler: {str(e)}")
return api_response(500, {'error': str(e)})
def api_response(status_code, body):
"""Helper para formatear respuestas de API"""
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS'
},
### **🚀 Paso 2: Desplegar API Lambda**
```bash
# Empaquetar función API
zip api-handler.zip api_handler.py
# Crear función Lambda para API
awslocal lambda create-function \
--function-name roxs-api-handler \
--runtime python3.9 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler api_handler.lambda_handler \
--zip-file fileb://api-handler.zip \
--timeout 30 \
--description "API Gateway handler: rutas completas de la aplicación"
🌐 Paso 3: Crear API Gateway
# Crear API REST
api_id=$(awslocal apigateway create-rest-api \
--name 'roxs-complete-api' \
--description 'API completa con múltiples servicios integrados' \
--query 'id' --output text)
echo "API ID: $api_id"
# Obtener resource root
root_id=$(awslocal apigateway get-resources \
--rest-api-id $api_id \
--query 'items[0].id' --output text)
echo "Root Resource ID: $root_id"
# Crear resource para capturar todas las rutas
resource_id=$(awslocal apigateway create-resource \
--rest-api-id $api_id \
--parent-id $root_id \
--path-part '{proxy+}' \
--query 'id' --output text)
echo "Proxy Resource ID: $resource_id"
# Crear método ANY para manejar todos los HTTP methods
awslocal apigateway put-method \
--rest-api-id $api_id \
--resource-id $resource_id \
--http-method ANY \
--authorization-type NONE
# Integrar con Lambda
awslocal apigateway put-integration \
--rest-api-id $api_id \
--resource-id $resource_id \
--http-method ANY \
--type AWS_PROXY \
--integration-http-method POST \
--uri arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:000000000000:function:roxs-api-handler/invocations
# También agregar método root
awslocal apigateway put-method \
--rest-api-id $api_id \
--resource-id $root_id \
--http-method ANY \
--authorization-type NONE
awslocal apigateway put-integration \
--rest-api-id $api_id \
--resource-id $root_id \
--http-method ANY \
--type AWS_PROXY \
--integration-http-method POST \
--uri arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:000000000000:function:roxs-api-handler/invocations
# Desplegar API
awslocal apigateway create-deployment \
--rest-api-id $api_id \
--stage-name prod
echo "API URL: http://localhost:4566/restapis/$api_id/prod/_user_request_"
🧪 Paso 4: Probar API completa
# Guardar API URL para facilidad
API_URL="http://localhost:4566/restapis/$api_id/prod/_user_request_"
# 1. Health check
curl "$API_URL/health"
# 2. Ver estadísticas del sistema
curl "$API_URL/stats" | jq
# 3. Listar imágenes procesadas
curl "$API_URL/images" | jq
# 4. Ver logs recientes
curl "$API_URL/logs" | jq
# 5. Listar archivos en S3
curl "$API_URL/files" | jq
# 6. Crear notificación via API
curl -X POST "$API_URL/notifications" \
-H "Content-Type: application/json" \
-d '{
"title": "Notificación desde API",
"message": "Esta notificación fue creada via REST API",
"priority": "high"
}' | jq
# 7. Verificar endpoint no existente
curl "$API_URL/nonexistent" | jq
🔄 Flujo de integración completo
🎯 Demostración de arquitectura end-to-end
#!/bin/bash
# complete-demo.sh
echo "🚀 Demo de arquitectura completa LocalStack"
echo "=========================================="
API_URL="http://localhost:4566/restapis/$api_id/prod/_user_request_"
echo "1. 📊 Estado inicial del sistema"
curl -s "$API_URL/stats" | jq
echo -e "\n2. 📸 Subiendo nueva imagen..."
echo "Nueva imagen para procesar: $(date)" > new-demo-image.jpg
awslocal s3 cp new-demo-image.jpg s3://roxs-images-original/
echo -e "\n3. ⚡ Procesando imagen con Lambda..."
awslocal lambda invoke \
--function-name roxs-image-processor \
--payload '{
"bucket": "roxs-images-original",
"key": "new-demo-image.jpg"
}' \
demo-process-result.json > /dev/null
echo "Resultado del procesamiento:"
cat demo-process-result.json | jq
echo -e "\n4. 📝 Generando logs de aplicación..."
awslocal lambda invoke \
--function-name roxs-log-processor \
--payload '{
"logs": [
{
"level": "INFO",
"message": "Image processing completed successfully",
"service": "image-processor",
"user_id": "demo-user"
},
{
"level": "DEBUG",
"message": "Thumbnail generated and stored",
"service": "image-processor",
"user_id": "demo-user"
}
]
}' \
demo-logs-result.json > /dev/null
echo "Logs procesados exitosamente"
echo -e "\n5. 🔔 Enviando notificación de completitud..."
curl -s -X POST "$API_URL/notifications" \
-H "Content-Type: application/json" \
-d '{
"title": "Procesamiento completado",
"message": "La imagen new-demo-image.jpg fue procesada exitosamente",
"priority": "normal"
}' | jq
echo -e "\n6. 📊 Estado final del sistema"
curl -s "$API_URL/stats" | jq
echo -e "\n7. 🔍 Verificando resultados..."
echo "Imágenes procesadas:"
curl -s "$API_URL/images" | jq '.count'
echo "Logs en el sistema:"
curl -s "$API_URL/logs" | jq '.total_scanned'
echo "Archivos en S3:"
awslocal s3 ls s3://roxs-images-original/ | wc -l
echo -e "\n✅ Demo completo - Todos los servicios integrados funcionando!"
📊 Monitoreo y observabilidad
📈 Dashboard de métricas
# dashboard.py
import json
import boto3
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""
Dashboard de métricas del sistema completo
"""
# Clientes AWS
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
lambda_client = boto3.client(
'lambda',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
try:
dashboard_data = {}
# Métricas de imágenes
try:
img_response = dynamodb.scan(TableName='RoxsImageMetadata')
images = img_response.get('Items', [])
dashboard_data['images'] = {
'total': len(images),
'successful': len([img for img in images if img.get('Status', {}).get('S') == 'completed']),
'failed': len([img for img in images if img.get('Status', {}).get('S') == 'error']),
'avg_size': sum(int(img.get('OriginalSize', {}).get('N', '0')) for img in images) / max(len(images), 1)
}
except:
dashboard_data['images'] = {'total': 0, 'successful': 0, 'failed': 0, 'avg_size': 0}
# Métricas de logs
try:
log_response = dynamodb.scan(TableName='RoxsLogIndex')
logs = log_response.get('Items', [])
log_levels = {}
for log in logs:
level = log.get('LogLevel', {}).get('S', 'UNKNOWN')
log_levels[level] = log_levels.get(level, 0) + 1
dashboard_data['logs'] = {
'total': len(logs),
'by_level': log_levels,
'recent_errors': len([log for log in logs if log.get('LogLevel', {}).get('S') == 'ERROR'])
}
except:
dashboard_data['logs'] = {'total': 0, 'by_level': {}, 'recent_errors': 0}
# Métricas de notificaciones
try:
notif_response = dynamodb.scan(TableName='RoxsNotificationHistory')
notifications = notif_response.get('Items', [])
priority_count = {}
for notif in notifications:
priority = notif.get('Priority', {}).get('S', 'normal')
priority_count[priority] = priority_count.get(priority, 0) + 1
dashboard_data['notifications'] = {
'total': len(notifications),
'by_priority': priority_count,
'successful': len([n for n in notifications if n.get('Status', {}).get('S') == 'sent'])
}
except:
dashboard_data['notifications'] = {'total': 0, 'by_priority': {}, 'successful': 0}
# Métricas de S3
try:
buckets = ['roxs-images-original', 'roxs-images-processed', 'roxs-images-thumbnails', 'roxs-logs-storage']
s3_metrics = {}
for bucket in buckets:
try:
response = s3_client.list_objects_v2(Bucket=bucket)
objects = response.get('Contents', [])
total_size = sum(obj['Size'] for obj in objects)
s3_metrics[bucket] = {
'objects': len(objects),
'total_size_bytes': total_size
}
except:
s3_metrics[bucket] = {'objects': 0, 'total_size_bytes': 0}
dashboard_data['storage'] = s3_metrics
except:
dashboard_data['storage'] = {}
# Métricas de Lambda
try:
functions_response = lambda_client.list_functions()
functions = functions_response.get('Functions', [])
dashboard_data['functions'] = {
'total': len(functions),
'names': [f['FunctionName'] for f in functions]
}
except:
dashboard_data['functions'] = {'total': 0, 'names': []}
# Metadata del dashboard
dashboard_data['metadata'] = {
'generated_at': datetime.now().isoformat(),
'system': 'LocalStack',
'version': '1.0.0'
}
return {
'statusCode': 200,
'body': json.dumps(dashboard_data, indent=2)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
🚀 Desplegar dashboard
# Empaquetar dashboard
zip dashboard.zip dashboard.py
# Crear función
awslocal lambda create-function \
--function-name roxs-dashboard \
--runtime python3.9 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler dashboard.lambda_handler \
--zip-file fileb://dashboard.zip \
--timeout 30 \
--description "Dashboard de métricas del sistema completo"
# Probar dashboard
awslocal lambda invoke \
--function-name roxs-dashboard \
dashboard-output.json
cat dashboard-output.json | jq
💡 Tarea del Día
📝 Parte 1: Arquitectura básica (30 minutos)
Implementar el procesador de imágenes completo:
-
Crear toda la infraestructura:
# Buckets
awslocal s3 mb s3://mi-images-original
awslocal s3 mb s3://mi-images-processed
# Tabla DynamoDB
awslocal dynamodb create-table \
--table-name MiImageMetadata \
--attribute-definitions AttributeName=ImageId,AttributeType=S \
--key-schema AttributeName=ImageId,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 -
Desplegar función procesadora (usar código del ejemplo)
-
Probar con tus propias imágenes:
echo "Mi primera imagen" > mi-foto.jpg
awslocal s3 cp mi-foto.jpg s3://mi-images-original/
# Procesar
awslocal lambda invoke \
--function-name roxs-image-processor \
--payload '{"bucket": "mi-images-original", "key": "mi-foto.jpg"}' \
mi-resultado.json
⚡ Parte 2: Sistema de logs (20 minutos)
Implementar tu propio sistema de logs:
-
Desplegar sistema de logs (usar código del ejemplo)
-
Generar logs de tu aplicación imaginaria:
awslocal lambda invoke \
--function-name roxs-log-processor \
--payload '{
"logs": [
{
"level": "INFO",
"message": "Usuario TU_NOMBRE inició sesión",
"service": "mi-auth-service",
"user_id": "TU_NOMBRE"
},
{
"level": "ERROR",
"message": "Falló conexión a base de datos",
"service": "mi-api",
"user_id": "system"
}
]
}' \
mis-logs.json
🌐 Parte 3: API básica (25 minutos)
Crear una versión simplificada de la API:
-
Crear función API simple:
# mi_api.py
import json
import boto3
def lambda_handler(event, context):
path = event.get('path', '/')
if path == '/health':
return {
'statusCode': 200,
'body': json.dumps({'status': 'ok', 'student': 'TU_NOMBRE'})
}
elif path == '/stats':
return {
'statusCode': 200,
'body': json.dumps({
'images_processed': 'check_dynamodb',
'logs_generated': 'check_dynamodb',
'student': 'TU_NOMBRE'
})
}
else:
return {
'statusCode': 404,
'body': json.dumps({'error': 'Not found'})
} -
Desplegar y probar
📸 Parte 4: Evidencia y análisis
Capturas a tomar:
- Lista de todas las funciones Lambda que creaste
- Lista de todas las tablas DynamoDB
- Lista de todos los buckets S3 con contenido
- Respuesta de tu API personal
Análisis a incluir:
- ¿Qué fue lo más desafiante de integrar múltiples servicios?
- ¿Cómo cambió tu comprensión de las arquitecturas serverless?
- ¿Qué patrones de integración descubriste?
🏆 Desafío Master: ¡Sistema completo personalizado!
Crear tu propia arquitectura end-to-end:
- Tema libre: Elige un dominio (e-commerce, blog, IoT, etc.)
- Mínimo 3 servicios: S3 + Lambda + DynamoDB
- API funcional: Al menos 3 endpoints
- Flujo completo: Desde input hasta output
- Monitoreo: Dashboard básico de métricas
Ejemplos de sistemas:
- Blog: Subir posts → Lambda procesa → DynamoDB guarda → API lee
- E-commerce: Subir productos → Lambda valida → DynamoDB catalogo → API busca
- IoT: Sensores → Lambda analiza → DynamoDB histórico → API dashboard
📤 Compartir en Discord
Mensaje a incluir:
- ¿Qué arquitectura implementaste completamente?
- ¿Cuál fue la integración más compleja?
- ¿Qué aprendiste sobre el diseño de sistemas distribuidos?
- ¿Lograste el desafío master? ¿Qué sistema creaste?
- ¿Cómo aplicarías esto en un proyecto real?
Screenshots esenciales:
- Dashboard de métricas (output del dashboard.py)
- Lista completa de recursos (Lambda, S3, DynamoDB)
- Respuesta de tu API personalizada
- (Bonus) Flujo completo de tu sistema personalizado
🔮 Lo que viene mañana...
Día 63: ¡Desafío final de la semana!
- Proyecto integrador completo
- Caso de uso empresarial real
- Documentación de arquitectura
- Deploy de aplicación roxs-voting-app
💎 Tips Pro de integración
🔗 Patrones de integración
# Event-driven: S3 → Lambda (trigger automático)
# Request-response: API Gateway → Lambda → DynamoDB
# Async processing: SQS → Lambda → SNS
# Data pipeline: S3 → Lambda → DynamoDB → S3
⚡ Optimización de rendimiento
# Reutilizar clientes AWS
# Crear clientes fuera del handler cuando sea posible
s3_client = boto3.client('s3', endpoint_url='http://localhost:4566')
def lambda_handler(event, context):
# Usar cliente reutilizado
pass
🔄 Manejo de errores
# Siempre implementar retry logic y error handling
try:
result = process_data()
except Exception as e:
# Log error
# Send to DLQ (Dead Letter Queue)
# Return appropriate error response
pass
📊 Monitoring
# Siempre incluir métricas en tus funciones
# CloudWatch metrics (en AWS real)
# Logs estructurados
# Health checks
¡Mañana el gran finale de la semana ! 🎯🚀