LangChain 介绍 LangChain(全称:LangChain Framework) 专门为开发基于大型语言模型(LLM)的应用程序而设计的开发框架。 核心理念:将大型语言模型与外部数据源和计算工具连接起来,通过模块化的组件设计,让开发者可以像搭积木一样构建AI应用。
Python(LangChain)
Java(LangChain4j)
学习前置要求: AI 基础概念:了解机器学习、自然语言处理基本概念 API 使用经验:有 RESTful API 调用经验更佳
学习流程: 第一阶段(基础概念与环境准备):了解 LangChain 基本概念、搭建开发环境、掌握AI大模型基础知识,为后续学习打下理论基础。 第二阶段(LangChain 核心组件):学习模型接入、提示词工程、输出解析等核心组件,掌握 LangChain 的基本使用方法。 第三阶段(高级功能与实战应用):深入学习链式调用、记忆功能、RAG 系统、智能体开发等高级特性,提升 AI 应用开发能力。 第四阶段(项目实战):通过完整的项目实战,将理论知识转化为实际开发能力,培养独立构建AI应用的技能。
附加资料 LangChain 官方网站:https://langchain.com LangChain 官方文档:https://python.langchain.com LangChain4j 官方文档:https://docs.langchain4j.dev GitHub 仓库:
LangChain 核心概念 核心优势
模块化设计 LangChain采用高度模块化的设计,将复杂的AI应用拆分为可重用的组件。 核心模块包括:
LLMs/ChatModels:语言模型和聊天模型的抽象层
Prompts:提示模板管理
Indexes:文档加载和索引
Memory:对话状态管理
Chains:将多个组件连接成处理流程
Agents:自主决策和工具使用的智能体
抽象与标准化 LangChain为不同的LLM提供商、向量数据库和工具创建了统一的抽象层。 这意味着可以轻松切换底层模型或服务,而无需大幅修改代码。
内置工具生态系统 LangChain提供了丰富的工具集成,让语言模型能够与外部世界交互:
网络搜索工具
数据库查询接口
API调用封装
代码执行环境
文件操作工具 这些工具极大扩展了LLM的能力边界,使其不再局限于训练数据中的知识,而能够访问实时信息和执行具体操作。
灵活的工作流构建 通过Chains(链)和Agents(智能体)机制,LangChain支持构建复杂的多步骤工作流:
顺序链:按预定顺序执行一系列操作
路由链:根据输入内容动态选择处理路径
ReAct智能体:结合推理和行动的自主决策系统
多智能体协作:支持多个专业智能体协同工作 这种灵活性使得开发者可以构建从简单对话到复杂问题解决的各类应用。
活跃的社区与生态 LangChain拥有活跃的开发者社区和不断扩展的生态系统:
GitHub上的频繁更新和贡献
详细的文档和教程
丰富的示例和模板
第三方扩展和插件 这种活跃的生态系统确保了框架的持续改进和问题的快速解决。
核心组件
模型提供商集成
OpenAl (GPT-3.5,GPT-4)
Anthropic(Claude)
Google (PaLM,Gemini)
Meta(lama2)
Hugging Face模型
本地部署模型(如Ollama)
向量数据库与存储 为了支持检索增强生成(RAG)应用,LangChain集成了各种向量数据库:
Pinecone
Chroma
FAISS
Veaviate
Milvus
Qdrant
Redis
传统数据库的向量扩展(如PostgreSQL+pgvector))
工具与服务集成 LangChain提供了丰富的工具集成,扩展LLM的能力:
搜索引擎(Google,Bing,DuckDuckGo)
API服务(Zapier,IFTTT)
代码执行环境(Python,JavaScript)
文档处理工具
数据分析工具
部署与托管服务 LangChain应用可以通过多种方式部署:
LangServe:LangChain官方的部署工具
LangSmith:用于监控和调试LangChain应用
云服务提供商(AWS,GCP,Azure)
容器化部署(Docker,Kubernetes)
无服务器函数(AWS Lambda,Cloud Functions)
扩展生态系统 围绕LangChain核心框架,形成了丰富的扩展生态:
社区贡献的组件
配套工具
LangChain Templates:预构建的应用模板库
LangServe:快速部署LangChain应用
LangSmith:监控、评估和调试LangChain应用
开发环境搭建 Python 环境搭建 使用pip安装核心库:
1 2 3 pip install langchain pip install langchain-community pip install langchain-core
安装模型相关依赖 由于我们将使用通义千问模型,需要安装阿里云的SDK:
验证安装 创建一个简单的测试脚本来验证LangChain是否安装成功:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain_core.messages import HumanMessagefrom langchain_community.chat_models import ChatTongyiimport osos.environ["DASHSCOPE_API_KEY" ] = "your_api_key_here" def test_langchain_installation (): """测试 LangChain 安装是否成功""" try : llm = ChatTongyi(model="qwen-plus" ) messages = [HumanMessage(content="你好,我是程序员鱼皮,请简单介绍一下 LangChain" )] print ("LangChain 安装成功!" ) print ("环境配置完成,可以开始开发了!" ) except ImportError as e: print (f"导入错误:{e} " ) print ("请检查 LangChain 是否正确安装" ) except Exception as e: print (f"其他错误:{e} " ) print ("请检查网络连接和 API Key 配置" ) if __name__ == "__main__" : test_langchain_installation()
编写一个环境检测脚本,自动检查当前系统是否满足LangChain开发的所有要求,包括Python版本、必要的库安装情况以及API Key配置状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import sysimport subprocessimport osimport importlib.utildef check_python_version (): version = sys.version_info required_major, required_minor = 3 , 8 if version.major > required_major or (version.major == required_major and version.minor >= required_minor): print (f"✅ Python 版本符合要求: {version.major} .{version.minor} .{version.micro} " ) return True else : print (f"❌ Python 版本不符合要求: {version.major} .{version.minor} .{version.micro} (需要 >= 3.8)" ) return False def check_package_installed (package_name ): spec = importlib.util.find_spec(package_name) return spec is not None def check_environment_complete (): print ("=== 编程导航 LangChain 环境检测工具 ===\n" ) all_good = True if not check_python_version(): all_good = False required_packages = [ 'langchain' , 'langchain_core' , 'langchain_community' , 'dashscope' ] print ("\n📦 检查必要的 Python 包:" ) for package in required_packages: if check_package_installed(package): print (f"✅ {package} " ) else : print (f"❌ {package} (请运行: pip install {package} )" ) all_good = False print ("\n🔑 检查环境变量:" ) api_key = os.getenv('DASHSCOPE_API_KEY' ) if api_key and len (api_key) > 10 : print ("✅ DASHSCOPE_API_KEY 已正确配置" ) else : print ("❌ DASHSCOPE_API_KEY 未配置或配置错误" ) all_good = False print ("\n" + "=" *50 ) if all_good: print ("🎉 恭喜!您的环境配置完美,可以开始 LangChain 学习之旅!" ) else : print ("⚠️ 环境存在问题,请根据上述提示进行修复后再试" ) return all_good if __name__ == "__main__" : check_environment_complete()
创建一个简单的LangChain应用,使用通义千问模型实现一个”编程导航助手”,能够回答关于编程学习的问题。 要求包含错误处理和日志记录功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 import osimport loggingfrom datetime import datetimefrom langchain_community.chat_models import ChatTongyifrom langchain_core.messages import HumanMessage, SystemMessagefrom langchain_core.prompts import ChatPromptTemplateos.environ["DASHSCOPE_API_KEY" ] = "sk-xxxxx" class ProgrammingNavigatorAssistant : def __init__ (self, api_key=None ): """初始化编程导航助手""" logging.basicConfig( level=logging.INFO, format ='%(asctime)s - %(levelname)s - %(message)s' , handlers=[ logging.FileHandler('navigator_assistant.log' ), logging.StreamHandler() ] ) self .logger = logging.getLogger(__name__) self .api_key = api_key or os.getenv('DASHSCOPE_API_KEY' ) if not self .api_key: raise ValueError("请设置 DASHSCOPE_API_KEY 环境变量或传入 api_key 参数" ) try : self .llm = ChatTongyi( model="qwen-plus" , dashscope_api_key=self .api_key, temperature=0.7 ) self .logger.info("编程导航助手初始化成功" ) except Exception as e: self .logger.error(f"模型初始化失败: {e} " ) raise self .system_prompt = """你是编程导航(codefather.cn)的专业编程学习助手。 你的任务是帮助用户解答编程学习相关的问题,提供实用的学习建议和技术指导。 请用温暖、专业且友好的语调回答问题。""" def ask_question (self, question ): """向助手提问""" try : self .logger.info(f"收到用户问题: {question} " ) messages = [ SystemMessage(content=self .system_prompt), HumanMessage(content=question) ] response = self .llm.invoke(messages) answer = response.content self .logger.info("问题回答成功" ) return answer except Exception as e: error_msg = f"回答问题时发生错误: {e} " self .logger.error(error_msg) return f"抱歉,我现在无法回答您的问题。错误信息:{error_msg} " def chat_session (self ): """开始聊天会话""" print ("🤖 编程导航助手启动成功!" ) print ("我是您的专属编程学习伙伴,可以帮您解答编程相关问题。" ) print ("输入 'quit' 或 'exit' 退出聊天\n" ) while True : try : user_input = input ("👤 您的问题: " ).strip() if user_input.lower() in ['quit' , 'exit' , '退出' ]: print ("👋 感谢使用编程导航助手,祝您学习愉快!" ) break if not user_input: print ("请输入您的问题~" ) continue print ("🤖 正在思考..." ) answer = self .ask_question(user_input) print (f"🤖 助手回答: {answer} \n" ) except KeyboardInterrupt: print ("\n👋 感谢使用编程导航助手!" ) break except Exception as e: print (f"❌ 发生错误: {e} " ) def main (): """主函数""" try : assistant = ProgrammingNavigatorAssistant() test_question = "我想学习 Python,应该从哪里开始?" print ("=== 测试问答 ===" ) print (f"问题: {test_question} " ) answer = assistant.ask_question(test_question) print (f"回答: {answer} " ) print ("\n=== 开始聊天会话 ===" ) assistant.chat_session() except Exception as e: print (f"程序启动失败: {e} " ) if __name__ == "__main__" : main()
设计一个自动化的开发环境部署脚本,能够在新的机器上一键安装和配置完整的LangChain开发环境(包括Python、Jva、必要的库和开发工具)。脚本应该支持不同的操作系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 """ 编程导航 LangChain 开发环境自动部署脚本 支持 Windows、macOS、Linux 系统 """ import osimport sysimport subprocessimport platformimport shutilfrom pathlib import Pathclass EnvironmentDeployer : def __init__ (self ): self .system = platform.system().lower() self .python_version = "3.9" self .java_version = "11" self .project_name = "langchain_env" print (f"🚀 编程导航 LangChain 环境部署工具" ) print (f"📋 检测到系统: {platform.system()} {platform.release()} " ) print (f"🎯 将安装 Python {self.python_version} 和 Java {self.java_version} " ) def run_command (self, command, shell=True ): """执行系统命令""" try : print (f"🔧 执行命令: {command} " ) result = subprocess.run(command, shell=shell, capture_output=True , text=True ) if result.returncode == 0 : print (f"✅ 命令执行成功" ) return True , result.stdout else : print (f"❌ 命令执行失败: {result.stderr} " ) return False , result.stderr except Exception as e: print (f"❌ 命令执行异常: {e} " ) return False , str (e) def check_command_exists (self, command ): """检查命令是否存在""" return shutil.which(command) is not None def install_python (self ): """安装 Python""" print ("\n🐍 安装 Python..." ) if self .check_command_exists("python3" ): print ("✅ Python 已安装" ) return True if self .system == "windows" : print ("请手动从 https://www.python.org/ 下载并安装 Python" ) return False elif self .system == "darwin" : if self .check_command_exists("brew" ): return self .run_command(f"brew install python@{self.python_version} " )[0 ] else : print ( "请先安装 Homebrew: /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)\"" ) return False elif self .system == "linux" : if self .check_command_exists("apt" ): return self .run_command("sudo apt update && sudo apt install -y python3 python3-pip python3-venv" )[0 ] elif self .check_command_exists("yum" ): return self .run_command("sudo yum install -y python3 python3-pip" )[0 ] elif self .check_command_exists("dnf" ): return self .run_command("sudo dnf install -y python3 python3-pip" )[0 ] return False def install_java (self ): """安装 Java""" print ("\n☕ 安装 Java..." ) if self .check_command_exists("java" ): print ("✅ Java 已安装" ) return True if self .system == "windows" : print ("请手动从 https://adoptopenjdk.net/ 下载并安装 OpenJDK 11" ) return False elif self .system == "darwin" : if self .check_command_exists("brew" ): return self .run_command(f"brew install openjdk@{self.java_version} " )[0 ] elif self .system == "linux" : if self .check_command_exists("apt" ): return self .run_command(f"sudo apt install -y openjdk-{self.java_version} -jdk" )[0 ] elif self .check_command_exists("yum" ): return self .run_command(f"sudo yum install -y java-{self.java_version} -openjdk-devel" )[0 ] return False def create_virtual_environment (self ): """创建 Python 虚拟环境""" print (f"\n🌍 创建虚拟环境 {self.project_name} ..." ) venv_path = Path.home() / self .project_name if venv_path.exists(): print (f"⚠️ 虚拟环境 {venv_path} 已存在" ) return True success, _ = self .run_command(f"python3 -m venv {venv_path} " ) if success: print (f"✅ 虚拟环境创建成功: {venv_path} " ) return True return False def install_langchain_packages (self ): """安装 LangChain 相关包""" print ("\n📦 安装 LangChain 包..." ) venv_path = Path.home() / self .project_name if self .system == "windows" : activate_script = venv_path / "Scripts" / "activate" pip_cmd = f"{venv_path} \\Scripts\\pip" else : activate_script = venv_path / "bin" / "activate" pip_cmd = f"{venv_path} /bin/pip" packages = [ "langchain" , "langchain-core" , "langchain-community" , "dashscope" , "jupyter" , "pytest" , "black" , "pylint" ] for package in packages: print (f"📦 安装 {package} ..." ) success, _ = self .run_command(f"{pip_cmd} install {package} " ) if not success: print (f"❌ {package} 安装失败" ) return False print ("✅ 所有 LangChain 包安装完成" ) return True def create_project_structure (self ): """创建项目结构""" print ("\n📁 创建项目结构..." ) project_dir = Path.home() / "langchain_projects" project_dir.mkdir(exist_ok=True ) demo_dir = project_dir / "demo" demo_dir.mkdir(exist_ok=True ) demo_file = demo_dir / "hello_langchain.py" demo_content = '''""" 编程导航 LangChain 示例项目 """ import os from langchain_community.chat_models import ChatTongyi from langchain_core.messages import HumanMessage def main(): # 设置 API Key(请替换为您的实际 API Key) api_key = os.getenv("DASHSCOPE_API_KEY") if not api_key: print("请设置 DASHSCOPE_API_KEY 环境变量") return # 初始化模型 llm = ChatTongyi(model="qwen-plus", dashscope_api_key=api_key) # 发送消息 messages = [HumanMessage(content="你好,我是编程导航的学员!")] response = llm.invoke(messages) print(f"AI 回复: {response.content}") if __name__ == "__main__": main() ''' with open (demo_file, 'w' , encoding='utf-8' ) as f: f.write(demo_content) env_file = demo_dir / ".env.example" env_content = '''# 阿里云通义千问 API Key DASHSCOPE_API_KEY=your_api_key_here # LangChain 跟踪配置(可选) LANGCHAIN_TRACING_V2=true LANGCHAIN_API_KEY=your_langchain_api_key ''' with open (env_file, 'w' , encoding='utf-8' ) as f: f.write(env_content) print (f"✅ 项目结构创建完成: {project_dir} " ) return True def deploy (self ): """执行完整部署""" print ("🚀 开始部署 LangChain 开发环境...\n" ) steps = [ ("安装 Python" , self .install_python), ("安装 Java" , self .install_java), ("创建虚拟环境" , self .create_virtual_environment), ("安装 LangChain 包" , self .install_langchain_packages), ("创建项目结构" , self .create_project_structure), ] for step_name, step_func in steps: print (f"\n{'=' * 50 } " ) print (f"📋 步骤: {step_name} " ) print ('=' * 50 ) if not step_func(): print (f"❌ {step_name} 失败,部署终止" ) return False self .print_completion_info() return True def print_completion_info (self ): """打印完成信息""" venv_path = Path.home() / self .project_name project_path = Path.home() / "langchain_projects" activation_cmd = { 'windows' : f"{venv_path} \\Scripts\\activate" , 'darwin' : f"source {venv_path} /bin/activate" , 'linux' : f"source {venv_path} /bin/activate" }.get(self .system, f"source {venv_path} /bin/activate" ) print (f"\n{'=' * 60 } " ) print ("🎉 LangChain 开发环境部署完成!" ) print ('=' * 60 ) print (f"📂 虚拟环境位置: {venv_path} " ) print (f"📂 项目目录: {project_path} " ) print (f"\n🔧 激活虚拟环境:" ) print (f" {activation_cmd} " ) print (f"\n📝 下一步:" ) print (f" 1. 激活虚拟环境" ) print (f" 2. 配置 .env 文件中的 API Key" ) print (f" 3. 运行示例: python {project_path} /demo/hello_langchain.py" ) print (f"\n🌟 欢迎来到编程导航的 LangChain 学习之旅!" ) print (f"📚 更多教程请访问: https://www.codefather.cn/" ) def main (): """主函数""" try : deployer = EnvironmentDeployer() success = deployer.deploy() if success: print ("\n✅ 部署成功完成!" ) else : print ("\n❌ 部署过程中遇到问题,请检查错误信息" ) except KeyboardInterrupt: print ("\n⚠️ 部署被用户中断" ) except Exception as e: print (f"\n❌ 部署过程中发生未预期的错误: {e} " ) if __name__ == "__main__" : main()
Java 环境搭建 推荐插件: Lombok:简化Java代码 Maven Helper:Maven依赖管理 GitToolBox:Git集成增强
Maven https://docs.langchain4j.dev/get-started/
1 2 3 4 5 6 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j-community-dashscope</artifactId > <version > 1.0.0-beta2</version > </dependency >
示例:
1 2 3 4 5 6 7 8 9 10 11 public class LangChainAiInvoke { public static void main (String[] args) { ChatLanguageModel qwenModel = QwenChatModel.builder() .apiKey(TestApiKey.API_KEY) .modelName("qwen-max" ) .build(); String answer = qwenModel.chat("我是程序员鱼皮,这是编程导航 codefather.cn 的原创项目教程" ); System.out.println(answer); } }
Maven测试配置:
1 2 3 4 5 6 7 8 9 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-surefire-plugin</artifactId > <version > 3.0.0-M7</version > </plugin > </plugins > </build >
模型接入 目前主流的大模型接入方式可分为三类:
API接入
SDK接入
本地部署接入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import pandas as pdfrom IPython.display import displaycomparison_data = { "接入方式" : ["API接入" , "SDK接入" , "本地部署" ], "开发难度" : ["低" , "中" , "高" ], "维护成本" : ["无" , "低" , "高" ], "响应速度" : ["依赖网络" , "依赖网络" , "快速稳定" ], "数据隐私" : ["数据出境" , "数据出境" , "完全控制" ], "成本结构" : ["按调用次数付费" , "按调用次数付费" , "前期硬件投入" ], "适用场景" : ["通用应用" , "需定制功能" , "数据敏感场景" ] } comparison_df = pd.DataFrame(comparison_data) print ("大模型接入方式对比:" )display(comparison_df)
大模型接入的基本流程通常包括: 准备工作(申请API密钥或准备部署环境)、 接口调用(构造请求、发送、解析响应)、 错误处理(网络问题、限流、异常)、 结果处理(解析响应、集成到应用逻辑)。 无论选择哪种接入方式,了解这一基本流程都很重要。
API接入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import requestsimport jsonimport osos.environ["DASHSCOPE_API_KEY" ] = "sk-xxxx" def call_tongyi_api (prompt, model="qwen-plus" ): """ 通过HTTP请求直接调用阿里云通义千问API 参数: prompt: 用户输入的提示词 model: 使用的模型名称,如 qwen-plus, qwen-turbo, qwen-max 返回: 模型生成的文本响应 """ api_url = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions" api_key = os.environ.get("DASHSCOPE_API_KEY" ) if not api_key: raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量" ) headers = { "Content-Type" : "application/json" , "Authorization" : "Bearer " + api_key } request_body = { "model" : model, "messages" : [ {"role" : "system" , "content" : "你是编程导航网站的智能助手,专注于帮助开发者解决问题。" }, {"role" : "user" , "content" : prompt} ], "temperature" : 0.7 , "max_tokens" : 500 } try : response = requests.post(api_url, headers=headers, json=request_body) response.raise_for_status() response_data = response.json() return response_data["choices" ][0 ]["message" ]["content" ] except requests.exceptions.RequestException as e: return f"API请求错误: {str (e)} " except (KeyError, IndexError) as e: return f"响应解析错误: {str (e)} " if __name__ == "__main__" : user_question = "面试鸭网站有哪些主要功能?" response = call_tongyi_api(user_question) print (f"问题: {user_question} \n" ) print (f"回答: {response} " )
这段程序展示了通过HTTP请求调用通义千问的完整过程: 构建请求头(包含API密钥进行认证)、构建请求体(包含模型名称、输入消息和参数设置)、发送请求并解析响应。 对于其他大模型提供商,如百度文心一言、ChatGPT、腾讯混元等,API调用的基本结构类似,主要区别在于:
端点URL不同
认证方式可能不同(有些使用API密钥,有些使用OAuth或其他认证方式)
请求参数的命名和结构可能不同
响应格式略有差异 在使用API接入时,有几个优化点得注意:
错误处理:实现完善的错误处理机制,处理网络错误、服务不可用、请求超时等情况。
请求重试:对于失败的请求实现智能重试机制,通常采用指数退避策略。
请求优化:通过优化输入内容长度、减少不必要的上下文等手段降低toke消耗。
响应缓存:对于相同或类以的输入,可以缓存响应以减少API调用。
SDK接入 以通义干问的Python SDK为例,展示如何通过SDK调用大模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import osfrom openai import OpenAIos.environ["DASHSCOPE_API_KEY" ] = "sk-xxxx" def call_tongyi_sdk (prompt, model="qwen-plus" ): """ 使用 OpenAI 官方 SDK 兼容模式调用阿里云通义千问(DashScope)大模型 参数: prompt: 用户输入的提示词 model: 使用的模型名称,如 qwen-plus 返回: 模型生成的文本响应 """ api_key = os.getenv("DASHSCOPE_API_KEY" ) if not api_key: raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量" ) client = OpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" , ) completion = client.chat.completions.create( model=model, messages=[ {"role" : "system" , "content" : "你是面试鸭网站的技术面试助手,专注于帮助用户准备技术面试。" }, {"role" : "user" , "content" : prompt}, ], temperature=0.7 , max_tokens=500 , ) return completion.choices[0 ].message.content if __name__ == "__main__" : question = "如何准备Java后端开发面试?" answer = call_tongyi_sdk(question) print (f"问题: {question} \n" ) print (f"回答: {answer} " )
与直接API调用相比,SDK代码更简洁,不需要手动构造HTTP请求和处理响应解析。 除了官方SDK外,高级AI框架如LangChain为开发者提供了更强大的功能和更高层次的抽象。 以LangChain为例,它不仅简化了模型调用,还提供了链式处理、上下文管理、工具集成等高级功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import osfrom langchain_community.chat_models import ChatTongyifrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserimport osfrom openai import OpenAIos.environ["DASHSCOPE_API_KEY" ] = "sk-xxx" def call_tongyi_sdk (prompt, model="qwen-plus" ): """ 使用 OpenAI 官方 SDK 兼容模式调用阿里云通义千问(DashScope)大模型 参数: prompt: 用户输入的提示词 model: 使用的模型名称,如 qwen-plus 返回: 模型生成的文本响应 """ api_key = os.getenv("DASHSCOPE_API_KEY" ) if not api_key: raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量" ) client = OpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" , ) completion = client.chat.completions.create( model=model, messages=[ {"role" : "system" , "content" : "你是面试鸭网站的技术面试助手,专注于帮助用户准备技术面试。" }, {"role" : "user" , "content" : prompt}, ], temperature=0.7 , max_tokens=500 , ) return completion.choices[0 ].message.content if __name__ == "__main__" : question = "如何准备Java后端开发面试?" answer = call_tongyi_sdk(question) print (f"问题: {question} \n" ) print (f"回答: {answer} " ) def call_with_langchain_tongyi (prompt, model_name="qwen-plus" ): """ 使用 LangChain 框架调用阿里云通义千问大模型 参数: prompt: 用户输入的提示词 model_name: 使用的模型名称,通义千问示例一般用 "qwen-plus" 返回: 模型生成的文本响应 """ api_key = os.environ.get("DASHSCOPE_API_KEY" ) if not api_key: raise ValueError("没有找到 DASHSCOPE_API_KEY 环境变量" ) try : chat_model = ChatTongyi( model=model_name, api_key=api_key, temperature=0.7 ) prompt_template = ChatPromptTemplate.from_messages([ ("system" , "你是代码小抄的编程助手,专注于提供清晰易懂的编程解释和示例代码。" ), ("human" , "{input}" ) ]) chain = prompt_template | chat_model | StrOutputParser() return chain.invoke({"input" : prompt}) except Exception as e: return f"LangChain调用错误: {str (e)} " if __name__ == "__main__" : user_question = "请解释Python中的装饰器概念并提供一个简单示例" response = call_with_langchain_tongyi(user_question) print (f"问题: {user_question} \n" ) print (f"回答: {response} " )
使用SDK接入的主要优势在于:
代码简洁:减少了大量样板代码,提高开发效率
功能丰富:提供了更多高级功能,如流式输出、函数调用等
错误处理:内置了完善的错误处理和重试机制
类型安全:许多SDK提供了类型提示和接口定义,减少错误
抽象层:提供了更高级的抽象,如链式处理、代理等
跨模型兼容:高级框架通常支持多种模型,便于切换和比较 在选择SDK时,应考虑以下因素:
官方支持度:优先选择模型提供商官方维护的SDK
社区活跃度:活跃的社区意味着更多的资源和更快的问题解决
功能完整性:是否支持所有需要的API功能
更新频率:是否能及时跟进模型提供商的API更新
文档质量:好的文档能大幅降低学习成本
依赖复杂度:过多的依赖可能增加项目维护难度 值得一提的是,某些框架如LangChain还提供了模型切换的便捷性,只需更改少量配置就可以从一个模型切换到另一个,这对于比较不同模型性能或避免对单一供应商的依赖非常有用。
本地模型 通过API接口调用本地模型 Ollama提供了兼容REST API,可以通过HTTP请求调用本地模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import requestsimport jsondef call_local_model (prompt, model="llama2" ): """ 调用本地部署的Ollama模型 参数: prompt: 用户输入的提示词 model: 模型名称,默认为llama2 返回: 模型生成的回答 """ api_url = "http://localhost:11434/api/generate" request_body = { "model" : model, "prompt" : prompt, "stream" : False } try : response = requests.post(api_url, json=request_body) response.raise_for_status() result = response.json() return result.get("response" , "无响应内容" ) except requests.exceptions.RequestException as e: return f"本地模型调用错误: {str (e)} " if __name__ == "__main__" : user_prompt = "编程导航是什么网站?请简要介绍。" print ("正在调用本地Llama2模型..." ) response = call_local_model(user_prompt) print (f"问题: {user_prompt} \n" ) print (f"回答: {response} " )
使用Python客户端库 除了直接使用HTTP请求,还可以使用Ollama的Python客户端库,使调用更简便:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import ollamadef call_with_ollama_client (prompt, model="llama2" ): """ 使用Ollama客户端库调用本地模型 参数: prompt: 用户输入的提示词 model: 模型名称 返回: 模型生成的回答 """ try : response = ollama.generate(model=model, prompt=prompt) return response['response' ] except Exception as e: return f"Ollama客户端调用错误: {str (e)} " if __name__ == "__main__" : user_prompt = "老鱼简历网站提供什么服务?" print ("正在通过Ollama客户端调用本地模型..." ) response = call_with_ollama_client(user_prompt) print (f"问题: {user_prompt} \n" ) print (f"回答: {response} " )
与LangChain集成 对于更复杂的应用场景,可以将本地模型与LangChain等高级框架集成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from langchain_community.llms import Ollamafrom langchain_core.prompts import PromptTemplatefrom langchain_core.output_parsers import StrOutputParserdef call_local_model_with_langchain (prompt, model_name="llama2" ): """ 使用LangChain框架调用本地Ollama模型 参数: prompt: 用户输入的提示词 model_name: 模型名称 返回: 模型生成的回答 """ try : llm = Ollama(model=model_name) template = """ 你是剪切助手的AI助手,请专业地回答用户问题。 用户问题: {question} 回答: """ prompt_template = PromptTemplate.from_template(template) chain = prompt_template | llm | StrOutputParser() return chain.invoke({"question" : prompt}) except Exception as e: return f"LangChain集成错误: {str (e)} " if __name__ == "__main__" : user_question = "Python中如何高效处理大文件?" print ("正在通过LangChain调用本地模型..." ) response = call_local_model_with_langchain(user_question) print (f"问题: {user_question} \n" ) print (f"回答: {response} " )
多模型切换与管理 下面,我们设计一个多模型管理器,支持模型注册、切换、回退等功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 import osimport timefrom typing import Dict , List , Callable , Any , Optional from enum import Enumclass ModelType (Enum ): """模型类型枚举""" OPENAI = "openai" DASHSCOPE = "dashscope" DEEPSEEK = "deepseek" ANTHROPIC = "anthropic" GOOGLE = "google" LOCAL = "local" CUSTOM = "custom" class ModelManager : """ 多模型管理器:支持注册、切换、回退等功能 """ def __init__ (self ): """初始化模型管理器""" self .models = {} self .fallback_chain = [] self .default_model = None self .metrics = {} def register_model (self, model_name: str , model_type: ModelType, inference_func: Callable , is_default: bool = False , config: Dict = None ): """ 注册一个模型到管理器 """ self .models[model_name] = { "type" : model_type, "function" : inference_func, "config" : config or {} } if is_default or self .default_model is None : self .default_model = model_name self .metrics[model_name] = { "calls" : 0 , "errors" : 0 , "total_latency" : 0 , "avg_latency" : 0 } print (f"模型 '{model_name} ' 已注册" ) def set_fallback_chain (self, model_names: List [str ] ): """ 设置模型回退链,按优先级排序 """ for name in model_names: if name not in self .models: raise ValueError(f"模型 '{name} ' 未注册" ) self .fallback_chain = model_names print (f"回退链已设置: {' -> ' .join(model_names)} " ) def _update_metrics (self, model_name: str , latency: float , is_error: bool = False ): """更新模型调用指标""" self .metrics[model_name]["calls" ] += 1 self .metrics[model_name]["total_latency" ] += latency self .metrics[model_name]["avg_latency" ] = ( self .metrics[model_name]["total_latency" ] / self .metrics[model_name]["calls" ] ) if is_error: self .metrics[model_name]["errors" ] += 1 def infer (self, input_text: str , model_name: Optional [str ] = None , use_fallback: bool = True ) -> Dict : """ 使用指定模型进行推理,支持回退机制 """ if model_name is None : model_name = self .default_model if model_name not in self .models: raise ValueError(f"模型 '{model_name} ' 未注册" ) models_to_try = [model_name] if use_fallback and self .fallback_chain: for fallback_model in self .fallback_chain: if fallback_model != model_name and fallback_model in self .models: models_to_try.append(fallback_model) last_error = None for current_model in models_to_try: try : model_config = self .models[current_model] inference_func = model_config["function" ] start_time = time.time() result = inference_func(input_text) latency = time.time() - start_time self ._update_metrics(current_model, latency) return { "output" : result, "model_used" : current_model, "latency" : latency, "fallback_used" : current_model != model_name, "timestamp" : time.time() } except Exception as e: latency = time.time() - start_time self ._update_metrics(current_model, latency, is_error=True ) last_error = str (e) print (f"模型 '{current_model} ' 调用失败: {last_error} " ) if current_model == models_to_try[-1 ]: raise Exception(f"所有模型调用失败。最后错误: {last_error} " ) raise RuntimeError("意外错误" ) def get_metrics (self ) -> Dict : """获取所有模型的调用指标""" return self .metrics def reset_metrics (self ): """重置所有模型的调用指标""" for model in self .metrics: self .metrics[model] = { "calls" : 0 , "errors" : 0 , "total_latency" : 0 , "avg_latency" : 0 }
现在,我们可以实现一个使用这个多模型管理器的示例,展示如何注册、切换和回退不同的模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 import osimport requestsimport jsondef openai_inference (text ): """OpenAI 模型推理""" api_key = os.environ.get("OPENAI_API_KEY" , "" ) if not api_key: raise ValueError("未找到 OpenAI API 密钥" ) headers = { "Content-Type" : "application/json" , "Authorization" : f"Bearer {api_key} " } data = { "model" : "gpt-3.5-turbo" , "messages" : [{"role" : "user" , "content" : text}], "temperature" : 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions" , headers=headers, json=data ) if response.status_code != 200 : raise Exception(f"API 错误: {response.status_code} " ) return response.json()["choices" ][0 ]["message" ]["content" ] def anthropic_inference (text ): """Anthropic Claude 模型推理""" api_key = os.environ.get("ANTHROPIC_API_KEY" , "" ) if not api_key: raise ValueError("未找到 Anthropic API 密钥" ) headers = { "Content-Type" : "application/json" , "x-api-key" : api_key } data = { "model" : "claude-instant-1" , "prompt" : f"\n\nHuman: {text} \n\nAssistant:" , "max_tokens_to_sample" : 300 } response = requests.post( "https://api.anthropic.com/v1/complete" , headers=headers, json=data ) if response.status_code != 200 : raise Exception(f"API 错误: {response.status_code} " ) return response.json()["completion" ] def local_model_inference (text ): """本地模型推理 (使用 Ollama)""" try : response = requests.post( "http://localhost:11434/api/generate" , json={"model" : "llama2" , "prompt" : text} ) if response.status_code != 200 : raise Exception(f"Ollama API 错误: {response.status_code} " ) return response.json()["response" ] except requests.exceptions.ConnectionError: raise Exception("无法连接到本地 Ollama 服务,请确保服务已启动" ) def dashscope_inference (text ): api_key = os.environ.get("DASHSCOPE_API_KEY" , "" ) if not api_key: raise ValueError("未找到 DashScope API 密钥" ) headers = { "Content-Type" : "application/json" , "Authorization" : f"Bearer {api_key} " } data = { "model" : "qwen-plus" , "messages" : [{"role" : "user" , "content" : text}], "parameters" : { "temperature" : 0.7 } } response = requests.post( "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions" , headers=headers, json=data ) if response.status_code != 200 : raise Exception(f"DashScope API 错误: {response.status_code} - {response.text} " ) return response.json()["choices" ][0 ]["message" ]["content" ] def deepseek_inference (text ): api_key = os.environ.get("DEEPSEEK_API_KEY" , "" ) if not api_key: raise ValueError("未找到 DeepSeek API 密钥" ) headers = { "Content-Type" : "application/json" , "Authorization" : f"Bearer {api_key} " } data = { "model" : "deepseek-chat" , "messages" : [{"role" : "user" , "content" : text}], "temperature" : 0.7 } response = requests.post( "https://api.deepseek.com/v1/chat/completions" , headers=headers, json=data ) if response.status_code != 200 : raise Exception(f"DeepSeek API 错误: {response.status_code} - {response.text} " ) return response.json()["choices" ][0 ]["message" ]["content" ] def demo_model_manager (): """演示模型管理器的使用""" manager = ModelManager() try : manager.register_model( "gpt-3.5" , ModelType.OPENAI, openai_inference, is_default=True ) print ("已注册 OpenAI GPT-3.5 模型" ) except Exception as e: print (f"OpenAI 模型注册失败: {str (e)} " ) try : manager.register_model( "claude" , ModelType.ANTHROPIC, anthropic_inference ) print ("已注册 Anthropic Claude 模型" ) except Exception as e: print (f"Anthropic 模型注册失败: {str (e)} " ) try : manager.register_model( "llama-local" , ModelType.LOCAL, local_model_inference ) print ("已注册本地 Llama2 模型" ) except Exception as e: print (f"本地模型注册失败: {str (e)} " ) try : manager.register_model("dashscope" , ModelType.CUSTOM, dashscope_inference) print ("已注册通义千问 DashScope 模型" ) except Exception as e: print (f"DashScope 模型注册失败: {str (e)} " ) try : manager.register_model("deepseek" , ModelType.CUSTOM, deepseek_inference) print ("已注册 DeepSeek 模型" ) except Exception as e: print (f"DeepSeek 模型注册失败: {str (e)} " ) available_models = list (manager.models.keys()) if len (available_models) > 1 : manager.set_fallback_chain(available_models) test_queries = [ "编程导航网站提供哪些学习资源?" , "请介绍一下面试鸭网站的主要功能。" , "老鱼简历有哪些特色功能?" ] for query in test_queries: try : print (f"\n问题: {query} " ) result = manager.infer(query) print (f"使用模型: {result['model_used' ]} " ) print (f"延迟: {result['latency' ]:.2 f} 秒" ) print (f"回答: {result['output' ][:150 ]} ..." if len (result['output' ]) > 150 else f"回答: {result['output' ]} " ) except Exception as e: print (f"推理失败: {str (e)} " ) print ("\n模型性能指标:" ) metrics = manager.get_metrics() for model, data in metrics.items(): if data["calls" ] > 0 : print (f"{model} : 调用次数={data['calls' ]} , 错误率={data['errors' ]/data['calls' ]:.1 %} , 平均延迟={data['avg_latency' ]:.2 f} 秒" ) if __name__ == "__main__" : demo_model_manager()
除了基础的模型切换功能外,高级多模型管理还可以考虑以下扩展:
智能路由:根据输入内容的特征自动选择最适合的模型
负载均衡:在多个模型实例间分配请求,避免单点压力过大
自适应选择:基于历史表现动态调整模型选择策略
混合策略:对同一请求使用多个模型,然后选择或合并结果
预算管理:根据成本预算智能分配不同价格层级的模型资源 在构建多模型系统时,关键是设计清晰的接口抽象,使不同模型可以无缝替换,这也是为什么像LangChain这样的框架受欢迎的原因之一它们提供了统一的抽象层,简化了多模型管理。
异步调用与批处理 在处理大量AI模型请求时,同步调用会导致程序长时间等待响应,造成资源浪费和用户体验下降。 异步调用和批处理是两种重要的优化技术,可以显著提高应用程序的吞吐量和响应能力。
异步调用 Python的asyncio库提供了实现异步编程的基础设施。 以下是使用aiohttp和asyncio实现通义千问API异步调用的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import asyncioimport osimport timefrom typing import List , Dict , Any from langchain_community.chat_models import ChatTongyios.environ["DASHSCOPE_API_KEY" ] = "sk-xxxx" async def async_tongyi_call (prompt: str ) -> Dict [str , Any ]: """ 异步调用通义千问 ChatTongyi 参数: prompt: 提示词 返回: 响应字典 """ model = ChatTongyi() def sync_call (): response = model.invoke(prompt) return response.content try : content = await asyncio.to_thread(sync_call) except Exception as e: raise Exception(f"通义千问调用异常: {str (e)} " ) return { "prompt" : prompt, "response" : content, "model" : "tongyi" } async def process_multiple_prompts (prompts: List [str ] ) -> List [Dict [str , Any ]]: """ 异步处理多个提示词,调用通义千问 参数: prompts: 提示词列表 返回: 响应列表 """ tasks = [ async_tongyi_call(prompt) for prompt in prompts ] return await asyncio.gather(*tasks, return_exceptions=True ) async def main (): prompts = [ "编程导航网站提供哪些服务?" , "面试鸭网站如何帮助程序员准备面试?" , "如何使用算法导航学习数据结构?" , "老鱼简历的特色功能有哪些?" , "代码小抄网站的主要用途是什么?" ] print (f"开始异步处理 {len (prompts)} 个查询..." ) start_time = time.time() results = await process_multiple_prompts(prompts) end_time = time.time() successful = sum (1 for r in results if not isinstance (r, Exception)) print (f"完成 {successful} /{len (prompts)} 个查询, 耗时: {end_time - start_time:.2 f} 秒" ) print (f"平均每个查询: {(end_time - start_time) / len (prompts):.2 f} 秒" ) for i, result in enumerate (results): print (f"\n查询 {i + 1 } : {prompts[i]} " ) if isinstance (result, Exception): print (f"错误: {str (result)} " ) else : print (f"回答: {result['response' ][:100 ]} ..." if len ( result['response' ]) > 100 else f"回答: {result['response' ]} " ) if __name__ == "__main__" : asyncio.run(main())
批处理 批处理是指将多个请求打包在一个AP调用中发送,进一步优化性能和成本。 许多大模型提供商支持批处理功能,如通义千问的ChatCompletion API。 以下是实现批处理的示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 import osimport openaiimport timefrom typing import List , Dict , Any def batch_process_openai (prompts: List [str ], model: str = "gpt-3.5-turbo" ) -> List [Dict [str , Any ]]: """ 批量处理多个提示词 参数: prompts: 提示词列表 model: 模型名称 返回: 响应列表 """ openai.api_key = os.environ.get("OPENAI_API_KEY" , "" ) if not openai.api_key: raise ValueError("未找到 OpenAI API 密钥" ) batch_messages = [] for prompt in prompts: batch_messages.append([{"role" : "user" , "content" : prompt}]) client = openai.OpenAI() try : response = client.chat.completions.create( model=model, messages=batch_messages, max_tokens=150 ) results = [] for i, choice in enumerate (response.choices): results.append({ "prompt" : prompts[i], "response" : choice.message.content, "model" : model }) return results except Exception as e: raise Exception(f"批处理请求失败: {str (e)} " ) async def batch_async_openai (prompts: List [str ], batch_size: int = 5 ) -> List [Dict [str , Any ]]: """ 结合批处理和异步处理 参数: prompts: 提示词列表 batch_size: 每批的大小 返回: 所有响应的列表 """ batches = [prompts[i:i + batch_size] for i in range (0 , len (prompts), batch_size)] print (f"将 {len (prompts)} 个提示词分为 {len (batches)} 批处理" ) client = openai.AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY" , "" )) async def process_batch (batch: List [str ] ): batch_messages = [[{"role" : "user" , "content" : p}] for p in batch] try : response = await client.chat.completions.create( model="gpt-3.5-turbo" , messages=batch_messages, max_tokens=150 ) return [ { "prompt" : batch[i], "response" : choice.message.content } for i, choice in enumerate (response.choices) ] except Exception as e: print (f"批处理错误: {str (e)} " ) return [{"prompt" : p, "error" : str (e)} for p in batch] tasks = [process_batch(batch) for batch in batches] batch_results = await asyncio.gather(*tasks) all_results = [] for batch_result in batch_results: all_results.extend(batch_result) return all_results async def batch_demo (): prompts = [ "编程导航网站提供哪些服务?" , "面试鸭网站如何帮助程序员准备面试?" , "如何使用算法导航学习数据结构?" , "老鱼简历的特色功能有哪些?" , "代码小抄网站的主要用途是什么?" , "剪切助手如何提高工作效率?" , "程序员鱼皮是谁?他创建了哪些产品?" ] print ("开始批量异步处理..." ) start_time = time.time() results = await batch_async_openai(prompts, batch_size=3 ) end_time = time.time() successful = sum (1 for r in results if "error" not in r) print (f"完成 {successful} /{len (prompts)} 个查询, 总耗时: {end_time - start_time:.2 f} 秒" ) print (f"平均每个查询: {(end_time - start_time) / len (prompts):.2 f} 秒" ) for i, result in enumerate (results): print (f"\n查询 {i+1 } : {result['prompt' ]} " ) if "error" in result: print (f"错误: {result['error' ]} " ) else : print (f"回答: {result['response' ][:100 ]} ..." if len (result['response' ]) > 100 else f"回答: {result['response' ]} " ) if __name__ == "__main__" : asyncio.run(batch_demo())
异步调用与批处理的实际应用 在实际应用中,异步调用和批处理特别适合以下场景:
高并发应用:如聊天机器人平台,需要同时处理多用户的请求
批量内容生成:如批量生成产品描述、文章摘要等
数据分析与处理:需要对大量文本数据进行分析和处理
并行任务处理:如多文档同时处理、多段落同时翻译等 使用异步和批处理技术时,需要注意以下几点:
资源管理:控制并发数量,避免超出AP!速率限制
错误处理:妥善处理单个请求失败的情况,避免影响整体流程
结果汇总:正确收集和整合异步操作的结果
超时控制:设置合理的超时时间,避免长时间等待
重试机制:对失败的请求实施智能重试策略
流式处理 对于需要实时响应的场景,如聊天对话,流式处理(Streaming)是另一种重要的技术。 它允许模型生成部分响应时立即返回,而不必等待完整响应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import osimport asynciofrom langchain_community.chat_models import ChatTongyios.environ["DASHSCOPE_API_KEY" ] = "sk-xxxxx" async def stream_tongyi_response (prompt: str ): """ 模拟流式处理通义千问响应 参数: prompt: 用户提示词 """ model = ChatTongyi() def sync_call (): return model.invoke(prompt).content print (f"问题: {prompt} " ) print ("回答: " , end="" , flush=True ) response = await asyncio.to_thread(sync_call) chunk_size = 10 for i in range (0 , len (response), chunk_size): chunk = response[i:i + chunk_size] print (chunk, end="" , flush=True ) await asyncio.sleep(0.1 ) print () async def stream_demo (): await stream_tongyi_response("请用简短的段落介绍剪切助手这款产品。" ) if __name__ == "__main__" : asyncio.run(stream_demo())
LangChain4j 基础 LangChain4j的架构主要包含以下几个核心组件:
模型接口层:提供统一的接口连接各种语言模型,如ChatLanguageModel、.EmbeddingModel等。
内存组件:管理对话历史和上下文信息,支持短期和长期记忆。
链(Chains):将多个组件组合成处理流程,如提示模板、模型调用、结果处理等。
工具(Tools):让语言模型能够执行外部操作,如搜索、计算、API调用等。
输出处理器:将模型输出解析成结构化数据,如SON、POO等。
检索器(Retrievers):实现文档检索和相关内容提取功能。
Maven 依赖 1 2 3 4 5 6 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j</artifactId > <version > 1.0.0-beta3</version > </dependency >
还需要引入对应的模型集成模块
1 2 3 4 5 6 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j-open-ai</artifactId > <version > 1.1.0</version > </dependency >
1 2 3 4 5 6 7 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j-community-dashscope</artifactId > <version > 1.0.0-beta2</version > </dependency >
大模型支持列表 LangChain4j的模型集成仍在不断扩展中,社区版本提供了更多模型的支持。 你可以在官方文档的集成页面查看最新的支持列表:https://docs.langchain4j.dev/integrations/language-models/
基本使用方法 LangChain4j的使用流程通常包括初始化模型、构建提示、处理响应等步骤
简单对话
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import dev.langchain4j.community.model.dashscope.QwenChatModel;import dev.langchain4j.model.chat.ChatLanguageModel;public class SimpleChatExample { public static void main (String[] args) { ChatLanguageModel model = QwenChatModel.builder() .apiKey(System.getenv("DASHSCOPE_API_KEY" )) .modelName("qwen-max" ) .temperature(0.7f ) .build(); String response = model.chat("你好,我是程序员鱼皮,能介绍一下编程导航网站吗?" ); System.out.println("AI 回答: " + response); } }
使用消息对象构建对话 如果需要更复杂的对话,可以使用Messages对象构建多轮对话:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import dev.langchain4j.community.model.dashscope.QwenChatModel;import dev.langchain4j.model.chat.ChatLanguageModel;import dev.langchain4j.data.message.AiMessage;import dev.langchain4j.data.message.ChatMessage;import dev.langchain4j.data.message.SystemMessage;import dev.langchain4j.data.message.UserMessage;import java.util.ArrayList;import java.util.List;public class MultiTurnChatExample { public static void main (String[] args) { ChatLanguageModel model = QwenChatModel.builder() .apiKey(System.getenv("DASHSCOPE_API_KEY" )) .modelName("qwen-max" ) .temperature(0.7f ) .build(); List<ChatMessage> messages = new ArrayList <>(); messages.add(new SystemMessage ("你是面试鸭的AI助手,专注于提供Java面试题解答" )); messages.add(new UserMessage ("请介绍一下Java中的多线程" )); AiMessage response = model.chat(messages).content(); System.out.println("AI 回答: " + response.text()); messages.add(response); messages.add(new UserMessage ("线程池有哪些常见参数?" )); AiMessage followUpResponse = model.chat(messages).content(); System.out.println("\n后续问题 AI 回答: " + followUpResponse.text()); } }
使用聊天服务构建应用 LangChain4j提供了ChatMemory来自动管理对话历史,简化多轮对话的开发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import dev.langchain4j.memory.ChatMemory;import dev.langchain4j.memory.chat.MessageWindowChatMemory;import dev.langchain4j.model.chat.ChatLanguageModel;import dev.langchain4j.community.model.dashscope.QwenChatModel;import dev.langchain4j.service.AiServices;import dev.langchain4j.service.SystemPromptTemplate;@SystemPromptTemplate("你是代码小抄的AI助手,专注于提供简洁有效的{{language}}代码示例") interface CodeAssistant { String chat (String message) ; String generateExample (String task, String language) ; } public class ChatServiceExample { public static void main (String[] args) { ChatLanguageModel model = QwenChatModel.builder() .apiKey(System.getenv("DASHSCOPE_API_KEY" )) .modelName("qwen-max" ) .temperature(0.7f ) .build(); ChatMemory chatMemory = MessageWindowChatMemory.builder() .maxMessages(10 ) .build(); CodeAssistant assistant = AiServices.builder(CodeAssistant.class) .chatLanguageModel(model) .chatMemory(chatMemory) .systemPromptVariable("language" , "Java" ) .build(); String response = assistant.chat("如何在Java中创建一个多线程安全的单例模式?" ); System.out.println("AI 回答: " + response); String example = assistant.generateExample("读取CSV文件" , "Java" ); System.out.println("\n代码示例: " + example); } }
结构化输出解析 LangChain4j支持将AI输出解析为结构化数据,非常适合需要从Al回答中提取特定信息的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import dev.langchain4j.model.chat.ChatLanguageModel;import dev.langchain4j.community.model.dashscope.QwenChatModel;import dev.langchain4j.service.AiServices;import dev.langchain4j.service.SystemMessage;class JobRecommendation { private String title; private String description; private int experienceYearsRequired; private String[] requiredSkills; public String getTitle () { return title; } public void setTitle (String title) { this .title = title; } public String getDescription () { return description; } public void setDescription (String description) { this .description = description; } public int getExperienceYearsRequired () { return experienceYearsRequired; } public void setExperienceYearsRequired (int experienceYearsRequired) { this .experienceYearsRequired = experienceYearsRequired; } public String[] getRequiredSkills() { return requiredSkills; } public void setRequiredSkills (String[] requiredSkills) { this .requiredSkills = requiredSkills; } @Override public String toString () { return "职位名称: " + title + "\n" + "职位描述: " + description + "\n" + "所需经验: " + experienceYearsRequired + "年\n" + "所需技能: " + String.join(", " , requiredSkills); } } interface JobAdvisor { @SystemMessage("你是老鱼简历的AI职业顾问,根据用户的技能和经验推荐合适的工作") JobRecommendation recommendJob (String skills, int yearsOfExperience) ; } public class StructuredOutputExample { public static void main (String[] args) { ChatLanguageModel model = QwenChatModel.builder() .apiKey(System.getenv("DASHSCOPE_API_KEY" )) .modelName("qwen-max" ) .temperature(0.7f ) .build(); JobAdvisor jobAdvisor = AiServices.builder(JobAdvisor.class) .chatLanguageModel(model) .build(); JobRecommendation recommendation = jobAdvisor.recommendJob( "Java, Spring Boot, Microservices, Docker" , 3 ); System.out.println("AI 推荐职位:\n" + recommendation); } }
LangChain4j与Spring生态集成 Spring Boot Starter LangChain4j提供了官方的Spring Boot Starter,简化了配置过程。https://docs.langchain4j.info/tutorials/spring-boot-integration 通义千问 首先在项目中引入依赖:
1 2 3 4 5 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j-open-ai-spring-boot-starter</artifactId > <version > 1.0.0-beta3</version > </dependency >
1 2 3 4 5 <dependency > <groupId > dev.langchain4j</groupId > <artifactId > langchain4j-community-dashscope-spring-boot-starter</artifactId > <version > 1.0.0-beta3</version > </dependency >
1 2 3 4 5 6 7 8 9 10 langchain4j: community: dashscope: api-key: ${QWEN_API_KEY} model-name: qwen-max temperature: 0.7 max-tokens: 2000 timeout: PT30S chat-memory: max-messages: 10
服务调用 Spring Boot的自动配置功能让LangChain4j的使用变得异常简单。框架会自动创建和配置必要的Bean:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import dev.langchain4j.model.chat.ChatLanguageModel;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestController public class ChatController { ChatLanguageModel chatLanguageModel; ChatController(ChatLanguageModel chatLanguageModel) { this .chatLanguageModel = chatLanguageModel; } @GetMapping("/hello") public String hello (@RequestParam(value = "message", defaultValue = "介绍一下编程导航吧") String message) { return chatLanguageModel.chat(message); } }