# LCEL简介

全称LangChain Expression Language。用处是使用“|”运算符链接langchain的各个组件

是一种声明式的方法来链接langchian组件。LCEL从第一天起就被设计为支持将原型投入生产,无需代码更改。

# LCEL语法

  • Runnable组件
  • 组件调用、批量、流式运行
  • 组合成chain
  • 并行调用运行
  • 合并输入和输出字典
  • 后备选项
  • 重复多次执行Runnable组件
  • 条件构建chain
  • map高阶处理
  • 打印chain图形
  • 生命周期管理

# 组件

标准:。可以是函数、工具。。。但必须是Runnable接口。

chain本身就是组件 可以把一个小 Chain 当作一个积木,去搭建一个更大的 Chain。

# 一切皆 Runnable

在 LCEL 中,不要把对象看作“类”,要把它们看作“可以被调用的函数”。

# 1. Runnable 协议(The Protocol)RunnableLambda

LangChain 中的核心组件(Model, Prompt, Retriever, Tool)都实现了 Runnable 接口。这意味着它们都可以直接被“执行”。

  • 标准接口
    • invoke(input): 单次调用。
    • stream(input): 流式返回(返回迭代器)。
    • batch([inputs]): 批量并行处理。
    • ainvoke, astream, abatch: 对应的异步方法

使用 RunnableLambda 封装成runnable接口

导入方式:

from langchain_core.runnables import RunnableLambda

定义一个函数作为组件-三种调用方式

# 定义一个函数做组件
from langchain_core.runnables import RunnableLambda, RunnableParallel

# def test1(x:int):
#     return x + 1

# # 封装成runnable组件
# r1 = RunnableLambda(test1)

# # 1、调用组件 看函数情况传参
# res = r1.invoke(3)
# print(res)  # 4

# # 2、批量调用 调用多次
# res2 = r1.batch([4, 5])
# print(res2)  # [5, 6]

# # 3、流式调用
# def test2(prompt: str):
#     for item in prompt.split(' '):
#         yield item
# r2 = RunnableLambda(test2)
# # res3 = r2.stream("tihe is a cat")
# for chunk in res3:
#     print(chunk)

# 2. 管道符 | 的本质

a | b 的意思是:a 的输出,作为 b 的输入

  • 这就要求:a 的输出数据结构,必须完全符合 b 的输入数据结构要求(通常是 strdict)。
from langchain_core.runnables import RunnableLambda
# 4、组件组合成链 两个以上的组件
def test3(x: int):
    return x + 2
def test4(x:int):
    return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
chain = r3|r4  # r3运行的结果作为参数传到r4
print(chain.invoke(3))  # 25

# 3.链的并行运算 RunnableParallel

调用时可以加一些配置

max_concurrency:最大并发数

chain.invoke(args, **kwargs, config={'max_concurrency':2})  # 两个并发
from langchain_core.runnables import RunnableLambda, RunnableParallel
# 4、组件组合成链 两个以上的组件
def test3(x: int):
    return x + 2
def test4(x:int):
    return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
# 5、链并行运行  输出是一个字典 r3=r3, r4=r4里的第一个r3\r4是输出的字典里结果的key. 两个组件同时运行
chain2 = RunnableParallel(r3=r3, r4=r4)
print(chain2.invoke(3, config={'max_concurrency':2}))  # {'r3': 5, 'r4': 9}

# 4.查看链的图像描述

pip install grandalf
print(chain.get_graph().print_ascii())

# 5.透传 - 合并输入并处理中间数据 RunnablePassthrough

RunnablePassthrough: 允许传递输入数据,可以保持不变或添加额外的键。还可以过滤数据。

使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据

使用 Runnable对象.pick 可以过滤数据,支持传入字符串或字符串列表

RunnablePassthrough是恒等函数f(x) = xRunnablePassthrough.assign是字典的update方法,所以x必须是dict

导入方式

from langchain_core.runnables import RunnablePassthrough

保持不变

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r1 = RunnableLambda(lambda x:x)
r2 = RunnableLambda(lambda x:x + 10)
chain = r1 | RunnablePassthrough() | r2
print(chain.invoke(2))  # 12

添加数据 RunnablePassthrough.assign必须接收字典

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r3 = RunnableLambda(lambda x:{'key1': x})
r4 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r3 | RunnablePassthrough.assign(new_key=r4)  # new_key是输出字典的key
print(chain.invoke(2))
'''
{'key1': 2, 'new_key': 12}
'''

扩展:并行+透传

from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key= r2))
print(chain.invoke(2))

第一步:r1 (数据初始化)

  • 输入: 2 (整数)
  • 逻辑: lambda x: {'key1': x}
  • 输出: {'key1': 2} (字典)
    • 注意:这一步至关重要,因为后续的 .assign() 要求输入必须是字典。

第二步:RunnableParallel (并行分流)

上一步的输出 {'key1': 2} 同时进入 b1b2 两个分支。

  • 分支 b1: RunnablePassthrough()
    • 含义: “什么都不做,给我什么,我就吐出什么”。
    • 输入: {'key1': 2}
    • 输出: {'key1': 2} (原样保留)
  • 分支 b2: RunnablePassthrough.assign(new_key=r2)
    • 含义: “拿着原始数据,计算出 new_key,然后把它到原始数据上”。
    • 内部计算过程:
      1. 系统把输入 {'key1': 2} 传给 r2
      2. r2 执行逻辑 x['key1'] + 10 -> 2 + 10 = 12
      3. assignnew_key=12 合并回原始字典。
    • 输出: {'key1': 2, 'new_key': 12}

第三步:最终合并

RunnableParallel 将两个分支的结果合并到一个大字典中,Key 就是定义的 b1b2

过滤

r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
# 方式1
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key=r2))
print(chain.invoke(2))  # {'b1': {'key1': 2}, 'b2': {'key1': 2, 'new_key': 12}}
chain2 = chain.pick('b2').pick('new_key')  # 过滤,按键取值
print(chain2.invoke(2))  # 12
# 方式2
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key=r2)) | RunnablePassthrough().pick('b2')
print(chain.invoke(2))  # {'key1': 2, 'new_key': 12}

场景

from langchain_core.runnables import RunnablePassthrough

# 场景:Prompt 需要 "question",但我们想直接传字符串给 chain.invoke("什么是LCEL")
# 使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据
chain = (
    {"question": RunnablePassthrough()} 
    | prompt 
    | model
)
chain.invoke('什么是LCEL')

# 6、后备选项 组件.with_fallbacks

在紧急情况下(报错时)使用的一种替代方案,确保整个链能正常执行完。

from langchain_core.runnables import RunnableLambda
def test1(x:int):
    return x + 10

r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x:int(x) + 10)
# 在加法运算中的后备选项
chain = r1.with_fallbacks([r2])
print(chain.invoke(2))  # 正常 r1不会报错
print(chain.invoke('2'))  # 传入字符串 r1会报错,使用后备选项r2

# 7、根据条件动态构建链

# 根据r1的输出结果判断是否执行r2  判断本身也是一个节点
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
def test1(x:int):
    return x + 10

r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x :[x] * 2)
chain = r1 | RunnableLambda(lambda x: r2 if x > 15 else RunnablePassthrough())  # r1的输出>15就调用r2  <15就直接输出
print(chain.invoke(6))  # [16, 16]
print(chain.invoke(3))  # 13

# 8、生命周期监听器 组件.with_listeners

轻量级、针对特定节点 的回调注入方式。

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers import Run
import time
def test4(n :int):
    time.sleep(2)
    return n ** 2

r1 = RunnableLambda(test4)
def on_start(run_obj : Run):
    '''r1节点启动时自动执行'''
    print('r1启动时间:', run_obj.start_time)

def on_end(run_obj: Run):
    '''r1节点执行结束的时间'''
    print('r1节点执行结束的时间',run_obj.end_time)
chain = r1.with_listeners(on_start= on_start, on_end=on_end)
print(chain.invoke(2))
'''
r1启动时间: 2026-01-11 07:53:45.300157+00:00
r1节点执行结束的时间 2026-01-11 07:53:47.302160+00:00
4
'''
  1. 核心作用:监控“这一步”的运行状况

通常我们用 callbacks=[MyHandler()] 是为了监控整个链条。而 .with_listeners() 是为了监控某一个特定的节点(在这里就是 r1)。

  • on_start: 在 test4 函数真正执行之前触发。
  • on_end: 在 test4 函数执行完毕并返回结果之后触发。
  • on_error (代码中未展示,但也支持): 如果 test4 报错了,会触发这个。
  1. 关键参数:Run 对象

on_start and on_end 接收了一个参数 run_obj: Run。这个对象非常重要,它是 LangSmith 追踪系统 的核心数据结构,包含了这次调用的所有元数据:

  • run_obj.id: 这次调用的唯一 UUID。
  • run_obj.inputs: 输入给 test4 的参数(例如 2)。
  • run_obj.outputs: test4 算出来的结果(在 on_end 里才有,例如 4)。
  • run_obj.start_time / run_obj.end_time: 也就是代码里打印的时间戳。
  • run_obj.extra: 额外的配置信息。

综合使用案例

import time
import os
from operator import itemgetter
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.tracers import Run
from dotenv import load_dotenv, find_dotenv

# 1. 环境配置
load_dotenv()
llm = ChatOpenAI(
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY'),
    model=os.getenv('OPENAI_MODEL'),
    temperature=0.5  # 值越高创造力高
)

# 2. 模拟工具函数 (保持不变)
def mock_db_query(sql: str):
    print(f"\n[DEBUG] 正在执行 SQL: {sql}")
    return "销售额: 100万"

def mock_internet_search(query: str):
    print(f"\n[DEBUG] 正在搜索互联网: {query}")
    time.sleep(1) 
    return f"【外网结果】关于 {query} 的新闻..."

def mock_internal_wiki(query: str):
    print(f"\n[DEBUG] 正在搜索内网: {query}")
    return f"【内网文档】公司关于 {query} 的政策..."

# 监听器
def log_sql_start(run_obj: Run):
    print(f"\n[生命周期] SQL链开始执行,ID: {run_obj.id}")

# ==========================================
# 3. 定义组件 (LCEL 核心部分)
# ==========================================

# --- A. 分类器链 ---
# 这里的 prompt 只需要一个字符串模板
classifier_prompt = ChatPromptTemplate.from_template(
    "你是一个意图分类助手。分析用户问题:{question}\n"
    "只输出以下单词之一:SQL, RESEARCH, CHAT。"
)
# 这一步输入是 {"question": "..."} -> 输出是 "SQL" (Str)
classifier_chain = classifier_prompt | llm | StrOutputParser()


# --- B. SQL 处理链 ---
sql_prompt = ChatPromptTemplate.from_template(
    "你是一个数据分析师。将用户问题转化为SQL语句。只输出SQL。\n问题:{question}"
)
# 注意:最后一步接上了 mock_db_query,实现真正的查询
# 并且在这里挂载了 .with_listeners
sql_chain = (
    sql_prompt 
    | llm 
    | StrOutputParser() 
    | RunnableLambda(mock_db_query)
).with_listeners(on_start=log_sql_start)


# --- C. 调研处理链 (包含并行) ---
# 定义并行层:同时去外网和内网
search_layer = RunnableParallel(
    internet=itemgetter("question") | RunnableLambda(mock_internet_search),
    internal=itemgetter("question") | RunnableLambda(mock_internal_wiki)
)

summary_prompt = ChatPromptTemplate.from_template(
    "基于以下信息回答用户问题:\n1. {internet}\n2. {internal}"
)

# 组合:并行搜索 -> 结果汇总 -> LLM生成
research_chain = (
    search_layer 
    | summary_prompt 
    | llm 
    | StrOutputParser()
)


# --- D. 闲聊链 ---
chat_chain = (
    ChatPromptTemplate.from_template("像朋友一样回答:{question}")
    | llm
    | StrOutputParser()
)

# ==========================================
# 4. 路由逻辑 (重点!)
# ==========================================

def route_logic(info):
    """
    接收上一步的输出字典,包含 {"intent": "...", "question": "..."}
    返回一个 Runnable 对象(子链)
    """
    # 1. 提取意图,并清洗(去掉空格,转大写)
    intent = info["intent"].strip().upper()
    print(f"--- [路由判定] 用户意图: {intent} ---")
    
    # 2. 根据意图返回对应的链对象
    if "SQL" in intent:
        return sql_chain
    elif "RESEARCH" in intent:
        return research_chain
    else:
        return chat_chain

# ==========================================
# 5. 组装总链 (Full Chain)
# ==========================================
full_chain = (
    # 第一步:计算意图,同时保留原始问题
    # 此时数据变成:{"question": "用户输入", "intent": "SQL"}
    RunnablePassthrough.assign(intent=classifier_chain)
    
    # 第二步:路由选择
    # route_logic 返回哪个链,LangChain 就自动执行哪个链
    | RunnableLambda(route_logic)
)

# ==========================================
# 6. 运行测试
# ==========================================
if __name__ == "__main__":
    # 模拟用户输入(不要在 Chain 内部写 input())
    questions = [
        "上个月的销售额是多少?",  # 应该走 SQL
        "帮我查一下竞争对手A的最新动态", # 应该走 RESEARCH (并行)
        "你好,今天心情怎么样?"    # 应该走 CHAT
    ]
    
    for q in questions:
        print(f"\n\n>>> 用户提问: {q}")
        # invoke 只需要传最原始的字典
        result = full_chain.invoke({"question": q})
        print(f"最终结果: {result}")