构建 RAG 应用:LangChain 检索、记忆与调试

1. 从最小 RAG 到可用应用

上一篇已经完成了一个最小版 RAG。它可以把检索结果交给 LLM,让模型基于资料整合答案。完整链路大致是:

1
Document → Chunk → Embedding → Vector Store → Retrieve → Context → Prompt → LLM → Answer

也就是说,系统已经能完成“先找资料,再回答”这件事。例如用户问:

1
良渚古城有几重结构?

系统会先从向量库中召回相关片段,再把这些片段放进 Prompt,最后让模型生成答案。到这里,RAG 的基本原理都已经涉及到了。但这还只是 Minimal RAG。它比较适合理解底层流程,但不能当做一个可以长期维护的问答应用。实际应用时可能还会碰到其它问题:

  • 知识接入还比较手工
    上一篇主要围绕一批示例文本演示切块、向量化、写入 Chroma 和检索。如果资料来源变成 PDF、Word、网页、本地目录、企业 Wiki,就不能每来一种数据源都手写一遍导入逻辑。

  • 链路拼装还比较散
    检索函数、Prompt 拼接、模型调用、答案输出都能工作,但它们更像是示例代码。继续加功能时,代码容易变成:load → split → embed → search → format_context → build_prompt → call_llm → parse_answer。每一步都写得很清楚,但也意味着每一步都需要自己维护。

  • 它还不能自然连续对话
    第一轮问,良渚古城有几重城址结构?外围水利系统是谁修建的?
    第二轮继续问:这些水利设施有什么作用?
    如果系统没有保存历史消息,就不知道“这些水利设施”指的是良渚古城外围水利系统。模型只能根据当前这句话重新猜,RAG 链路本身并不会自动记住上一轮发生了什么。

  • 系统内部不够好观察
    回答错了时,很难马上判断问题出在哪里:是检索没找对,Prompt 没组织好,历史消息没带上,还是模型没有按上下文回答。只看最终答案,问题定位成本会很高。

所以本文主要目标是把手写 Minimal RAG 往应用方向推进一步:

1
多来源资料接入 → Retriever → Prompt → LLM → Memory → Tracing → 高层封装

这里开始出现几个新的模块:

  • Loader 负责把 PDF、Word、网页和文本统一变成 Document
  • Retriever 负责把底层向量库包装成检索接口
  • Prompt 负责稳定组织上下文和问题
  • Memory 负责保存多轮对话状态
  • Tracing 负责观察系统内部到底发生了什么

这意味着关注点发生变化。上一篇解决的是如何手写跑通一条 RAG 链路。 这一篇想要解决如何把这条链路整理成更像应用的结构。

接下来先完成第一步,把已有向量库升级成统一的知识接入层,让 PDF、Word、网页和文本都能够进入同一个问答系统。

2. 构建统一的知识接入层

上一篇已经完成了一条简单的 RAG 链路。但真实项目里,知识接入不能只靠临时示例。资料来源会变多,导入过程会失败,向量库也需要持久化,否则每次重跑都要重新写入。

这一章对应的代码拆成两个文件:

  • common.py:统一模型、Embedding、切块器和 Chroma 配置。
  • 02_ingest_documents.py:读取良渚资料,切块后写入 Chroma。

整体流程仍然很简单:

1
Loader → Document → Splitter → Chroma

2.1 统一模型、Embedding 与向量库配置

先把公共配置放到 common.py。后面所有脚本都从这里拿 LLM、Embedding 和向量库,避免重复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from pathlib import Path

from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter


BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR / "Liangzhu"
CHROMA_DIR = BASE_DIR / "chroma_db"

CHAT_COMPLETIONS_API_KEY = os.getenv("CHAT_COMPLETIONS_API_KEY")
DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"


def get_llm():
return ChatOpenAI(
api_key=CHAT_COMPLETIONS_API_KEY,
base_url=DASHSCOPE_BASE_URL,
model="deepseek-v4-pro",
temperature=0,
)


def get_embedding():
return OpenAIEmbeddings(
api_key=CHAT_COMPLETIONS_API_KEY,
base_url=DASHSCOPE_BASE_URL,
model="text-embedding-v4",
check_embedding_ctx_length=False,
)


def get_text_splitter():
# overlap 保留相邻 chunk 的上下文,降低关键信息被切断的概率。
return RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
)


def get_vector_db():
# 所有章节共用同一个 collection 和本地持久化目录。
return Chroma(
collection_name="liangzhu",
embedding_function=get_embedding(),
persist_directory=str(CHROMA_DIR),
)


def require_api_key():
if not CHAT_COMPLETIONS_API_KEY:
raise RuntimeError(
"请先设置环境变量 CHAT_COMPLETIONS_API_KEY,再运行脚本。"
)

这里有几个细节要注意一下

  • get_llm()get_embedding() 分开写,后面要换模型只改这里。
  • chunk_overlap=100 可以减少切块时把关键信息切断的问题。
  • persist_directory=str(CHROMA_DIR) 会把 Chroma 数据落到 lang-chain-rag/chroma_db,脚本结束后知识库不会消失。
  • require_api_key() 调用报错提示。

2.2 封装多来源文档导入流程

02_ingest_documents.py 负责导入资料。不同文件格式只影响 Loader,后面的读取、切块、写入逻辑都走同一个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langchain_community.document_loaders import (
Docx2txtLoader,
PyPDFLoader,
TextLoader,
WikipediaLoader,
)

from common import DATA_DIR, get_text_splitter, get_vector_db, require_api_key


text_splitter = get_text_splitter()
vector_db = get_vector_db()


def add_documents_in_batches(chunks, batch_size=10):
# 分批写入,避免一次提交过多 chunk 时请求或本地写入不稳定
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
vector_db.add_documents(batch)


def split_and_import(loader):
try:
documents = loader.load()

if not documents:
print(f"未读取到内容:{loader}")
return

chunks = text_splitter.split_documents(documents)

if not chunks:
print(f"未切分出 chunks:{loader}")
return

# 保留来源 Loader,后面排查“哪个文件/哪种 Loader 贡献了答案”会用到。
for chunk in chunks:
chunk.metadata["source_type"] = loader.__class__.__name__

add_documents_in_batches(chunks, batch_size=10)

print(f"导入成功 | {loader.__class__.__name__}")
print(f"documents={len(documents)} chunks={len(chunks)}")

except Exception as e:
print(f"导入失败:{loader}")
print(f"错误信息:{e}")

这个函数做了几件事,先读取文档,再检查空内容,然后切块并补充 source_type,最后分批写入 Chroma。分批写入是为了避免资料多时一次性写入太不稳定。

入口部分只负责创建 Loader:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main():
require_api_key()

# Wikipedia 依赖外部网络,失败不影响本地资料导入。
try:
wikipedia_loader = WikipediaLoader(
query="良渚古城",
lang="zh",
load_max_docs=2,
)
split_and_import(wikipedia_loader)
except Exception as e:
print(f"Wikipedia 跳过:{e}")

split_and_import(Docx2txtLoader(str(DATA_DIR / "Liangzhu-Overview.docx")))
split_and_import(PyPDFLoader(str(DATA_DIR / "Liangzhu-Water-System.pdf")))
split_and_import(
TextLoader(
str(DATA_DIR / "Liangzhu-Encyclopedia.txt"),
encoding="utf-8",
)
)

if __name__ == "__main__":
main()

这里把 Wikipedia 放在 try 里,是因为它经常受网络影响。如果它失败,就会跳过,继续导入本地 Word、PDF 和 TXT。真正稳定的知识来源应该优先使用自己可控的本地文件或内部知识库。

日志输出结果类似:

1
2
3
4
5
6
7
8
导入成功 | WikipediaLoader
documents=2 chunks=18
导入成功 | Docx2txtLoader
documents=1 chunks=1
导入成功 | PyPDFLoader
documents=1 chunks=1
导入成功 | TextLoader
documents=1 chunks=1

到这里,知识入口就变成了一个可以继续扩展的导入层。下一步就需要把检索能力包装成更统一的接口。

3. 构建基础 RAG 问答链

第二章完成的是知识入库。现在 03_retrieval_and_rag.py 负责先检查检索结果,再把检索结果交给模型生成回答。整体结构是:

1
Chroma → Retriever → Prompt → LLM → Answer

3.1 验证向量检索结果

RAG 的回答质量首先取决于检索。如果这里找错资料,后面模型再强也只能基于错误上下文生成。脚本里先定义问题,再打印前 4 条相似结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

from common import get_llm, get_vector_db, require_api_key

QUESTION = """
良渚古城有几重城址结构?
外围水利系统是谁修建的?
"""

def check_retrieval(vector_db, question=QUESTION):
# 先看检索结果,再接 LLM,RAG 答错时通常要先查这一层。
results = vector_db.similarity_search(question, k=4)
print("\n=== 检索结果 ===")
for index, document in enumerate(results, start=1):
print(f"\n[{index}] metadata={document.metadata}")
print(document.page_content[:500])
return results

这里看的不是最终答案,而是检索层到底拿到了什么。重点检查两件事:

  • page_content 里是否真的包含良渚古城、三重城址结构、外围水利系统等信息。
  • metadata 里是否保留了来源文件和 Loader 类型,方便后面引用和调试。

3.2 组合 Retriever、Prompt 与 LLM

检索没问题后,再构建第一版 RAG 链。这里的 retriever 来自 vector_db.as_retriever(),也就是把 Chroma 包装成 LangChain 统一的检索接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def build_rag_chain(vector_db):
chatbot = get_llm()
# retriever 是 Chroma 的统一检索接口,后面的链不再直接调用 similarity_search。
retriever = vector_db.as_retriever()
# 同一个 question 既要进入 retriever,也要原样进入 Prompt。
question_feeder = RunnablePassthrough()

rag_prompt_template = """
请仅根据提供的上下文回答问题。

要求:
1. 只能使用上下文中的信息回答
2. 不允许编造内容
3. 如果上下文中没有答案,请直接回答:“无法根据提供资料确定”
4. 回答尽量简洁,最多三句话

上下文:
{context}

问题:
{question}

回答:
"""

rag_prompt = PromptTemplate.from_template(rag_prompt_template)

return {
# LCEL 会把用户问题传给 retriever,返回的 Documents 填入 {context}。
"context": retriever,
# RunnablePassthrough 保留原始问题,填入 {question}。
"question": question_feeder,
} | rag_prompt | chatbot

这段链路里,问题会同时走两条路:

1
2
Question → Retriever → context
Question → RunnablePassthrough → question

然后二者一起进入 Prompt,最后交给 LLM 生成答案。

3.3 运行基础问答流程

最后封装一个执行函数,再在 main() 里串起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def execute_chain(chain, question):
return chain.invoke(question)


def main():
require_api_key()
vector_db = get_vector_db()
check_retrieval(vector_db)

rag_chain = build_rag_chain(vector_db)
question = """
良渚古城有几重城址结构?
外围水利系统是谁修建的?

请同时说明来源。
"""
answer = execute_chain(rag_chain, question)

print("\n=== RAG 回答 ===")
print(answer.content)


if __name__ == "__main__":
main()

到这里,第一版 RAG 已经跑通。它已经不是单纯返回文档片段,而是完成了:

1
Question → Retrieve → Context → Prompt → LLM → Answer

输出结果类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
=== 检索结果 ===

[1] metadata={'source_type': 'Docx2txtLoader', 'source': 'lang-chain-rag/Liangzhu/Liangzhu-Overview.docx'}
Liangzhu Ancient City Overview 良渚古城遗址...

[2] metadata={'source': 'lang-chain-rag/Liangzhu/Liangzhu-Encyclopedia.txt', 'source_type': 'TextLoader'}
良渚古城遗址位于...

问答提示:
- 良渚古城有几重城址结构?答案:三重城址结构。
- 外围水利系统是谁修建的?答案:良渚先民。
- 这些水利设施有什么作用?答案:防洪、蓄水、调水,并支持农业生产和古城运行。

[3] metadata={'title': '良渚古城外围水利工程遗址', 'source': 'https://zh.wikipedia.org/wiki/%E8%89%AF%E6%B8%9A%E5%8F%A4%E5%9F%8E%E5%A4%96%E5%9B%B4%E6%B0%B4%E5%88%A9%E5%B7%A5%E7%A8%8B%E9%81%97%E5%9D%80', 'source_type': 'WikipediaLoader', 'summary': '良渚古城外围水利工程遗...'}
2000年,在当时彭公镇...

[4] metadata={'source': 'https://zh.wikipedia.org/wiki/%E8%89%AF%E6%B8%9A%E5%8F%A4%E5%9F%8E%E5%A4%96%E5%9B%B4%E6%B0%B4%E5%88%A9%E5%B7%A5%E7%A8%8B%E9%81%97%E5%9D%80', 'source_type': 'WikipediaLoader', 'title': '良渚古城外围水利工程遗址', 'summary': '良渚古城外围水利...'}
== 布局和结构 ==

遗址由11条坝体构成,面积达100平方公里。堤坝可分山前长堤、谷口高坝和平原低坝3类。

山前长堤:即塘山,是水利系统中最大的单层坝体。位于良渚古城北侧,北靠天目山脉,全长约五千米,呈东北西南走向。
谷口高坝:位于西北侧较高丘陵的谷口位置,包括6条坝体。可分为东、西两组,各自封堵一个山谷,形成水库。高坝体高程约为海拔30-35米。坝体长度约为50-200米,厚度近100米。
平原低坝:位于高坝南侧的平原内,四条坝将平原上的孤丘连接而成,坝顶高程大约为10米。坝长35-360米不等。高坝与低坝之间的库区面积约8.5平方公里。库区东端与塘山长堤相接,组成统一的水利体系。

=== RAG 回答 ===
良渚古城为三重城址结构;外围水利系统由良渚先民修建。
来源:详见《良渚古城概述》和《良渚古城百科》资料中的明确记载。

但它还有一个明显问题:每次问答都是独立的。系统没有记忆能力,所以下面还需要处理连续对话。

4. 为 RAG 增加多轮对话记忆

第三章的 RAG 已经可以回答问题,但每次调用都是独立的。第一轮问“良渚古城有几重城址结构,外围水利系统是谁修建的”,第二轮再问“这些水利设施有什么作用”,如果不带历史消息,系统就不知道“这些水利设施”指的是什么。

第四章对应的代码是 04_rag_with_memory.py,核心变化是加入 ChatMessageHistory,并在每次调用时把历史消息重新放回 Prompt。

4.1 使用消息模板组织 Prompt

普通 Prompt 更适合单轮问答。要支持连续对话,需要改用 ChatPromptTemplate,让系统消息、历史消息、检索上下文和当前问题分开进入模型。

1
2
3
4
5
6
7
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

from common import get_llm, get_vector_db, require_api_key

chat_history_memory = ChatMessageHistory()

接着构建带记忆的 RAG 链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def build_rag_chain_with_memory(vector_db):
chatbot = get_llm()
retriever = vector_db.as_retriever()
question_feeder = RunnablePassthrough()

rag_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"你是一个熟悉中国考古和良渚古城的问答助手。"
"请只依据提供的上下文回答问题。"
"如果上下文中没有答案,请直接说无法根据提供资料确定。"
"回答最多三句话,并尽量说明来源。",
),
("placeholder", "{chat_history_messages}"),
("assistant", "{retrieved_context}"),
("human", "{question}"),
]
)

def get_messages(_):
# 每次调用链时动态读取历史,而不是在构建链时固定住历史。
return chat_history_memory.messages

return {
"retrieved_context": retriever,
"question": question_feeder,
# placeholder 名称必须和 ChatPromptTemplate 里的 {chat_history_messages} 对上。
"chat_history_messages": RunnableLambda(get_messages),
} | rag_prompt | chatbot

这里的关键是 chat_history_messages。它不是固定文本,而是每次执行时通过 RunnableLambda(get_messages) 动态读取当前历史。

4.2 维护对话历史状态

链本身会读取历史,但历史还需要在每轮调用前后手动更新:

1
2
3
4
5
6
def execute_chain_with_memory(chain, question):
# 先记录用户问题,再执行链;回答完成后再把 AI 消息写回历史。
chat_history_memory.add_user_message(question)
answer = chain.invoke(question)
chat_history_memory.add_ai_message(answer)
return answer

这样一轮调用之后,用户问题和模型回答都会进入 chat_history_memory。下一轮问题再来时,Prompt 里就会同时包含历史和当前问题。

4.3 验证连续问答效果

完整执行入口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def main():
require_api_key()
vector_db = get_vector_db()
rag_chain = build_rag_chain_with_memory(vector_db)

question = """
良渚古城有几重城址结构?外围水利系统是谁修建的?
请同时说明来源。
"""

answer = execute_chain_with_memory(rag_chain, question)
print("\n=== 第一轮回答 ===")
print(answer.content)

follow_up = """
这些水利设施有什么作用?
请同时说明来源。
"""

answer = execute_chain_with_memory(rag_chain, follow_up)
print("\n=== 第二轮回答 ===")
print(answer.content)

print("\n=== 当前历史消息 ===")
for message in chat_history_memory.messages:
print(message)


if __name__ == "__main__":
main()

现在执行过程就从单轮问答变成了:

1
History + Question → Retriever → Prompt → LLM → Answer

这里没有换模型,也没有增加新资料,只是补上了状态管理。但这个变化会直接影响聊天体验。

输出结果类似:

1
2
3
4
5
6
7
8
9
10
11
12
=== 第一轮回答 ===
良渚古城拥有**三重城址结构**,包括宫殿区、内城和外城。外围水利系统由**良渚先民**修建。以上信息来源于《良渚古城概述》与《良渚古城百科》资料。

=== 第二轮回答 ===
这些水利设施的主要功能包括**防洪、蓄水、调水**,并为**水稻农业和城市运转**提供支持。以上信息来源于《良渚古城外围水利工程》资料。

=== 当前历史消息 ===
content='\n 良渚古城有几重城址结构?外围水利系统是谁修建的?\n 请同时说明来源。\n ' additional_kwargs={} response_metadata={}
content='良渚古城拥有**三重城址结构**,包括宫殿区、内城和外城。外围水利系统由**良渚先民**修建。以上信息来源于《良渚古城概述》与《良渚古城百科》资料。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 186, 'prompt_tokens': 1714, 'total_tokens': 1900, 'completion_tokens_details': {'accepted_prediction_tokens': None, 'audio_tokens': None, 'reasoning_tokens': 134, 'rejected_prediction_tokens': None}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'deepseek-v4-pro', 'system_fingerprint': None, 'id': 'chatcmpl-1cb009e8-b37f-9de0-9d4f-40e4b0d93982', 'finish_reason': 'stop', 'logprobs': None} id='lc_run--019e6c9f-5cad-7c31-9eba-0bfcafc5ca7a-0' tool_calls=[] invalid_tool_calls=[] usage_metadata={'input_tokens': 1714, 'output_tokens': 186, 'total_tokens': 1900, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 134}}
content='\n 这些水利设施有什么作用?\n 请同时说明来源。\n ' additional_kwargs={} response_metadata={}
content='这些水利设施的主要功能包括**防洪、蓄水、调水**,并为**水稻农业和城市运转**提供支持。以上信息来源于《良渚古城外围水利工程》资料。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 344, 'prompt_tokens': 1877, 'total_tokens': 2221, 'completion_tokens_details': {'accepted_prediction_tokens': None, 'audio_tokens': None, 'reasoning_tokens': 302, 'rejected_prediction_tokens': None}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'deepseek-v4-pro', 'system_fingerprint': None, 'id': 'chatcmpl-e174f309-27bb-9beb-84cb-8e2bf3638f7e', 'finish_reason': 'stop', 'logprobs': None} id='lc_run--019e6c9f-7869-7d22-a602-8830ae00cfe9-0' tool_calls=[] invalid_tool_calls=[] usage_metadata={'input_tokens': 1877, 'output_tokens': 344, 'total_tokens': 2221, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 302}}

5. 引入 RAG 调试与链路观测

到这里,已经算是一个完整的问答系统了,系统已经具备:

1
多来源知识 → Retriever → Memory → LLM → Answer

正常情况下已经能够:

  • 基于知识回答问题
  • 进行连续对话
  • 保留上下文状态

但真正进入开发阶段之后,系统开始变复杂。复杂系统最难的是出问题之后找不到原因。例如下面这些情况。

1
2
3
4
回答错了
没有引用正确来源
回答越来越慢
第二轮突然失忆

以前开发普通程序时,可以依赖,日志、断点、调用栈,但大模型不一样。整个过程通常是:

1
Question → Retriever → Prompt → LLM → Answer

中间经过大量动态数据。如果没有好的调试方式,很难知道问题发生在哪一层。这一章需要解决工程化问题。让 RAG 从能运行,升级到可观察、可调试。

5.1 RAG 问题的多层来源

普通函数大多数时候是确定性的。输入一致。输出通常一致。例如,add(1,2),永远得到3。但 RAG 不一样。同一个问题。可能受到很多因素影响:

1
2
3
4
检索结果变化
Prompt 变化
历史消息变化
模型采样变化

例如:

1
2
3
4
5
6
7
8
第一次问:
公司的远程办公政策是什么

系统返回:
允许每周两天

如果第二次问,系统就可能返回:
允许每周三天

这时候问题可能来自Retriever,也可能来自Prompt,甚至来自历史消息,如果只看最终答案,很难定位。因此需要能够把整条执行链展开。

5.2 使用 LangSmith 记录执行过程

这里引入了 LangSmith。它不是模型。也不是框架。它更像一套运行监控系统。作用可以理解成:

1
GitHub → 管理代码;LangSmith → 管理 LLM 执行过程

启用后。每次运行链都会自动记录:

1
输入 → 中间步骤 → 最终输出

形成完整 Trace。配置也比较直接。先安装依赖。然后配置环境变量:

1
2
3
4
export LANGSMITH_TRACING=true
export LANGSMITH_ENDPOINT=https://api.smith.langchain.com
export LANGSMITH_PROJECT=liangzhu-rag-debug
export LANGSMITH_API_KEY="你的 LangSmith API Key"

对应的可运行脚本是 05_langsmith_tracing.py。它复用第四章的带记忆 RAG 链,只额外做两件事:开启 LangSmith 环境变量,并给链增加 run_nametagsmetadata,方便在 LangSmith 控制台里过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import os

from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

from common import get_llm, get_vector_db, require_api_key

# LangSmith 通过环境变量开启;项目名会出现在 LangSmith 控制台。
os.environ.setdefault("LANGSMITH_TRACING", "true")
os.environ.setdefault("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com")
os.environ.setdefault("LANGSMITH_PROJECT", "liangzhu-rag-debug")

chat_history_memory = ChatMessageHistory()

def require_langsmith_key():
if os.getenv("LANGSMITH_TRACING") == "true" and not os.getenv("LANGSMITH_API_KEY"):
raise RuntimeError(
"已开启 LANGSMITH_TRACING=true,请先设置 LANGSMITH_API_KEY。"
)

链本身和第四章基本一致,只是在返回前加了一层 with_config()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def build_traced_rag_chain(vector_db):
chatbot = get_llm()
retriever = vector_db.as_retriever()
question_feeder = RunnablePassthrough()

rag_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"你是一个熟悉中国考古和良渚古城的问答助手。"
"请只依据提供的上下文回答问题。"
"如果上下文中没有答案,请直接说无法根据提供资料确定。"
"回答最多三句话,并尽量说明来源。",
),
("placeholder", "{chat_history_messages}"),
("assistant", "{retrieved_context}"),
("human", "{question}"),
]
)

def get_messages(_):
# LangSmith trace 中可以看到这一步读取了哪些历史消息。
return chat_history_memory.messages

chain = {
"retrieved_context": retriever,
"question": question_feeder,
"chat_history_messages": RunnableLambda(get_messages),
} | rag_prompt | chatbot

# run_name / tags / metadata 会出现在 LangSmith Trace 详情里,便于过滤和排查。
return chain.with_config(
{
"run_name": "liangzhu_rag_with_memory",
"tags": ["rag", "liangzhu", "debug"],
"metadata": {
"collection": "liangzhu",
"example": "chapter_05_langsmith_tracing",
},
}
)

最后执行两轮问题。运行完成后,到 LangSmith 控制台里就能看到这次调用的完整 Trace。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def execute_chain_with_memory(chain, question):
chat_history_memory.add_user_message(question)
answer = chain.invoke(question)
chat_history_memory.add_ai_message(answer)
return answer


def main():
require_api_key()
require_langsmith_key()

vector_db = get_vector_db()
rag_chain = build_traced_rag_chain(vector_db)

question = """
良渚古城有几重城址结构?外围水利系统是谁修建的?
请同时说明来源。
"""
answer = execute_chain_with_memory(rag_chain, question)
print("\n=== 第一轮回答 ===")
print(answer.content)

follow_up = """
这些水利设施有什么作用?
请同时说明来源。
"""

answer = execute_chain_with_memory(rag_chain, follow_up)
print("\n=== 第二轮回答 ===")
print(answer.content)

print("\nLangSmith project:", os.getenv("LANGSMITH_PROJECT"))
print("运行完成后,到 LangSmith 控制台查看本次 Trace。")


if __name__ == "__main__":
main()

5.3 分析一次完整 Trace

运行一次问答之后。进入 LangSmith。可以看到一条完整执行记录。整体结构类似:

1
Question → RunnableSequence → Retriever → Prompt → ChatModel → Answer

这里真正有价值的是中间过程。先看 Retriever,它通常会展示 Question → Document[],继续展开可以看到每个文档的 page_contentmetadatasource。这一层用来判断检索是否正确:如果这里拿错文档,后面的回答基本都会错。

继续看 Prompt,可以看到真正送给模型的 systemhistorycontextquestion。这一层主要检查三件事:历史是否丢失、上下文是否为空、Prompt 是否过长。

再进入模型调用,可以重点看 token_usagemodelfinish_reason。其中 prompt_tokens 表示输入消耗,completion_tokens 表示输出消耗。如果 prompt_tokens 很高,例如 Prompt = 12000 tokens,通常意味着上下文过大或检索返回过多;如果 finish_reason = length,则说明回答被截断。

5.4 基于 Trace 定位问题

观察链之后,大多数问题可以按层归类:

现象 优先检查
文档不对 Retriever
回答跑偏 Prompt
忽略上下文 Memory
成本过高 Token

整个排查过程可以理解为沿着 Question → Retriever → Prompt → Memory → LLM 逐层定位。这一章关注如何知道系统为什么这样回答。这是从 Demo 向应用演进时最容易被忽略的一层能力。下一章开始进一步收敛代码,不再继续手写完整链,而是利用 LangChain 提供的高层封装能力。

6. 使用 LCEL 收敛检索问答链

前面几章已经分别实现了检索、Prompt、模型调用和记忆。继续往后写时,容易混乱的是检索结果如何进入 Prompt。因此本章对应的代码 06_retrieval_chain.py 先把这件事单独收敛:把 Retriever 返回的 Document 列表格式化成上下文字符串,再通过 LCEL 组合成一条完整链。

1
Question → Retriever → format_docs → Prompt → LLM → Answer

6.1 格式化 Retriever 返回的文档

Retriever 返回的是 Document 列表,而 Prompt 需要的是一段可以填入 {context} 的字符串。所以先定义一个 format_docs()

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

from common import get_llm, get_vector_db, require_api_key


def format_docs(docs):
"""
把 Retriever 返回的 Document 列表
转成 Prompt 中可用的上下文字符串。
"""
return "\n\n".join(doc.page_content for doc in docs)

这个函数只做一件事,把多个文档片段拼成上下文。后面如果要加入来源、页码或更复杂的格式,也可以从这里扩展。

6.2 构建检索问答链

接下来构建链路。这里的关键是,context 这一路先经过 retriever,再经过 format_docsquestion 这一路用 RunnablePassthrough() 保留原始问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def build_retrieval_chain(vector_db):
chatbot = get_llm()
retriever = vector_db.as_retriever()

qa_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""
请根据以下上下文回答问题。

要求:
1. 只能使用上下文中的信息
2. 如果上下文中没有答案,请直接说“无法根据提供资料确定”
3. 回答尽量简洁
4. 如有可能,请说明依据来自哪些资料

上下文:
{context}
""",
),
(
"human",
"{question}",
),
]
)

rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| qa_prompt
| chatbot
)
return rag_chain

这段代码比直接把 retriever 塞进 Prompt 多了一步 format_docs,但逻辑更清楚:Retriever 负责找资料,format_docs 负责整理上下文,Prompt 负责组织输入,LLM 负责生成答案。

6.3 运行检索问答链

最后在 main() 中加载向量库,构建链路并执行问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def main():
require_api_key()
vector_db = get_vector_db()
rag_chain = build_retrieval_chain(
vector_db
)

question = "良渚古城有几重城址结构?外围水利系统是谁修建的?"

answer = rag_chain.invoke(question)

print("\n=== answer ===")
print(answer.content)


if __name__ == "__main__":
main()

输出结果类似:

1
2
3
=== answer ===
良渚古城有三重城址结构。外围水利系统由良渚先民修建。
依据:上下文指出“良渚古城通常被描述为三重城址结构”和“外围水利系统由良渚先民修建”。

这个版本没有引入新的能力,但把链路表达得更集中。到这里,RAG 已经不只是一个 Prompt,而是一套有数据入口、检索、生成、记忆和调试能力的应用骨架。

7. 总结:从 RAG 原型到应用结构

这一章把已有的向量检索能力逐步整理成一个完整的问答系统。回头看整个过程,变化不在于模型本身变得更强,而在于模型周围多了几层稳定结构。

7.1 从向量存储到知识服务

上一篇完成的是 Document → Embedding → ChromaDB → Similarity Search,系统已经能够回答“有没有相关内容”。这一篇之后,结构变成 Question → Retriever → Prompt → LLM → Answer,系统开始回答“如何使用这些内容”。

这里最大的变化不是增加模型,而是增加组织能力。模型不再直接面对原始知识,而是通过 检索 → 上下文 → 生成 完成回答。于是系统从一个数据工具,开始演化成一个应用系统。

7.2 中间层决定系统能力

很多人在第一次接触 RAG 时,会把重点放在模型。本篇中涉及到模型的变化反而很少,真正新增的是模型周围的中间层。

层次 作用
Loader 统一 PDF、Word、文本等不同知识来源
Retriever 接管检索,让业务层不直接依赖 Chroma 查询细节
Prompt 组织上下文、问题和回答约束
Memory 保存多轮对话状态
Tracing 记录链路执行过程,方便调试

这些层共同形成 Knowledge → Retrieval → Conversation → Generation → Observation。真正决定体验的,通常不是模型名称,而是这些层之间如何协作。

7.3 从单次调用到持续交互

能力变化可以压缩成三步:第一版是 Question → Answer,只能完成单次问答;加入历史后变成 History + Question → Answer,开始支持连续交互;加入 Trace 后,系统进一步具备调试能力,可以从 History + Question → Chain → Trace 中观察每一步发生了什么。

再加上后面的链路封装,系统就不只是“能回答”,而是开始具备可维护的应用结构。

7.4 工程化能力的补齐

如果只看最终代码,这一章增加的东西其实很少:as_retriever()ChatMessageHistory()RunnableLambda()with_config() 等都只是很小的接口。但组合起来之后,系统已经有了几个关键能力,数据入口统一、检索接口统一、多轮对话可维护、执行过程可追踪、链路结构可继续扩展。

这意味着项目已经可以长期可维护了。

7.5 从 RAG 走向 Agent

到这里,系统已经能够完成 理解问题 → 检索知识 → 生成回答 → 保存上下文。但它仍然是被动系统,所有行为都依赖用户输入。下一阶段,系统会继续向 任务拆解 → 工具调用 → 状态流转 → 自主执行 演进,也就是从 RAG 逐步进入 Agent。

8.备注

完整代码:https://github.com/keychankc/AI_agent_code/tree/main/lang-chain-rag