Agentic Workflows: Build your own Weather Agent with MAX Serve, FastAPI and NextJS
Help us improve and tell us what you’d like us to build next.
Request a recipe topicREADME
This recipe demonstrates how to build an intelligent weather assistant that combines:
While this recipe focuses on weather data, the patterns demonstrated here can be adapted for various reporting and automation needs, such as:
We'll create a solution that showcases:
You'll learn how to:
Please make sure your system meets our system requirements.
To proceed, ensure you have the magic
CLI installed with the magic --version
to be 0.7.2 or newer:
curl -ssL https://magic.modular.com/ | bash
or update it via:
magic self-update
Then install max-pipelines
via:
magic global install max-pipelines=="25.2.0.dev2025031705"
For this recipe, you will also need:
Set up your environment variables:
cp backend/.env.sample backend/.env
echo "HUGGING_FACE_HUB_TOKEN=your_hf_token" > backend/.env
echo "WEATHERAPI_API_KEY=your_api_key" >> backend/.env
Download the code for this recipe using magic
CLI:
magic init ai-weather-agent --from modular/max-recipes/ai-weather-agent
cd ai-weather-agent
Run the application:
Make sure the ports 7999, 8001
and 8010
are available. You can adjust the port settings in Procfile.
magic run app
Note that it may take a few minutes for models to be downloaded and compiled.
Open http://localhost:3000 in your browser to see the UI when all services below are ready:
7999
8000
and8001
Then you can ask weather-related questions and get detailed reports.
Actions to take:
Hi
.Vancouver
. Notice that the app understands it's a city name and provides a weather report.And once done with the app, to clean up the resources run:
magic run clean
The weather assistant uses a multi-tier architecture:
The architecture consists of several key components:
sentence-transformers/all-mpnet-base-v2
for generating embeddings for semantic cachingEach component is designed to be independently scalable and maintainable. The backend uses FastAPI's async capabilities to handle concurrent requests efficiently, while MAX Serve provides high-performance inference for the LLM components.
Here's how a typical weather query flows through the system:
This sequence represents a complete query lifecycle:
The entire process typically completes in 2-3 seconds, with cached responses returning in under 500ms.
The backend implements a sophisticated multi-stage LLM pipeline that processes user queries through several key stages. Weather and space weather data are fetched concurrently for optimal performance:
async def fetch_all_weather_data(city: str) -> dict:
"""Fetch both weather and space weather data concurrently"""
weather_task = fetch_weather_data(city)
space_weather_task = fetch_space_weather()
results = await asyncio.gather(
weather_task,
space_weather_task
)
return {
"weather": results[0],
"space_weather": results[1]
}
The first stage determines whether the user is asking about weather or making general conversation:
# Example queries and their classifications:
# "What's the weather like in London?" -> "WEATHER_QUERY"
# "How are you doing today?" -> "GENERAL_CHAT"
# "Will it rain in Paris tomorrow?" -> "WEATHER_QUERY"
# "Tell me a joke" -> "GENERAL_CHAT"
@track_operation_time("intent_detection")
async def detect_intent(request_message: str, timing_collector: TimingCollector):
"""Detect if the user is asking about weather or just chatting"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{"role": "system", "content": INTENT_PROMPT},
{"role": "user", "content": request_message},
],
temperature=0, # Use 0 for deterministic outputs
)
return response.choices[0].message.content.strip()
The intent classifier uses temperature=0 for consistent outputs and is wrapped with timing tracking for performance monitoring.
2. City name normalizationAfter detecting weather-related intent, the backend normalizes city names using the LLM to handle variations and abbreviations:
CITY_NORMALIZATION_PROMPT = """Normalize the following city name to its standard form.
Examples:
- "NYC" -> "New York City"
- "SF" -> "San Francisco"
- "LA" -> "Los Angeles"
City: {city}
Respond with only the normalized city name, nothing else."""
async def normalize_city_name(city: str, timing_collector: TimingCollector) -> str:
"""Standardize city names for consistent API calls and caching"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{"role": "system", "content": CITY_NORMALIZATION_PROMPT.format(city=city)},
],
max_tokens=50,
temperature=0, # Use 0 for consistent outputs
)
return response.choices[0].message.content.strip()
This normalization ensures:
When a weather query is detected, the backend uses OpenAI-compatible function calling to structure the request and fetch relevant data:
TOOLS = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather and forecast data for a city",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The city name to get weather for",
}
},
"required": ["city"],
},
},
},
{
"type": "function",
"function": {
"name": "get_air_quality",
"description": "Get air quality data for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"},
"include_forecast": {"type": "boolean", "default": false}
},
"required": ["city"],
},
},
}
]
async def handle_function_calling(message: str) -> dict:
"""Extract structured data requirements from natural language"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[{"role": "user", "content": message}],
tools=TOOLS,
tool_choice="auto",
)
# Process tool calls and fetch data
results = {}
for tool_call in response.choices[0].message.tool_calls:
func_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
if func_name == "get_weather":
results["weather"] = await fetch_weather_data(arguments["city"])
elif func_name == "get_air_quality":
results["air_quality"] = await fetch_air_quality(arguments["city"])
return results
The final stage generates a natural language report from the collected weather data:
WEATHER_ANALYSIS_PROMPT = """Given the user request about weather:
User: {user}
Analyze the following weather data and provide a natural, conversational summary:
Weather data: {weather_data}
Focus on:
1. Current conditions and how they feel
2. Notable patterns or changes in the forecast
3. Any relevant warnings or recommendations
"""
async def analyze_weather_data(request_message: str, weather_data: dict):
"""Generate a natural language report from weather data"""
content = WEATHER_ANALYSIS_PROMPT.format(
user=request_message,
weather_data=str(weather_data)
)
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[{"role": "system", "content": content}],
max_tokens=512,
temperature=0.7, # Slightly higher for more natural language
)
return response.choices[0].message.content
The backend implements several advanced techniques to optimize performance and monitor system behavior:
1. Connection pooling and HTTP optimizationThe backend implements efficient HTTP connection pooling for external API calls:
@asynccontextmanager
async def get_http_client():
"""Connection pooling: Shared http client for better connection reuse."""
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=5),
) as client:
yield client
This optimization provides:
The backend uses asyncio locks and precise timing for operation tracking:
class TimingCollector:
def __init__(self, lock: asyncio.Lock = None):
self._timings: List[Dict[str, Union[str, float]]] = []
self._lock = lock or asyncio.Lock()
async def add_timing(self, operation: str, duration_ms: float):
async with self._lock:
self._timings.append({
"operation": operation,
"duration_ms": duration_ms
})
async def get_timings(self) -> List[Dict[str, Union[str, float]]]:
async with self._lock:
return self._timings.copy()
This system helps:
To reduce API calls and improve response times, the backend implements semantic caching using embeddings:
class SemanticCache:
def __init__(self, threshold=0.75, ttl_seconds=CACHE_TTL):
self.threshold = threshold
self.ttl_seconds = ttl_seconds
self.cache: Dict[Tuple[float, ...], Tuple[Any, datetime]] = {}
self._lock = Lock()
async def _compute_embedding(self, text: str) -> np.ndarray:
"""Get embeddings from MAX Serve embedding endpoint"""
response = await embedding_client.embeddings.create(
model=EMBEDDING_MODEL,
input=text
)
embedding = np.array(response.data[0].embedding)
return embedding
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
async def get(self, text: str, normalized_city: str = None) -> Tuple[bool, Any]:
"""Try to find a semantically similar cached result"""
async with self._lock:
now = datetime.now()
# Clean expired entries
expired = []
for embedding_tuple, (value, timestamp) in self.cache.items():
if (now - timestamp).total_seconds() > self.ttl_seconds:
expired.append(embedding_tuple)
for emb in expired:
del self.cache[emb]
# Use normalized city if provided, otherwise use original text
query_text = normalized_city if normalized_city else text
query_embedding = await self._compute_embedding(query_text)
query_tuple = tuple(query_embedding.tolist())
# Find most similar cached query
max_similarity = 0
best_match = None
for cached_embedding_tuple, (value, _) in self.cache.items():
cached_embedding = np.array(cached_embedding_tuple)
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity > max_similarity:
max_similarity = similarity
best_match = value
if max_similarity > self.threshold:
return True, best_match
return False, None
# Example cache decorator usage
@semantic_cache(threshold=0.90, ttl_seconds=CACHE_TTL)
@track_operation_time("chat_response")
async def generate_chat_response(request_message: str, timing_collector: TimingCollector):
"""Generate a general chat response with caching"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{
"role": "system",
"content": "You are a friendly weather assistant. Provide helpful and concise responses.",
},
{"role": "user", "content": request_message},
],
max_tokens=256,
temperature=0,
)
return response.choices[0].message.content
The semantic cache provides several key features:
Example similar queries that would hit the cache:
The backend implements comprehensive error handling:
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": str(exc.detail)}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
logger.exception("Unexpected error")
return JSONResponse(
status_code=500,
content={"detail": "An unexpected error occurred"}
)
def wait_for_llm_server(base_url: str):
"""Ensure LLM server is healthy before starting"""
@retry(
stop=stop_after_attempt(20),
wait=wait_fixed(60),
retry=(
retry_if_exception_type(httpx.RequestError) |
retry_if_result(lambda x: x.status_code != 200)
)
)
async def _check_health():
async with httpx.AsyncClient() as client:
return await client.get(f"{base_url}/health")
return _check_health()
The system includes features such as:
The frontend is built with Next.js 14 and TypeScript, featuring three main components:
1. Chat interfaceinterface ChatMessage {
role: 'user' | 'assistant';
content: string;
data?: WeatherData;
timings?: Array<{
operation: string;
duration_ms: number;
}>;
}
export default function Chat() {
const [messages, setMessages] = useState([]);
const [isLoading, setIsLoading] = useState(false);
}
interface WeatherData {
weather: {
location: {
name: string;
country: string;
localtime: string;
};
current: {
temperature: number;
condition: string;
feels_like: number;
humidity: number;
wind_kph: number;
};
forecast: Array<{
date: string;
max_temp: number;
min_temp: number;
condition: string;
}>;
};
air_quality?: {
aqi: number;
pm2_5: number;
};
}
interface OperationTiming {
operation: string;
duration_ms: number;
}
const operationLabels: Record = {
'intent_detection': 'Analyzing question',
'weather_data_fetch': 'Getting weather data',
'weather_analysis': 'Creating weather report'
};
Common issues and solutions:
LLM server connection issues
magic run app
)Weather API Problems
.env
Frontend Development
.next
cache if builds failnpm install
after pulling updatesPerformance Issues
This recipe demonstrates patterns that can be applied to many domains beyond weather reporting:
The following components can be directly reused:
Financial reports
Product recommendations
Medical summaries
The multi-stage LLM pipeline pattern, combined with efficient data handling and caching, provides a robust foundation for building various domain-specific AI assistants.
This recipe demonstrates how to build a production-ready AI assistant that combines:
The patterns and components shown here provide a solid foundation for building your own domain-specific AI assistants. Whether you're working with financial data, medical records, or product catalogs, the architecture can be adapted while maintaining performance, reliability, and user experience.
Now that you've built a foundation for AI-powered applications, you can explore more advanced deployments and features:
We're excited to see what you'll build with MAX! Share your projects with us using #ModularAI
on social media.
DETAILS
THE CODE
ai-weather-agent
AUTHOR
Ehsan M. Kermani
AVAILABLE TASKS
magic run app
magic run clean
PROBLEMS WITH THE CODE?
File an Issue
TAGS
Help us improve and tell us what you’d like us to build next.
Request a recipe topic