Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added aws_genai_chatbot_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
105 changes: 105 additions & 0 deletions genai_chatbot_diagram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from diagrams import Diagram, Cluster, Edge
from diagrams.aws.compute import Lambda
from diagrams.aws.network import APIGateway, CloudFront, Route53
from diagrams.aws.storage import S3
from diagrams.aws.database import Dynamodb
from diagrams.aws.ml import Sagemaker, Bedrock
from diagrams.aws.security import Cognito, SecretsManager
from diagrams.aws.integration import Eventbridge, SQS, SNS, StepFunctions
from diagrams.aws.analytics import KinesisDataAnalytics
from diagrams.onprem.client import Users

# Configuration for the diagram
graph_attr = {
"fontsize": "14",
"bgcolor": "white",
"pad": "0.5",
}

with Diagram(
"AWS GenAI LLM Chatbot Architecture",
filename="aws_genai_chatbot_architecture",
show=False,
direction="TB",
graph_attr=graph_attr
):

# Users
user = Users("Users")

with Cluster("Frontend Layer"):
dns = Route53("Route 53")
cdn = CloudFront("CloudFront")
webapp = S3("Web App\n(S3 Static Site)")

with Cluster("API Layer"):
apigw = APIGateway("API Gateway")
websocket_apigw = APIGateway("WebSocket API")

with Cluster("Authentication"):
auth = Cognito("Cognito\nUser Pool")

with Cluster("Application Logic"):
with Cluster("Lambda Functions"):
api_lambda = Lambda("API Handler")
websocket_lambda = Lambda("WebSocket\nHandler")
chat_lambda = Lambda("Chat Processing")
orchestrator = Lambda("Orchestrator")

with Cluster("AI/ML Services"):
bedrock = Bedrock("Amazon Bedrock\n(LLM Models)")
sagemaker = Sagemaker("SageMaker\n(Custom Models)")

with Cluster("Data Storage"):
dynamodb = Dynamodb("DynamoDB\n(Chat History)")
s3_data = S3("S3\n(Documents)")
secrets = SecretsManager("Secrets Manager")

with Cluster("Message Processing"):
queue = SQS("SQS Queue")
eventbus = Eventbridge("EventBridge")
step_func = StepFunctions("Step Functions\n(Workflow)")

with Cluster("Monitoring & Analytics"):
analytics = KinesisDataAnalytics("Kinesis\nAnalytics")
notifications = SNS("SNS\nNotifications")

# Flow connections
user >> Edge(label="HTTPS") >> dns >> cdn >> webapp

webapp >> Edge(label="REST API") >> apigw
webapp >> Edge(label="WebSocket") >> websocket_apigw

apigw >> auth
websocket_apigw >> auth

auth >> api_lambda
auth >> websocket_lambda

api_lambda >> chat_lambda
websocket_lambda >> chat_lambda

chat_lambda >> orchestrator
orchestrator >> queue
queue >> step_func

step_func >> bedrock
step_func >> sagemaker
step_func >> s3_data

chat_lambda >> dynamodb
orchestrator >> dynamodb

bedrock >> Edge(label="Response") >> orchestrator
sagemaker >> Edge(label="Response") >> orchestrator

orchestrator >> Edge(label="Store") >> dynamodb
s3_data >> Edge(label="RAG Context") >> orchestrator

secrets >> api_lambda
secrets >> chat_lambda

eventbus >> analytics
chat_lambda >> eventbus

analytics >> notifications
Binary file added recommendation_system_pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
133 changes: 133 additions & 0 deletions recommendation_system_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from diagrams import Diagram, Cluster, Edge
from diagrams.onprem.workflow import Airflow
from diagrams.onprem.analytics import Spark
from diagrams.onprem.database import PostgreSQL, MongoDB, Cassandra
from diagrams.onprem.inmemory import Redis
from diagrams.onprem.queue import Kafka
from diagrams.programming.framework import Flask
from diagrams.onprem.client import Users, Client
from diagrams.custom import Custom
from diagrams.generic.storage import Storage
from diagrams.generic.database import SQL
from diagrams.onprem.compute import Server

# Configuration for the diagram
graph_attr = {
"fontsize": "14",
"bgcolor": "white",
"pad": "0.5",
}

with Diagram(
"Real-Time Recommendation System ML Pipeline",
filename="recommendation_system_pipeline",
show=False,
direction="LR",
graph_attr=graph_attr
):

# Data Sources
with Cluster("Data Sources"):
users = Users("User Events")
app_logs = Client("Application\nLogs")
transactions = SQL("Transaction\nData")

# Data Ingestion Layer
with Cluster("Data Ingestion"):
kafka_stream = Kafka("Kafka\nStreaming")
batch_ingestion = Storage("Batch\nIngestion")

# Orchestration
with Cluster("Orchestration"):
airflow = Airflow("Apache Airflow\nWorkflow Scheduler")

# Data Processing Layer
with Cluster("Data Processing"):
with Cluster("Stream Processing"):
spark_streaming = Spark("Spark\nStreaming")
event_processor = Server("Event\nProcessor")

with Cluster("Batch Processing"):
spark_batch = Spark("Spark\nBatch Jobs")
feature_eng = Server("Feature\nEngineering")

# Storage Layer
with Cluster("Data Storage"):
data_lake = Storage("Data Lake\n(Raw Data)")
feature_store = PostgreSQL("Feature\nStore")
user_profiles = MongoDB("User\nProfiles")

# ML Pipeline
with Cluster("ML Training Pipeline"):
model_training = Server("Model\nTraining")
model_eval = Server("Model\nEvaluation")
model_registry = Storage("Model\nRegistry")

# Serving Layer
with Cluster("Real-Time Serving"):
model_serving = Flask("Model\nServing API")
cache = Redis("Redis\nCache")
rec_engine = Server("Recommendation\nEngine")

# Recommendation Storage
with Cluster("Recommendation Storage"):
rec_db = Cassandra("Pre-computed\nRecommendations")
online_store = Redis("Online\nFeature Store")

# Application Layer
with Cluster("Application"):
api = Flask("REST API")
web_app = Client("Web/Mobile\nApp")

# Data Flow - Ingestion
users >> Edge(label="events") >> kafka_stream
app_logs >> Edge(label="logs") >> kafka_stream
transactions >> Edge(label="batch") >> batch_ingestion

# Stream Processing Flow
kafka_stream >> Edge(label="stream") >> spark_streaming
spark_streaming >> event_processor
event_processor >> Edge(label="real-time") >> user_profiles

# Batch Processing Flow
batch_ingestion >> Edge(label="schedule") >> airflow
airflow >> Edge(label="trigger") >> spark_batch
spark_batch >> feature_eng

# Data Storage Flow
spark_streaming >> Edge(label="store") >> data_lake
spark_batch >> data_lake
feature_eng >> Edge(label="features") >> feature_store
event_processor >> user_profiles

# ML Training Flow
airflow >> Edge(label="trigger") >> model_training
feature_store >> Edge(label="training data") >> model_training
user_profiles >> model_training

model_training >> model_eval
model_eval >> Edge(label="validated") >> model_registry

# Serving Flow
model_registry >> Edge(label="deploy") >> model_serving
feature_store >> Edge(label="features") >> online_store

online_store >> rec_engine
model_serving >> rec_engine
user_profiles >> Edge(label="context") >> rec_engine

# Pre-computation
airflow >> Edge(label="schedule") >> rec_engine
rec_engine >> Edge(label="batch recs") >> rec_db
rec_engine >> Edge(label="hot cache") >> cache

# API Flow
rec_engine >> api
rec_db >> api
cache >> api

api >> web_app
web_app >> Edge(label="feedback") >> kafka_stream

# Monitoring feedback loop
web_app >> Edge(label="user actions", style="dashed") >> users