LangChain 介绍

LangChain(全称:LangChain Framework)
专门为开发基于大型语言模型(LLM)的应用程序而设计的开发框架。
核心理念:将大型语言模型与外部数据源和计算工具连接起来,通过模块化的组件设计,让开发者可以像搭积木一样构建AI应用。

  • Python(LangChain)
  • Java(LangChain4j)

学习前置要求:
AI 基础概念:了解机器学习、自然语言处理基本概念
API 使用经验:有 RESTful API 调用经验更佳

学习流程:
第一阶段(基础概念与环境准备):了解 LangChain 基本概念、搭建开发环境、掌握AI大模型基础知识,为后续学习打下理论基础。
第二阶段(LangChain 核心组件):学习模型接入、提示词工程、输出解析等核心组件,掌握 LangChain 的基本使用方法。
第三阶段(高级功能与实战应用):深入学习链式调用、记忆功能、RAG 系统、智能体开发等高级特性,提升 AI 应用开发能力。
第四阶段(项目实战):通过完整的项目实战,将理论知识转化为实际开发能力,培养独立构建AI应用的技能。

附加资料
LangChain 官方网站:https://langchain.com
LangChain 官方文档:https://python.langchain.com
LangChain4j 官方文档:https://docs.langchain4j.dev
GitHub 仓库:

LangChain 核心概念

核心优势

  1. 模块化设计
    LangChain采用高度模块化的设计,将复杂的AI应用拆分为可重用的组件。
    核心模块包括:
  • LLMs/ChatModels:语言模型和聊天模型的抽象层
  • Prompts:提示模板管理
  • Indexes:文档加载和索引
  • Memory:对话状态管理
  • Chains:将多个组件连接成处理流程
  • Agents:自主决策和工具使用的智能体
  1. 抽象与标准化
    LangChain为不同的LLM提供商、向量数据库和工具创建了统一的抽象层。
    这意味着可以轻松切换底层模型或服务,而无需大幅修改代码。
  2. 内置工具生态系统
    LangChain提供了丰富的工具集成,让语言模型能够与外部世界交互:
  • 网络搜索工具
  • 数据库查询接口
  • API调用封装
  • 代码执行环境
  • 文件操作工具
    这些工具极大扩展了LLM的能力边界,使其不再局限于训练数据中的知识,而能够访问实时信息和执行具体操作。
  1. 灵活的工作流构建
    通过Chains(链)和Agents(智能体)机制,LangChain支持构建复杂的多步骤工作流:
  • 顺序链:按预定顺序执行一系列操作
  • 路由链:根据输入内容动态选择处理路径
  • ReAct智能体:结合推理和行动的自主决策系统
  • 多智能体协作:支持多个专业智能体协同工作
    这种灵活性使得开发者可以构建从简单对话到复杂问题解决的各类应用。
  1. 活跃的社区与生态
    LangChain拥有活跃的开发者社区和不断扩展的生态系统:
  • GitHub上的频繁更新和贡献
  • 详细的文档和教程
  • 丰富的示例和模板
  • 第三方扩展和插件
    这种活跃的生态系统确保了框架的持续改进和问题的快速解决。

核心组件

  1. 模型提供商集成
  • OpenAl (GPT-3.5,GPT-4)
  • Anthropic(Claude)
  • Google (PaLM,Gemini)
  • Meta(lama2)
  • Hugging Face模型
  • 本地部署模型(如Ollama)
  1. 向量数据库与存储
    为了支持检索增强生成(RAG)应用,LangChain集成了各种向量数据库:
  • Pinecone
  • Chroma
  • FAISS
  • Veaviate
  • Milvus
  • Qdrant
  • Redis
  • 传统数据库的向量扩展(如PostgreSQL+pgvector))
  1. 工具与服务集成
    LangChain提供了丰富的工具集成,扩展LLM的能力:
  • 搜索引擎(Google,Bing,DuckDuckGo)
  • API服务(Zapier,IFTTT)
  • 代码执行环境(Python,JavaScript)
  • 文档处理工具
  • 数据分析工具
  1. 部署与托管服务
    LangChain应用可以通过多种方式部署:
  • LangServe:LangChain官方的部署工具
  • LangSmith:用于监控和调试LangChain应用
  • 云服务提供商(AWS,GCP,Azure)
  • 容器化部署(Docker,Kubernetes)
  • 无服务器函数(AWS Lambda,Cloud Functions)

扩展生态系统

围绕LangChain核心框架,形成了丰富的扩展生态:

  1. 社区贡献的组件
  • 自定义代理实现
  • 专业领域模板
  • 工具包和集成
  1. 配套工具
  • LangChain Templates:预构建的应用模板库
  • LangServe:快速部署LangChain应用
  • LangSmith:监控、评估和调试LangChain应用

开发环境搭建

Python 环境搭建

使用pip安装核心库:

1
2
3
pip install langchain
pip install langchain-community
pip install langchain-core

安装模型相关依赖
由于我们将使用通义千问模型,需要安装阿里云的SDK:

1
pip install dashscope

验证安装
创建一个简单的测试脚本来验证LangChain是否安装成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# test_langchain.py
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
import os

# 设置环境变量(请替换为你的实际 API Key)
os.environ["DASHSCOPE_API_KEY"] = "your_api_key_here"

def test_langchain_installation():
"""测试 LangChain 安装是否成功"""
try:
# 初始化通义千问模型
llm = ChatTongyi(model="qwen-plus")

# 创建一个简单的消息
messages = [HumanMessage(content="你好,我是程序员鱼皮,请简单介绍一下 LangChain")]

# 测试调用(这里只是验证代码结构,实际需要有效的 API Key)
print("LangChain 安装成功!")
print("环境配置完成,可以开始开发了!")

except ImportError as e:
print(f"导入错误:{e}")
print("请检查 LangChain 是否正确安装")
except Exception as e:
print(f"其他错误:{e}")
print("请检查网络连接和 API Key 配置")

if __name__ == "__main__":
test_langchain_installation()

编写一个环境检测脚本,自动检查当前系统是否满足LangChain开发的所有要求,包括Python版本、必要的库安装情况以及API Key配置状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import sys
import subprocess
import os
import importlib.util

def check_python_version():
version = sys.version_info
required_major, required_minor = 3, 8

if version.major > required_major or (version.major == required_major and version.minor >= required_minor):
print(f"✅ Python 版本符合要求: {version.major}.{version.minor}.{version.micro}")
return True
else:
print(f"❌ Python 版本不符合要求: {version.major}.{version.minor}.{version.micro} (需要 >= 3.8)")
return False

def check_package_installed(package_name):
spec = importlib.util.find_spec(package_name)
return spec is not None

def check_environment_complete():
print("=== 编程导航 LangChain 环境检测工具 ===\n")

all_good = True

# 检查 Python 版本
if not check_python_version():
all_good = False

# 检查必要包
required_packages = [
'langchain', 'langchain_core', 'langchain_community',
'dashscope'
]

print("\n📦 检查必要的 Python 包:")
for package in required_packages:
if check_package_installed(package):
print(f"✅ {package}")
else:
print(f"❌ {package} (请运行: pip install {package})")
all_good = False

# 检查环境变量
print("\n🔑 检查环境变量:")
api_key = os.getenv('DASHSCOPE_API_KEY')
if api_key and len(api_key) > 10:
print("✅ DASHSCOPE_API_KEY 已正确配置")
else:
print("❌ DASHSCOPE_API_KEY 未配置或配置错误")
all_good = False

# 最终结果
print("\n" + "="*50)
if all_good:
print("🎉 恭喜!您的环境配置完美,可以开始 LangChain 学习之旅!")
else:
print("⚠️ 环境存在问题,请根据上述提示进行修复后再试")

return all_good

if __name__ == "__main__":
check_environment_complete()

创建一个简单的LangChain应用,使用通义千问模型实现一个”编程导航助手”,能够回答关于编程学习的问题。
要求包含错误处理和日志记录功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import logging
from datetime import datetime
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate

os.environ["DASHSCOPE_API_KEY"] = "sk-xxxxx" # 替换为你的真实 API Key

class ProgrammingNavigatorAssistant:
def __init__(self, api_key=None):
"""初始化编程导航助手"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('navigator_assistant.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)

# 设置 API Key
self.api_key = api_key or os.getenv('DASHSCOPE_API_KEY')
if not self.api_key:
raise ValueError("请设置 DASHSCOPE_API_KEY 环境变量或传入 api_key 参数")

# 初始化模型
try:
self.llm = ChatTongyi(
model="qwen-plus",
dashscope_api_key=self.api_key,
temperature=0.7
)
self.logger.info("编程导航助手初始化成功")
except Exception as e:
self.logger.error(f"模型初始化失败: {e}")
raise

# 设置系统提示
self.system_prompt = """你是编程导航(codefather.cn)的专业编程学习助手。
你的任务是帮助用户解答编程学习相关的问题,提供实用的学习建议和技术指导。
请用温暖、专业且友好的语调回答问题。"""

def ask_question(self, question):
"""向助手提问"""
try:
self.logger.info(f"收到用户问题: {question}")

# 构建消息
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=question)
]

# 调用模型
response = self.llm.invoke(messages)
answer = response.content

self.logger.info("问题回答成功")
return answer

except Exception as e:
error_msg = f"回答问题时发生错误: {e}"
self.logger.error(error_msg)
return f"抱歉,我现在无法回答您的问题。错误信息:{error_msg}"

def chat_session(self):
"""开始聊天会话"""
print("🤖 编程导航助手启动成功!")
print("我是您的专属编程学习伙伴,可以帮您解答编程相关问题。")
print("输入 'quit' 或 'exit' 退出聊天\n")

while True:
try:
user_input = input("👤 您的问题: ").strip()

if user_input.lower() in ['quit', 'exit', '退出']:
print("👋 感谢使用编程导航助手,祝您学习愉快!")
break

if not user_input:
print("请输入您的问题~")
continue

print("🤖 正在思考...")
answer = self.ask_question(user_input)
print(f"🤖 助手回答: {answer}\n")

except KeyboardInterrupt:
print("\n👋 感谢使用编程导航助手!")
break
except Exception as e:
print(f"❌ 发生错误: {e}")


def main():
"""主函数"""
try:
# 创建助手实例
assistant = ProgrammingNavigatorAssistant()

# 测试单次问答
test_question = "我想学习 Python,应该从哪里开始?"
print("=== 测试问答 ===")
print(f"问题: {test_question}")
answer = assistant.ask_question(test_question)
print(f"回答: {answer}")

# 开始交互式聊天
print("\n=== 开始聊天会话 ===")
assistant.chat_session()

except Exception as e:
print(f"程序启动失败: {e}")


if __name__ == "__main__":
main()

设计一个自动化的开发环境部署脚本,能够在新的机器上一键安装和配置完整的LangChain开发环境(包括Python、Jva、必要的库和开发工具)。脚本应该支持不同的操作系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#!/usr/bin/env python3
"""
编程导航 LangChain 开发环境自动部署脚本
支持 Windows、macOS、Linux 系统
"""

import os
import sys
import subprocess
import platform
import shutil
from pathlib import Path


class EnvironmentDeployer:
def __init__(self):
self.system = platform.system().lower()
self.python_version = "3.9"
self.java_version = "11"
self.project_name = "langchain_env"

print(f"🚀 编程导航 LangChain 环境部署工具")
print(f"📋 检测到系统: {platform.system()} {platform.release()}")
print(f"🎯 将安装 Python {self.python_version} 和 Java {self.java_version}")

def run_command(self, command, shell=True):
"""执行系统命令"""
try:
print(f"🔧 执行命令: {command}")
result = subprocess.run(command, shell=shell, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ 命令执行成功")
return True, result.stdout
else:
print(f"❌ 命令执行失败: {result.stderr}")
return False, result.stderr
except Exception as e:
print(f"❌ 命令执行异常: {e}")
return False, str(e)

def check_command_exists(self, command):
"""检查命令是否存在"""
return shutil.which(command) is not None

def install_python(self):
"""安装 Python"""
print("\n🐍 安装 Python...")

if self.check_command_exists("python3"):
print("✅ Python 已安装")
return True

if self.system == "windows":
print("请手动从 https://www.python.org/ 下载并安装 Python")
return False
elif self.system == "darwin": # macOS
if self.check_command_exists("brew"):
return self.run_command(f"brew install python@{self.python_version}")[0]
else:
print(
"请先安装 Homebrew: /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)\"")
return False
elif self.system == "linux":
# 尝试不同的包管理器
if self.check_command_exists("apt"):
return self.run_command("sudo apt update && sudo apt install -y python3 python3-pip python3-venv")[0]
elif self.check_command_exists("yum"):
return self.run_command("sudo yum install -y python3 python3-pip")[0]
elif self.check_command_exists("dnf"):
return self.run_command("sudo dnf install -y python3 python3-pip")[0]

return False

def install_java(self):
"""安装 Java"""
print("\n☕ 安装 Java...")

if self.check_command_exists("java"):
print("✅ Java 已安装")
return True

if self.system == "windows":
print("请手动从 https://adoptopenjdk.net/ 下载并安装 OpenJDK 11")
return False
elif self.system == "darwin": # macOS
if self.check_command_exists("brew"):
return self.run_command(f"brew install openjdk@{self.java_version}")[0]
elif self.system == "linux":
if self.check_command_exists("apt"):
return self.run_command(f"sudo apt install -y openjdk-{self.java_version}-jdk")[0]
elif self.check_command_exists("yum"):
return self.run_command(f"sudo yum install -y java-{self.java_version}-openjdk-devel")[0]

return False

def create_virtual_environment(self):
"""创建 Python 虚拟环境"""
print(f"\n🌍 创建虚拟环境 {self.project_name}...")

venv_path = Path.home() / self.project_name
if venv_path.exists():
print(f"⚠️ 虚拟环境 {venv_path} 已存在")
return True

success, _ = self.run_command(f"python3 -m venv {venv_path}")
if success:
print(f"✅ 虚拟环境创建成功: {venv_path}")
return True
return False

def install_langchain_packages(self):
"""安装 LangChain 相关包"""
print("\n📦 安装 LangChain 包...")

venv_path = Path.home() / self.project_name

# 根据系统确定激活脚本路径
if self.system == "windows":
activate_script = venv_path / "Scripts" / "activate"
pip_cmd = f"{venv_path}\\Scripts\\pip"
else:
activate_script = venv_path / "bin" / "activate"
pip_cmd = f"{venv_path}/bin/pip"

packages = [
"langchain",
"langchain-core",
"langchain-community",
"dashscope",
"jupyter",
"pytest",
"black",
"pylint"
]

for package in packages:
print(f"📦 安装 {package}...")
success, _ = self.run_command(f"{pip_cmd} install {package}")
if not success:
print(f"❌ {package} 安装失败")
return False

print("✅ 所有 LangChain 包安装完成")
return True

def create_project_structure(self):
"""创建项目结构"""
print("\n📁 创建项目结构...")

project_dir = Path.home() / "langchain_projects"
project_dir.mkdir(exist_ok=True)

# 创建示例项目
demo_dir = project_dir / "demo"
demo_dir.mkdir(exist_ok=True)

# 创建示例文件
demo_file = demo_dir / "hello_langchain.py"
demo_content = '''"""
编程导航 LangChain 示例项目
"""
import os
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage

def main():
# 设置 API Key(请替换为您的实际 API Key)
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
print("请设置 DASHSCOPE_API_KEY 环境变量")
return

# 初始化模型
llm = ChatTongyi(model="qwen-plus", dashscope_api_key=api_key)

# 发送消息
messages = [HumanMessage(content="你好,我是编程导航的学员!")]
response = llm.invoke(messages)

print(f"AI 回复: {response.content}")

if __name__ == "__main__":
main()
'''

with open(demo_file, 'w', encoding='utf-8') as f:
f.write(demo_content)

# 创建环境配置文件
env_file = demo_dir / ".env.example"
env_content = '''# 阿里云通义千问 API Key
DASHSCOPE_API_KEY=your_api_key_here

# LangChain 跟踪配置(可选)
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langchain_api_key
'''

with open(env_file, 'w', encoding='utf-8') as f:
f.write(env_content)

print(f"✅ 项目结构创建完成: {project_dir}")
return True

def deploy(self):
"""执行完整部署"""
print("🚀 开始部署 LangChain 开发环境...\n")

steps = [
("安装 Python", self.install_python),
("安装 Java", self.install_java),
("创建虚拟环境", self.create_virtual_environment),
("安装 LangChain 包", self.install_langchain_packages),
("创建项目结构", self.create_project_structure),
]

for step_name, step_func in steps:
print(f"\n{'=' * 50}")
print(f"📋 步骤: {step_name}")
print('=' * 50)

if not step_func():
print(f"❌ {step_name} 失败,部署终止")
return False

self.print_completion_info()
return True

def print_completion_info(self):
"""打印完成信息"""
venv_path = Path.home() / self.project_name
project_path = Path.home() / "langchain_projects"

activation_cmd = {
'windows': f"{venv_path}\\Scripts\\activate",
'darwin': f"source {venv_path}/bin/activate",
'linux': f"source {venv_path}/bin/activate"
}.get(self.system, f"source {venv_path}/bin/activate")

print(f"\n{'=' * 60}")
print("🎉 LangChain 开发环境部署完成!")
print('=' * 60)
print(f"📂 虚拟环境位置: {venv_path}")
print(f"📂 项目目录: {project_path}")
print(f"\n🔧 激活虚拟环境:")
print(f" {activation_cmd}")
print(f"\n📝 下一步:")
print(f" 1. 激活虚拟环境")
print(f" 2. 配置 .env 文件中的 API Key")
print(f" 3. 运行示例: python {project_path}/demo/hello_langchain.py")
print(f"\n🌟 欢迎来到编程导航的 LangChain 学习之旅!")
print(f"📚 更多教程请访问: https://www.codefather.cn/")


def main():
"""主函数"""
try:
deployer = EnvironmentDeployer()
success = deployer.deploy()

if success:
print("\n✅ 部署成功完成!")
else:
print("\n❌ 部署过程中遇到问题,请检查错误信息")

except KeyboardInterrupt:
print("\n⚠️ 部署被用户中断")
except Exception as e:
print(f"\n❌ 部署过程中发生未预期的错误: {e}")


if __name__ == "__main__":
main()

Java 环境搭建

推荐插件:
Lombok:简化Java代码
Maven Helper:Maven依赖管理
GitToolBox:Git集成增强

Maven https://docs.langchain4j.dev/get-started/

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/dev.langchain4j/langchain4j-community-dashscope -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-community-dashscope</artifactId>
<version>1.0.0-beta2</version>
</dependency>

示例:

1
2
3
4
5
6
7
8
9
10
11
public class LangChainAiInvoke {

public static void main(String[] args) {
ChatLanguageModel qwenModel = QwenChatModel.builder()
.apiKey(TestApiKey.API_KEY)
.modelName("qwen-max")
.build();
String answer = qwenModel.chat("我是程序员鱼皮,这是编程导航 codefather.cn 的原创项目教程");
System.out.println(answer);
}
}

Maven测试配置:

1
2
3
4
5
6
7
8
9
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
</plugin>
</plugins>
</build>

模型接入

目前主流的大模型接入方式可分为三类:

  1. API接入
  2. SDK接入
  3. 本地部署接入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pandas as pd
from IPython.display import display

# 创建比较数据
comparison_data = {
"接入方式": ["API接入", "SDK接入", "本地部署"],
"开发难度": ["低", "中", "高"],
"维护成本": ["无", "低", "高"],
"响应速度": ["依赖网络", "依赖网络", "快速稳定"],
"数据隐私": ["数据出境", "数据出境", "完全控制"],
"成本结构": ["按调用次数付费", "按调用次数付费", "前期硬件投入"],
"适用场景": ["通用应用", "需定制功能", "数据敏感场景"]
}

# 创建DataFrame并显示比较表
comparison_df = pd.DataFrame(comparison_data)
print("大模型接入方式对比:")
display(comparison_df)

大模型接入的基本流程通常包括:
准备工作(申请API密钥或准备部署环境)、
接口调用(构造请求、发送、解析响应)、
错误处理(网络问题、限流、异常)、
结果处理(解析响应、集成到应用逻辑)。
无论选择哪种接入方式,了解这一基本流程都很重要。

API接入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import requests
import json
import os

os.environ["DASHSCOPE_API_KEY"] = "sk-xxxx" # 替换为你的真实 API Key


def call_tongyi_api(prompt, model="qwen-plus"):
"""
通过HTTP请求直接调用阿里云通义千问API

参数:
prompt: 用户输入的提示词
model: 使用的模型名称,如 qwen-plus, qwen-turbo, qwen-max

返回:
模型生成的文本响应
"""
# 通义千问API端点(通义千问通用生成接口)
api_url = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"

# 从环境变量获取通义千问API密钥
api_key = os.environ.get("DASHSCOPE_API_KEY")
if not api_key:
raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量")

# 请求头,注意加上x-acs-dingtalk-access-token,具体按官方文档
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + api_key
}

# 构建请求体,注意格式需符合通义千问要求
request_body = {
"model": model,
"messages": [
{"role": "system", "content": "你是编程导航网站的智能助手,专注于帮助开发者解决问题。"},
{"role": "user", "content": prompt}
],
"temperature": 0.7,
"max_tokens": 500
}

try:
# 发送POST请求
response = requests.post(api_url, headers=headers, json=request_body)
response.raise_for_status()

# 解析响应结构,具体字段根据通义千问官方文档调整
response_data = response.json()

# 典型的返回路径,需根据实际接口返回确认
# 假设返回格式类似OpenAI: choices[0].message.content
return response_data["choices"][0]["message"]["content"]

except requests.exceptions.RequestException as e:
return f"API请求错误: {str(e)}"
except (KeyError, IndexError) as e:
return f"响应解析错误: {str(e)}"


# 使用示例
if __name__ == "__main__":
user_question = "面试鸭网站有哪些主要功能?"
response = call_tongyi_api(user_question)
print(f"问题: {user_question}\n")
print(f"回答: {response}")

这段程序展示了通过HTTP请求调用通义千问的完整过程:
构建请求头(包含API密钥进行认证)、构建请求体(包含模型名称、输入消息和参数设置)、发送请求并解析响应。
对于其他大模型提供商,如百度文心一言、ChatGPT、腾讯混元等,API调用的基本结构类似,主要区别在于:

  1. 端点URL不同
  2. 认证方式可能不同(有些使用API密钥,有些使用OAuth或其他认证方式)
  3. 请求参数的命名和结构可能不同
  4. 响应格式略有差异
    在使用API接入时,有几个优化点得注意:
  5. 错误处理:实现完善的错误处理机制,处理网络错误、服务不可用、请求超时等情况。
  6. 请求重试:对于失败的请求实现智能重试机制,通常采用指数退避策略。
  7. 请求优化:通过优化输入内容长度、减少不必要的上下文等手段降低toke消耗。
  8. 响应缓存:对于相同或类以的输入,可以缓存响应以减少API调用。

SDK接入

以通义干问的Python SDK为例,展示如何通过SDK调用大模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os
from openai import OpenAI

os.environ["DASHSCOPE_API_KEY"] = "sk-xxxx" # 替换为你的真实 API Key


def call_tongyi_sdk(prompt, model="qwen-plus"):
"""
使用 OpenAI 官方 SDK 兼容模式调用阿里云通义千问(DashScope)大模型

参数:
prompt: 用户输入的提示词
model: 使用的模型名称,如 qwen-plus

返回:
模型生成的文本响应
"""
# 从环境变量读取 API Key
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量")

# 创建客户端,base_url 指向通义千问兼容接口地址
client = OpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

# 调用 chat.completions 接口
completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是面试鸭网站的技术面试助手,专注于帮助用户准备技术面试。"},
{"role": "user", "content": prompt},
],
temperature=0.7,
max_tokens=500,
# 如果用的是Qwen3开源版,取消下面注释:
# extra_body={"enable_thinking": False},
)

# 返回生成文本
return completion.choices[0].message.content


if __name__ == "__main__":
question = "如何准备Java后端开发面试?"
answer = call_tongyi_sdk(question)
print(f"问题: {question}\n")
print(f"回答: {answer}")

与直接API调用相比,SDK代码更简洁,不需要手动构造HTTP请求和处理响应解析。
除了官方SDK外,高级AI框架如LangChain为开发者提供了更强大的功能和更高层次的抽象。
以LangChain为例,它不仅简化了模型调用,还提供了链式处理、上下文管理、工具集成等高级功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

import os
from openai import OpenAI

os.environ["DASHSCOPE_API_KEY"] = "sk-xxx" # 替换为你的真实 API Key


def call_tongyi_sdk(prompt, model="qwen-plus"):
"""
使用 OpenAI 官方 SDK 兼容模式调用阿里云通义千问(DashScope)大模型

参数:
prompt: 用户输入的提示词
model: 使用的模型名称,如 qwen-plus

返回:
模型生成的文本响应
"""
# 从环境变量读取 API Key
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量")

# 创建客户端,base_url 指向通义千问兼容接口地址
client = OpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

# 调用 chat.completions 接口
completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是面试鸭网站的技术面试助手,专注于帮助用户准备技术面试。"},
{"role": "user", "content": prompt},
],
temperature=0.7,
max_tokens=500,
# 如果用的是Qwen3开源版,取消下面注释:
# extra_body={"enable_thinking": False},
)

# 返回生成文本
return completion.choices[0].message.content


if __name__ == "__main__":
question = "如何准备Java后端开发面试?"
answer = call_tongyi_sdk(question)
print(f"问题: {question}\n")
print(f"回答: {answer}")

def call_with_langchain_tongyi(prompt, model_name="qwen-plus"):
"""
使用 LangChain 框架调用阿里云通义千问大模型

参数:
prompt: 用户输入的提示词
model_name: 使用的模型名称,通义千问示例一般用 "qwen-plus"

返回:
模型生成的文本响应
"""
# 从环境变量获取通义千问API密钥
api_key = os.environ.get("DASHSCOPE_API_KEY")
if not api_key:
raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量")

try:
# 初始化通义千问聊天模型
chat_model = ChatTongyi(
model=model_name,
api_key=api_key,
temperature=0.7
)

# 创建提示模板
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是代码小抄的编程助手,专注于提供清晰易懂的编程解释和示例代码。"),
("human", "{input}")
])

# 构建处理链
chain = prompt_template | chat_model | StrOutputParser()

# 执行链并返回结果
return chain.invoke({"input": prompt})

except Exception as e:
return f"LangChain调用错误: {str(e)}"


# 使用示例
if __name__ == "__main__":
user_question = "请解释Python中的装饰器概念并提供一个简单示例"
response = call_with_langchain_tongyi(user_question)
print(f"问题: {user_question}\n")
print(f"回答: {response}")

使用SDK接入的主要优势在于:

  1. 代码简洁:减少了大量样板代码,提高开发效率
  2. 功能丰富:提供了更多高级功能,如流式输出、函数调用等
  3. 错误处理:内置了完善的错误处理和重试机制
  4. 类型安全:许多SDK提供了类型提示和接口定义,减少错误
  5. 抽象层:提供了更高级的抽象,如链式处理、代理等
  6. 跨模型兼容:高级框架通常支持多种模型,便于切换和比较
    在选择SDK时,应考虑以下因素:
  7. 官方支持度:优先选择模型提供商官方维护的SDK
  8. 社区活跃度:活跃的社区意味着更多的资源和更快的问题解决
  9. 功能完整性:是否支持所有需要的API功能
  10. 更新频率:是否能及时跟进模型提供商的API更新
  11. 文档质量:好的文档能大幅降低学习成本
  12. 依赖复杂度:过多的依赖可能增加项目维护难度
    值得一提的是,某些框架如LangChain还提供了模型切换的便捷性,只需更改少量配置就可以从一个模型切换到另一个,这对于比较不同模型性能或避免对单一供应商的依赖非常有用。

本地模型

通过API接口调用本地模型
Ollama提供了兼容REST API,可以通过HTTP请求调用本地模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import requests
import json

def call_local_model(prompt, model="llama2"):
"""
调用本地部署的Ollama模型

参数:
prompt: 用户输入的提示词
model: 模型名称,默认为llama2

返回:
模型生成的回答
"""
api_url = "http://localhost:11434/api/generate"

# 构建请求体
request_body = {
"model": model,
"prompt": prompt,
"stream": False
}

try:
# 发送请求
response = requests.post(api_url, json=request_body)
response.raise_for_status()

# 解析响应
result = response.json()
return result.get("response", "无响应内容")

except requests.exceptions.RequestException as e:
return f"本地模型调用错误: {str(e)}"

# 使用示例
if __name__ == "__main__":
user_prompt = "编程导航是什么网站?请简要介绍。"

print("正在调用本地Llama2模型...")
response = call_local_model(user_prompt)

print(f"问题: {user_prompt}\n")
print(f"回答: {response}")

使用Python客户端库
除了直接使用HTTP请求,还可以使用Ollama的Python客户端库,使调用更简便:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 先安装客户端库: pip install ollama
import ollama

def call_with_ollama_client(prompt, model="llama2"):
"""
使用Ollama客户端库调用本地模型

参数:
prompt: 用户输入的提示词
model: 模型名称

返回:
模型生成的回答
"""
try:
# 调用模型
response = ollama.generate(model=model, prompt=prompt)
return response['response']

except Exception as e:
return f"Ollama客户端调用错误: {str(e)}"

# 使用示例
if __name__ == "__main__":
user_prompt = "老鱼简历网站提供什么服务?"

print("正在通过Ollama客户端调用本地模型...")
response = call_with_ollama_client(user_prompt)

print(f"问题: {user_prompt}\n")
print(f"回答: {response}")

与LangChain集成
对于更复杂的应用场景,可以将本地模型与LangChain等高级框架集成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

def call_local_model_with_langchain(prompt, model_name="llama2"):
"""
使用LangChain框架调用本地Ollama模型

参数:
prompt: 用户输入的提示词
model_name: 模型名称

返回:
模型生成的回答
"""
try:
# 初始化Ollama模型
llm = Ollama(model=model_name)

# 创建提示模板
template = """
你是剪切助手的AI助手,请专业地回答用户问题。

用户问题: {question}

回答:
"""

prompt_template = PromptTemplate.from_template(template)

# 构建链
chain = prompt_template | llm | StrOutputParser()

# 执行链并返回结果
return chain.invoke({"question": prompt})

except Exception as e:
return f"LangChain集成错误: {str(e)}"

# 使用示例
if __name__ == "__main__":
user_question = "Python中如何高效处理大文件?"

print("正在通过LangChain调用本地模型...")
response = call_local_model_with_langchain(user_question)

print(f"问题: {user_question}\n")
print(f"回答: {response}")

多模型切换与管理

下面,我们设计一个多模型管理器,支持模型注册、切换、回退等功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import time
from typing import Dict, List, Callable, Any, Optional
from enum import Enum

class ModelType(Enum):
"""模型类型枚举"""
OPENAI = "openai"
DASHSCOPE = "dashscope"
DEEPSEEK = "deepseek"
ANTHROPIC = "anthropic"
GOOGLE = "google"
LOCAL = "local"
CUSTOM = "custom"

class ModelManager:
"""
多模型管理器:支持注册、切换、回退等功能
"""

def __init__(self):
"""初始化模型管理器"""
self.models = {} # 已注册的模型
self.fallback_chain = [] # 回退链
self.default_model = None # 默认模型
self.metrics = {} # 模型调用指标

def register_model(self, model_name: str, model_type: ModelType,
inference_func: Callable, is_default: bool = False,
config: Dict = None):
"""
注册一个模型到管理器
"""
self.models[model_name] = {
"type": model_type,
"function": inference_func,
"config": config or {}
}

if is_default or self.default_model is None:
self.default_model = model_name

self.metrics[model_name] = {
"calls": 0,
"errors": 0,
"total_latency": 0,
"avg_latency": 0
}

print(f"模型 '{model_name}' 已注册")

def set_fallback_chain(self, model_names: List[str]):
"""
设置模型回退链,按优先级排序
"""
for name in model_names:
if name not in self.models:
raise ValueError(f"模型 '{name}' 未注册")

self.fallback_chain = model_names
print(f"回退链已设置: {' -> '.join(model_names)}")

def _update_metrics(self, model_name: str, latency: float, is_error: bool = False):
"""更新模型调用指标"""
self.metrics[model_name]["calls"] += 1
self.metrics[model_name]["total_latency"] += latency
self.metrics[model_name]["avg_latency"] = (
self.metrics[model_name]["total_latency"] / self.metrics[model_name]["calls"]
)

if is_error:
self.metrics[model_name]["errors"] += 1

def infer(self, input_text: str, model_name: Optional[str] = None,
use_fallback: bool = True) -> Dict:
"""
使用指定模型进行推理,支持回退机制
"""
if model_name is None:
model_name = self.default_model

if model_name not in self.models:
raise ValueError(f"模型 '{model_name}' 未注册")

models_to_try = [model_name]

if use_fallback and self.fallback_chain:
for fallback_model in self.fallback_chain:
if fallback_model != model_name and fallback_model in self.models:
models_to_try.append(fallback_model)

last_error = None
for current_model in models_to_try:
try:
model_config = self.models[current_model]
inference_func = model_config["function"]

start_time = time.time()
result = inference_func(input_text)
latency = time.time() - start_time

self._update_metrics(current_model, latency)

return {
"output": result,
"model_used": current_model,
"latency": latency,
"fallback_used": current_model != model_name,
"timestamp": time.time()
}

except Exception as e:
latency = time.time() - start_time
self._update_metrics(current_model, latency, is_error=True)

last_error = str(e)
print(f"模型 '{current_model}' 调用失败: {last_error}")

if current_model == models_to_try[-1]:
raise Exception(f"所有模型调用失败。最后错误: {last_error}")

raise RuntimeError("意外错误")

def get_metrics(self) -> Dict:
"""获取所有模型的调用指标"""
return self.metrics

def reset_metrics(self):
"""重置所有模型的调用指标"""
for model in self.metrics:
self.metrics[model] = {
"calls": 0,
"errors": 0,
"total_latency": 0,
"avg_latency": 0
}

现在,我们可以实现一个使用这个多模型管理器的示例,展示如何注册、切换和回退不同的模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# 导入必要的库
import os
import requests
import json

# 实现具体模型的推理函数
def openai_inference(text):
"""OpenAI 模型推理"""
api_key = os.environ.get("OPENAI_API_KEY", "")
if not api_key:
raise ValueError("未找到 OpenAI API 密钥")

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}

data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": text}],
"temperature": 0.7
}

response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=data
)

if response.status_code != 200:
raise Exception(f"API 错误: {response.status_code}")

return response.json()["choices"][0]["message"]["content"]

def anthropic_inference(text):
"""Anthropic Claude 模型推理"""
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
raise ValueError("未找到 Anthropic API 密钥")

headers = {
"Content-Type": "application/json",
"x-api-key": api_key
}

data = {
"model": "claude-instant-1",
"prompt": f"\n\nHuman: {text}\n\nAssistant:",
"max_tokens_to_sample": 300
}

response = requests.post(
"https://api.anthropic.com/v1/complete",
headers=headers,
json=data
)

if response.status_code != 200:
raise Exception(f"API 错误: {response.status_code}")

return response.json()["completion"]

def local_model_inference(text):
"""本地模型推理 (使用 Ollama)"""
try:
response = requests.post(
"http://localhost:11434/api/generate",
json={"model": "llama2", "prompt": text}
)

if response.status_code != 200:
raise Exception(f"Ollama API 错误: {response.status_code}")

return response.json()["response"]
except requests.exceptions.ConnectionError:
raise Exception("无法连接到本地 Ollama 服务,请确保服务已启动")

def dashscope_inference(text):
api_key = os.environ.get("DASHSCOPE_API_KEY", "")
if not api_key:
raise ValueError("未找到 DashScope API 密钥")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"model": "qwen-plus",
"messages": [{"role": "user", "content": text}],
"parameters": {
"temperature": 0.7
}
}
response = requests.post(
"https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
headers=headers,
json=data
)
if response.status_code != 200:
raise Exception(f"DashScope API 错误: {response.status_code} - {response.text}")
return response.json()["choices"][0]["message"]["content"]

def deepseek_inference(text):
api_key = os.environ.get("DEEPSEEK_API_KEY", "")
if not api_key:
raise ValueError("未找到 DeepSeek API 密钥")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": text}],
"temperature": 0.7
}
response = requests.post(
"https://api.deepseek.com/v1/chat/completions",
headers=headers,
json=data
)
if response.status_code != 200:
raise Exception(f"DeepSeek API 错误: {response.status_code} - {response.text}")
return response.json()["choices"][0]["message"]["content"]


# 使用模型管理器
def demo_model_manager():
"""演示模型管理器的使用"""
manager = ModelManager()

# 注册多个模型
try:
manager.register_model(
"gpt-3.5",
ModelType.OPENAI,
openai_inference,
is_default=True
)
print("已注册 OpenAI GPT-3.5 模型")
except Exception as e:
print(f"OpenAI 模型注册失败: {str(e)}")

try:
manager.register_model(
"claude",
ModelType.ANTHROPIC,
anthropic_inference
)
print("已注册 Anthropic Claude 模型")
except Exception as e:
print(f"Anthropic 模型注册失败: {str(e)}")

try:
manager.register_model(
"llama-local",
ModelType.LOCAL,
local_model_inference
)
print("已注册本地 Llama2 模型")
except Exception as e:
print(f"本地模型注册失败: {str(e)}")

try:
manager.register_model("dashscope", ModelType.CUSTOM, dashscope_inference)
print("已注册通义千问 DashScope 模型")
except Exception as e:
print(f"DashScope 模型注册失败: {str(e)}")

try:
manager.register_model("deepseek", ModelType.CUSTOM, deepseek_inference)
print("已注册 DeepSeek 模型")
except Exception as e:
print(f"DeepSeek 模型注册失败: {str(e)}")

# 设置回退链
available_models = list(manager.models.keys())
if len(available_models) > 1:
manager.set_fallback_chain(available_models)

# 测试推理
test_queries = [
"编程导航网站提供哪些学习资源?",
"请介绍一下面试鸭网站的主要功能。",
"老鱼简历有哪些特色功能?"
]

for query in test_queries:
try:
print(f"\n问题: {query}")

# 尝试默认模型
result = manager.infer(query)
print(f"使用模型: {result['model_used']}")
print(f"延迟: {result['latency']:.2f}秒")
print(f"回答: {result['output'][:150]}..." if len(result['output']) > 150 else f"回答: {result['output']}")

except Exception as e:
print(f"推理失败: {str(e)}")

# 输出指标
print("\n模型性能指标:")
metrics = manager.get_metrics()
for model, data in metrics.items():
if data["calls"] > 0:
print(f"{model}: 调用次数={data['calls']}, 错误率={data['errors']/data['calls']:.1%}, 平均延迟={data['avg_latency']:.2f}秒")

if __name__ == "__main__":
demo_model_manager()

除了基础的模型切换功能外,高级多模型管理还可以考虑以下扩展:

  1. 智能路由:根据输入内容的特征自动选择最适合的模型
  2. 负载均衡:在多个模型实例间分配请求,避免单点压力过大
  3. 自适应选择:基于历史表现动态调整模型选择策略
  4. 混合策略:对同一请求使用多个模型,然后选择或合并结果
  5. 预算管理:根据成本预算智能分配不同价格层级的模型资源
    在构建多模型系统时,关键是设计清晰的接口抽象,使不同模型可以无缝替换,这也是为什么像LangChain这样的框架受欢迎的原因之一它们提供了统一的抽象层,简化了多模型管理。

异步调用与批处理

在处理大量AI模型请求时,同步调用会导致程序长时间等待响应,造成资源浪费和用户体验下降。
异步调用和批处理是两种重要的优化技术,可以显著提高应用程序的吞吐量和响应能力。

异步调用
Python的asyncio库提供了实现异步编程的基础设施。
以下是使用aiohttp和asyncio实现通义千问API异步调用的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import asyncio
import os
import time
from typing import List, Dict, Any
from langchain_community.chat_models import ChatTongyi

os.environ["DASHSCOPE_API_KEY"] = "sk-xxxx" # 替换为你的真实 API Key


async def async_tongyi_call(prompt: str) -> Dict[str, Any]:
"""
异步调用通义千问 ChatTongyi

参数:
prompt: 提示词

返回:
响应字典
"""
# 创建模型实例(你也可以改为全局实例避免重复创建)
model = ChatTongyi()

# 使用 asyncio.to_thread 把同步调用包装成异步
def sync_call():
response = model.invoke(prompt)
return response.content

try:
content = await asyncio.to_thread(sync_call)
except Exception as e:
raise Exception(f"通义千问调用异常: {str(e)}")

return {
"prompt": prompt,
"response": content,
"model": "tongyi"
}


async def process_multiple_prompts(prompts: List[str]) -> List[Dict[str, Any]]:
"""
异步处理多个提示词,调用通义千问

参数:
prompts: 提示词列表

返回:
响应列表
"""
tasks = [
async_tongyi_call(prompt)
for prompt in prompts
]
return await asyncio.gather(*tasks, return_exceptions=True)


# 使用示例
async def main():
prompts = [
"编程导航网站提供哪些服务?",
"面试鸭网站如何帮助程序员准备面试?",
"如何使用算法导航学习数据结构?",
"老鱼简历的特色功能有哪些?",
"代码小抄网站的主要用途是什么?"
]

print(f"开始异步处理 {len(prompts)} 个查询...")
start_time = time.time()

results = await process_multiple_prompts(prompts)

end_time = time.time()
successful = sum(1 for r in results if not isinstance(r, Exception))

print(f"完成 {successful}/{len(prompts)} 个查询, 耗时: {end_time - start_time:.2f} 秒")
print(f"平均每个查询: {(end_time - start_time) / len(prompts):.2f} 秒")

for i, result in enumerate(results):
print(f"\n查询 {i + 1}: {prompts[i]}")
if isinstance(result, Exception):
print(f"错误: {str(result)}")
else:
print(f"回答: {result['response'][:100]}..." if len(
result['response']) > 100 else f"回答: {result['response']}")


if __name__ == "__main__":
asyncio.run(main())

批处理
批处理是指将多个请求打包在一个AP调用中发送,进一步优化性能和成本。
许多大模型提供商支持批处理功能,如通义千问的ChatCompletion API。
以下是实现批处理的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import openai
import time
from typing import List, Dict, Any

def batch_process_openai(prompts: List[str], model: str = "gpt-3.5-turbo") -> List[Dict[str, Any]]:
"""
批量处理多个提示词

参数:
prompts: 提示词列表
model: 模型名称

返回:
响应列表
"""
# 设置 API 密钥
openai.api_key = os.environ.get("OPENAI_API_KEY", "")
if not openai.api_key:
raise ValueError("未找到 OpenAI API 密钥")

# 准备批量请求
batch_messages = []
for prompt in prompts:
batch_messages.append([{"role": "user", "content": prompt}])

# 创建批请求客户端
client = openai.OpenAI()

# 发送批处理请求
try:
response = client.chat.completions.create(
model=model,
messages=batch_messages,
max_tokens=150
)

# 提取结果
results = []
for i, choice in enumerate(response.choices):
results.append({
"prompt": prompts[i],
"response": choice.message.content,
"model": model
})

return results

except Exception as e:
raise Exception(f"批处理请求失败: {str(e)}")

# 结合异步和批处理的高级示例
async def batch_async_openai(prompts: List[str], batch_size: int = 5) -> List[Dict[str, Any]]:
"""
结合批处理和异步处理

参数:
prompts: 提示词列表
batch_size: 每批的大小

返回:
所有响应的列表
"""
# 将提示词分组为批次
batches = [prompts[i:i + batch_size] for i in range(0, len(prompts), batch_size)]
print(f"将 {len(prompts)} 个提示词分为 {len(batches)} 批处理")

client = openai.AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY", ""))

async def process_batch(batch: List[str]):
# 准备这一批的消息
batch_messages = [[{"role": "user", "content": p}] for p in batch]

try:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=batch_messages,
max_tokens=150
)

return [
{
"prompt": batch[i],
"response": choice.message.content
}
for i, choice in enumerate(response.choices)
]
except Exception as e:
print(f"批处理错误: {str(e)}")
# 返回错误结果
return [{"prompt": p, "error": str(e)} for p in batch]

# 异步处理所有批次
tasks = [process_batch(batch) for batch in batches]
batch_results = await asyncio.gather(*tasks)

# 将所有批次结果合并
all_results = []
for batch_result in batch_results:
all_results.extend(batch_result)

return all_results

# 使用示例
async def batch_demo():
# 准备多个查询
prompts = [
"编程导航网站提供哪些服务?",
"面试鸭网站如何帮助程序员准备面试?",
"如何使用算法导航学习数据结构?",
"老鱼简历的特色功能有哪些?",
"代码小抄网站的主要用途是什么?",
"剪切助手如何提高工作效率?",
"程序员鱼皮是谁?他创建了哪些产品?"
]

print("开始批量异步处理...")
start_time = time.time()

# 批量异步处理
results = await batch_async_openai(prompts, batch_size=3)

# 统计结果
end_time = time.time()
successful = sum(1 for r in results if "error" not in r)

print(f"完成 {successful}/{len(prompts)} 个查询, 总耗时: {end_time - start_time:.2f} 秒")
print(f"平均每个查询: {(end_time - start_time) / len(prompts):.2f} 秒")

# 输出结果
for i, result in enumerate(results):
print(f"\n查询 {i+1}: {result['prompt']}")
if "error" in result:
print(f"错误: {result['error']}")
else:
print(f"回答: {result['response'][:100]}..." if len(result['response']) > 100 else f"回答: {result['response']}")

# 运行批处理示例
if __name__ == "__main__":
asyncio.run(batch_demo())

异步调用与批处理的实际应用
在实际应用中,异步调用和批处理特别适合以下场景:

  1. 高并发应用:如聊天机器人平台,需要同时处理多用户的请求
  2. 批量内容生成:如批量生成产品描述、文章摘要等
  3. 数据分析与处理:需要对大量文本数据进行分析和处理
  4. 并行任务处理:如多文档同时处理、多段落同时翻译等
    使用异步和批处理技术时,需要注意以下几点:
  5. 资源管理:控制并发数量,避免超出AP!速率限制
  6. 错误处理:妥善处理单个请求失败的情况,避免影响整体流程
  7. 结果汇总:正确收集和整合异步操作的结果
  8. 超时控制:设置合理的超时时间,避免长时间等待
  9. 重试机制:对失败的请求实施智能重试策略

流式处理
对于需要实时响应的场景,如聊天对话,流式处理(Streaming)是另一种重要的技术。
它允许模型生成部分响应时立即返回,而不必等待完整响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import asyncio
from langchain_community.chat_models import ChatTongyi

os.environ["DASHSCOPE_API_KEY"] = "sk-xxxxx" # 替换为你的真实 API Key


async def stream_tongyi_response(prompt: str):
"""
模拟流式处理通义千问响应

参数:
prompt: 用户提示词
"""
model = ChatTongyi()

def sync_call():
return model.invoke(prompt).content

print(f"问题: {prompt}")
print("回答: ", end="", flush=True)

# 在线程池执行同步调用,避免阻塞
response = await asyncio.to_thread(sync_call)

# 模拟流式分块打印,比如每次打印10个字符(可根据需求调整)
chunk_size = 10
for i in range(0, len(response), chunk_size):
chunk = response[i:i + chunk_size]
print(chunk, end="", flush=True)
await asyncio.sleep(0.1) # 模拟流式延迟

print() # 换行


# 使用示例
async def stream_demo():
await stream_tongyi_response("请用简短的段落介绍剪切助手这款产品。")


if __name__ == "__main__":
asyncio.run(stream_demo())

LangChain4j 基础

LangChain4j的架构主要包含以下几个核心组件:

  1. 模型接口层:提供统一的接口连接各种语言模型,如ChatLanguageModel、.EmbeddingModel等。
  2. 内存组件:管理对话历史和上下文信息,支持短期和长期记忆。
  3. 链(Chains):将多个组件组合成处理流程,如提示模板、模型调用、结果处理等。
  4. 工具(Tools):让语言模型能够执行外部操作,如搜索、计算、API调用等。
  5. 输出处理器:将模型输出解析成结构化数据,如SON、POO等。
  6. 检索器(Retrievers):实现文档检索和相关内容提取功能。

Maven 依赖

1
2
3
4
5
6
<!-- LangChain4j 核心依赖 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j</artifactId>
<version>1.0.0-beta3</version>
</dependency>

还需要引入对应的模型集成模块

1
2
3
4
5
6
<!-- OpenAI 模型 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId>
<version>1.1.0</version>
</dependency>
1
2
3
4
5
6
7
<!-- 阿里云灵积模型集成 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-community-dashscope</artifactId>
<version>1.0.0-beta2</version>
</dependency>

大模型支持列表
LangChain4j的模型集成仍在不断扩展中,社区版本提供了更多模型的支持。
你可以在官方文档的集成页面查看最新的支持列表:https://docs.langchain4j.dev/integrations/language-models/

基本使用方法

LangChain4j的使用流程通常包括初始化模型、构建提示、处理响应等步骤

简单对话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import dev.langchain4j.community.model.dashscope.QwenChatModel;
import dev.langchain4j.model.chat.ChatLanguageModel;

public class SimpleChatExample {

public static void main(String[] args) {
// 初始化聊天模型
ChatLanguageModel model = QwenChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-max")
.temperature(0.7f)
.build();

// 发送用户问题并获取回答
String response = model.chat("你好,我是程序员鱼皮,能介绍一下编程导航网站吗?");

System.out.println("AI 回答: " + response);
}
}

使用消息对象构建对话
如果需要更复杂的对话,可以使用Messages对象构建多轮对话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import dev.langchain4j.community.model.dashscope.QwenChatModel;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;

import java.util.ArrayList;
import java.util.List;

public class MultiTurnChatExample {

public static void main(String[] args) {
// 初始化聊天模型
ChatLanguageModel model = QwenChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-max")
.temperature(0.7f)
.build();

// 构建对话历史
List<ChatMessage> messages = new ArrayList<>();

// 添加系统消息,设定角色和行为边界
messages.add(new SystemMessage("你是面试鸭的AI助手,专注于提供Java面试题解答"));

// 添加用户问题
messages.add(new UserMessage("请介绍一下Java中的多线程"));

// 获取AI回答
AiMessage response = model.chat(messages).content();
System.out.println("AI 回答: " + response.text());

// 将AI回答添加到对话历史
messages.add(response);

// 继续对话,提出后续问题
messages.add(new UserMessage("线程池有哪些常见参数?"));

// 获取新的回答
AiMessage followUpResponse = model.chat(messages).content();
System.out.println("\n后续问题 AI 回答: " + followUpResponse.text());
}
}

使用聊天服务构建应用
LangChain4j提供了ChatMemory来自动管理对话历史,简化多轮对话的开发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import dev.langchain4j.memory.ChatMemory;
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.community.model.dashscope.QwenChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.SystemPromptTemplate;

// 定义助手接口
@SystemPromptTemplate("你是代码小抄的AI助手,专注于提供简洁有效的{{language}}代码示例")
interface CodeAssistant {
String chat(String message);
String generateExample(String task, String language);
}

public class ChatServiceExample {

public static void main(String[] args) {
// 初始化聊天模型
ChatLanguageModel model = QwenChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-max")
.temperature(0.7f)
.build();

// 创建聊天记忆,最多记住10条消息
ChatMemory chatMemory = MessageWindowChatMemory.builder()
.maxMessages(10)
.build();

// 创建助手服务
CodeAssistant assistant = AiServices.builder(CodeAssistant.class)
.chatLanguageModel(model)
.chatMemory(chatMemory)
.systemPromptVariable("language", "Java")
.build();

// 使用助手
String response = assistant.chat("如何在Java中创建一个多线程安全的单例模式?");
System.out.println("AI 回答: " + response);

// 使用特定方法生成代码示例
String example = assistant.generateExample("读取CSV文件", "Java");
System.out.println("\n代码示例: " + example);
}
}

结构化输出解析
LangChain4j支持将AI输出解析为结构化数据,非常适合需要从Al回答中提取特定信息的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.community.model.dashscope.QwenChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.SystemMessage;

// 定义结构化输出类
class JobRecommendation {
private String title;
private String description;
private int experienceYearsRequired;
private String[] requiredSkills;

// Getters and setters...
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public int getExperienceYearsRequired() { return experienceYearsRequired; }
public void setExperienceYearsRequired(int experienceYearsRequired) { this.experienceYearsRequired = experienceYearsRequired; }
public String[] getRequiredSkills() { return requiredSkills; }
public void setRequiredSkills(String[] requiredSkills) { this.requiredSkills = requiredSkills; }

@Override
public String toString() {
return "职位名称: " + title + "\n" +
"职位描述: " + description + "\n" +
"所需经验: " + experienceYearsRequired + "年\n" +
"所需技能: " + String.join(", ", requiredSkills);
}
}

// 定义AI服务接口
interface JobAdvisor {
@SystemMessage("你是老鱼简历的AI职业顾问,根据用户的技能和经验推荐合适的工作")
JobRecommendation recommendJob(String skills, int yearsOfExperience);
}

public class StructuredOutputExample {

public static void main(String[] args) {
// 初始化聊天模型
ChatLanguageModel model = QwenChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-max")
.temperature(0.7f)
.build();

// 创建AI服务
JobAdvisor jobAdvisor = AiServices.builder(JobAdvisor.class)
.chatLanguageModel(model)
.build();

// 获取结构化职位推荐
JobRecommendation recommendation = jobAdvisor.recommendJob(
"Java, Spring Boot, Microservices, Docker", 3);

System.out.println("AI 推荐职位:\n" + recommendation);
}
}

LangChain4j与Spring生态集成

Spring Boot Starter
LangChain4j提供了官方的Spring Boot Starter,简化了配置过程。
https://docs.langchain4j.info/tutorials/spring-boot-integration
通义千问
首先在项目中引入依赖:

1
2
3
4
5
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai-spring-boot-starter</artifactId>
<version>1.0.0-beta3</version>
</dependency>
1
2
3
4
5
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-community-dashscope-spring-boot-starter</artifactId>
<version>1.0.0-beta3</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
langchain4j:
community:
dashscope:
api-key: ${QWEN_API_KEY}
model-name: qwen-max
temperature: 0.7
max-tokens: 2000
timeout: PT30S
chat-memory:
max-messages: 10

服务调用
Spring Boot的自动配置功能让LangChain4j的使用变得异常简单。框架会自动创建和配置必要的Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import dev.langchain4j.model.chat.ChatLanguageModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ChatController {

ChatLanguageModel chatLanguageModel;

ChatController(ChatLanguageModel chatLanguageModel) {
this.chatLanguageModel = chatLanguageModel;
}

@GetMapping("/hello")
public String hello(@RequestParam(value = "message", defaultValue = "介绍一下编程导航吧") String message) {
return chatLanguageModel.chat(message);
}
}