在我之前的文章 “Elastic AI agent builder 介绍(一)”,我详细地介绍了如何在 Kibana 中创建 tools 及 agents。在今天的文章里,我讲详细地介绍如何在 Python 应用里访问这些 agents。我们可以参考文章 “开始使用 Elastic Agent Builder 和 Microsoft Agent Framework”。

安装

我们首先参考文章 “Elastic AI agent builder 介绍(一)” 来进行安装。我们也需要按照这个章节来创建一个叫做 find_the_cheapest_ticket_from_cn_us 的 agent。

我们可以在 Agents 界面中进行如上的查询。

下载代码

接下来,我们下载已经创建好的代码:

git clone https://github.com/liu-xiao-guo/ai_agent_builder

然后我们进入到项目的根目录中,并创建一个叫做 .env 的文件:

.env

ES_AGENT_URL=http://localhost:5601/api/agent_builder/a2a
ES_AGENT_BUILDER_URL=http://localhost:5601/api/agent_builder
ES_API_KEY=WnJwR3RKb0JGQVVCVjdnb29yUkI6RHotbGZBTmJzMDJWUWszbTAtbDVjQQ==
AGENT_ID=find_the_cheapest_ticket_from_cn_us
TOOL_ID=find_cheapest_ticket_from_cn_us

我们需要根据自己的配置进行相应的修改。我们可以通过如下的方法来得到你自己的 a2a 地址:

针对我的情况,上面的地址是:

http://localhost:5601/api/agent_builder/mcp

很显然,上面的地址是给 MCP 而用的。我们需要把上面的 mcp 换成 a2a,这样它的地址就是:

http://localhost:5601/api/agent_builder/a2a

接下来,我们来创建一个用于我们访问 Elasticsearch 的 API key:

拷贝上面的 key。这样我们就完成了我们的 API key 申请。

另外,我们需要填入我们想要访问的 agent ID,比如上面的 find_the_cheapest_ticket_from_cn_us

接下来,我们需要使用如下的命令来安装所需要的包:

pip install -r requirements.txt 

运行代码

我们的代码非常简单:

access_agents.py

import asyncio
from dotenv import load_dotenv
import httpx
import os
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
 
 
async def main():
    load_dotenv()
    a2a_agent_host = os.getenv("ES_AGENT_URL")
    a2a_agent_key = os.getenv("ES_API_KEY")
    agent_id = os.getenv("AGENT_ID")
 
    print(f"Connection to Elastic A2A agent at: {a2a_agent_host}")
 
    custom_headers = {"Authorization": f"ApiKey {a2a_agent_key}"}
 
    async with httpx.AsyncClient(timeout=60.0, headers=custom_headers) as http_client:
        # Resolve the A2A Agent Card
        resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
        agent_card = await resolver.get_agent_card(
            relative_card_path=f"/{agent_id}.json"
        )
        print(f"Found Agent: {agent_card.name} - {agent_card.description}")
 
        # Use the Agent
        agent = A2AAgent(
            name=agent_card.name,
            description=agent_card.description,
            agent_card=agent_card,
            url=a2a_agent_host,
            http_client=http_client,
        )
        prompt = input("Enter Greeting >>> ")
        print("\nSending message to Elastic A2A agent...")
        response = await agent.run(prompt)
        print("\nAgent Response:")
        for message in response.messages:
            print(message.text)
 
 
if __name__ == "__main__":
    asyncio.run(main())

我们运行上面的应用:

 python access_agents.py 
从中国到美国最便宜的机票是多少?出发城市和到达城市是什么?
$ pwd
/Users/liuxg/python/ai_agent_builder
$ ls
README.md        access_agents.py requirements.txt
$ python access_agents.py 
Connection to Elastic A2A agent at: http://localhost:5601/api/agent_builder/a2a
Found Agent: Find cheapest price from China to US - Find cheapest price from China to US
Enter Greeting >>> 从中国到美国最便宜的机票是多少?出发城市和到达城市是什么?

Sending message to Elastic A2A agent...

Agent Response:
根据查询结果,从中国到美国最便宜的机票信息如下:

- **机票价格**:272.68美元
- **出发城市**:广州 (Guangzhou)
- **到达城市**:塔尔萨 (Tulsa)

很显然,我们得到了我们想要的结果。

我们也可以使用英文来进行提问:

What is the cheapest air-ticket price from China to US? What are the origin city and the destination city?
$ python access_agents.py 
Connection to Elastic A2A agent at: http://localhost:5601/api/agent_builder/a2a
Found Agent: Find cheapest price from China to US - Find cheapest price from China to US
Enter Greeting >>> What is the cheapest air-ticket price from China to US? What are the origin city and the destination city?

Sending message to Elastic A2A agent...

Agent Response:
Based on the flight data, the cheapest air-ticket from China to the US is:

- **Price**: $272.68
- **Origin City**: Guangzhou, China
- **Destination City**: Tulsa, USA

如果我们想使用 gradio 来做一个网页,那么我们可以把我们的代码修改为:

access_agents_gradio.py

import asyncio
from dotenv import load_dotenv
import httpx
import os
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
import gradio as gr

# ---------- Global variables ----------
a2a_agent_host = None
a2a_agent_key = None
agent_id = None
agent_card = None
custom_headers = None


# ---------- Initialize everything once ----------
async def init_agent():
    global a2a_agent_host, a2a_agent_key, agent_id, agent_card, custom_headers

    load_dotenv()
    a2a_agent_host = os.getenv("ES_AGENT_URL")
    a2a_agent_key = os.getenv("ES_API_KEY")
    agent_id = os.getenv("AGENT_ID")

    print(f"Connecting to Elastic A2A agent: {a2a_agent_host}")

    custom_headers = {"Authorization": f"ApiKey {a2a_agent_key}"}

    async with httpx.AsyncClient(timeout=60.0, headers=custom_headers) as http_client:
        resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
        agent_card = await resolver.get_agent_card(
            relative_card_path=f"/{agent_id}.json"
        )
        print(f"Loaded Agent Card: {agent_card.name} - {agent_card.description}")


# ---------- Function that Gradio will call ----------
async def call_agent(prompt):
    async with httpx.AsyncClient(timeout=60.0, headers=custom_headers) as http_client:
        agent = A2AAgent(
            name=agent_card.name,
            description=agent_card.description,
            agent_card=agent_card,
            url=a2a_agent_host,
            http_client=http_client,
        )
        response = await agent.run(prompt)
        return "\n".join([msg.text for msg in response.messages])


# ---------- Gradio wrapper (sync → async) ----------
def gradio_handler(prompt):
    return asyncio.run(call_agent(prompt))


# ---------- Launch Gradio UI ----------
async def launch_gradio():
    await init_agent()   # Load everything before UI starts

    iface = gr.Interface(
        fn=gradio_handler,
        inputs=gr.Textbox(lines=2, label="Enter Greeting"),
        outputs=gr.Textbox(label="Agent Response"),
        title="Elastic A2A Agent Web UI"
    )
    iface.launch()


if __name__ == "__main__":
    asyncio.run(launch_gradio())
python access_agents_gradio.py

调用一个 tool

我们可以使用如下的方法来调用一个 tool。参考文章 “Elastic AI agent builder 介绍(二)

execute_tool.py

from dotenv import load_dotenv
import requests
import os

load_dotenv()

a2a_agent_host = os.getenv("ES_AGENT_URL")
es_agent_builder_url = os.getenv("ES_AGENT_BUILDER_URL")
a2a_agent_key = os.getenv("ES_API_KEY")
agent_id = os.getenv("AGENT_ID")
tool_id = os.getenv("TOOL_ID")

HEADERS = {
    "Content-Type": "application/json",
    "kbn-xsrf": "true",
    "Authorization": f"ApiKey {a2a_agent_key}",
}

def make_http_request(url, method="GET", headers=None, data=None, params=None):
    """
    Make an HTTP request to the specified URL.
    
    Args:
        url (str): The URL to make the request to
        method (str): HTTP method (GET, POST, PUT, DELETE, etc.)
        headers (dict): Optional headers to include in the request
        data (dict): Optional data to send in the request body
        params (dict): Optional query parameters
    
    Returns:
        requests.Response: The response object
    """
    try:
        response = requests.request(
            method=method.upper(),
            url=url,
            headers=headers,
            json=data,
            params=params
            # verify=False
        )
        response.raise_for_status()
        return response
    except requests.exceptions.RequestException as e:
        print(f"Error making request: {e}")
        return None
    

# Use the POST/api/agent_builder/tools/_execute to execute the tool
def call_tool(tool_id, tool_params):
    url = f"{es_agent_builder_url}/tools/_execute"
    headers = HEADERS
    data = {
        "tool_id": tool_id,
        "tool_params": {
            "query": tool_params
        }
    }
    response = make_http_request(url, method="POST", headers=headers, data=data)
    if response is not None:
        return response.json()
    else:
        return {"error": "Failed to call tool"} 
    
# Example usage:
tool_response = call_tool(tool_id, "What is the cheapest air-ticket price from China to US? What are the origin city and the destination city?")
print("Tool Response:", tool_response)

运行上面的代码,我们可以看到:

python execute_tool.py
$ python execute_tool.py
Tool Response: {'results': [{'type': 'query', 'data': {'esql': 'FROM kibana_sample_data_flights\n  METADATA _score\n| WHERE MATCH(OriginCountry, "CN") AND MATCH(DestCountry, "US")\n| SORT AvgTicketPrice ASC\n| KEEP AvgTicketPrice, OriginCityName, DestCityName\n| LIMIT 1'}, 'tool_result_id': 'g9SNNq'}, {'tool_result_id': 'udTnRw', 'type': 'tabular_data', 'data': {'source': 'esql', 'query': 'FROM kibana_sample_data_flights\n  METADATA _score\n| WHERE MATCH(OriginCountry, "CN") AND MATCH(DestCountry, "US")\n| SORT AvgTicketPrice ASC\n| KEEP AvgTicketPrice, OriginCityName, DestCityName\n| LIMIT 1', 'columns': [{'name': 'AvgTicketPrice', 'type': 'double'}, {'name': 'OriginCityName', 'type': 'keyword'}, {'name': 'DestCityName', 'type': 'keyword'}], 'values': [[272.6810302734375, 'Guangzhou', 'Tulsa']]}}]}

从上面,我们可以看出来,我们成功地运行了名叫 TOOL_ID=find_cheapest_ticket_from_cn_us 的 tool。很显然它返回了查询的结果,但是没有任何的推理。推理是在 Agent 里完成的。

祝大家使用 AI agent builder 愉快!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐