# LCEL简介
全称LangChain Expression Language。用处是使用“|”运算符链接langchain的各个组件
是一种声明式的方法来链接langchian组件。LCEL从第一天起就被设计为支持将原型投入生产,无需代码更改。
# LCEL语法
- Runnable组件
- 组件调用、批量、流式运行
- 组合成chain
- 并行调用运行
- 合并输入和输出字典
- 后备选项
- 重复多次执行Runnable组件
- 条件构建chain
- map高阶处理
- 打印chain图形
- 生命周期管理
# 组件
标准:。可以是函数、工具。。。但必须是Runnable接口。
chain本身就是组件 可以把一个小 Chain 当作一个积木,去搭建一个更大的 Chain。
# 一切皆 Runnable
在 LCEL 中,不要把对象看作“类”,要把它们看作“可以被调用的函数”。
# 1. Runnable 协议(The Protocol)RunnableLambda
LangChain 中的核心组件(Model, Prompt, Retriever, Tool)都实现了 Runnable 接口。这意味着它们都可以直接被“执行”。
- 标准接口:
invoke(input): 单次调用。stream(input): 流式返回(返回迭代器)。batch([inputs]): 批量并行处理。ainvoke,astream,abatch: 对应的异步方法
使用 RunnableLambda 封装成runnable接口
导入方式:
from langchain_core.runnables import RunnableLambda
定义一个函数作为组件-三种调用方式
# 定义一个函数做组件
from langchain_core.runnables import RunnableLambda, RunnableParallel
# def test1(x:int):
# return x + 1
# # 封装成runnable组件
# r1 = RunnableLambda(test1)
# # 1、调用组件 看函数情况传参
# res = r1.invoke(3)
# print(res) # 4
# # 2、批量调用 调用多次
# res2 = r1.batch([4, 5])
# print(res2) # [5, 6]
# # 3、流式调用
# def test2(prompt: str):
# for item in prompt.split(' '):
# yield item
# r2 = RunnableLambda(test2)
# # res3 = r2.stream("tihe is a cat")
# for chunk in res3:
# print(chunk)
# 2. 管道符 | 的本质
a | b 的意思是:把 a 的输出,作为 b 的输入。
- 这就要求:
a的输出数据结构,必须完全符合b的输入数据结构要求(通常是str或dict)。
from langchain_core.runnables import RunnableLambda
# 4、组件组合成链 两个以上的组件
def test3(x: int):
return x + 2
def test4(x:int):
return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
chain = r3|r4 # r3运行的结果作为参数传到r4
print(chain.invoke(3)) # 25
# 3.链的并行运算 RunnableParallel
调用时可以加一些配置
max_concurrency:最大并发数
chain.invoke(args, **kwargs, config={'max_concurrency':2}) # 两个并发
from langchain_core.runnables import RunnableLambda, RunnableParallel
# 4、组件组合成链 两个以上的组件
def test3(x: int):
return x + 2
def test4(x:int):
return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
# 5、链并行运行 输出是一个字典 r3=r3, r4=r4里的第一个r3\r4是输出的字典里结果的key. 两个组件同时运行
chain2 = RunnableParallel(r3=r3, r4=r4)
print(chain2.invoke(3, config={'max_concurrency':2})) # {'r3': 5, 'r4': 9}
# 4.查看链的图像描述
pip install grandalf
print(chain.get_graph().print_ascii())
# 5.透传 - 合并输入并处理中间数据 RunnablePassthrough
RunnablePassthrough: 允许传递输入数据,可以保持不变或添加额外的键。还可以过滤数据。
使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据
使用 Runnable对象.pick 可以过滤数据,支持传入字符串或字符串列表
RunnablePassthrough是恒等函数f(x) = x。 RunnablePassthrough.assign是字典的update方法,所以x必须是dict
导入方式
from langchain_core.runnables import RunnablePassthrough
保持不变
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r1 = RunnableLambda(lambda x:x)
r2 = RunnableLambda(lambda x:x + 10)
chain = r1 | RunnablePassthrough() | r2
print(chain.invoke(2)) # 12
添加数据 RunnablePassthrough.assign必须接收字典
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r3 = RunnableLambda(lambda x:{'key1': x})
r4 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r3 | RunnablePassthrough.assign(new_key=r4) # new_key是输出字典的key
print(chain.invoke(2))
'''
{'key1': 2, 'new_key': 12}
'''
扩展:并行+透传
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key= r2))
print(chain.invoke(2))
第一步:r1 (数据初始化)
- 输入:
2(整数) - 逻辑:
lambda x: {'key1': x} - 输出:
{'key1': 2}(字典)- 注意:这一步至关重要,因为后续的
.assign()要求输入必须是字典。
- 注意:这一步至关重要,因为后续的
第二步:RunnableParallel (并行分流)
上一步的输出 {'key1': 2} 同时进入 b1 和 b2 两个分支。
- 分支
b1:RunnablePassthrough()- 含义: “什么都不做,给我什么,我就吐出什么”。
- 输入:
{'key1': 2} - 输出:
{'key1': 2}(原样保留)
- 分支
b2:RunnablePassthrough.assign(new_key=r2)- 含义: “拿着原始数据,计算出
new_key,然后把它贴到原始数据上”。 - 内部计算过程:
- 系统把输入
{'key1': 2}传给r2。 r2执行逻辑x['key1'] + 10->2 + 10=12。assign将new_key=12合并回原始字典。
- 系统把输入
- 输出:
{'key1': 2, 'new_key': 12}
- 含义: “拿着原始数据,计算出
第三步:最终合并
RunnableParallel 将两个分支的结果合并到一个大字典中,Key 就是定义的 b1 和 b2。
过滤
r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
# 方式1
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key=r2))
print(chain.invoke(2)) # {'b1': {'key1': 2}, 'b2': {'key1': 2, 'new_key': 12}}
chain2 = chain.pick('b2').pick('new_key') # 过滤,按键取值
print(chain2.invoke(2)) # 12
# 方式2
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key=r2)) | RunnablePassthrough().pick('b2')
print(chain.invoke(2)) # {'key1': 2, 'new_key': 12}
场景
from langchain_core.runnables import RunnablePassthrough
# 场景:Prompt 需要 "question",但我们想直接传字符串给 chain.invoke("什么是LCEL")
# 使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据
chain = (
{"question": RunnablePassthrough()}
| prompt
| model
)
chain.invoke('什么是LCEL')
# 6、后备选项 组件.with_fallbacks
在紧急情况下(报错时)使用的一种替代方案,确保整个链能正常执行完。
from langchain_core.runnables import RunnableLambda
def test1(x:int):
return x + 10
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x:int(x) + 10)
# 在加法运算中的后备选项
chain = r1.with_fallbacks([r2])
print(chain.invoke(2)) # 正常 r1不会报错
print(chain.invoke('2')) # 传入字符串 r1会报错,使用后备选项r2
# 7、根据条件动态构建链
# 根据r1的输出结果判断是否执行r2 判断本身也是一个节点
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
def test1(x:int):
return x + 10
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x :[x] * 2)
chain = r1 | RunnableLambda(lambda x: r2 if x > 15 else RunnablePassthrough()) # r1的输出>15就调用r2 <15就直接输出
print(chain.invoke(6)) # [16, 16]
print(chain.invoke(3)) # 13
# 8、生命周期监听器 组件.with_listeners
轻量级、针对特定节点 的回调注入方式。
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers import Run
import time
def test4(n :int):
time.sleep(2)
return n ** 2
r1 = RunnableLambda(test4)
def on_start(run_obj : Run):
'''r1节点启动时自动执行'''
print('r1启动时间:', run_obj.start_time)
def on_end(run_obj: Run):
'''r1节点执行结束的时间'''
print('r1节点执行结束的时间',run_obj.end_time)
chain = r1.with_listeners(on_start= on_start, on_end=on_end)
print(chain.invoke(2))
'''
r1启动时间: 2026-01-11 07:53:45.300157+00:00
r1节点执行结束的时间 2026-01-11 07:53:47.302160+00:00
4
'''
- 核心作用:监控“这一步”的运行状况
通常我们用 callbacks=[MyHandler()] 是为了监控整个链条。而 .with_listeners() 是为了监控某一个特定的节点(在这里就是 r1)。
on_start: 在test4函数真正执行之前触发。on_end: 在test4函数执行完毕并返回结果之后触发。on_error(代码中未展示,但也支持): 如果test4报错了,会触发这个。
- 关键参数:
Run对象
on_start and on_end 接收了一个参数 run_obj: Run。这个对象非常重要,它是 LangSmith 追踪系统 的核心数据结构,包含了这次调用的所有元数据:
run_obj.id: 这次调用的唯一 UUID。run_obj.inputs: 输入给test4的参数(例如2)。run_obj.outputs:test4算出来的结果(在on_end里才有,例如4)。run_obj.start_time/run_obj.end_time: 也就是代码里打印的时间戳。run_obj.extra: 额外的配置信息。
综合使用案例
import time
import os
from operator import itemgetter
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.tracers import Run
from dotenv import load_dotenv, find_dotenv
# 1. 环境配置
load_dotenv()
llm = ChatOpenAI(
base_url=os.getenv('OPENAI_BASE_URL'),
api_key=os.getenv('OPENAI_API_KEY'),
model=os.getenv('OPENAI_MODEL'),
temperature=0.5 # 值越高创造力高
)
# 2. 模拟工具函数 (保持不变)
def mock_db_query(sql: str):
print(f"\n[DEBUG] 正在执行 SQL: {sql}")
return "销售额: 100万"
def mock_internet_search(query: str):
print(f"\n[DEBUG] 正在搜索互联网: {query}")
time.sleep(1)
return f"【外网结果】关于 {query} 的新闻..."
def mock_internal_wiki(query: str):
print(f"\n[DEBUG] 正在搜索内网: {query}")
return f"【内网文档】公司关于 {query} 的政策..."
# 监听器
def log_sql_start(run_obj: Run):
print(f"\n[生命周期] SQL链开始执行,ID: {run_obj.id}")
# ==========================================
# 3. 定义组件 (LCEL 核心部分)
# ==========================================
# --- A. 分类器链 ---
# 这里的 prompt 只需要一个字符串模板
classifier_prompt = ChatPromptTemplate.from_template(
"你是一个意图分类助手。分析用户问题:{question}。\n"
"只输出以下单词之一:SQL, RESEARCH, CHAT。"
)
# 这一步输入是 {"question": "..."} -> 输出是 "SQL" (Str)
classifier_chain = classifier_prompt | llm | StrOutputParser()
# --- B. SQL 处理链 ---
sql_prompt = ChatPromptTemplate.from_template(
"你是一个数据分析师。将用户问题转化为SQL语句。只输出SQL。\n问题:{question}"
)
# 注意:最后一步接上了 mock_db_query,实现真正的查询
# 并且在这里挂载了 .with_listeners
sql_chain = (
sql_prompt
| llm
| StrOutputParser()
| RunnableLambda(mock_db_query)
).with_listeners(on_start=log_sql_start)
# --- C. 调研处理链 (包含并行) ---
# 定义并行层:同时去外网和内网
search_layer = RunnableParallel(
internet=itemgetter("question") | RunnableLambda(mock_internet_search),
internal=itemgetter("question") | RunnableLambda(mock_internal_wiki)
)
summary_prompt = ChatPromptTemplate.from_template(
"基于以下信息回答用户问题:\n1. {internet}\n2. {internal}"
)
# 组合:并行搜索 -> 结果汇总 -> LLM生成
research_chain = (
search_layer
| summary_prompt
| llm
| StrOutputParser()
)
# --- D. 闲聊链 ---
chat_chain = (
ChatPromptTemplate.from_template("像朋友一样回答:{question}")
| llm
| StrOutputParser()
)
# ==========================================
# 4. 路由逻辑 (重点!)
# ==========================================
def route_logic(info):
"""
接收上一步的输出字典,包含 {"intent": "...", "question": "..."}
返回一个 Runnable 对象(子链)
"""
# 1. 提取意图,并清洗(去掉空格,转大写)
intent = info["intent"].strip().upper()
print(f"--- [路由判定] 用户意图: {intent} ---")
# 2. 根据意图返回对应的链对象
if "SQL" in intent:
return sql_chain
elif "RESEARCH" in intent:
return research_chain
else:
return chat_chain
# ==========================================
# 5. 组装总链 (Full Chain)
# ==========================================
full_chain = (
# 第一步:计算意图,同时保留原始问题
# 此时数据变成:{"question": "用户输入", "intent": "SQL"}
RunnablePassthrough.assign(intent=classifier_chain)
# 第二步:路由选择
# route_logic 返回哪个链,LangChain 就自动执行哪个链
| RunnableLambda(route_logic)
)
# ==========================================
# 6. 运行测试
# ==========================================
if __name__ == "__main__":
# 模拟用户输入(不要在 Chain 内部写 input())
questions = [
"上个月的销售额是多少?", # 应该走 SQL
"帮我查一下竞争对手A的最新动态", # 应该走 RESEARCH (并行)
"你好,今天心情怎么样?" # 应该走 CHAT
]
for q in questions:
print(f"\n\n>>> 用户提问: {q}")
# invoke 只需要传最原始的字典
result = full_chain.invoke({"question": q})
print(f"最终结果: {result}")