Developing Resilient Systems: How to Harness Persistence and Streams in AI Agents
A hands-on guide to implementing persistence and streaming in AI Agent with LangGraph.
Intro to AI Agents:
👉 Article 1: Simple ReAct Agent from Scratch
👉 Article 2: ReAct Agent in LangGraph
👉 (This)Article 3: Persistence (memory) and streaming in Langgraph
👉 Article 4: Human in the loop
[Github] The Jupiter Notebook of this implementation
[Inspired by] Deeplearning.ai course
This blog post delves into the critical components of developing resilient AI systems: Persistence and Streams.
Persistence for AI Agents
Persistence allows an AI agent to maintain the agent’s state. In simple terms, it’s like a digital bookmark that helps bring continuity in interactions.
The Importance of Persistence
Persistence maintains an agent's state at a specific point, enabling seamless interactions without loss in progress or context. Implementing persistence is simple in LangGraph, using a feature known as a checkpointer. We'll use in-memory SQLite as our persistent layer:
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
In your agent initialization, pass it as a checkpointer:
self.graph = graph.compile(checkpointer=checkpointer)
Streams for AI Agent
Streams capture and emit real-time events, much like live updates on a news ticker. They offer valuable insights into an agent’s present actions. We could have message-level streaming or even go further with token streaming. Lets start with message level streaming
Message Level Streaming
Running the graph with streams involves using thread configurations, aiding multiple conversations simultaneously—a necessity in production environments:
thread = {"configurable": {"thread_id": "1"}}
To use stream we have to execute the graph with stream instead of invoke (as in our previous example)
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
# The message gets printed as and when the persistence storage is updated
print(v['messages'])
This approach returns a stream of events as the persistent storage is updated.
Example of message level streaming
Imagine resuming a paused conversation with an AI agent without having to start from scratch. That's the power of persistence.
Now, let’s test the biggest advantage of persistence: its ability to continue conversations from where they left off. For instance, asking about Berlin's weather using the same thread ID without prior context showcases the agent's capacity to maintain and utilize persistent knowledge.
Following up with a comparison question like "which is warmer" demonstrates the system’s understanding of context, thanks to persistence, providing accurate answers.
Token Streaming for Enhanced Interactivity
A-Stream events in LangGraph objects simplifies the approach to streaming. By reacting to token-level events, you ensure smooth, continuous interaction. We can achieve this by using A-Stream events that come with all langchain and langgraph objects. A-stream events is an asynchronous method, which means it should be used with async checkpointer.
A very simple example can be the use of `AsyncSqlliteSaver` instead of synchronous `SqlliteSaver`.
We will also use a new thread ID. We also will be reacting to various underlying events. These events represent updates from the underlying stream. In this case, we want to react to events that correspond to the new token. These events are called `on_chat_model_stream`
When we see these events, we will print out the tokens
messages = [HumanMessage(content="What is the weather in zurich?")]
thread = {"configurable": {"thread_id": "5"}}
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
checkpointer = AsyncSqliteSaver.from_conn_string(":memory:")
async with checkpointer as checkpointer:
abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
async for event in abot.graph.astream_events({"messages": messages}, thread):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="|")
With this implementation, we can now observe the output on the fly like below...
The| current| weather| in| Zurich| is| partly| cloudy| with| a| temperature| of| |14|.|3|°C| (|57|.|7|°F|).| The| wind| is ...
Conclusion
In conclusion, developing resilient AI systems through persistence and streams provides a robust framework for seamless, real-time interactions. These technologies not only ensure continuity in conversations but also enhance the user experience by maintaining context and providing timely updates. As AI continues to evolve, such strategies will become increasingly vital in building advanced, interactive systems that can adapt and respond efficiently in dynamic environments.
Intro to AI Agents:
👉 Article 1: Simple ReAct Agent from Scratch
👉 Article 2: ReAct Agent in LangGraph
👉 (This) Article 3: Persistence (memory) and streaming in Langgraph
👉 Article 4: Human in the loop