Skip to content

asin553/HostLens_AI

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

29 Commits
Β 
Β 
Β 
Β 

Repository files navigation

HostLens AI

image

Welcome to a walkthrough detailing the steps to develop an interactive AI assistant powered by a real-time data streaming pipeline, a graph-based storage engine, as well as a conversational AI agent -- all in Python!

Yes, that's right, the ETL layer to extract the data, the API & data layer to model the relational data, the backend ReAct agent equipped with different tools & the UI chat interface are all built following Pythonic standards.

This tutorial aims to empower other developers to build an app with a similar stack that is meaningful to them & future users. One should feel free to use their own data of interest to expand on and to generate live insights.

The code for this project is not yet open-source.

Architecture Overview

image

Motivation

As an Airbnb host myself as well as an avid user of Airbnb when looking for rentals during travel stays, I'm always on the lookout for the best prices. I often monitor market trends in the same area I host my timeshare to give competitive rates to renters, but I also hope to expand my host properties around the country.

My aim was to find a listings site showcasing the hottest trends on property listings. It's a good thing that Airbnb has a page for this. However, some problems I had included manually searching the listings, as well as not being able to easily get the data in a central forum via a public data API or purchasable dataset.

As a developer, I wanted to build something that would streamline this process of market analysis to then give me up-to-date intelligent recommendations & investment opportunities. We've all at one point had a "Fine, I'll do it myself" moment, and this situation marked a time where I had to take matters into my own hands.

Soon in this tutorial, we'll go through how I was able to automate the daily extraction of listings from Airbnb's Friendly webpage to later present the data on demand in a conversational AI app. Let's get started!

Quick Start

Prerequisites

  • Python 3.11+
  • Docker Engine (Running on a highly available server)
  • Selenium WebDriver (MS Edge used)
  • Apache Airflow 3 (Astro CLI used)
  • Apache Kafka
  • Apache Spark 3.4+ (w/ Spark Connect + Structured Streaming)
  • Neo4j Instance (AuraDB managed cloud service used)
  • Strawberry GraphQL (via FastAPI & Apollo Sandbox)
  • MLflow
  • Streamlit
  • LLM API Key (OpenAI w/ LangChain used)

Learning Links

Structured Streaming + Kafka Integration Guide

Neo4j Spark Structured Streaming

Intro to GraphQL with Python and Strawberry Course

LangChain GraphQL Integration

LangChain-based GraphRAG Workflow

Build a Neo4j-backed Chatbot using Python, LangChain & Streamlit

Environment Setup

It's a good idea to have a monorepo consisting of one codebase dedicated to the Astro project and the other for Streamlit. This way we don't include unnecessary dependencies that our Airflow container doesn't need to execute the DAG.

Sample AI Project Structure (API & Chat Layers)

AirbnbFriendlyGenAIApp       
β”œβ”€β”€ πŸ“ .streamlit             
β”‚   └── πŸ“„secrets.toml               
β”œβ”€β”€ πŸ“ api
β”‚   β”œβ”€β”€ πŸ“„__init__.py                                
β”‚   β”œβ”€β”€ πŸ“„main.py               
β”‚   β”œβ”€β”€ πŸ“„mutation.py               
β”‚   β”œβ”€β”€ πŸ“„query.py
β”‚   β”œβ”€β”€ πŸ“„schema.py       
β”‚   └── πŸ“„types.py              
β”œβ”€β”€ πŸ“ chat
β”‚   β”œβ”€β”€ πŸ“„__init__.py                                                
β”‚   β”œβ”€β”€ πŸ“„agent.py              
β”‚   β”œβ”€β”€ πŸ“„bot.py
β”‚   β”œβ”€β”€ πŸ“„graph.py                          
β”‚   β”œβ”€β”€ πŸ“„llm.py
β”‚   β”œβ”€β”€ πŸ“„utils.py                              
β”‚   β”œβ”€β”€ πŸ“ tools
β”‚       β”œβ”€β”€ πŸ“„__init__.py                                        
β”‚       β”œβ”€β”€ πŸ“„cypher.py      
β”‚       β”œβ”€β”€ πŸ“„graphql.py
β”‚       └── πŸ“„vector.py         
β”œβ”€β”€ πŸ“ tests
β”œβ”€β”€ πŸ“„README.md       
└── πŸ“„requirements.txt                

List Python packages in requirements.txt file in project directory

fastapi==0.115.12
gql==4.0.0
langchain>=0.3.25
langchain-community==0.3.25
langchainhub==0.1.21
langchain-neo4j==0.1.1
langchain-openai==0.2.10
mlflow==3.4.0
neo4j==5.27.0
openai==1.56.0
pytest==8.1.1
requests==2.32.4
strawberry-graphql>=0.213.0
streamlit==1.35.0
tenacity!=8.4.0
uvicorn==0.34.3

Sample Astro Project Structure (ETL Layer)

AirflowStructuredStreaming
β”œβ”€β”€ πŸ“ .astro
β”‚   └── πŸ“„config.yaml
β”œβ”€β”€ πŸ“ dags
β”‚   β”œβ”€β”€ πŸ“„__init__.py             
β”‚   └── πŸ“„etl.py
β”œβ”€β”€ πŸ“ data
β”œβ”€β”€ πŸ“ include
β”‚   β”œβ”€β”€ πŸ“„__init__.py
β”‚   └── πŸ“„embedding.py
β”‚   └── πŸ“„scraper.py
β”‚   └── πŸ“„scripts.py              
β”‚   └── πŸ“„streamer.py
β”œβ”€β”€ πŸ“ plugins
β”œβ”€β”€ πŸ“ tests
β”œβ”€β”€ πŸ“„.env
β”œβ”€β”€ πŸ“„airflow_settings.yaml
β”œβ”€β”€ πŸ“„docker-compose.override.yaml
β”œβ”€β”€ πŸ“„Dockerfile        
β”œβ”€β”€ πŸ“„packages.txt
β”œβ”€β”€ πŸ“„README.md       
└── πŸ“„requirements.txt              

List Python packages in requirements.txt file in project directory

apache-airflow-providers-common-compat
apache-airflow-providers-standard
astro-run-dag
grpcio>=1.48.1
grpcio-status
kafka-python==2.2.15
neo4j==5.27.0
openai==1.56.0
pyarrow>=4.0.0
pyspark==3.5.8
pytest==8.1.1
selenium==4.35.0

Setup virtual environment

Before installing the dependencies, let's set up 2 virtual environments dedicated for each one of our projects to keep project dependencies isolated:

python -m venv .airflow-streaming
python -m venv .graphrag-app

To activate any respective environment (Linux), for example run:

source .airflow-streaming/bin/activate

Install dependencies

pip install -r requirements.txt

Configuration Setup

Since we know we want to use Streamlit as our UI for conversational app, we can take advantage of its secrets.toml file and use it to manage all of our AI projects' environment variables.

Streamlit Secrets

OPENAI_API_KEY = "OPENAI_API_KEY"
OPENAI_MODEL = "OPENAI_MODEL"  # e.g. gpt-4o-mini

NEO4J_URI = "NEO4J_URI"  # e.g. neo4j+s://your-instance.databases.neo4j.io
NEO4J_USERNAME = "NEO4J_USERNAME"  
NEO4J_PASSWORD = "NEO4J_PASSWORD"
NEO4J_DATABASE = "NEO4J_DATABASE"

Similarly, in our Astro project we can just use the .env file to manage the environment variables.

Airflow Environment Variables

OPENAI_API_KEY=OPENAI_API_KEY

EDGE_DRIVER_URL=EDGE_DRIVER_URL # e.g. http://localhost:4444
WEB_URL=WEB_URL  # e.g. https://www.airbnb.com/airbnb-friendly

KAFKA_BROKER=KAFKA_BROKER # e.g. localhost:9092
KAFKA_TOPIC=KAFKA_TOPIC
CHECKPOINT_LOCATION=CHECKPOINT_LOCATION  # e.g. gs://{bucket}/{directory}

NEO4J_URI=NEO4J_URI  # e.g. neo4j+s://your-instance.databases.neo4j.io
NEO4J_USERNAME=NEO4J_USERNAME  
NEO4J_PASSWORD=NEO4J_PASSWORD

Manual Infrastructure Setup

For the sake of simplicity, we will follow the manual setup steps below to start running the services on a server that the app is dependent on. Feel free to use Docker Compose here as well if you please!

Service Port Description
Spark Web UI 4040 Monitor Spark application
Airflow Web UI 8080 DAG visualization
Kafka Web UI 8081 Kafka cluster visualization
Zookeeper 2181 Kafka coordination service
Kafka 9092 Message streaming platform
GraphQL API 8000 Strawberry Apollo Sandbox
Streamlit 8501 Interactive chat interface
MLflow 5000 GenAI apps & agents tracing

Useful commands

1. Start Apache Kafka

# Terminal 1: Start Zookeeper
cd /path/to/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

# Terminal 2: Start Kafka Server
bin/kafka-server-start.sh config/server.properties

# Terminal 3: Start Kafka UI (Docker)
# To monitor Kafka topic and its produced messages, we will use the pre-built Docker image to run Apache Kafka UI:
docker run -it -p 8080:8081 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

It may be necessary to bind the Kafka broker port to IPv4, as Docker will try to connect to local Kafka on that port only. If so, open up the bin\kafka-run-class.sh file and add

KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"

Further mandatory information to connect a local Kafka cluster to an Airflow instance running in Docker is here.

2. Start Spark Connect server

In order for Spark Structured Streaming to be able to consume messages from Kakfa topic & to also write to Neo4j, we first need to make sure we have the correct JAR placed in Spark's classpath for the checkpoint location. In this case we are using GCS:

gcs-connector-hadoop3-2.2.33-shaded.jar

Then navigate to the Spark installation folder and run the command to start server. Remember, this decoupled client-server architecture allows the Airflow instance running in docker (the client) to communicate with remote SparkSession over gRPC.

spark-submit --class org.apache.spark.sql.connect.service.SparkConnectServer --name "Spark Connect server" --packages org.apache.spark:spark-connect_2.13:3.5.8,org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.8,org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3

3. Start Airflow Docker container via Astro CLI

astro dev init
astro dev start

4. Start GraphQL API

python -m api.main

5. Start Streamlit App

streamlit run chat/bot.py

6. Start MLflow

mlflow ui

ETL Layer

Extract

We first need to decide on a data scheduler that will periodically, in this case daily, scrape our target Airbnb web page to make sure we are getting the most recent data for our GraphRAG application. For this demo we will use Airflow, which uses a set of tasks in a DAG pipeline to orchestrate different parts of the streaming pipeline. Below is a sample DAG view of a successful run:

hippo

We now need to focus on the scraping of data from the Airbnb Friendly webpage. Before moving one, one needs to pull the latest driver compatible with the local Edge installation. Again, Chrome or Firefox can be used here instead. Then in the Docker Compose override YAML file, one can specify the image with the available ports so the default template containers Airflow spins up at start time can be merged with our custom Selenium container.

Here, it is crucial to keep in mind the 'Developer Tools' that help us inspect the HTML page with DOM elements presented as a tree structure of interconnected nodes. Below is a quick demo detailing that specific elements could look like on a specific citys' listings:

hippo

As a general guideline, the hierarchy shows that <div> tags group our HTML elements into sections, while the class & href attributes contain the text or destination URLs we want to strip. To integrate these principles together, understanding XPath Axes & Selenium's web elements are crucial. Some helpful links I used to brush up on my foundations are:

Selenium Web Elements

XPath Axes

Hint: I only had to use 'following-sibling' & 'ancestor' axes to get the XPath of element nodes containing the target data I needed!

Once done, you can expect your Selenium code to extract all Airbnb listing data from each city available. Here is what our WebDriver will be automated to do for us in real-time:

hippo

In each city's webpage, the scrollable section will initially only load a certain number of listings as property cards. We need to simulate a scroll behavior to the maximum extent of this section to scrape all the data.

hippo

Transform

Now that we have our data organized in a way that's easy to parse, we can produce our data as message records so Kafka can queue it, then we will use Spark Structured Streaming to consume those messages and output it to Neo4j.

Our justification to use a real-time data streaming pipeline is to handle large records of data, especially if this project increases in scope and we decide to scrape listings in other countries outside of the US. If one doesn't feel the need to use streaming technologies and prefers to directly write the scraped data to Neo4j, a Python-Neo4j connector allows for exactly that.

When the streaming pipeline runs daily, the checkpoint location (a path in an HDFS-compatible storage system) may cause some challenges we need to keep in mind:

  1. Data loss
  2. Topic resets
  3. Checkpoint offsets

Here is a comprehensive article I referenced when needing to come up with solutions to those potential roadblocks.

In each iteration of the daily streaming pipeline, it's a good practice to first re-create & delete the Kakfa topic before producing new incoming data, then re-start the Spark Streaming query consuming that data so each microbatch starts with an offset of 0. Additionally, some further configurations to set in the code as a best practice are:

Client Configuration
Kafka Producer enable_idempotence=True
Spark Consumer startingOffsets="latest"
Spark Consumer failOnDataLoss="false"

We can monitor the messages produced to the Kafka topic using its Web UI:

hippo

We can also monitor the running Spark Streaming session in its respective Web UI:

hippo

We are now ready to write our data to Neo4j using Cypher queries via Graph Query Language (GQL)!

Load

Once we properly identify our nodes with their properties & relationships via a Cypher query to create our graph database model, we can have Spark write the data stream to our Neo4j AuraDB instance. This will kickstart progress on introducing a GraphRAG system that uses Neo4j as a knowledge graph to provide more accurate & relevant results to our GenAI application based on relationships between the data nodes.

Graph Visualization Overview:

Schema Visualization Subgraph

A summary of the graph database model the above images illustrate:

Node Structures

Node Type Properties Description
Country name: String Represents a country (e.g., "United States")
State code: String Represents a state (e.g., "CA", "NY")
City name: String
state: String
Represents a city with its state
Listing name: String
address: String
monthly_rent: Integer
weekly_earnings: Integer
net_revenue: Integer
roi: Float
description: String
Represents an Airbnb listing with financial metrics

Relationships

Relationship Direction From β†’ To Description
HAS_STATE β†’ Country β†’ State Country contains states
HAS_CITY β†’ State β†’ City State contains cities
HAS_LISTING β†’ City β†’ Listing City contains listings

API & Data Layer

Now we can move on to creating a single GraphQL endpoint that will seamlessly enable one to retrieve data with a single query. This is opposed to using & navigating multiple REST API endpoints on the client side where there can be large response data.

Strawberry integrates with FastAPI to give that endpoint and allows us to create resources that need to be shared across requests. We use context managers to provide the Neo4j driver to GraphQL resolvers. If we define resolvers for fields of the Query type, these fields become the entry point into the GraphQL schema.

The schema is a collection of types & fields written in schema definition language (SDL), and once created the resolver can retrieve data for a specific field in the schema. Strawberry translates Python types into GraphQL SDL in the background.

Once we find all properties of a class defined as a type, a field can be resolved with a specific function. Then we can use the Apollo Sandbox IDE to validate the GraphQL schema we've created with different types & fields as shown:

hippo

Some of the query operations focus on ROI, highest earnings by revenue, listings that fall within a certain range, just to name a few. This GraphQL web IDE allows us to become more familiar with our query operations. Next we will talk more about how those resolver functions are helpful in a GraphRAG workflow for a GenAI app hosted on Streamlit.

Conversational UI

We will create a Streamlit app that will interact with an OpenAI LLM through an agent. A list of tools with prompt templates need to be passed to the agent for it to then generate a response to a user's input in UI.

We specifically only work with LangChain here and not LangGraph because we want to take advantage of Neo4j's chat message history storage. This is what allows for a conversation history via the use of callback functions & stored session IDs, giving the agent a memory of recent messages.

We can either create a ReAct agent using a pre-existing RAG prompt template from LangSmith, provide our own custom prompt template, or include a general prompt template dedicated for chat models. In either case, we always need to specify the LLM, tools & type of prompt when creating the ReAct agent.

Some sample tools registered & passed to our agent include:

  1. An Airbnb general chat chain to fallback on when no other tool is available for the agent
  2. A Graph Cypher QA Chain to execute direct region-specific queries against Neo4j using Text2Cypher
  3. A custom Smart NLP text parser using existing resolver functions in GraphQL endpoint
  4. LangChain's BaseGraphQLTool to allow agent to generate & execute queries based on GraphQL API endpoint
  5. Semantic search similarity based on a listing's description due to vector indexes & text embeddings created in knowledge graph

A sample user input & agent response is shown below:

hippo

We can enable tracing for our application to visualize the data flow with MLflow's LangChain integration. In the Traces tab, we can see the reasoning flow of the agent after it parses the user input. The tool shown below is identified by the name given to it during initialization in the backend code:

hippo

Tip: If one ever encounters memory errors in the Streamlit application, some fine-tuning techniques involve reducing model token limits, lowering the temparature for more factual responses and adding timeout durations for requests in the LLM's initialization.

Future Considerations

As this project keeps growing, we look to enhance the Streamlit app with cosmetic changes and eventually deploy to its Community Cloud so users can interact with it over the Internet. This would also require hosting the GraphQL server publicly with either AWS AppSync or Neo4j AuraDB's Data API service.

Stay tuned for updates!

About

A guided demo to build an AI assistant giving users real-time ROI & revenue insights on U.S. apartment rentals based off Airbnb data.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors