CrewAI lets you build multi-agent systems where specialized agents collaborate on tasks. This guide shows how to give CrewAI agents access to Catalogian tools for product catalog monitoring and analysis.
cat_live_) — requires Brand plan or higherpip install crewai openaiFirst, create tool functions that call Catalogian's Responses API:
import os
import json
from openai import OpenAI
from crewai.tools import tool
catalogian = OpenAI(
base_url="https://api.catalogian.com/v1",
api_key=os.environ["CATALOGIAN_KEY"],
)
def _call_catalogian(tool_name: str, args: dict = {}) -> str:
response = catalogian.responses.create(
model="catalogian-1",
input=json.dumps(args) if args else "{}",
tools=[{"type": "function", "name": tool_name}],
tool_choice={"type": "function", "name": tool_name},
)
for item in response.output:
if hasattr(item, "content"):
for part in item.content:
if hasattr(part, "text"):
return part.text
return "No result"
@tool("list_sources")
def list_sources() -> str:
"""List all product feed sources in Catalogian."""
return _call_catalogian("list_sources")
@tool("get_delta")
def get_delta(source_slug: str, limit: int = 5) -> str:
"""Get recent change events for a product feed source."""
return _call_catalogian("get_delta", {
"sourceSlug": source_slug, "limit": limit
})
@tool("profile_snapshot")
def profile_snapshot(source_slug: str) -> str:
"""Get field stats, null rates, and data quality metrics for a source."""
return _call_catalogian("profile_snapshot", {
"sourceSlug": source_slug
})
@tool("search_snapshot")
def search_snapshot(source_slug: str, query: str) -> str:
"""Search across all fields in a product feed."""
return _call_catalogian("search_snapshot", {
"sourceSlug": source_slug, "query": query
})
@tool("get_delta_rows")
def get_delta_rows(source_slug: str, delta_event_id: str, change_type: str = "changed") -> str:
"""Get row-level before/after data for a specific delta event."""
return _call_catalogian("get_delta_rows", {
"sourceSlug": source_slug,
"deltaEventId": delta_event_id,
"changeType": change_type,
})Create specialized agents for different catalog monitoring tasks:
from crewai import Agent
catalog_monitor = Agent(
role="Product Catalog Monitor",
goal="Monitor product feeds for significant changes and anomalies",
backstory="""You are a product data specialist who monitors vendor feeds
for changes. You detect price changes, new products, discontinued items,
and data quality issues. Always call profile_snapshot before analyzing
a new source to understand its structure.""",
tools=[list_sources, get_delta, profile_snapshot, get_delta_rows],
verbose=True,
)
data_analyst = Agent(
role="Product Data Analyst",
goal="Analyze product catalog data and provide actionable insights",
backstory="""You analyze product feed data to find trends, anomalies,
and opportunities. You search for specific products and compare data
across time periods.""",
tools=[search_snapshot, profile_snapshot, get_delta],
verbose=True,
)from crewai import Crew, Task
# Define tasks
monitor_task = Task(
description="""Check all sources for recent changes.
For any source with changes in the last 24 hours:
1. Get the delta events
2. Summarize what changed (new products, price changes, deletions)
3. Flag anything unusual (large deletions, many price changes)""",
agent=catalog_monitor,
expected_output="A summary of recent catalog changes across all sources",
)
analysis_task = Task(
description="""Based on the monitoring report, analyze the most significant
changes in detail. For sources with price changes, compare before/after
values. Identify any patterns or concerns.""",
agent=data_analyst,
expected_output="Detailed analysis with specific examples and recommendations",
)
# Assemble and run
crew = Crew(
agents=[catalog_monitor, data_analyst],
tasks=[monitor_task, analysis_task],
verbose=True,
)
result = crew.kickoff()
print(result)For simpler use cases, a single agent works well:
from crewai import Agent, Task, Crew
agent = Agent(
role="Catalog Assistant",
goal="Answer questions about product catalog data",
backstory="You help users query and understand their product feeds.",
tools=[list_sources, get_delta, profile_snapshot, search_snapshot],
)
task = Task(
description="Find all wireless headphones in the acme-products feed "
"and check if any had price changes this week.",
agent=agent,
expected_output="List of matching products with any recent price changes",
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
print(result)Rate limits: Each tool call makes one request to Catalogian. The MCP/Responses endpoint allows 30 requests per minute per key. For multi-agent crews with many tool calls, consider adding delays between tasks or using separate API keys per agent.
Tips for building effective AI agent integrations. Agent Best Practices →