Welcome to a walkthrough detailing the steps to develop an interactive AI assistant powered by a real-time data streaming pipeline, a graph-based storage engine, as well as a conversational AI agent -- all in Python!
Yes, that's right, the ETL layer to extract the data, the API & data layer to model the relational data, the backend ReAct agent equipped with different tools & the UI chat interface are all built following Pythonic standards.
This tutorial aims to empower other developers to build an app with a similar stack that is meaningful to them & future users. One should feel free to use their own data of interest to expand on and to generate live insights.
The code for this project is not yet open-source.
As an Airbnb host myself as well as an avid user of Airbnb when looking for rentals during travel stays, I'm always on the lookout for the best prices. I often monitor market trends in the same area I host my timeshare to give competitive rates to renters, but I also hope to expand my host properties around the country.
My aim was to find a listings site showcasing the hottest trends on property listings. It's a good thing that Airbnb has a page for this. However, some problems I had included manually searching the listings, as well as not being able to easily get the data in a central forum via a public data API or purchasable dataset.
As a developer, I wanted to build something that would streamline this process of market analysis to then give me up-to-date intelligent recommendations & investment opportunities. We've all at one point had a "Fine, I'll do it myself" moment, and this situation marked a time where I had to take matters into my own hands.
Soon in this tutorial, we'll go through how I was able to automate the daily extraction of listings from Airbnb's Friendly webpage to later present the data on demand in a conversational AI app. Let's get started!
- Python 3.11+
- Docker Engine (Running on a highly available server)
- Selenium WebDriver (MS Edge used)
- Apache Airflow 3 (Astro CLI used)
- Apache Kafka
- Apache Spark 3.4+ (w/ Spark Connect + Structured Streaming)
- Neo4j Instance (AuraDB managed cloud service used)
- Strawberry GraphQL (via FastAPI & Apollo Sandbox)
- MLflow
- Streamlit
- LLM API Key (OpenAI w/ LangChain used)
Structured Streaming + Kafka Integration Guide
Neo4j Spark Structured Streaming
Intro to GraphQL with Python and Strawberry Course
LangChain-based GraphRAG Workflow
Build a Neo4j-backed Chatbot using Python, LangChain & Streamlit
It's a good idea to have a monorepo consisting of one codebase dedicated to the Astro project and the other for Streamlit. This way we don't include unnecessary dependencies that our Airflow container doesn't need to execute the DAG.
Sample AI Project Structure (API & Chat Layers)
AirbnbFriendlyGenAIApp
βββ π .streamlit
β βββ πsecrets.toml
βββ π api
β βββ π__init__.py
β βββ πmain.py
β βββ πmutation.py
β βββ πquery.py
β βββ πschema.py
β βββ πtypes.py
βββ π chat
β βββ π__init__.py
β βββ πagent.py
β βββ πbot.py
β βββ πgraph.py
β βββ πllm.py
β βββ πutils.py
β βββ π tools
β βββ π__init__.py
β βββ πcypher.py
β βββ πgraphql.py
β βββ πvector.py
βββ π tests
βββ πREADME.md
βββ πrequirements.txt
List Python packages in requirements.txt file in project directory
fastapi==0.115.12
gql==4.0.0
langchain>=0.3.25
langchain-community==0.3.25
langchainhub==0.1.21
langchain-neo4j==0.1.1
langchain-openai==0.2.10
mlflow==3.4.0
neo4j==5.27.0
openai==1.56.0
pytest==8.1.1
requests==2.32.4
strawberry-graphql>=0.213.0
streamlit==1.35.0
tenacity!=8.4.0
uvicorn==0.34.3
Sample Astro Project Structure (ETL Layer)
AirflowStructuredStreaming
βββ π .astro
β βββ πconfig.yaml
βββ π dags
β βββ π__init__.py
β βββ πetl.py
βββ π data
βββ π include
β βββ π__init__.py
β βββ πembedding.py
β βββ πscraper.py
β βββ πscripts.py
β βββ πstreamer.py
βββ π plugins
βββ π tests
βββ π.env
βββ πairflow_settings.yaml
βββ πdocker-compose.override.yaml
βββ πDockerfile
βββ πpackages.txt
βββ πREADME.md
βββ πrequirements.txt
List Python packages in requirements.txt file in project directory
apache-airflow-providers-common-compat
apache-airflow-providers-standard
astro-run-dag
grpcio>=1.48.1
grpcio-status
kafka-python==2.2.15
neo4j==5.27.0
openai==1.56.0
pyarrow>=4.0.0
pyspark==3.5.8
pytest==8.1.1
selenium==4.35.0
Setup virtual environment
Before installing the dependencies, let's set up 2 virtual environments dedicated for each one of our projects to keep project dependencies isolated:
python -m venv .airflow-streaming
python -m venv .graphrag-appTo activate any respective environment (Linux), for example run:
source .airflow-streaming/bin/activateInstall dependencies
pip install -r requirements.txtSince we know we want to use Streamlit as our UI for conversational app, we can take advantage of its secrets.toml file and use it to manage all of our AI projects' environment variables.
Streamlit Secrets
OPENAI_API_KEY = "OPENAI_API_KEY"
OPENAI_MODEL = "OPENAI_MODEL" # e.g. gpt-4o-mini
NEO4J_URI = "NEO4J_URI" # e.g. neo4j+s://your-instance.databases.neo4j.io
NEO4J_USERNAME = "NEO4J_USERNAME"
NEO4J_PASSWORD = "NEO4J_PASSWORD"
NEO4J_DATABASE = "NEO4J_DATABASE"Similarly, in our Astro project we can just use the .env file to manage the environment variables.
Airflow Environment Variables
OPENAI_API_KEY=OPENAI_API_KEY
EDGE_DRIVER_URL=EDGE_DRIVER_URL # e.g. http://localhost:4444
WEB_URL=WEB_URL # e.g. https://www.airbnb.com/airbnb-friendly
KAFKA_BROKER=KAFKA_BROKER # e.g. localhost:9092
KAFKA_TOPIC=KAFKA_TOPIC
CHECKPOINT_LOCATION=CHECKPOINT_LOCATION # e.g. gs://{bucket}/{directory}
NEO4J_URI=NEO4J_URI # e.g. neo4j+s://your-instance.databases.neo4j.io
NEO4J_USERNAME=NEO4J_USERNAME
NEO4J_PASSWORD=NEO4J_PASSWORD
For the sake of simplicity, we will follow the manual setup steps below to start running the services on a server that the app is dependent on. Feel free to use Docker Compose here as well if you please!
| Service | Port | Description |
|---|---|---|
| Spark Web UI | 4040 | Monitor Spark application |
| Airflow Web UI | 8080 | DAG visualization |
| Kafka Web UI | 8081 | Kafka cluster visualization |
| Zookeeper | 2181 | Kafka coordination service |
| Kafka | 9092 | Message streaming platform |
| GraphQL API | 8000 | Strawberry Apollo Sandbox |
| Streamlit | 8501 | Interactive chat interface |
| MLflow | 5000 | GenAI apps & agents tracing |
1. Start Apache Kafka
# Terminal 1: Start Zookeeper
cd /path/to/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
# Terminal 2: Start Kafka Server
bin/kafka-server-start.sh config/server.properties
# Terminal 3: Start Kafka UI (Docker)
# To monitor Kafka topic and its produced messages, we will use the pre-built Docker image to run Apache Kafka UI:
docker run -it -p 8080:8081 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-uiIt may be necessary to bind the Kafka broker port to IPv4, as Docker will try to connect to local Kafka on that port only. If so, open up the bin\kafka-run-class.sh file and add
KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"Further mandatory information to connect a local Kafka cluster to an Airflow instance running in Docker is here.
2. Start Spark Connect server
In order for Spark Structured Streaming to be able to consume messages from Kakfa topic & to also write to Neo4j, we first need to make sure we have the correct JAR placed in Spark's classpath for the checkpoint location. In this case we are using GCS:
gcs-connector-hadoop3-2.2.33-shaded.jar
Then navigate to the Spark installation folder and run the command to start server. Remember, this decoupled client-server architecture allows the Airflow instance running in docker (the client) to communicate with remote SparkSession over gRPC.
spark-submit --class org.apache.spark.sql.connect.service.SparkConnectServer --name "Spark Connect server" --packages org.apache.spark:spark-connect_2.13:3.5.8,org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.8,org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_33. Start Airflow Docker container via Astro CLI
astro dev init
astro dev start4. Start GraphQL API
python -m api.main5. Start Streamlit App
streamlit run chat/bot.py6. Start MLflow
mlflow uiWe first need to decide on a data scheduler that will periodically, in this case daily, scrape our target Airbnb web page to make sure we are getting the most recent data for our GraphRAG application. For this demo we will use Airflow, which uses a set of tasks in a DAG pipeline to orchestrate different parts of the streaming pipeline. Below is a sample DAG view of a successful run:
We now need to focus on the scraping of data from the Airbnb Friendly webpage. Before moving one, one needs to pull the latest driver compatible with the local Edge installation. Again, Chrome or Firefox can be used here instead. Then in the Docker Compose override YAML file, one can specify the image with the available ports so the default template containers Airflow spins up at start time can be merged with our custom Selenium container.
Here, it is crucial to keep in mind the 'Developer Tools' that help us inspect the HTML page with DOM elements presented as a tree structure of interconnected nodes. Below is a quick demo detailing that specific elements could look like on a specific citys' listings:
As a general guideline, the hierarchy shows that <div> tags group our HTML elements into sections, while the class & href attributes contain the text or destination URLs we want to strip. To integrate these principles together, understanding XPath Axes & Selenium's web elements are crucial. Some helpful links I used to brush up on my foundations are:
Hint: I only had to use 'following-sibling' & 'ancestor' axes to get the XPath of element nodes containing the target data I needed!
Once done, you can expect your Selenium code to extract all Airbnb listing data from each city available. Here is what our WebDriver will be automated to do for us in real-time:
In each city's webpage, the scrollable section will initially only load a certain number of listings as property cards. We need to simulate a scroll behavior to the maximum extent of this section to scrape all the data.
Now that we have our data organized in a way that's easy to parse, we can produce our data as message records so Kafka can queue it, then we will use Spark Structured Streaming to consume those messages and output it to Neo4j.
Our justification to use a real-time data streaming pipeline is to handle large records of data, especially if this project increases in scope and we decide to scrape listings in other countries outside of the US. If one doesn't feel the need to use streaming technologies and prefers to directly write the scraped data to Neo4j, a Python-Neo4j connector allows for exactly that.
When the streaming pipeline runs daily, the checkpoint location (a path in an HDFS-compatible storage system) may cause some challenges we need to keep in mind:
- Data loss
- Topic resets
- Checkpoint offsets
Here is a comprehensive article I referenced when needing to come up with solutions to those potential roadblocks.
In each iteration of the daily streaming pipeline, it's a good practice to first re-create & delete the Kakfa topic before producing new incoming data, then re-start the Spark Streaming query consuming that data so each microbatch starts with an offset of 0. Additionally, some further configurations to set in the code as a best practice are:
| Client | Configuration |
|---|---|
| Kafka Producer | enable_idempotence=True |
| Spark Consumer | startingOffsets="latest" |
| Spark Consumer | failOnDataLoss="false" |
We can monitor the messages produced to the Kafka topic using its Web UI:
We can also monitor the running Spark Streaming session in its respective Web UI:
We are now ready to write our data to Neo4j using Cypher queries via Graph Query Language (GQL)!
Once we properly identify our nodes with their properties & relationships via a Cypher query to create our graph database model, we can have Spark write the data stream to our Neo4j AuraDB instance. This will kickstart progress on introducing a GraphRAG system that uses Neo4j as a knowledge graph to provide more accurate & relevant results to our GenAI application based on relationships between the data nodes.
Graph Visualization Overview:
![]() |
![]() |
A summary of the graph database model the above images illustrate:
Node Structures
| Node Type | Properties | Description |
|---|---|---|
| Country | name: String |
Represents a country (e.g., "United States") |
| State | code: String |
Represents a state (e.g., "CA", "NY") |
| City | name: Stringstate: String |
Represents a city with its state |
| Listing | name: Stringaddress: Stringmonthly_rent: Integerweekly_earnings: Integernet_revenue: Integerroi: Floatdescription: String |
Represents an Airbnb listing with financial metrics |
Relationships
| Relationship | Direction | From β To | Description |
|---|---|---|---|
| HAS_STATE | β |
Country β State | Country contains states |
| HAS_CITY | β |
State β City | State contains cities |
| HAS_LISTING | β |
City β Listing | City contains listings |
Now we can move on to creating a single GraphQL endpoint that will seamlessly enable one to retrieve data with a single query. This is opposed to using & navigating multiple REST API endpoints on the client side where there can be large response data.
Strawberry integrates with FastAPI to give that endpoint and allows us to create resources that need to be shared across requests. We use context managers to provide the Neo4j driver to GraphQL resolvers. If we define resolvers for fields of the Query type, these fields become the entry point into the GraphQL schema.
The schema is a collection of types & fields written in schema definition language (SDL), and once created the resolver can retrieve data for a specific field in the schema. Strawberry translates Python types into GraphQL SDL in the background.
Once we find all properties of a class defined as a type, a field can be resolved with a specific function. Then we can use the Apollo Sandbox IDE to validate the GraphQL schema we've created with different types & fields as shown:
Some of the query operations focus on ROI, highest earnings by revenue, listings that fall within a certain range, just to name a few. This GraphQL web IDE allows us to become more familiar with our query operations. Next we will talk more about how those resolver functions are helpful in a GraphRAG workflow for a GenAI app hosted on Streamlit.
We will create a Streamlit app that will interact with an OpenAI LLM through an agent. A list of tools with prompt templates need to be passed to the agent for it to then generate a response to a user's input in UI.
We specifically only work with LangChain here and not LangGraph because we want to take advantage of Neo4j's chat message history storage. This is what allows for a conversation history via the use of callback functions & stored session IDs, giving the agent a memory of recent messages.
We can either create a ReAct agent using a pre-existing RAG prompt template from LangSmith, provide our own custom prompt template, or include a general prompt template dedicated for chat models. In either case, we always need to specify the LLM, tools & type of prompt when creating the ReAct agent.
Some sample tools registered & passed to our agent include:
- An Airbnb general chat chain to fallback on when no other tool is available for the agent
- A Graph Cypher QA Chain to execute direct region-specific queries against Neo4j using Text2Cypher
- A custom Smart NLP text parser using existing resolver functions in GraphQL endpoint
- LangChain's BaseGraphQLTool to allow agent to generate & execute queries based on GraphQL API endpoint
- Semantic search similarity based on a listing's description due to vector indexes & text embeddings created in knowledge graph
A sample user input & agent response is shown below:
We can enable tracing for our application to visualize the data flow with MLflow's LangChain integration. In the Traces tab, we can see the reasoning flow of the agent after it parses the user input. The tool shown below is identified by the name given to it during initialization in the backend code:
Tip: If one ever encounters memory errors in the Streamlit application, some fine-tuning techniques involve reducing model token limits, lowering the temparature for more factual responses and adding timeout durations for requests in the LLM's initialization.
As this project keeps growing, we look to enhance the Streamlit app with cosmetic changes and eventually deploy to its Community Cloud so users can interact with it over the Internet. This would also require hosting the GraphQL server publicly with either AWS AppSync or Neo4j AuraDB's Data API service.
Stay tuned for updates!











