LangChain
LangChain is a framework for developing applications powered by Large Language Models (LLMs). It simplifies the entire LLM application lifecycle with open-source components, third-party integrations, and tools for building complex AI workflows.
Installation
# Core LangChain library
pip install langchain
# Specific integrations
pip install langchain-openai # OpenAI models
pip install langchain-anthropic # Claude models
pip install langchain-community # Community integrations
pip install langchain-experimental # Experimental features
# Vector stores
pip install langchain-chroma # ChromaDB
pip install langchain-pinecone # Pinecone
pip install faiss-cpu # FAISS
# All common packages
pip install langchain[all]
# Development installation
git clone https://github.com/langchain-ai/langchain.git
cd langchain
pip install -e .[all]
Quick Start
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Initialize LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}")
])
# Create chain using LCEL (LangChain Expression Language)
chain = prompt | llm | StrOutputParser()
# Invoke the chain
result = chain.invoke({"input": "What is LangChain?"})
print(result)
Core Concepts
1. LangChain Expression Language (LCEL)
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# Basic chain composition
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
output_parser = StrOutputParser()
# Chain components with | operator
chain = prompt | model | output_parser
# Invoke chain
result = chain.invoke({"topic": "programming"})
# Batch processing
results = chain.batch([
{"topic": "cats"},
{"topic": "dogs"}
])
# Streaming
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
# Async support
import asyncio
async_result = await chain.ainvoke({"topic": "python"})
2. Prompts and Templates
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.prompts import FewShotPromptTemplate
# Basic prompt template
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
# Chat prompt template
chat_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that translates {input_language} to {output_language}."),
("human", "{text}")
])
# Few-shot prompting
examples = [
{"input": "happy", "output": "sad"},
{"input": "tall", "output": "short"},
]
example_prompt = PromptTemplate(
input_variables=["input", "output"],
template="Input: {input}\nOutput: {output}"
)
few_shot_prompt = FewShotPromptTemplate(
examples=examples,
example_prompt=example_prompt,
prefix="Give the antonym of every input",
suffix="Input: {adjective}\nOutput:",
input_variables=["adjective"]
)
# Partial prompts
partial_prompt = prompt.partial(product="smartphones")
result = partial_prompt.format()
# Prompt composition
final_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
chat_prompt,
("human", "Please also explain why this translation is correct.")
])
3. LLM Integration
from langchain_openai import ChatOpenAI, OpenAI
from langchain_anthropic import ChatAnthropic
from langchain_community.llms import Ollama
# OpenAI models
openai_chat = ChatOpenAI(
model="gpt-4o",
temperature=0.7,
max_tokens=1000,
api_key="your-api-key"
)
# Anthropic Claude
anthropic = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1000
)
# Local Ollama models
ollama = Ollama(
model="llama2",
base_url="http://localhost:11434"
)
# Model with callbacks for monitoring
from langchain_core.callbacks import BaseCallbackHandler
class TokenCountCallback(BaseCallbackHandler):
def __init__(self):
self.total_tokens = 0
def on_llm_end(self, response, **kwargs):
if hasattr(response, 'llm_output') and 'token_usage' in response.llm_output:
self.total_tokens += response.llm_output['token_usage']['total_tokens']
callback = TokenCountCallback()
llm_with_callback = ChatOpenAI(callbacks=[callback])
Common Patterns
1. Sequential Chains
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain_core.prompts import PromptTemplate
# First chain: generate synopsis
synopsis_template = """You are a playwright. Given the title of play, it is your job to write a synopsis for that title.
Title: {title}
Playwright: This is a synopsis for the above play:"""
synopsis_prompt = PromptTemplate(
input_variables=["title"],
template=synopsis_template
)
synopsis_chain = LLMChain(llm=llm, prompt=synopsis_prompt)
# Second chain: write review
review_template = """You are a play critic from the New York Times. Given the synopsis of play, it is your job to write a review for that play.
Synopsis: {synopsis}
Review from a New York Times play critic of the above play:"""
review_prompt = PromptTemplate(
input_variables=["synopsis"],
template=review_template
)
review_chain = LLMChain(llm=llm, prompt=review_prompt)
# Combine chains
overall_chain = SimpleSequentialChain(
chains=[synopsis_chain, review_chain],
verbose=True
)
# Run the chain
review = overall_chain.invoke({"input": "Tragedy at sunset on the beach"})
2. Parallel Processing
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# Define parallel tasks
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model | StrOutputParser()
poem_chain = ChatPromptTemplate.from_template("write a short poem about {topic}") | model | StrOutputParser()
# Run in parallel
parallel_chain = RunnableParallel({
"joke": joke_chain,
"poem": poem_chain,
"original_topic": RunnablePassthrough()
})
result = parallel_chain.invoke({"topic": "artificial intelligence"})
print(result["joke"])
print(result["poem"])
print(result["original_topic"])
3. Conditional Logic and Routing
from langchain_core.runnables import RunnableBranch
def route_question(info):
if "math" in info["question"].lower():
return math_chain
elif "history" in info["question"].lower():
return history_chain
else:
return general_chain
# Math chain
math_chain = ChatPromptTemplate.from_template(
"You are a math expert. Answer this question: {question}"
) | model | StrOutputParser()
# History chain
history_chain = ChatPromptTemplate.from_template(
"You are a history expert. Answer this question: {question}"
) | model | StrOutputParser()
# General chain
general_chain = ChatPromptTemplate.from_template(
"Answer this question: {question}"
) | model | StrOutputParser()
# Route based on question content
routing_chain = RunnableBranch(
(lambda x: "math" in x["question"].lower(), math_chain),
(lambda x: "history" in x["question"].lower(), history_chain),
general_chain # default
)
result = routing_chain.invoke({"question": "What is 2+2?"})
Memory Management
1. Conversation Buffer Memory
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# Basic conversation memory
memory = ConversationBufferMemory()
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)
# Have a conversation
response1 = conversation.invoke({"input": "Hi, I'm John"})
response2 = conversation.invoke({"input": "What's my name?"})
# Access memory
print(memory.buffer)
print(memory.chat_memory.messages)
2. Conversation Summary Memory
from langchain.memory import ConversationSummaryMemory
# Memory that summarizes conversation
summary_memory = ConversationSummaryMemory(
llm=llm,
return_messages=True
)
conversation_with_summary = ConversationChain(
llm=llm,
memory=summary_memory,
verbose=True
)
# Long conversation will be summarized
for i in range(5):
response = conversation_with_summary.invoke({
"input": f"Tell me a fact about number {i}"
})
3. Conversation Buffer Window Memory
from langchain.memory import ConversationBufferWindowMemory
# Keep only last k interactions
window_memory = ConversationBufferWindowMemory(
k=3, # Keep last 3 exchanges
return_messages=True
)
windowed_conversation = ConversationChain(
llm=llm,
memory=window_memory
)
4. Custom Memory with LCEL
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.messages import BaseMessage
from typing import List
# Custom memory implementation
class CustomMemory:
def __init__(self):
self.messages: List[BaseMessage] = []
def add_message(self, message: BaseMessage):
self.messages.append(message)
def get_context(self) -> str:
return "\n".join([f"{msg.type}: {msg.content}" for msg in self.messages[-6:]])
memory = CustomMemory()
# Chain with custom memory
def add_memory(inputs):
# Add user input to memory
memory.add_message(HumanMessage(content=inputs["input"]))
inputs["chat_history"] = memory.get_context()
return inputs
def save_response(response):
# Save AI response to memory
memory.add_message(AIMessage(content=response.content))
return response
chat_with_memory = (
RunnableLambda(add_memory) |
ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Here's the chat history:\n{chat_history}"),
("human", "{input}")
]) |
model |
RunnableLambda(save_response) |
StrOutputParser()
)
Document Loading and Processing
1. Document Loaders
from langchain_community.document_loaders import (
TextLoader, PDFLoader, WebBaseLoader,
DirectoryLoader, CSVLoader, UnstructuredHTMLLoader
)
# Text files
text_loader = TextLoader("path/to/file.txt")
docs = text_loader.load()
# PDF files
pdf_loader = PDFLoader("path/to/document.pdf")
pdf_docs = pdf_loader.load()
# Web pages
web_loader = WebBaseLoader("https://example.com")
web_docs = web_loader.load()
# Directory of files
directory_loader = DirectoryLoader(
"path/to/directory",
glob="**/*.txt",
loader_cls=TextLoader
)
all_docs = directory_loader.load()
# CSV files
csv_loader = CSVLoader("path/to/data.csv")
csv_docs = csv_loader.load()
# Custom loader
from langchain_core.documents import Document
def custom_loader(file_path: str) -> List[Document]:
# Custom loading logic
with open(file_path, 'r') as f:
content = f.read()
return [Document(
page_content=content,
metadata={"source": file_path, "custom_field": "value"}
)]
2. Text Splitting
from langchain.text_splitter import (
CharacterTextSplitter, RecursiveCharacterTextSplitter,
TokenTextSplitter, MarkdownHeaderTextSplitter
)
# Character-based splitting
char_splitter = CharacterTextSplitter(
separator="\n\n",
chunk_size=1000,
chunk_overlap=200
)
char_chunks = char_splitter.split_documents(docs)
# Recursive character splitting (recommended)
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", " ", ""]
)
recursive_chunks = recursive_splitter.split_documents(docs)
# Token-based splitting
token_splitter = TokenTextSplitter(
chunk_size=512,
chunk_overlap=50
)
token_chunks = token_splitter.split_documents(docs)
# Markdown-aware splitting
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)
markdown_chunks = markdown_splitter.split_text(markdown_text)
# Custom splitter
from langchain.text_splitter import TextSplitter
class CustomSplitter(TextSplitter):
def split_text(self, text: str) -> List[str]:
# Custom splitting logic
return text.split("---") # Split on custom separator
custom_splitter = CustomSplitter()
custom_chunks = custom_splitter.split_documents(docs)
Vector Stores and Embeddings
1. Embeddings
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings, OllamaEmbeddings
# OpenAI embeddings
openai_embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
# Hugging Face embeddings
hf_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Local Ollama embeddings
ollama_embeddings = OllamaEmbeddings(
model="llama2",
base_url="http://localhost:11434"
)
# Test embeddings
text = "This is a test document"
embedding_vector = openai_embeddings.embed_query(text)
print(f"Embedding dimension: {len(embedding_vector)}")
# Batch embeddings
texts = ["Document 1", "Document 2", "Document 3"]
batch_embeddings = openai_embeddings.embed_documents(texts)
2. Vector Store Operations
from langchain_community.vectorstores import Chroma, FAISS, Pinecone
from langchain_core.documents import Document
# Create documents
docs = [
Document(page_content="The sky is blue", metadata={"source": "fact1"}),
Document(page_content="Grass is green", metadata={"source": "fact2"}),
Document(page_content="Fire is hot", metadata={"source": "fact3"}),
]
# ChromaDB vector store
chroma_db = Chroma.from_documents(
documents=docs,
embedding=openai_embeddings,
persist_directory="./chroma_db"
)
# FAISS vector store (in-memory)
faiss_db = FAISS.from_documents(
documents=docs,
embedding=openai_embeddings
)
# Save/load FAISS
faiss_db.save_local("./faiss_index")
loaded_faiss = FAISS.load_local("./faiss_index", openai_embeddings)
# Pinecone vector store
import pinecone
pinecone.init(api_key="your-api-key", environment="your-env")
pinecone_db = Pinecone.from_documents(
documents=docs,
embedding=openai_embeddings,
index_name="your-index"
)
# Search operations
query = "What color is the sky?"
similar_docs = chroma_db.similarity_search(query, k=2)
# Search with scores
docs_with_scores = chroma_db.similarity_search_with_score(query, k=2)
for doc, score in docs_with_scores:
print(f"Score: {score}, Content: {doc.page_content}")
# Filtered search
filtered_docs = chroma_db.similarity_search(
query,
k=2,
filter={"source": "fact1"}
)
# Add more documents
new_docs = [Document(page_content="Water is wet", metadata={"source": "fact4"})]
chroma_db.add_documents(new_docs)
# Delete documents
chroma_db.delete(ids=["doc_id_to_delete"])
Retrieval-Augmented Generation (RAG)
1. Basic RAG Chain
from langchain.chains import RetrievalQA
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
# Set up vector store as retriever
retriever = chroma_db.as_retriever(
search_type="similarity",
search_kwargs={"k": 3}
)
# Traditional approach with RetrievalQA
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
result = qa_chain.invoke({"query": "What is the color of the sky?"})
print(result["result"])
print(result["source_documents"])
# Modern approach with LCEL
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_template = """Answer the question based only on the following context:
{context}
Question: {question}
Answer:"""
rag_prompt = ChatPromptTemplate.from_template(rag_template)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
answer = rag_chain.invoke("What is the color of the sky?")
2. Advanced RAG with Multiple Retrievers
from langchain.retrievers import EnsembleRetriever, MultiQueryRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 retriever (keyword-based)
texts = [doc.page_content for doc in docs]
bm25_retriever = BM25Retriever.from_texts(texts)
# Ensemble retriever (combines multiple retrievers)
ensemble_retriever = EnsembleRetriever(
retrievers=[chroma_db.as_retriever(), bm25_retriever],
weights=[0.7, 0.3] # Weight vector search more than keyword search
)
# Multi-query retriever (generates multiple queries)
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=chroma_db.as_retriever(),
llm=llm
)
# Use with RAG chain
advanced_rag_chain = (
{"context": ensemble_retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
3. RAG with Chat History
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
# Contextualized retrieval
contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
history_aware_retriever = create_history_aware_retriever(
llm, retriever, contextualize_q_prompt
)
# Answer generation
qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know.
{context}"""
qa_prompt = ChatPromptTemplate.from_messages([
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
# Full RAG chain with history
rag_chain_with_history = create_retrieval_chain(
history_aware_retriever, question_answer_chain
)
# Usage with session history
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
conversational_rag_chain = RunnableWithMessageHistory(
rag_chain_with_history,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
# Chat with history
response = conversational_rag_chain.invoke(
{"input": "What is the sky color?"},
config={"configurable": {"session_id": "session_1"}},
)
follow_up = conversational_rag_chain.invoke(
{"input": "Why is that?"}, # References previous question
config={"configurable": {"session_id": "session_1"}},
)
Agents and Tool Usage
1. Basic Agent Setup
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain_community.tools import WikipediaQueryRun, DuckDuckGoSearchRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.tools import BaseTool
from typing import Type
# Define tools
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
search = DuckDuckGoSearchRun()
# Custom tool
class CalculatorTool(BaseTool):
name = "calculator"
description = "Useful for mathematical calculations"
def _run(self, query: str) -> str:
try:
return str(eval(query)) # Be careful with eval in production
except Exception as e:
return f"Error: {str(e)}"
calculator = CalculatorTool()
# Available tools
tools = [wikipedia, search, calculator]
# Create agent
agent_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Use tools when needed."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_tools_agent(llm, tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# Use agent
result = agent_executor.invoke({
"input": "What is the population of Tokyo? Then calculate 10% of that number."
})
2. Custom Tools
from langchain.tools import tool
from langchain.pydantic_v1 import BaseModel, Field
import requests
# Decorator-based tool
@tool
def get_weather(city: str) -> str:
"""Get current weather for a given city."""
# Mock weather API call
return f"The weather in {city} is sunny and 75°F"
# Class-based tool with input schema
class WeatherInput(BaseModel):
city: str = Field(description="The city to get weather for")
units: str = Field(default="fahrenheit", description="Temperature units")
class WeatherTool(BaseTool):
name = "weather_tool"
description = "Get current weather information"
args_schema: Type[BaseModel] = WeatherInput
def _run(self, city: str, units: str = "fahrenheit") -> str:
# Weather API integration
return f"Weather in {city}: 72°{units[0].upper()}, partly cloudy"
# API-based tool
@tool
def search_api(query: str) -> str:
"""Search the web using a custom API."""
# Example API call
response = requests.get(
"https://api.example.com/search",
params={"q": query},
headers={"Authorization": "Bearer your-token"}
)
return response.json().get("results", "No results found")
# File system tool
@tool
def read_file(file_path: str) -> str:
"""Read contents of a file."""
try:
with open(file_path, 'r') as f:
return f.read()
except Exception as e:
return f"Error reading file: {str(e)}"
# Database tool
@tool
def query_database(sql_query: str) -> str:
"""Execute SQL query on database."""
import sqlite3
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
try:
cursor.execute(sql_query)
results = cursor.fetchall()
conn.close()
return str(results)
except Exception as e:
conn.close()
return f"Database error: {str(e)}"
3. Agent Types and Strategies
from langchain.agents import create_react_agent, create_structured_chat_agent
# ReAct agent (Reasoning and Acting)
react_prompt = """Answer the following questions as best you can. You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}"""
react_agent = create_react_agent(llm, tools, react_prompt)
react_executor = AgentExecutor(
agent=react_agent,
tools=tools,
verbose=True,
max_iterations=5
)
# Structured chat agent
structured_agent = create_structured_chat_agent(
llm=llm,
tools=tools,
prompt=agent_prompt
)
structured_executor = AgentExecutor(
agent=structured_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
# Custom agent with error handling
class CustomAgentExecutor(AgentExecutor):
def _handle_error(self, error: Exception) -> str:
return f"I encountered an error: {str(error)}. Let me try a different approach."
custom_executor = CustomAgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=3,
early_stopping_method="generate"
)
Output Parsing
1. Built-in Parsers
from langchain_core.output_parsers import (
StrOutputParser, JsonOutputParser, ListOutputParser,
CommaSeparatedListOutputParser, DatetimeOutputParser
)
from langchain_core.pydantic_v1 import BaseModel, Field
# String parser (default)
str_parser = StrOutputParser()
# JSON parser
json_parser = JsonOutputParser()
# List parser
list_parser = ListOutputParser()
# Comma-separated list parser
csv_parser = CommaSeparatedListOutputParser()
# Datetime parser
datetime_parser = DatetimeOutputParser()
# Usage with chains
json_chain = (
ChatPromptTemplate.from_template(
"Generate a JSON object with name and age for a person named {name}"
)
| llm
| json_parser
)
result = json_chain.invoke({"name": "Alice"})
2. Pydantic Parser
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
# Define data model
class Person(BaseModel):
name: str = Field(description="person's name")
age: int = Field(description="person's age")
occupation: str = Field(description="person's job")
skills: List[str] = Field(description="list of skills")
# Create parser
person_parser = PydanticOutputParser(pydantic_object=Person)
# Create prompt with format instructions
person_prompt = ChatPromptTemplate.from_messages([
("system", "Extract person information from the following text."),
("human", "{text}\n{format_instructions}"),
])
# Chain with structured output
person_chain = (
person_prompt.partial(format_instructions=person_parser.get_format_instructions())
| llm
| person_parser
)
# Usage
text = "John Doe is a 30-year-old software engineer who knows Python, JavaScript, and SQL."
structured_data = person_chain.invoke({"text": text})
print(f"Name: {structured_data.name}")
print(f"Skills: {structured_data.skills}")
3. Custom Output Parsers
from langchain_core.output_parsers import BaseOutputParser
from typing import Dict, Any
import re
class CustomEmailParser(BaseOutputParser[Dict[str, Any]]):
"""Parse email components from LLM output."""
def parse(self, text: str) -> Dict[str, Any]:
# Extract email components using regex
subject_match = re.search(r'Subject:\s*(.*)', text)
body_match = re.search(r'Body:\s*(.*?)(?=\n\n|\Z)', text, re.DOTALL)
recipient_match = re.search(r'To:\s*(.*)', text)
return {
"subject": subject_match.group(1) if subject_match else "",
"body": body_match.group(1).strip() if body_match else "",
"recipient": recipient_match.group(1) if recipient_match else ""
}
@property
def _type(self) -> str:
return "custom_email_parser"
# Usage
email_parser = CustomEmailParser()
email_prompt = ChatPromptTemplate.from_template(
"""Write a professional email with the following format:
Subject: [subject line]
To: [recipient email]
Body: [email content]
Topic: {topic}
Recipient: {recipient}"""
)
email_chain = email_prompt | llm | email_parser
parsed_email = email_chain.invoke({
"topic": "Meeting followup",
"recipient": "john@example.com"
})
print(parsed_email)
Advanced Features
1. Callbacks and Monitoring
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.callbacks import CallbackManager
import time
class TimingCallback(BaseCallbackHandler):
def __init__(self):
self.start_time = None
self.end_time = None
def on_chain_start(self, serialized, inputs, **kwargs):
self.start_time = time.time()
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs, **kwargs):
self.end_time = time.time()
duration = self.end_time - self.start_time
print(f"Chain completed in {duration:.2f} seconds")
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"LLM called with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
if hasattr(response, 'llm_output') and response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
print(f"Token usage: {token_usage}")
# Use callback
timing_callback = TimingCallback()
callback_manager = CallbackManager([timing_callback])
chain_with_callbacks = (
prompt
| llm.with_config(callbacks=[timing_callback])
| StrOutputParser()
)
result = chain_with_callbacks.invoke({"input": "Hello, world!"})
2. Streaming and Async Operations
import asyncio
from langchain_core.callbacks import AsyncCallbackHandler
# Streaming responses
def stream_response(chain, input_data):
for chunk in chain.stream(input_data):
print(chunk, end="", flush=True)
# Async operations
async def async_chain_example():
async_llm = ChatOpenAI(temperature=0)
async_chain = prompt | async_llm | StrOutputParser()
# Async invoke
result = await async_chain.ainvoke({"input": "Hello async world!"})
print(result)
# Async streaming
async for chunk in async_chain.astream({"input": "Stream this async"}):
print(chunk, end="", flush=True)
# Async batch processing
batch_results = await async_chain.abatch([
{"input": "First query"},
{"input": "Second query"},
{"input": "Third query"}
])
return batch_results
# Run async example
# results = asyncio.run(async_chain_example())
# Async callback
class AsyncTimingCallback(AsyncCallbackHandler):
async def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Async chain started: {inputs}")
async def on_chain_end(self, outputs, **kwargs):
print(f"Async chain completed: {outputs}")
3. Configuration and Environment Management
from langchain_core.runnables import RunnableConfig
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration management
config = RunnableConfig(
tags=["production", "chat"],
metadata={"user_id": "123", "session_id": "abc"},
recursion_limit=10
)
# Chain with config
result = chain.invoke(
{"input": "Hello"},
config=config
)
# Environment-specific setup
class EnvironmentConfig:
def __init__(self, env: str = "development"):
self.env = env
if env == "production":
self.llm_model = "gpt-4"
self.temperature = 0.1
self.max_tokens = 1000
else:
self.llm_model = "gpt-3.5-turbo"
self.temperature = 0.7
self.max_tokens = 500
def get_llm(self):
return ChatOpenAI(
model=self.llm_model,
temperature=self.temperature,
max_tokens=self.max_tokens
)
# Usage
env_config = EnvironmentConfig("production")
production_llm = env_config.get_llm()
4. Custom Chain Classes
from langchain_core.runnables import Runnable
from typing import Dict, Any
class CustomProcessingChain(Runnable):
"""Custom chain with preprocessing and postprocessing."""
def __init__(self, llm, preprocessor=None, postprocessor=None):
self.llm = llm
self.preprocessor = preprocessor or self._default_preprocess
self.postprocessor = postprocessor or self._default_postprocess
def _default_preprocess(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Default preprocessing
if "text" in input_data:
input_data["text"] = input_data["text"].strip().lower()
return input_data
def _default_postprocess(self, output: str) -> str:
# Default postprocessing
return output.strip().capitalize()
def invoke(self, input_data: Dict[str, Any], config=None) -> str:
# Preprocess
processed_input = self.preprocessor(input_data)
# LLM call
prompt = ChatPromptTemplate.from_template("Process this text: {text}")
chain = prompt | self.llm | StrOutputParser()
result = chain.invoke(processed_input, config=config)
# Postprocess
final_result = self.postprocessor(result)
return final_result
# Custom preprocessing/postprocessing functions
def custom_preprocessor(data):
data["text"] = f"IMPORTANT: {data['text']}"
return data
def custom_postprocessor(output):
return f"[PROCESSED] {output}"
# Usage
custom_chain = CustomProcessingChain(
llm=llm,
preprocessor=custom_preprocessor,
postprocessor=custom_postprocessor
)
result = custom_chain.invoke({"text": "hello world"})
Integration Examples
1. FastAPI Web Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
session_id: str
# Initialize chain
chat_chain = prompt | llm | StrOutputParser()
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
try:
response = await chat_chain.ainvoke({"input": request.message})
return ChatResponse(
response=response,
session_id=request.session_id or "default"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Run with: uvicorn main:app --reload
2. Streamlit Chat Interface
import streamlit as st
from langchain_core.messages import HumanMessage, AIMessage
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "chain" not in st.session_state:
st.session_state.chain = prompt | llm | StrOutputParser()
st.title("LangChain Chatbot")
# Display chat history
for message in st.session_state.messages:
if isinstance(message, HumanMessage):
with st.chat_message("user"):
st.write(message.content)
else:
with st.chat_message("assistant"):
st.write(message.content)
# Chat input
if user_input := st.chat_input("Type your message..."):
# Add user message
st.session_state.messages.append(HumanMessage(content=user_input))
# Display user message
with st.chat_message("user"):
st.write(user_input)
# Get AI response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
response = st.session_state.chain.invoke({"input": user_input})
st.write(response)
st.session_state.messages.append(AIMessage(content=response))
3. Gradio Interface
import gradio as gr
def chat_interface(message, history):
# Convert history to proper format
context = "\n".join([f"Human: {h[0]}\nAssistant: {h[1]}" for h in history])
# Add current message
full_prompt = f"{context}\nHuman: {message}\nAssistant:"
# Get response
response = chain.invoke({"input": full_prompt})
return response
# Create Gradio interface
demo = gr.ChatInterface(
fn=chat_interface,
title="LangChain Chat",
description="Chat with LangChain-powered AI assistant"
)
# Launch interface
if __name__ == "__main__":
demo.launch(share=True)
Performance Optimization
1. Caching
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# In-memory caching
set_llm_cache(InMemoryCache())
# SQLite caching (persistent)
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# Redis caching
from langchain.cache import RedisCache
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
set_llm_cache(RedisCache(redis_client))
# Custom cache
from langchain_core.caches import BaseCache
class CustomCache(BaseCache):
def __init__(self):
self._cache = {}
def lookup(self, prompt, llm_string):
return self._cache.get((prompt, llm_string))
def update(self, prompt, llm_string, return_val):
self._cache[(prompt, llm_string)] = return_val
set_llm_cache(CustomCache())
2. Batch Processing
# Batch LLM calls
batch_prompts = [
{"input": f"Summarize topic {i}"} for i in range(10)
]
# Sequential processing
results = []
for prompt in batch_prompts:
result = chain.invoke(prompt)
results.append(result)
# Parallel batch processing
batch_results = chain.batch(batch_prompts, config={"max_concurrency": 5})
# Async batch processing
async def async_batch_processing():
return await chain.abatch(batch_prompts)
# Streaming batch
for result in chain.batch(batch_prompts):
print(f"Completed: {result}")
3. Memory Management
# Efficient memory usage
from langchain.memory import ConversationSummaryBufferMemory
# Summary + buffer memory (hybrid approach)
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=1000,
return_messages=True
)
# Custom memory with cleanup
class EfficientMemory:
def __init__(self, max_messages=10):
self.messages = []
self.max_messages = max_messages
def add_message(self, message):
self.messages.append(message)
if len(self.messages) > self.max_messages:
# Keep only recent messages
self.messages = self.messages[-self.max_messages:]
def clear_old_messages(self):
# Keep only last 5 messages
self.messages = self.messages[-5:]
# Periodic cleanup
efficient_memory = EfficientMemory()
# Use context managers for cleanup
from contextlib import contextmanager
@contextmanager
def managed_chain(chain):
try:
yield chain
finally:
# Cleanup operations
if hasattr(chain, 'memory') and chain.memory:
chain.memory.clear()
Best Practices
1. Error Handling
from langchain_core.exceptions import LangChainException
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_chain_invoke(chain, input_data, max_retries=3):
"""Invoke chain with error handling and retries."""
for attempt in range(max_retries):
try:
result = chain.invoke(input_data)
logger.info(f"Chain invocation successful on attempt {attempt + 1}")
return result
except LangChainException as e:
logger.error(f"LangChain error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
raise
except Exception as e:
logger.error(f"Unexpected error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
raise
# Exponential backoff
time.sleep(2 ** attempt)
# Usage
try:
result = robust_chain_invoke(chain, {"input": "test"})
except Exception as e:
logger.error(f"All retry attempts failed: {str(e)}")
# Fallback behavior
result = "I apologize, but I'm having trouble processing your request right now."
2. Testing
import pytest
from unittest.mock import Mock, patch
class TestLangChainComponents:
@pytest.fixture
def mock_llm(self):
"""Mock LLM for testing."""
mock = Mock()
mock.invoke.return_value = "Mocked response"
return mock
@pytest.fixture
def sample_chain(self, mock_llm):
"""Sample chain for testing."""
prompt = ChatPromptTemplate.from_template("Test: {input}")
return prompt | mock_llm | StrOutputParser()
def test_chain_invoke(self, sample_chain):
"""Test basic chain invocation."""
result = sample_chain.invoke({"input": "hello"})
assert result == "Mocked response"
def test_prompt_formatting(self):
"""Test prompt template formatting."""
prompt = ChatPromptTemplate.from_template("Hello {name}")
formatted = prompt.format(name="Alice")
assert "Alice" in str(formatted)
@patch('langchain_openai.ChatOpenAI')
def test_with_real_llm_mock(self, mock_openai):
"""Test with mocked OpenAI client."""
mock_openai.return_value.invoke.return_value = "Test response"
llm = ChatOpenAI()
result = llm.invoke("test prompt")
assert result == "Test response"
mock_openai.assert_called_once()
# Run tests with: pytest test_langchain.py
3. Production Deployment
from langchain_core.runnables import RunnableConfig
import os
from typing import Dict, Any
class ProductionChainWrapper:
"""Production-ready chain wrapper with monitoring and error handling."""
def __init__(self, chain, environment="production"):
self.chain = chain
self.environment = environment
self.request_count = 0
self.error_count = 0
def invoke(self, input_data: Dict[str, Any]) -> str:
"""Invoke chain with production safeguards."""
self.request_count += 1
try:
# Input validation
self._validate_input(input_data)
# Rate limiting check
if self.request_count > 1000: # Example limit
raise Exception("Rate limit exceeded")
# Invoke chain with timeout
config = RunnableConfig(
tags=[self.environment],
metadata={"request_id": self.request_count}
)
result = self.chain.invoke(input_data, config=config)
# Output validation
self._validate_output(result)
return result
except Exception as e:
self.error_count += 1
logger.error(f"Chain error: {str(e)}")
# Fallback response
return self._get_fallback_response(input_data)
def _validate_input(self, input_data: Dict[str, Any]):
"""Validate input data."""
if not isinstance(input_data, dict):
raise ValueError("Input must be a dictionary")
if "input" not in input_data:
raise ValueError("Input must contain 'input' key")
if len(input_data["input"]) > 10000: # Character limit
raise ValueError("Input too long")
def _validate_output(self, output: str):
"""Validate output."""
if not isinstance(output, str):
raise ValueError("Output must be a string")
if len(output) == 0:
raise ValueError("Empty output")
def _get_fallback_response(self, input_data: Dict[str, Any]) -> str:
"""Provide fallback response on error."""
return "I apologize, but I'm unable to process your request at the moment."
def get_metrics(self) -> Dict[str, Any]:
"""Get performance metrics."""
return {
"request_count": self.request_count,
"error_count": self.error_count,
"error_rate": self.error_count / max(self.request_count, 1),
"environment": self.environment
}
# Usage
production_chain = ProductionChainWrapper(chain, "production")
result = production_chain.invoke({"input": "Hello world"})
metrics = production_chain.get_metrics()
4. Monitoring and Observability
from langchain_community.callbacks import LangChainTracer
from langchain.callbacks.manager import tracing_v2_enabled
# LangSmith tracing (if available)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
# Use tracing context
with tracing_v2_enabled():
result = chain.invoke({"input": "traced request"})
# Custom monitoring
class MonitoringCallback(BaseCallbackHandler):
def __init__(self):
self.metrics = {
"total_requests": 0,
"total_tokens": 0,
"average_response_time": 0,
"error_count": 0
}
self.start_times = {}
def on_chain_start(self, serialized, inputs, run_id, **kwargs):
self.metrics["total_requests"] += 1
self.start_times[run_id] = time.time()
def on_chain_end(self, outputs, run_id, **kwargs):
if run_id in self.start_times:
duration = time.time() - self.start_times[run_id]
# Update average response time
current_avg = self.metrics["average_response_time"]
new_avg = ((current_avg * (self.metrics["total_requests"] - 1)) + duration) / self.metrics["total_requests"]
self.metrics["average_response_time"] = new_avg
del self.start_times[run_id]
def on_chain_error(self, error, run_id, **kwargs):
self.metrics["error_count"] += 1
if run_id in self.start_times:
del self.start_times[run_id]
# Use monitoring
monitoring = MonitoringCallback()
monitored_chain = chain.with_config(callbacks=[monitoring])
# Health check endpoint
def health_check():
return {
"status": "healthy",
"metrics": monitoring.metrics,
"timestamp": time.time()
}
Troubleshooting
Common Issues and Solutions
- API Key Issues ```python # Check environment variables import os print("OpenAI API Key:", "SET" if os.getenv("OPENAI_API_KEY") else "NOT SET")
# Set programmatically os.environ["OPENAI_API_KEY"] = "your-api-key" ```
-
Memory Issues with Large Documents ```python # Use streaming for large documents def process_large_document(doc, chunk_size=1000): chunks = [doc[i:i+chunk_size] for i in range(0, len(doc), chunk_size)] results = []
for chunk in chunks: result = chain.invoke({"input": chunk}) results.append(result)
return results ```
-
Rate Limiting ```python import time from tenacity import retry, wait_exponential, stop_after_attempt
@retry( wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3) ) def invoke_with_retry(chain, input_data): return chain.invoke(input_data) ```
- Token Limits ```python import tiktoken
def count_tokens(text, model="gpt-4"): encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text))
def truncate_text(text, max_tokens=4000, model="gpt-4"): encoding = tiktoken.encoding_for_model(model) tokens = encoding.encode(text) if len(tokens) > max_tokens: truncated_tokens = tokens[:max_tokens] return encoding.decode(truncated_tokens) return text ```
For the latest documentation and updates, visit the LangChain Documentation and GitHub Repository.