Using LangChain and LangGraph to Build a RAG-Powered Chatbot

Traducciones al Español
Estamos traduciendo nuestros guías y tutoriales al Español. Es posible que usted esté viendo una traducción generada automáticamente. Estamos trabajando con traductores profesionales para verificar las traducciones de nuestro sitio web. Este proyecto es un trabajo en curso.
Create a Linode account to try this guide with a $ credit.
This credit will be applied to any valid services used during your first  days.

Large language models have extensive general knowledge but can’t access your organization’s proprietary documents, internal policies, or specialized domain content. Retrieval-augmented generation (RAG) solves this by retrieving relevant information from your documents and including it in prompts to the LLM.

By building a chatbot with RAG, you can ground its responses in your specific content, ensuring accurate answers that reflect your documentation rather than the model’s training data. Building a chatbot with RAG requires solving several problems: processing and indexing documents, generating embeddings, performing vector searches, managing conversation state, and orchestrating LLM interactions.

This guide describes how to leverage LangChain and LangGraph, two open-source production-ready frameworks, to simplify chatbot development.

Workflow Diagram

Below is a workflow diagram of an example RAG chatbot architecture built with the LangChain, LangGraph, and FastAPI frameworks.

  1. LangChain is used to load source documents from an S3-compatible object storage bucket.

  2. The documents are split into chunks and then vector representations of these chunks are generated by an embedding model.

    Document chunking addresses two technical requirements: maintaining text segments within the model’s token limits and optimizing vector database queries for better accuracy and response times.

  3. These generated vector embeddings are stored in the vector database. The example code assumes a PostgreSQL database with the pgvector extension enabled.

  4. When a user submits a question, the chatbot sends it to the same embedding model that processed the documents. This converts the query text into a vector representation in the same mathematical space as the document chunks, enabling meaningful comparisons.

  5. The query embedding is compared against all stored document embeddings using vector similarity search. The pgvector extension performs this efficiently using HNSW (Hierarchical Navigable Small World) indexing, returning the most semantically similar chunks.

  6. The chatbot retrieves the top matching document chunks identified by the similarity search. These chunks contain the specific text segments from your documents that are most relevant to the user’s question.

  7. LangGraph retrieves the conversation history for the current session from the PostgreSQL state database. This provides the LLM with previous messages and responses, enabling it to understand follow-up questions and maintain context across the conversation.

  8. The chatbot constructs a prompt that combines the user’s question, the retrieved document chunks as context, and the conversation history. This complete prompt is sent to the LLM (gpt-4o-mini in the example), which generates a response grounded in both your specific documents and the ongoing conversation.

  9. After the LLM responds, LangGraph saves both the user’s question and the assistant’s answer to the state database using its checkpointing mechanism. This persisted history allows users to continue conversations across sessions and enables the chatbot to reference earlier exchanges.

Systems and Components

  • Python Application: Your chatbot application, built with LangChain, LangGraph, and FastAPI.

  • LangChain: Open-source framework that orchestrates document processing, embedding generation, vector retrieval, and prompt engineering.

  • LangGraph: Open-source framework that manages stateful language model conversations.

  • FastAPI: Python web framework providing the REST API endpoints that handle chat requests and responses.

  • Source Documents: S3-compatible object storage used to store source documents that form the chatbot’s knowledge base.

  • OpenAI API: External LLM service providing both the embedding model (text-embedding-3-small) for document vectorization and the chat model (gpt-4o-mini) for generating responses.

  • Vector Database: A PostgreSQL database with the pgvector extension enabled. Used for storing document embeddings and performing vector similarity searches.

  • State Database: A PostgreSQL database used by LangGraph to persist conversation history across chatbot sessions.

LangChain vs LangGraph

LangChain offers a comprehensive toolkit for building LLM-powered applications. It provides pre-built integrations with popular vector databases and language models. For retrieval-augmented generation (RAG) chatbots, LangChain includes methods for document loading, text splitting, embedding generation, and the retrieval pipeline. Its LCEL expression language lets you chain operations together declaratively, improving the readability of your chatbot code.

LangGraph orchestrates stateful AI agents. LangGraph provides persistent checkpointing that saves conversation history to a database. This means users can close a chat and resume it later without losing context. LangGraph models conversations as state graphs, where each node represents a processing step (like retrieval or response generation) and edges control the flow of the agent’s logic. LangChain and LangGraph can be used together, LangGraph can also be used without LangChain.

Understanding Retrieval-Augmented Generation (RAG)

Here is a quick overview of how RAG solves the problem of LLMs having limited knowledge of your specific documents. RAG operates in two distinct phases:

  1. The indexing phase involves preparing your knowledge base: loading documents, splitting them into chunks, generating embeddings, and storing everything in your vector database.

  2. The query phase happens with every user question: converting the question to a vector, finding related documents through vector search, and passing that information to the LLM for answer generation.

The key insight is that the retriever uses vector similarity-—not the LLM—-to find relevant documents. The application involves the LLM only after retrieval to synthesize information into a natural language answer.

Chatbot Code Walkthrough

The example chatbot application code can be found in the rag-pipeline-chatbot-langchain branch of the linode/docs-cloud-projects repository on GitHub. This section describes some key areas of the code and how they interact with the LangChain, LangGraph, and FastAPI frameworks.

Here is a quick breakdown of the key Python files in the repository:

  • app/api/
    • chat.py: Handles chat API endpoints for processing user messages and returning AI responses with conversation thread management.
    • health.py: For monitoring application status, database connectivity, and system health.
  • app/core/
    • config.py: Loads environment variables and provides centralized settings for databases, APIs, and application parameters.
    • memory.py: Implements conversation memory persistence across sessions using LangGraph with PostgreSQL checkpointing.
    • rag.py: Core RAG pipeline implementation that handles document indexing from S3-compatible storage, vector storage with pgvector, and query processing.
  • app/scripts/
    • init_db.py: Database initialization script that creates necessary PostgreSQL databases, enables the pgvector extension, and sets up the required tables and indexes.
    • index_documents.py: Indexes documents in an object storage bucket by processing them through the RAG pipeline for chunking and embedding, then storing data in the vector database.

Implementing Document Indexing

The code for indexing documents is present in the app/core/rag.py file. Here are some highlights from the index_documents_from_s3 method:

File: app/core/rag.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def index_documents_from_s3(self, object_keys: List[str]) -> Dict[str, Any]:
    """
    Index documents from S3-compatible Object Storage.

    Args:
        object_keys: List of object keys in the S3 bucket

    Returns:
        Dictionary with indexing results
    """
    try:
        total_chunks = 0
        processed_docs = 0

        for object_key in object_keys:
            logger.info(f"Processing document: {object_key}")

            # Load document from S3
            loader = S3FileLoader(
                bucket=settings.linode_object_storage_bucket,
                key=object_key,
                aws_access_key_id=settings.linode_object_storage_access_key,
                aws_secret_access_key=settings.linode_object_storage_secret_key,
                endpoint_url=settings.linode_object_storage_endpoint
            )

            documents = loader.load()

            if not documents:
                logger.warning(f"No content found in document: {object_key}")
                continue

            # Split documents into chunks
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=settings.chunk_size,
                chunk_overlap=settings.chunk_overlap,
                length_function=len,
                separators=["\n\n", "\n", " ", ""]
            )

            chunks = text_splitter.split_documents(documents)

            # Extract enhanced metadata from document
            enhanced_metadata = self._extract_document_metadata(object_key, documents[0])

            # Log the extracted metadata
            logger.info(f"Extracted metadata for {object_key}:")
            if enhanced_metadata.get("title"):
                logger.info(f"  Title: {enhanced_metadata['title']}")
            if enhanced_metadata.get("author"):
                logger.info(f"  Author: {enhanced_metadata['author']}")
            if enhanced_metadata.get("language"):
                logger.info(f"  Language: {enhanced_metadata['language']}")
            logger.info(f"  Document Type: {enhanced_metadata.get('document_type', 'unknown')}")
            logger.info(f"  Document Length: {enhanced_metadata.get('document_length', 0):,} characters")
            logger.info(f"  Indexed At: {enhanced_metadata.get('indexed_at', 'unknown')}")

            # Add metadata to chunks
            for i, chunk in enumerate(chunks):
                chunk.metadata.update({
                    "source": object_key,
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                    **enhanced_metadata  # Spread enhanced metadata
                })

            # Store chunks in vector database
            self.vector_store.add_documents(chunks)

            total_chunks += len(chunks)
            processed_docs += 1

            logger.info(f"Successfully indexed {len(chunks)} chunks from {object_key}")
            logger.info(f"  Chunk size: {settings.chunk_size} chars, overlap: {settings.chunk_overlap} chars")

        # Create vector index for better performance after all documents are added
        if total_chunks > 0:
            logger.info("Creating vector indexes for better search performance...")
            self._create_vector_index()

        result = {
            "success": True,
            "documents_processed": processed_docs,
            "chunks_created": total_chunks,
            "message": f"Successfully indexed {processed_docs} documents with {total_chunks} chunks"
        }

        logger.info(f"Document indexing completed: {result}")
        return result

    except Exception as e:
        logger.error(f"Failed to index documents: {e}")
        return {
            "success": False,
            "documents_processed": 0,
            "chunks_created": 0,
            "message": f"Failed to index documents: {str(e)}"
        }
  • On lines 139-147, LangChain’s S3FileLoader is used to load documents from S3-compatible object storage. It handles authentication and retrieval of documents from object storage
  • RecursiveCharacterTextSplitter (lines 154-159), a LangChain text splitting utility, is used to intelligently splits documents into chunks while also respecting a configurable chunk size (chunk_size), creating overlaps between chunks (chunk_overlap), and using hierarchical separators (paragraphs / lines / spaces / characters).
  • On Line 188, the add_documents method of LangChain’s vector store interface is used to add chunks to the vector database.

Building the RAG Query Pipeline

The application uses LangChain to chain together the retrieval of relevant document chunks with the LLM-generated response to the user’s prompt. In app/core/rag.py, chaining these steps together looks like this:

File: app/core/rag.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def _create_rag_chain(self):
    """Create the RAG chain for question answering."""
    try:
        # Create retriever
        self.retriever = self.vector_store.as_retriever(
            search_type="similarity",
            search_kwargs={"k": settings.retrieval_k}
        )

        # Define the RAG prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", """You are a helpful assistant that answers questions based on the provided context.

            Instructions:
            - Answer questions using ONLY the information provided in the context documents
            - Always cite your sources when referencing specific information
            - Include the document title, author, and source file when citing
            - Cite the source file as the original document name, not the chunk index or document number
            - Don't cite the document number (like "Document 1" or "Document 2") as this is not useful information
            - If the context doesn't contain relevant information, say so clearly
            - Be concise but comprehensive in your answers
            - Maintain a helpful and professional tone

            When citing sources, use this format: "According to [Title] by [Author] ([Source file])..." or "As mentioned in [Title] by [Author]..."."""),
            ("human", "Context:\n{context}\n\nQuestion: {question}")
        ])

        # Create the RAG chain using LangChain Expression Language (LCEL)
        self.rag_chain = (
            {"context": self.retriever | self._format_docs, "question": RunnablePassthrough()}
            | prompt_template
            | self.llm
            | StrOutputParser()
        )

        logger.info("RAG chain created successfully")
    except Exception as e:
        logger.error(f"Failed to create RAG chain: {e}")
        raise
  • Lines 212-215: The vector store retriever is configured to return the top 10 most similar chunks (settings.retrieval_k is defined in app/core/config.py).
  • A ChatPromptTemplate is designed on lines 69-84 that instructs the LLM to use the retrieved context and cite sources.
  • Lines 87-92 use the LangChain Expression Language (LCEL) to invoke the retriever and establish the context for a query, add that context to the prompt, send the enriched prompt to the LLM, and return the LLM’s response.

Adding Conversation Memory

To make the RAG system more user-friendly within a chatbot interface, extend it with persistent conversation memory using LangGraph. LangGraph stores conversation history in the conversations database, which enables persistence across restarts and supports multiple concurrent conversations. The example chatbot’s persistence code is implemented in app/core/memory.py:

  • _create_conversation_graph: This method compiles a graph, which LangGraph uses to represent the chatbot agent’s workflow.

    File: app/core/memory.py
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    
        def _create_conversation_graph(self):
            """Create the LangGraph conversation graph."""
            try:
                # Create the graph with state schema
                workflow = StateGraph(ConversationState)
    
                # Add nodes
                workflow.add_node("rag_query", self._rag_query_node)
                workflow.add_node("generate_response", self._generate_response_node)
    
                # Define the flow
                workflow.set_entry_point("rag_query")
                workflow.add_edge("rag_query", "generate_response")
                workflow.add_edge("generate_response", END)
    
                # Compile the graph with checkpointer
                self.graph = workflow.compile(checkpointer=self.checkpointer)
    
                logger.info("Conversation graph created successfully")
            except Exception as e:
                logger.error(f"Failed to create conversation graph: {e}")
                raise

    Graphs specify the state of the chatbot application, the actions performed by the agent, and which actions an agent should take based on its current state.

    • Lines 154-155 create nodes for the graph. Nodes are individual processing steps for your agent. The rag_query node retrieves relevant documents, and the generate_response node generates the LLM response.

    • Lines 158-160 create edges for the graph. Edges determine which states should follow from each other, or the logical flow of the agent. These lines define this execution path: the agent starts with rag_query, proceeds to generate_response, then ends.

    • Line 163 compiles the graph, which performs some validation of the logical consistency of the graph. The graph is compiled with a PostgreSQL checkpointer that automatically persists conversation state after each step, enabling conversation history across sessions.

  • process_message: This method handles a user’s chatbot question and retrieves an answer from the LLM. It does this while referring to and preserving the user’s conversation history. It can accept a unique thread ID as an argument which corresponds to a user’s conversation history with the chatbot.

    File: app/core/memory.py
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    
        def process_message(self, message: str, thread_id: Optional[str] = None) -> Dict[str, Any]:
            """
            Process a user message and return the response.
    
            Args:
                message: The user's message
                thread_id: Optional thread ID for conversation continuity
    
            Returns:
                Dictionary with response and thread information
            """
            try:
                # Generate thread ID if not provided
                if not thread_id:
                    thread_id = str(uuid.uuid4())
    
                # Get existing conversation history first
                existing_history = self.get_conversation_history(thread_id)
                existing_messages = existing_history.get("messages", [])
    
                # Create human message in serializable format
                human_message = {
                    "type": "HumanMessage",
                    "content": message,
                    "timestamp": datetime.utcnow().isoformat()
                }
    
                # Prepare initial state with existing messages + new message
                initial_state = {
                    "messages": existing_messages + [human_message],
                    "thread_id": thread_id,
                    "user_input": message,
                    "rag_result": None
                }
    
                # Configure the graph with thread ID
                config = {"configurable": {"thread_id": thread_id}}
    
                # Run the conversation graph
                final_state = self.graph.invoke(initial_state, config=config)
    
                # Extract the response
                messages = final_state["messages"]
                ai_response = messages[-1]["content"] if messages else "No response generated."
    
                result = {
                    "response": ai_response,
                    "thread_id": thread_id,
                    "message_count": len(messages),
                    "timestamp": datetime.utcnow().isoformat()
                }
    
                logger.info(f"Message processed successfully for thread {thread_id}")
                return result
    • Lines 289-290 retrieve the conversation history for the user’s thread ID.

    • Lines 292-305 combine the previous conversation history with the new user message into an initial state for the agent’s graph.

    • Lines 307-311 ensure that the graph execution has access to the thread ID. This allows the PostgresSQL checkpointer to store and retrieve state for the conversation.

    • Line 311: The graph is invoked to execute the agent’s workflow (RAG querying and LLM response generation).

Creating the API

The application uses the FastAPI framework to create the web API that clients interact with to send messages and receive responses. The API is implemented in https://github.com/linode/docs-cloud-projects/blob/rag-pipeline-chatbot-langchain/app/api/chat.py. The key endpoint, which accepts messages and returns AI-generated responses, is implemented like this:

File: app/api/chat.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@router.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    conversation_memory=Depends(get_conversation_memory)
) -> ChatResponse:
    """
    Process a chat message and return the AI response.

    Args:
        request: Chat request containing message and optional thread_id
        conversation_memory: Dependency injection for conversation memory

    Returns:
        ChatResponse with the AI's response and thread information
    """
    try:
        logger.info(f"Processing chat message: {request.message[:50]}...")

        # Process the message through the conversation memory system
        result = conversation_memory.process_message(
            message=request.message,
            thread_id=request.thread_id
        )

        # Create response
        response = ChatResponse(
            response=result["response"],
            thread_id=result["thread_id"]
        )

        logger.info(f"Chat message processed successfully for thread {result['thread_id']}")
        return response

More Information

You may wish to consult the following resources for additional information on this topic. While these are provided in the hope that they will be useful, please note that we cannot vouch for the accuracy or timeliness of externally hosted materials.

This page was originally published on


Your Feedback Is Important

Let us know if this guide was helpful to you.


Join the conversation.
Read other comments or post your own below. Comments must be respectful, constructive, and relevant to the topic of the guide. Do not post external links or advertisements. Before posting, consider if your comment would be better addressed by contacting our Support team or asking on our Community Site.
The Disqus commenting system for Linode Docs requires the acceptance of Functional Cookies, which allow us to analyze site usage so we can measure and improve performance. To view and create comments for this article, please update your Cookie Preferences on this website and refresh this web page. Please note: You must have JavaScript enabled in your browser.