Examples
- Examples
- Getting Started
- Agents
- Teams
- Workflows
- Applications
- Streamlit Apps
- Evals
Agent Concepts
- Reasoning
- Multimodal
- RAG
- User Control Flows
- Knowledge
- Memory
- Async
- Hybrid Search
- Storage
- Tools
- Search
- Social
- Web Scraping
- Database
- Local
- Native Model Toolkit
- APIs & External Services
- MCP
- Vector Databases
- Context
- Embedders
- Agent State
- Observability
- Miscellaneous
Models
- Anthropic
- AWS Bedrock
- AWS Bedrock Claude
- Azure AI Foundry
- Azure OpenAI
- Cerebras
- Cerebras OpenAI
- Cohere
- DeepInfra
- DeepSeek
- Fireworks
- Gemini
- Groq
- Hugging Face
- IBM
- LM Studio
- LiteLLM
- LiteLLM OpenAI
- Meta
- Mistral
- NVIDIA
- Ollama
- OpenAI
- Perplexity
- Together
- XAI
- Vercel
- vLLM
MCP
Pipedream Auth
This example shows how to add authorization when integrating Pipedream MCP servers with Agno Agents.
Code
Copy
Ask AI
"""
🔒 Using Pipedream MCP servers with authentication
This is an example of how to use Pipedream MCP servers with authentication.
This is useful if your app is interfacing with the MCP servers in behalf of your users.
1. Get your access token. You can check how in Pipedream's docs: https://pipedream.com/docs/connect/mcp/developers/
2. Get the URL of the MCP server. It will look like this: https://remote.mcp.pipedream.net/<External user id>/<MCP app slug>
3. Set the environment variables:
- MCP_SERVER_URL: The URL of the MCP server you previously got
- MCP_ACCESS_TOKEN: The access token you previously got
- PIPEDREAM_PROJECT_ID: The project id of the Pipedream project you want to use
- PIPEDREAM_ENVIRONMENT: The environment of the Pipedream project you want to use
3. Install dependencies: pip install agno mcp-sdk
"""
import asyncio
from os import getenv
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.mcp import MCPTools, StreamableHTTPClientParams
from agno.utils.log import log_exception
mcp_server_url = getenv("MCP_SERVER_URL")
mcp_access_token = getenv("MCP_ACCESS_TOKEN")
pipedream_project_id = getenv("PIPEDREAM_PROJECT_ID")
pipedream_environment = getenv("PIPEDREAM_ENVIRONMENT")
server_params = StreamableHTTPClientParams(
url=mcp_server_url,
headers={
"Authorization": f"Bearer {mcp_access_token}",
"x-pd-project-id": pipedream_project_id,
"x-pd-environment": pipedream_environment,
},
)
async def run_agent(task: str) -> None:
try:
async with MCPTools(
server_params=server_params, transport="streamable-http", timeout_seconds=20
) as mcp:
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[mcp],
markdown=True,
)
await agent.aprint_response(message=task, stream=True)
except Exception as e:
log_exception(f"Unexpected error: {e}")
if __name__ == "__main__":
# The agent can read channels, users, messages, etc.
asyncio.run(run_agent("Show me the latest message in the channel #general"))
Was this page helpful?
On this page
Assistant
Responses are generated using AI and may contain mistakes.