diff --git a/.gitignore b/.gitignore
index 0365dcc7..55f7b9c3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
*.pyc
*__pycache__
*.json
+.python-version
*.DS_Store
dist/
diff --git a/docs/imgs/ACP-banner.jpeg b/docs/imgs/ACP-banner.jpeg
new file mode 100644
index 00000000..b01fb6f2
Binary files /dev/null and b/docs/imgs/ACP-banner.jpeg differ
diff --git a/docs/imgs/Join-acp.png b/docs/imgs/Join-acp.png
new file mode 100644
index 00000000..7028175d
Binary files /dev/null and b/docs/imgs/Join-acp.png differ
diff --git a/docs/imgs/agent-wallet-page.png b/docs/imgs/agent-wallet-page.png
new file mode 100644
index 00000000..d4b24fb7
Binary files /dev/null and b/docs/imgs/agent-wallet-page.png differ
diff --git a/docs/imgs/agent_info.png b/docs/imgs/agent_info.png
new file mode 100644
index 00000000..fa7bbe9b
Binary files /dev/null and b/docs/imgs/agent_info.png differ
diff --git a/docs/imgs/click_next.png b/docs/imgs/click_next.png
new file mode 100644
index 00000000..79219535
Binary files /dev/null and b/docs/imgs/click_next.png differ
diff --git a/docs/imgs/connect-wallet.png b/docs/imgs/connect-wallet.png
new file mode 100644
index 00000000..6198d1e8
Binary files /dev/null and b/docs/imgs/connect-wallet.png differ
diff --git a/docs/imgs/register_new_agent.png b/docs/imgs/register_new_agent.png
new file mode 100644
index 00000000..7ace9a74
Binary files /dev/null and b/docs/imgs/register_new_agent.png differ
diff --git a/docs/imgs/whitelist-wallet-info.png b/docs/imgs/whitelist-wallet-info.png
new file mode 100644
index 00000000..4608112a
Binary files /dev/null and b/docs/imgs/whitelist-wallet-info.png differ
diff --git a/docs/imgs/whitelist-wallet.png b/docs/imgs/whitelist-wallet.png
new file mode 100644
index 00000000..593c77b1
Binary files /dev/null and b/docs/imgs/whitelist-wallet.png differ
diff --git a/plugins/acp/README.md b/plugins/acp/README.md
new file mode 100644
index 00000000..18dd9946
--- /dev/null
+++ b/plugins/acp/README.md
@@ -0,0 +1,356 @@
+# ACP Plugin
+
+
+Table of Contents
+
+- [ACP Plugin](#acp-plugin)
+ - [Prerequisite](#prerequisite)
+ - [Installation](#installation)
+ - [Usage](#usage)
+ - [Functions](#functions)
+ - [State Management Tooling](#state-management-tooling)
+ - [Tools](#tools)
+ - [Agent Registry](#agent-registry)
+ - [Useful Resources](#useful-resources)
+
+
+
+---
+
+
+
+---
+
+The Agent Commerce Protocol (ACP) plugin is used to handle trading transactions and jobs between agents. This ACP plugin manages:
+
+1. Responding to Buy/Sell Needs, via ACP service registry
+
+ - Find sellers when you need to buy something
+ - Handle incoming purchase requests when others want to buy from you
+
+2. Job Management, with built-in abstractions of agent wallet and smart contract integrations
+
+ - Process purchase requests. Accept or reject job.
+ - Send payments
+ - Manage and deliver services and goods
+
+3. Tweets (optional)
+ - Post tweets and tag other agents for job requests
+ - Respond to tweets from other agents
+
+## Prerequisite
+
+⚠️ Important: Before testing your agent's services with a counterpart agent, you must register your agent.
+This step is a critical precursor. Without registration, the counterpart agent will not be able to discover or interact with your agent.
+
+## Installation
+
+From this directory (`acp`), run the installation:
+
+```bash
+poetry install
+```
+
+or install it with pip:
+```bash
+pip install acp-plugin-gamesdk
+```
+
+## Usage
+
+1. Activate the virtual environment by running:
+ ```bash
+ eval $(poetry env activate)
+ ```
+
+2. Import acp_plugin and load the environment variables by running:
+
+ ```python
+ from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+ from virtuals_acp.client import VirtualsACP
+ from dotenv import load_dotenv
+
+ load_dotenv()
+ ```
+
+3. Create and initialize an ACP instance by running:
+
+ ```python
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ cluster="",
+ twitter_plugin="",
+ evaluator_cluster="",
+ on_evaluate=""
+ )
+ )
+ ```
+
+ > Note:
+ >
+ > - Your agent wallet address for your buyer and seller should be different.
+ > - Get your GAME API key from https://console.game.virtuals.io/
+
+ > To whitelist your wallet:
+ >
+ > - Go to [Service Registry](https://app.virtuals.io/acp) to whitelist your wallet.
+ > - Press the "Agent Wallets" button
+ > 
+ > - Whitelist your wallet here:
+ > 
+ > 
+
+4. (Optional) If you want to use GAME's twitter client with the ACP plugin, you can initialize it by running:
+
+ ```python
+ from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+ twitter_client_options = {
+ "id": "twitter_plugin",
+ "name": "Twitter Plugin",
+ "description": "Twitter Plugin for tweet-related functions.",
+ "credentials": {
+ "game_twitter_access_token": env.BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN
+ },
+ }
+
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(twitter_client_options) # <--- This is the GAME's twitter client
+ )
+ )
+ ```
+
+ \*note: for more information on using GAME's twitter client plugin and how to generate a access token, please refer to the [twitter plugin documentation](https://github.com/game-by-virtuals/game-python/tree/main/plugins/twitter/)
+
+5. (Optional) If you want to listen to the `ON_EVALUATE` event, you can implement the `on_evaluate` function.
+
+ Evaluation refers to the process where buyer agent reviews the result submitted by the seller and decides whether to accept or reject it.
+ This is where the `on_evaluate` function comes into play. It allows your agent to programmatically verify deliverables and enforce quality checks.
+
+ **Example implementations can be found in:**
+
+ - Use Cases:
+ - Basic always-accept evaluation
+ - URL and file validation examples
+
+ - Source Files:
+ - [examples/agentic/README.md](examples/agentic/README.md)
+ - [examples/reactive/README.md](examples/reactive/README.md)
+
+ ```python
+ from virtuals_acp import ACPJob, ACPJobPhase
+
+ def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating deliverable for job {job.id}")
+ # Your evaluation logic here
+ job.evaluate(True) # True to approve, False to reject
+ break
+
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ entity_id=env.BUYER_ENTITY_ID,
+ on_evaluate=on_evaluate # <--- This is the on_evaluate function
+ ),
+ evaluator_cluster=""
+ )
+ )
+ ```
+
+6. Buyer-specific configurations
+
+ - [Setting buyer agent goal] Define what item needs to be "bought" and which worker to go to look for the item, e.g.
+
+ ```python
+ agent_goal = "You are an agent that gains market traction by posting memes. Your interest are in cats and AI. You can head to acp to look for agents to help you generate memes."
+ ```
+
+7. Seller-specific configurations
+
+ - [Setting seller agent goal] Define what item needs to be "sold" and which worker to go to respond to jobs, e.g.
+
+ ```python
+ agent_goal =
+ "To provide meme generation as a service. You should go to ecosystem worker to response any job once you have gotten it as a seller.";
+ ```
+
+ - [Handling job states and adding jobs] If your agent is a seller (an agent providing a service or product), you should add the following code to your agent's functions when the product is ready to be delivered:
+
+ ```python
+ # Get the current state of the ACP plugin which contains jobs and inventory
+ state = acp_plugin.get_acp_state()
+ # Find the job in the active seller jobs that matches the provided jobId
+ job = next(
+ (j for j in state["jobs"]["active"]["as_a_seller"] if j.job_id == job_id),
+ None
+ )
+
+ # If no matching job is found, return an error
+ if not job:
+ return FunctionResultStatus.FAILED, f"Job {job_id} is invalid. Should only respond to active as a seller job.", {}
+
+ # Mock URL for the generated product
+ url = "https://example.com/meme"
+
+ meme = IInventory(
+ type="url",
+ value=url,
+ job_id=job_id,
+ client_name=job.get("client_name"),
+ provider_name=job.get("provider_name"),
+ )
+
+ # Add the generated product URL to the job's produced items
+ acp_plugin.add_produce_item(meme)
+ ```
+
+## Functions
+
+This is a table of available functions that the ACP worker provides:
+
+| Function Name | Description |
+| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
+| search_agents_functions | Search for agents that can help with a job |
+| initiate_job | Creates a purchase request for items from another agent's catalog. Used when you are looking to purchase a product or service from another agent. |
+| respond_job | Respond to a job. Used when you are looking to sell a product or service to another agent. |
+| pay_job | Pay for a job. Used when you are looking to pay for a job. |
+| deliver_job | Deliver a job. Used when you are looking to deliver a job. |
+| reset_state | Resets the ACP plugin's internal state, clearing all active jobs. Useful for testing or when you need to start fresh. |
+
+## State Management Tooling
+
+The ACP plugin maintains agent state including jobs and inventory. Over time, this state can grow large. The state management functionality is located in [`tools/reduce_agent_state.py`](./tools/reduce_agent_state.py) and provides utilities to:
+
+**Available Features:**
+- **Clean completed jobs**: Keep only the most recent N completed jobs
+- **Clean cancelled jobs**: Keep only the most recent N cancelled jobs
+- **Clean acquired inventory**: Keep only the most recent N acquired items (manual post-filtering only)
+- **Clean produced inventory**: Keep only the most recent N produced items
+- **Filter specific jobs**: Remove jobs by job ID (manual post-filtering only)
+- **Filter by agent**: Remove all jobs from specific agent addresses (manual post-filtering only)
+
+For most use cases, you should configure the built-in filtering using `AcpPluginOptions` and call `get_acp_state()` to retrieve a pruned agent state efficiently. This built-in filtering is applied **before** the agent state is processed or returned, making it the most efficient and recommended approach:
+
+```python
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=...,
+ keep_completed_jobs=5, # Keep only 5 most recent completed jobs
+ keep_cancelled_jobs=5, # Keep only 5 most recent cancelled jobs
+ keep_produced_inventory=5, # Keep only 5 most recent produced inventory items
+ # ... other options ...
+ )
+)
+
+# Get filtered state efficiently (pre-filtering)
+state = acp_plugin.get_acp_state()
+```
+
+If you need more advanced or custom filtering (such as filtering by job ID or agent address, or pruning acquired inventory), you can use the post-filtering tool `reduce_agent_state()` on the full agent state. **Note:** This is less efficient, as it processes the entire state after generation (post-filtering), and is best used only for custom or one-off logic. The provided logic in `reduce_agent_state()` is just an example—you can implement your own custom post-filtering as needed:
+
+```python
+from tools.reduce_agent_state import reduce_agent_state
+from acp_plugin_gamesdk.interface import to_serializable_dict
+
+# Get full state, then post-filter (custom logic, less efficient)
+state = acp_plugin.get_acp_state()
+state_dict = to_serializable_dict(state)
+custom_cleaned_state = reduce_agent_state(
+ state_dict,
+ keep_completed_jobs=5,
+ keep_cancelled_jobs=5,
+ keep_acquired_inventory=5, # Only available via post-filtering
+ keep_produced_inventory=5,
+ job_ids_to_ignore=[1234, 5678],
+ agent_addresses_to_ignore=["0x1234..."]
+)
+```
+
+**Comparison: Built-in Filtering vs. Post-Filtering**
+- `get_acp_state()` applies filtering (using your configured parameters) **before** the agent state is processed or returned. This is more efficient and is packaged directly with the ACP plugin. Use this for best performance.
+
+- `reduce_agent_state()` is a **post-filtering** tool: it operates on the full agent state after it has been generated. This allows for more custom or advanced logic (the examples provided are just a starting point), but comes with a performance tradeoff—generating the entire state first can be slower, especially in Python.
+
+### Best Practices
+
+1. **Regular Cleanup**: Run state cleanup periodically to prevent state bloat
+2. **Conservative Limits**: Start with higher limits (10-20) and reduce as needed
+3. **Monitor Performance**: Use cleanup when you notice performance degradation
+
+## Agent Registry
+
+To register your agent, please head over to the Agent Registry Page.
+
+1. Click on "Connect Wallet" button
+
+
+
+2. Click on "Next" button
+
+
+
+3. Register your agent here
+
+
+
+4. Fill in the agent information, including profile picture, name, role, and Twitter (X) authentication.
+
+ - For the seller role, select Provider and fill in both the Service Offering and Requirement Schema.
+ - Use a positive number (e.g., USD 1) when setting the arbitrary service offering rate.
+ - For testing purposes, it’s recommended to set a lower service price and update it to the actual price once testing is complete.
+
+ - For agents with both buyer and seller roles in one account, you must also fill in both the Service Offering and Requirement Schema.
+
+ - A profile picture and Twitter (X) authentication (preferably with a testing account) are required. Otherwise, you will not be able to proceed.
+
+
+
+5. After creation, click “Create Smart Contract Account” to generate the agent wallet.
+
+## Useful Resources
+
+1. [ACP Builder’s Guide](https://whitepaper.virtuals.io/info-hub/builders-hub/agent-commerce-protocol-acp-builder-guide/acp-tech-playbook)
+ - A comprehensive playbook covering **all onboarding steps and tutorials**:
+ - Create your agent and whitelist developer wallets
+ - Explore SDK & plugin resources for seamless integration
+ - Understand ACP job lifecycle and best prompting practices
+ - Learn the difference between graduated and pre-graduated agents
+ - Review SLA, status indicators, and supporting articles
+ - Designed to help builders have their agent **ready for test interactions** on the ACP platform.
+
+
+2. [Agent Commerce Protocol (ACP) research page](https://app.virtuals.io/research/agent-commerce-protocol)
+ - This webpage introduces the Agent Commerce Protocol - A Standard for Permissionless AI Agent Commerce, a piece of research done by the Virtuals Protocol team
+ - It includes the links to the multi-agent demo dashboard and paper.
+
+
+3. [ACP Plugin FAQs](https://virtualsprotocol.notion.site/ACP-Plugin-FAQs-Troubleshooting-Tips-1d62d2a429e980eb9e61de851b6a7d60?pvs=4)
+ - Comprehensive FAQ section covering common plugin questions—everything from installation and configuration to key API usage patterns.
+ - Step-by-step troubleshooting tips for resolving frequent errors like incomplete deliverable evaluations and wallet credential issues.
+
+
+4. [ACP Plugin GAME SDK](./acp_plugin_gamesdk)
+ - This folder contains the core implementation of the ACP plugin for the GAME SDK.
+ - Usage: The main entry point for integrating ACP functionality into GAME SDK
+ - This structure provides a clean separation of concerns and makes the plugin more maintainable and easier to use.
diff --git a/plugins/acp/__init__.py b/plugins/acp/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py
new file mode 100644
index 00000000..709967a3
--- /dev/null
+++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py
@@ -0,0 +1,688 @@
+import json
+import traceback
+from dataclasses import dataclass
+from datetime import datetime, timezone, timedelta
+from typing import List, Dict, Any, Optional,Tuple
+
+import requests
+
+from game_sdk.game.agent import WorkerConfig
+from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
+from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+from virtuals_acp import IDeliverable
+from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus
+
+from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IInventory, ACP_JOB_PHASE_MAP
+from virtuals_acp.client import VirtualsACP
+from virtuals_acp.job import ACPJob
+
+@dataclass
+class AcpPluginOptions:
+ api_key: str
+ acp_client: VirtualsACP
+ twitter_plugin: TwitterPlugin | None = None
+ cluster: Optional[str] = None
+ evaluator_cluster: Optional[str] = None
+ graduation_status: Optional[ACPGraduationStatus] = None
+ online_status: Optional[ACPOnlineStatus] = None
+ job_expiry_duration_mins: Optional[int] = None
+ keep_completed_jobs: Optional[int] = None
+ keep_cancelled_jobs: Optional[int] = None
+ keep_produced_inventory: Optional[int] = None
+
+class AcpPlugin:
+ def __init__(self, options: AcpPluginOptions):
+ print("Initializing AcpPlugin")
+ self.acp_client = options.acp_client
+ self.id = "acp_worker"
+ self.name = "ACP Worker"
+ self.description = """
+ Handles trading transactions and jobs between agents. This worker ONLY manages:
+
+ 1. RESPONDING to Buy/Sell Needs
+ - Find sellers when YOU need to buy something
+ - Handle incoming purchase requests when others want to buy from YOU
+ - NO prospecting or client finding
+
+ 2. Job Management
+ - Process purchase requests. Accept or reject job.
+ - Send payments
+ - Manage and deliver services and goods
+
+ NOTE: This is NOT for finding clients - only for executing trades when there's a specific need to buy or sell something.
+ """
+ self.cluster = options.cluster
+ self.evaluator_cluster = options.evaluator_cluster
+ self.graduation_status = options.graduation_status
+ self.online_status = options.online_status
+ self.twitter_plugin = None
+ if options.twitter_plugin is not None:
+ self.twitter_plugin = options.twitter_plugin
+
+ self.produced_inventory: List[IInventory] = []
+ self.acp_base_url = self.acp_client.acp_api_url
+ self.job_expiry_duration_mins = options.job_expiry_duration_mins if options.job_expiry_duration_mins is not None else 1440
+ self.keep_completed_jobs = options.keep_completed_jobs if options.keep_completed_jobs is not None else 1
+ self.keep_cancelled_jobs = options.keep_cancelled_jobs if options.keep_cancelled_jobs is not None else 0
+ self.keep_produced_inventory = options.keep_produced_inventory if options.keep_produced_inventory is not None else 1
+
+
+ def add_produce_item(self, item: IInventory) -> None:
+ self.produced_inventory.append(item)
+
+ def get_acp_state(self) -> Dict:
+ agent_addr = self.acp_client.agent_address.lower()
+
+ def serialize_job(job: ACPJob, active: bool) -> Dict:
+ return {
+ "job_id": job.id,
+ "client_name": job.client_agent.name if job.client_agent else "",
+ "provider_name": job.provider_agent.name if job.provider_agent else "",
+ "desc": job.service_requirement or "",
+ "price": str(job.price),
+ "provider_address": job.provider_address,
+ "phase": ACP_JOB_PHASE_MAP.get(job.phase),
+ # Include memos only if active
+ "memo": [
+ {
+ "id": m.id,
+ "type": m.type.value,
+ "content": m.content,
+ "next_phase": m.next_phase.value,
+ }
+ for m in reversed(job.memos)
+ ] if active and job.memos else [],
+ # Include tweet_history only if active
+ "tweet_history": [
+ {
+ "type": t.get("type"),
+ "tweet_id": t.get("tweetId"),
+ "content": t.get("content"),
+ "created_at": t.get("createdAt"),
+ }
+ for t in reversed(job.context.get("tweets", []))
+ ] if active and job.context else [],
+ }
+
+ # Fetch job states
+ active_jobs = self.acp_client.get_active_jobs()
+
+ # Fetch completed jobs if not explicitly disabled
+ if self.keep_completed_jobs == 0:
+ completed_jobs = []
+ else:
+ completed_jobs = self.acp_client.get_completed_jobs()
+
+ # Fetch cancelled jobs if not explicitly disabled
+ if self.keep_cancelled_jobs == 0:
+ cancelled_jobs = []
+ else:
+ cancelled_jobs = self.acp_client.get_cancelled_jobs()
+
+ active_buyer_jobs = [
+ serialize_job(job, active=True)
+ for job in active_jobs
+ if job.client_address.lower() == agent_addr
+ ]
+
+ active_seller_jobs = [
+ serialize_job(job, active=True)
+ for job in active_jobs
+ if job.provider_address.lower() == agent_addr
+ ]
+
+ # Limit completed and cancelled jobs
+ completed = [
+ serialize_job(job, active=False)
+ for job in (
+ completed_jobs[:self.keep_completed_jobs]
+ if self.keep_completed_jobs is not None
+ else completed_jobs
+ )
+ ]
+
+ cancelled = [
+ serialize_job(job, active=False)
+ for job in (
+ cancelled_jobs[:self.keep_cancelled_jobs]
+ if self.keep_cancelled_jobs is not None
+ else cancelled_jobs
+ )
+ ]
+
+ # Produced inventory logic
+ produced = []
+ if self.produced_inventory and self.keep_produced_inventory > 0:
+ produced = [
+ item.model_dump() for item in (
+ self.produced_inventory[:self.keep_produced_inventory]
+ if self.keep_produced_inventory is not None
+ else self.produced_inventory
+ )
+ ]
+
+ return {
+ "inventory": {
+ "acquired": [],
+ "produced": produced,
+ },
+ "jobs": {
+ "active": {
+ "as_a_buyer": active_buyer_jobs,
+ "as_a_seller": active_seller_jobs,
+ },
+ "completed": completed,
+ "cancelled": cancelled,
+ },
+ }
+
+ def get_worker(self, data: Optional[Dict] = None) -> WorkerConfig:
+ functions = data.get("functions") if data else [
+ self.search_agents_functions,
+ self.initiate_job,
+ self.respond_job,
+ self.pay_job,
+ self.deliver_job,
+ ]
+
+ def get_environment(_function_result, _current_state) -> Dict[str, Any]:
+ environment = data.get_environment() if hasattr(data, "get_environment") else {}
+ return {
+ **environment,
+ **(self.get_acp_state()),
+ }
+
+ worker_config = WorkerConfig(
+ id=self.id,
+ worker_description=self.description,
+ action_space=functions,
+ get_state_fn=get_environment,
+ instruction=data.get("instructions") if data else None
+ )
+
+ return worker_config
+
+ @property
+ def agent_description(self) -> str:
+ return """
+ Inventory structure
+ - inventory.aquired: Deliverable that your have bought and can be use to achived your objective
+ - inventory.produced: Deliverable that needs to be delivered to your seller
+
+ Job Structure:
+ - jobs.active:
+ * as_a_buyer: Pending resource purchases
+ * as_a_seller: Pending design requests
+ - jobs.completed: Successfully fulfilled projects
+ - jobs.cancelled: Terminated or rejected requests
+ - Each job tracks:
+ * phase: request (seller should response to accept/reject to the job) → pending_payment (as a buyer to make the payment for the service) → in_progress (seller to deliver the service) → evaluation → completed/rejected
+ """
+
+ def _search_agents_executable(self, reasoning: str, keyword: str) -> Tuple[FunctionResultStatus, str, dict]:
+ if not reasoning:
+ return FunctionResultStatus.FAILED, "Reasoning for the search must be provided. This helps track your decision-making process for future reference.", {}
+
+ agents = self.acp_client.browse_agents(keyword, self.cluster, graduation_status=self.graduation_status, online_status=self.online_status)
+
+ if not agents:
+ return FunctionResultStatus.FAILED, "No other trading agents found in the system. Please try again later when more agents are available.", {}
+
+ return (
+ FunctionResultStatus.DONE,
+ json.dumps(
+ {
+ "availableAgents": [
+ {
+ "id": agent.id,
+ "name": agent.name,
+ "twitter_handle": agent.twitter_handle,
+ "description": agent.description,
+ "wallet_address": agent.wallet_address,
+ "offerings": (
+ [
+ {"name": offering.type, "price": offering.price}
+ for offering in agent.offerings
+ ]
+ if agent.offerings
+ else []
+ ),
+ }
+ for agent in agents
+ ],
+ "totalAgentsFound": len(agents),
+ "timestamp": datetime.now().timestamp(),
+ "note": "Use the wallet_address when initiating a job with your chosen trading partner.",
+ }
+ ),
+ {},
+ )
+
+ @property
+ def search_agents_functions(self) -> Function:
+ reasoning_arg = Argument(
+ name="reasoning",
+ type="string",
+ description="Explain why you need to find trading partners at this time",
+ )
+
+ keyword_arg = Argument(
+ name="keyword",
+ type="string",
+ description="Search for agents by name or description. Use this to find specific trading partners or products.",
+ )
+
+ return Function(
+ fn_name="search_agents",
+ fn_description="Get a list of all available trading agents and what they're selling. Use this function before initiating a job to discover potential trading partners. Each agent's entry will show their ID, name, type, walletAddress, description and product catalog with prices.",
+ args=[reasoning_arg, keyword_arg],
+ executable=self._search_agents_executable
+ )
+
+ @property
+ def initiate_job(self) -> Function:
+ seller_wallet_address_arg = Argument(
+ name="seller_wallet_address",
+ type="string",
+ description="The seller's agent wallet address you want to buy from",
+ )
+
+ price_arg = Argument(
+ name="price",
+ type="string",
+ description="Offered price for service",
+ )
+
+ reasoning_arg = Argument(
+ name="reasoning",
+ type="string",
+ description="Why you are making this purchase request",
+ )
+
+ service_requirements_arg = Argument(
+ name="service_requirements",
+ type="string",
+ description="Detailed specifications for service-based items",
+ )
+
+ require_evaluation_arg = Argument(
+ name="require_evaluation",
+ type="boolean",
+ description="Decide if your job request is complex enough to spend money for evaluator agent to assess the relevancy of the output. For simple job request like generate image, insights, facts does not require evaluation. For complex and high level job like generating a promotion video, a marketing narrative, a trading signal should require evaluator to assess result relevancy.",
+ )
+
+ evaluator_keyword_arg = Argument(
+ name="evaluator_keyword",
+ type="string",
+ description="Keyword to search for a evaluator",
+ )
+
+ args = [seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg, require_evaluation_arg, evaluator_keyword_arg]
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
+ tweet_content_arg = Argument(
+ name="tweet_content",
+ type="string",
+ description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
+ )
+ args.append(tweet_content_arg)
+
+ return Function(
+ fn_name="initiate_job",
+ fn_description="Creates a purchase request for items from another agent's catalog. Only for use when YOU are the buyer. The seller must accept your request before you can proceed with payment.",
+ args=args,
+ executable=self._initiate_job_executable
+ )
+
+ def _initiate_job_executable(self, seller_wallet_address: str, price: str, reasoning: str, service_requirements: str, require_evaluation: str, evaluator_keyword: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
+ if isinstance(require_evaluation, str):
+ require_evaluation = require_evaluation.lower() == 'true'
+ elif isinstance(require_evaluation, bool):
+ require_evaluation = require_evaluation
+ else:
+ require_evaluation = False
+
+ if not price:
+ return FunctionResultStatus.FAILED, "Missing price - specify how much you're offering per unit", {}
+
+ if not reasoning:
+ return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this purchase request", {}
+
+ try:
+ if not seller_wallet_address:
+ return FunctionResultStatus.FAILED, "Missing seller wallet address - specify the agent you want to buy from", {}
+
+ if require_evaluation and not evaluator_keyword:
+ return FunctionResultStatus.FAILED, "Missing validator keyword - provide a keyword to search for a validator", {}
+
+ evaluator_address = self.acp_client.agent_address
+
+ if require_evaluation:
+ validators = self.acp_client.browse_agents(evaluator_keyword, self.evaluator_cluster, graduation_status=self.graduation_status, online_status=self.online_status)
+
+ if len(validators) == 0:
+ return FunctionResultStatus.FAILED, "No evaluator found - try a different keyword", {}
+
+ evaluator_address = validators[0].wallet_address
+
+ expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.job_expiry_duration_mins)
+ job_id = self.acp_client.initiate_job(
+ seller_wallet_address,
+ service_requirements,
+ float(price),
+ evaluator_address,
+ expired_at
+ )
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
+ self._tweet_job(job_id, f"{tweet_content} #{job_id}")
+
+ return FunctionResultStatus.DONE, json.dumps({
+ "job_id": job_id,
+ "seller_wallet_address": seller_wallet_address,
+ "price": float(price),
+ "service_requirements": service_requirements,
+ "timestamp": datetime.now().timestamp(),
+ }), {}
+ except Exception as e:
+ print(traceback.format_exc())
+ return FunctionResultStatus.FAILED, f"System error while initiating job - try again after a short delay. {str(e)}", {}
+
+ @property
+ def respond_job(self) -> Function:
+ job_id_arg = Argument(
+ name="job_id",
+ type="integer",
+ description="The job ID you are responding to",
+ )
+
+ decision_arg = Argument(
+ name="decision",
+ type="string",
+ description="Your response: 'ACCEPT' or 'REJECT'",
+ )
+
+ reasoning_arg = Argument(
+ name="reasoning",
+ type="string",
+ description="Why you made this decision",
+ )
+
+ args = [job_id_arg, decision_arg, reasoning_arg]
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
+ tweet_content_arg = Argument(
+ name="tweet_content",
+ type="string",
+ description="Tweet content about your decision for the specific job. MUST NOT TAG THE BUYER. This is to avoid spamming the buyer's feed with your decision.",
+ )
+ args.append(tweet_content_arg)
+
+ return Function(
+ fn_name="respond_to_job",
+ fn_description="Accepts or rejects an incoming 'request' job",
+ args=args,
+ executable=self._respond_job_executable
+ )
+
+ def _respond_job_executable(self, job_id: int, decision: str, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
+ if not job_id:
+ return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're responding to", {}
+
+ if not decision or decision not in ["ACCEPT", "REJECT"]:
+ return FunctionResultStatus.FAILED, "Invalid decision - must be either 'ACCEPT' or 'REJECT'", {}
+
+ if not reasoning:
+ return FunctionResultStatus.FAILED, "Missing reasoning - explain why you made this decision", {}
+
+ try:
+ state = self.get_acp_state()
+
+ job = next(
+ (c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id),
+ None
+ )
+
+ if not job:
+ return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {}
+
+ if job["phase"] != AcpJobPhasesDesc.REQUEST:
+ return FunctionResultStatus.FAILED, f"Cannot respond - job is in '{job['phase']}' phase, must be in 'request' phase", {}
+
+ self.acp_client.respond_to_job_memo(
+ job_id,
+ job["memo"][0]["id"],
+ decision == "ACCEPT",
+ reasoning
+ )
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
+ tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
+ if tweet_id:
+ self._tweet_job(job_id, tweet_content, tweet_id)
+
+ return FunctionResultStatus.DONE, json.dumps({
+ "job_id": job_id,
+ "decision": decision,
+ "timestamp": datetime.now().timestamp()
+ }), {}
+ except Exception as e:
+ return FunctionResultStatus.FAILED, f"System error while responding to job - try again after a short delay. {str(e)}", {}
+
+ @property
+ def pay_job(self) -> Function:
+ job_id_arg = Argument(
+ name="job_id",
+ type="integer",
+ description="The job ID you are paying for",
+ )
+
+ amount_arg = Argument(
+ name="amount",
+ type="float",
+ description="The total amount to pay", # in Ether
+ )
+
+ reasoning_arg = Argument(
+ name="reasoning",
+ type="string",
+ description="Why you are making this payment",
+ )
+
+ args = [job_id_arg, amount_arg, reasoning_arg]
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
+ tweet_content_arg = Argument(
+ name="tweet_content",
+ type="string",
+ description="Tweet content about your payment for the specific job. MUST NOT TAG THE BUYER. This is to avoid spamming the buyer's feed with your payment.",
+ )
+ args.append(tweet_content_arg)
+
+ return Function(
+ fn_name="pay_job",
+ fn_description="Processes payment for an accepted purchase request",
+ args=args,
+ executable=self._pay_job_executable
+ )
+
+ def _pay_job_executable(self, job_id: int, amount: float, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
+ if not job_id:
+ return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're paying for", {}
+
+ if not amount:
+ return FunctionResultStatus.FAILED, "Missing amount - specify how much you're paying", {}
+
+ if not reasoning:
+ return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this payment", {}
+
+ try:
+ state = self.get_acp_state()
+
+ job = next(
+ (c for c in state["jobs"]["active"]["as_a_buyer"] if c["job_id"] == job_id),
+ None
+ )
+
+ if not job:
+ return FunctionResultStatus.FAILED, "Job not found in your buyer jobs - check the ID and verify you're the buyer", {}
+
+ if job["phase"] != AcpJobPhasesDesc.NEGOTIATION:
+ return FunctionResultStatus.FAILED, f"Cannot pay - job is in '{job['phase']}' phase, must be in 'negotiation' phase", {}
+
+
+ self.acp_client.pay_for_job(
+ job_id,
+ job["memo"][0]["id"],
+ amount,
+ reasoning
+ )
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
+ tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
+ if tweet_id:
+ self._tweet_job(job_id, tweet_content, tweet_id)
+
+ return FunctionResultStatus.DONE, json.dumps({
+ "job_id": job_id,
+ "amount_paid": amount,
+ "timestamp": datetime.now().timestamp()
+ }), {}
+ except Exception as e:
+ print(traceback.format_exc())
+ return FunctionResultStatus.FAILED, f"System error while processing payment - try again after a short delay. {str(e)}", {}
+
+ @property
+ def deliver_job(self) -> Function:
+ job_id_arg = Argument(
+ name="job_id",
+ type="integer",
+ description="The job ID you are delivering for",
+ )
+
+ reasoning_arg = Argument(
+ name="reasoning",
+ type="string",
+ description="Why you are making this delivery",
+ )
+
+ args = [job_id_arg, reasoning_arg]
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
+ tweet_content_arg = Argument(
+ name="tweet_content",
+ type="string",
+ description="Tweet content about your delivery for the specific job. MUST NOT TAG THE BUYER. This is to avoid spamming the buyer's feed with your delivery.",
+ )
+ args.append(tweet_content_arg)
+
+ return Function(
+ fn_name="deliver_job",
+ fn_description="Completes a sale by delivering items to the buyer",
+ args=args,
+ executable=self._deliver_job_executable
+ )
+
+ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
+ if not job_id:
+ return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're delivering for", {}
+
+ if not reasoning:
+ return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this delivery", {}
+
+ try:
+ state = self.get_acp_state()
+
+ job = next(
+ (c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id),
+ None
+ )
+
+ if not job:
+ return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {}
+
+ if job["phase"] != AcpJobPhasesDesc.TRANSACTION:
+ return FunctionResultStatus.FAILED, f"Cannot deliver - job is in '{job['phase']}' phase, must be in 'transaction' phase", {}
+
+ produced = next(
+ (i for i in self.produced_inventory if i.job_id == job["job_id"]),
+ None
+ )
+
+ if not produced:
+ return FunctionResultStatus.FAILED, "Cannot deliver - you should be producing the deliverable first before delivering it", {}
+
+ deliverable = IDeliverable(
+ type=produced.type,
+ value=produced.value
+ )
+
+ self.acp_client.submit_job_deliverable(
+ job_id,
+ deliverable,
+ )
+
+ if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
+ tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
+ if tweet_id:
+ self._tweet_job(job_id, tweet_content, tweet_id)
+
+ return FunctionResultStatus.DONE, json.dumps({
+ "status": "success",
+ "job_id": job_id,
+ "deliverable": deliverable.model_dump_json(),
+ "timestamp": datetime.now().timestamp()
+ }), {}
+ except Exception as e:
+ print(traceback.format_exc())
+ return FunctionResultStatus.FAILED, f"System error while delivering items - try again after a short delay. {str(e)}", {}
+
+ def _tweet_job(self, job_id: int, content: str, tweet_id: Optional[str] = None):
+ if not hasattr(self, 'twitter_plugin') or self.twitter_plugin is None:
+ return
+
+ job = self.acp_client.get_job_by_onchain_id(job_id)
+ if not job:
+ raise Exception("ERROR (tweetJob): Job not found")
+
+
+ if tweet_id :
+ response = self.twitter_plugin.twitter_client.create_tweet(
+ text=content,
+ in_reply_to_tweet_id=tweet_id
+ )
+ else:
+ response = self.twitter_plugin.twitter_client.create_tweet(text=content)
+
+
+ role = "buyer" if job.client_address.lower() == self.acp_client.agent_address.lower() else "seller"
+
+ # Safely extract tweet ID
+ tweet_id = None
+ if isinstance(response, dict):
+ tweet_id = response.get('data', {}).get('id') or response.get('id')
+
+ context = {
+ **(job.context or {}),
+ 'tweets': [
+ *((job.context or {}).get('tweets', [])),
+ {
+ 'type': role,
+ 'tweetId': tweet_id,
+ 'content': content,
+ 'createdAt': int(datetime.now().timestamp() * 1000)
+ },
+ ],
+ }
+
+ response = requests.patch(
+ f"{self.acp_base_url}/jobs/{job_id}/context",
+ headers={
+ "Content-Type": "application/json",
+ "wallet-address": self.acp_client.agent_address,
+ },
+ json={"data": {"context": context}}
+ )
+
+ if not response.ok:
+ raise Exception(f"ERROR (tweetJob): {response.status_code} {response.text}")
diff --git a/plugins/acp/acp_plugin_gamesdk/env.py b/plugins/acp/acp_plugin_gamesdk/env.py
new file mode 100644
index 00000000..3273fb02
--- /dev/null
+++ b/plugins/acp/acp_plugin_gamesdk/env.py
@@ -0,0 +1,35 @@
+from typing import Optional
+from virtuals_acp.env import EnvSettings
+from pydantic import field_validator
+
+class PluginEnvSettings(EnvSettings):
+ GAME_DEV_API_KEY: str
+ GAME_API_KEY: str
+ BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN: str
+ SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN: str
+ WHITELISTED_WALLET_ENTITY_ID: Optional[int] = None
+ # BUYER_AGENT_TWITTER_BEARER_TOKEN: str
+ # BUYER_AGENT_TWITTER_API_KEY: str
+ # BUYER_AGENT_TWITTER_API_SECRET_KEY: str
+ # BUYER_AGENT_TWITTER_ACCESS_TOKEN: str
+ # BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET: str
+ # SELLER_AGENT_TWITTER_BEARER_TOKEN: str
+ # SELLER_AGENT_TWITTER_API_KEY: str
+ # SELLER_AGENT_TWITTER_API_SECRET_KEY: str
+ # SELLER_AGENT_TWITTER_ACCESS_TOKEN: str
+ # SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET: str
+
+ @field_validator("GAME_DEV_API_KEY", "GAME_API_KEY")
+ @classmethod
+ def check_apt_prefix(cls, v: str) -> str:
+ if v and not v.startswith("apt-"):
+ raise ValueError("GAME key must start with 'apt-'")
+ return v
+
+ @field_validator("BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN", "SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN")
+ @classmethod
+ def check_apx_prefix(cls, v: str) -> str:
+ if v and not v.startswith("apx-"):
+ raise ValueError("SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN must start with 'apx-'")
+ return v
+
\ No newline at end of file
diff --git a/plugins/acp/acp_plugin_gamesdk/interface.py b/plugins/acp/acp_plugin_gamesdk/interface.py
new file mode 100644
index 00000000..eade02c3
--- /dev/null
+++ b/plugins/acp/acp_plugin_gamesdk/interface.py
@@ -0,0 +1,147 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional, List, Literal, Dict, Any
+from pydantic import BaseModel
+
+from virtuals_acp.models import ACPJobPhase, IDeliverable
+
+
+class AcpOffering(BaseModel):
+ name: str
+ price: float
+
+ def __str__(self) -> str:
+ return f"Offering(name={self.name}, price={self.price})"
+
+class AcpJobPhasesDesc(str, Enum):
+ REQUEST = "request"
+ NEGOTIATION = "pending_payment"
+ TRANSACTION = "in_progress"
+ EVALUATION = "evaluation"
+ COMPLETED = "completed"
+ REJECTED = "rejected"
+ EXPIRED = "expired"
+
+ACP_JOB_PHASE_MAP: Dict[ACPJobPhase, AcpJobPhasesDesc] = {
+ ACPJobPhase.REQUEST: AcpJobPhasesDesc.REQUEST,
+ ACPJobPhase.NEGOTIATION: AcpJobPhasesDesc.NEGOTIATION,
+ ACPJobPhase.TRANSACTION: AcpJobPhasesDesc.TRANSACTION,
+ ACPJobPhase.EVALUATION: AcpJobPhasesDesc.EVALUATION,
+ ACPJobPhase.COMPLETED: AcpJobPhasesDesc.COMPLETED,
+ ACPJobPhase.REJECTED: AcpJobPhasesDesc.REJECTED,
+ ACPJobPhase.EXPIRED: AcpJobPhasesDesc.EXPIRED,
+}
+
+ACP_JOB_PHASE_REVERSE_MAP: Dict[str, ACPJobPhase] = {
+ "request": ACPJobPhase.REQUEST,
+ "pending_payment": ACPJobPhase.NEGOTIATION,
+ "in_progress": ACPJobPhase.TRANSACTION,
+ "evaluation": ACPJobPhase.EVALUATION,
+ "completed": ACPJobPhase.COMPLETED,
+ "rejected": ACPJobPhase.REJECTED,
+ "expired": ACPJobPhase.EXPIRED,
+}
+
+class AcpRequestMemo(BaseModel):
+ id: int
+
+ def __repr__(self) -> str:
+ return f"Memo(ID: {self.id})"
+
+class ITweet(BaseModel):
+ type: Literal["buyer", "seller"]
+ tweet_id: str
+ content: str
+ created_at: int
+
+class IAcpJob(BaseModel):
+ job_id: Optional[int]
+ client_name: Optional[str]
+ provider_name: Optional[str]
+ desc: str
+ price: str
+ provider_address: Optional[str]
+ phase: AcpJobPhasesDesc
+ memo: List[AcpRequestMemo]
+ tweet_history: Optional[List[Optional[ITweet]]]
+
+ def __repr__(self) -> str:
+ return (
+ f"Job ID: {self.job_id}, "
+ f"Client Name: {self.client_name}, "
+ f"Provider Name: {self.provider_name}, "
+ f"Description: {self.desc}, "
+ f"Price: {self.price}, "
+ f"Provider Address: {self.provider_address}, "
+ f"Phase: {self.phase.value}, "
+ f"Memo: {self.memo}, "
+ f"Tweet History: {self.tweet_history}"
+ )
+
+
+class IInventory(IDeliverable):
+ job_id: int
+ client_name: Optional[str]
+ provider_name: Optional[str]
+
+class AcpJobsSection(BaseModel):
+ as_a_buyer: List[IAcpJob]
+ as_a_seller: List[IAcpJob]
+
+ def __str__(self) -> str:
+ buyer_jobs = "\n".join([f"#{i+1} {str(job)}" for i, job in enumerate(self.as_a_buyer)])
+ seller_jobs = "\n".join([f"#{i+1} {str(job)}" for i, job in enumerate(self.as_a_seller)])
+ return f"As Buyer:\n{buyer_jobs}\n\nAs Seller:\n{seller_jobs}"
+
+class AcpJobs(BaseModel):
+ active: AcpJobsSection
+ completed: List[IAcpJob]
+ cancelled: List[IAcpJob]
+
+ def __str__(self) -> str:
+ return (
+ f"💻 Jobs\n"
+ f"🌕 Active Jobs:\n{self.active}\n"
+ f"🟢 Completed:\n{self.completed}\n"
+ f"🔴 Cancelled:\n{self.cancelled}"
+ )
+
+class AcpInventory(BaseModel):
+ acquired: List[IInventory]
+ produced: Optional[List[IInventory]]
+
+ def __str__(self) -> str:
+ return (
+ f"💼 Inventory\n"
+ f"Acquired: {self.acquired}\n"
+ f"Produced: {self.produced}"
+ )
+
+class AcpState(BaseModel):
+ inventory: AcpInventory
+ jobs: AcpJobs
+
+ def __str__(self) -> str:
+ return (
+ f"🤖 Agent State".center(50, '=') + "\n"
+ f"{str(self.inventory)}\n"
+ f"{str(self.jobs)}\n"
+ f"State End".center(50, '=')
+ )
+
+def to_serializable_dict(obj: Any) -> Any:
+ if isinstance(obj, Enum):
+ return obj.value
+ elif isinstance(obj, dict):
+ return {k: to_serializable_dict(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [to_serializable_dict(item) for item in obj]
+ elif hasattr(obj, "__dict__"):
+ return {
+ k: to_serializable_dict(v)
+ for k, v in vars(obj).items()
+ if not k.startswith("_")
+ }
+ else:
+ return obj
+
\ No newline at end of file
diff --git a/plugins/acp/examples/agentic/.env.example b/plugins/acp/examples/agentic/.env.example
new file mode 100644
index 00000000..f14d64f1
--- /dev/null
+++ b/plugins/acp/examples/agentic/.env.example
@@ -0,0 +1,28 @@
+# ACP Agents' Credentials
+WHITELISTED_WALLET_PRIVATE_KEY=<0x-your-whitelisted-wallet-private-key>
+BUYER_AGENT_WALLET_ADDRESS=<0x-your-buyer-agent-wallet-address>
+SELLER_AGENT_WALLET_ADDRESS=<0x-your-seller-agent-wallet-address>
+BUYER_ENTITY_ID=
+SELLER_ENTITY_ID=
+EVALUATOR_ENTITY_ID=
+
+# GAME API Key (get from https://console.game.virtuals.io/)
+GAME_API_KEY=
+# GAME Dev API Key (get from Virtuals' DevRels)
+GAME_DEV_API_KEY=
+
+# GAME Twitter Access Token for X (Twitter) Authentication
+BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+
+# GAME Twitter Access Token for X (Twitter) Authentication
+BUYER_AGENT_TWITTER_BEARER_TOKEN=
+BUYER_AGENT_TWITTER_API_KEY=
+BUYER_AGENT_TWITTER_API_SECRET_KEY=
+BUYER_AGENT_TWITTER_ACCESS_TOKEN=
+BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+SELLER_AGENT_TWITTER_BEARER_TOKEN=
+SELLER_AGENT_TWITTER_API_KEY=
+SELLER_AGENT_TWITTER_API_SECRET_KEY=
+SELLER_AGENT_TWITTER_ACCESS_TOKEN=
+SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
diff --git a/plugins/acp/examples/agentic/README.md b/plugins/acp/examples/agentic/README.md
new file mode 100644
index 00000000..466fe974
--- /dev/null
+++ b/plugins/acp/examples/agentic/README.md
@@ -0,0 +1,329 @@
+# ACP Plugin Examples - Agentic Mode
+
+This directory contains example implementations of the ACP (Agent Commerce Protocol) plugin in the agentic mode, demonstrating both buyer and seller interactions.
+
+## Overview
+
+In this example, we have two agents:
+
+- `buyer.py`: An agent that looks for meme generation services
+- `seller.py`: An agent that provides meme generation services
+
+## Prerequisite
+⚠️ Important: Before testing your agent's services with a counterpart agent, you must register your agent.
+This step is a critical precursor. Without registration, the counterpart agent will not be able to discover or interact with your agent.
+
+## Buyer Example
+
+The buyer agent (`buyer.py`):
+
+- Posts tweets using memes
+- Searches for meme generation services through ACP
+- Uses Twitter integration for posting
+
+### Configuration
+
+```python
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options)
+ )
+)
+```
+
+## Seller Example
+
+The seller agent (`seller.py`):
+
+- Provides meme generation services
+- Responds to job requests through ACP
+- Generates and delivers memes via URLs
+
+### Configuration
+
+```python
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.SELLER_AGENT_WALLET_ADDRESS,
+ entity_id=env.SELLER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options)
+ )
+)
+```
+
+## Getting Started
+
+### Installation
+
+1. From the directory (`acp`), run the installation:
+
+ ```bash
+ poetry install
+ ```
+
+ or install it with pip
+
+ ```bash
+ pip install acp-plugin-gamesdk
+ ```
+
+2. Activate the virtual environment by running:
+
+ ```bash
+ eval $(poetry env activate)
+ ```
+
+3. Store the key in a safe location, like a .env, .bashrc or a .zshrc file.
+
+ ```dotenv
+ # ACP Agents' Credentials
+ WHITELISTED_WALLET_PRIVATE_KEY=<0x-your-whitelisted-wallet-private-key>
+ BUYER_AGENT_WALLET_ADDRESS=<0x-your-buyer-agent-wallet-address>
+ SELLER_AGENT_WALLET_ADDRESS=<0x-your-seller-agent-wallet-address>
+ BUYER_ENTITY_ID=
+ SELLER_ENTITY_ID=
+
+ # GAME API Key (get from https://console.game.virtuals.io/)
+ GAME_API_KEY=
+
+ # GAME Twitter Access Token for X (Twitter) Authentication
+ BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+ SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+
+ # GAME Twitter Access Token for X (Twitter) Authentication
+ BUYER_AGENT_TWITTER_BEARER_TOKEN=
+ BUYER_AGENT_TWITTER_API_KEY=
+ BUYER_AGENT_TWITTER_API_SECRET_KEY=
+ BUYER_AGENT_TWITTER_ACCESS_TOKEN=
+ BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+ SELLER_AGENT_TWITTER_BEARER_TOKEN=
+ SELLER_AGENT_TWITTER_API_KEY=
+ SELLER_AGENT_TWITTER_API_SECRET_KEY=
+ SELLER_AGENT_TWITTER_ACCESS_TOKEN=
+ SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+ ```
+
+4. Import acp_plugin and load the environment variables by running:
+
+ ```python
+ from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+ from virtuals_acp.client import VirtualsACP
+ from virtuals_acp import ACPJob, ACPJobPhase
+ from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+ from dotenv import load_dotenv
+
+ load_dotenv()
+ ```
+
+5. Configure your environment:
+
+ - Set up your API keys
+ - GAME API key (get from https://console.game.virtuals.io/)
+ - GAME Dev API key (please contact us to get one)
+ - Configure your wallet private key
+ - Set up your GAME Twitter access token
+
+6. Run the examples:
+- Run buyer
+
+ ```python
+ python plugins/acp/examples/agentic/buyer.py
+ ```
+- Run seller
+
+ ```python
+ python plugins/acp/examples/agentic/seller.py
+ ```
+
+## Understanding the `on_evaluate` Function
+
+The `on_evaluate` parameter in the VirtualsACP client configuration is crucial for handling job evaluation when your agent acts as an evaluator:
+
+- The function is triggered when a job requires evaluation
+- You receive the complete ACPJob object with all memos and deliverables
+- Call `job.evaluate(True)` to approve or `job.evaluate(False)` to reject
+- The function should check for memos with `next_phase == ACPJobPhase.COMPLETED`
+
+### How it works?
+Here's a minimal example to get started with evaluation.
+
+```python
+from virtuals_acp import ACPJob, ACPJobPhase
+
+def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating deliverable for job {job.id}")
+ # Your evaluation logic here
+ job.evaluate(True) # True to approve, False to reject
+ break
+```
+
+Then, pass this function into the VirtualsACP client:
+```python
+acp_client = VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate
+)
+```
+
+### More Realistic Examples
+You can implement custom evaluation logic based on the job deliverables:
+
+1. Example 1: Check deliverable content:
+
+ ```python
+ def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating job {job.id}")
+
+ if job.deliverable:
+ deliverable_data = json.loads(job.deliverable)
+
+ # Check if it's a URL deliverable
+ if deliverable_data.get("type") == "url":
+ url = deliverable_data.get("value", "")
+ if url.startswith(("http://", "https://")):
+ print(f"✅ Valid URL: {url}")
+ job.evaluate(True)
+ else:
+ print(f"❌ Invalid URL: {url}")
+ job.evaluate(False)
+ else:
+ # Accept other types
+ job.evaluate(True)
+ else:
+ print("❌ No deliverable found")
+ job.evaluate(False)
+ break
+ ```
+
+2. Example 2: Check file type for image deliverables:
+ ```python
+ def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating job {job.id}")
+
+ if job.deliverable:
+ deliverable_data = json.loads(job.deliverable)
+ url = deliverable_data.get("value", "")
+
+ if any(url.endswith(ext) for ext in [".png", ".jpg", ".jpeg", ".gif"]):
+ print(f"✅ Valid image format: {url}")
+ job.evaluate(True)
+ else:
+ print(f"❌ Invalid image format: {url}")
+ job.evaluate(False)
+ else:
+ job.evaluate(False)
+ break
+ ```
+
+These are just simple, self-defined examples of custom evaluator logic. You're encouraged to tweak and expand these based on the complexity of your use case. Evaluators are a powerful way to gatekeep quality and ensure consistency in jobs submitted by seller agents.
+
+Moving forward, we are building four in-house evaluator agent clusters (work in progress):
+
+- Blockchain Evaluator Agent
+- Meme Evaluator Agent
+- Hedgefund Evaluator Agent
+- Mediahouse Evaluator Agent
+
+These evaluators will handle more advanced logic and domain-specific validations. But feel free to build your own lightweight ones until they're fully live!
+
+## Understanding Clusters
+
+Clusters in ACP are categories that group agents together based on their functionality or domain:
+
+- `cluster`: Specifies the category your agent belongs to, making it easier for other agents to discover and interact with services in the same domain.
+- [WIP] `evaluator_cluster`: A specialized type of cluster specifically for agents that evaluate jobs generated by AI. These evaluator agents provide quality control and verification services.
+
+Clusters help with:
+
+- Organizing agents by their specialization
+- Improving service discovery efficiency
+- Creating ecosystems of complementary agents
+- Enabling targeted searches for specific capabilities
+
+When configuring your agent, choose clusters that accurately represent your agent's capabilities to ensure it can be found by the right counterparts.
+
+## Job Expiry Setup with `job_expiry_duration_mins`
+
+The `job_expiry_duration_mins` parameter defines how long a job request remains active and valid before it automatically expires. This timeout is crucial for managing agent coordination workflows, especially in asynchronous or decentralized environments where job responses may not arrive immediately.
+
+### Why It Matters
+
+Setting an expiry time ensures that:
+- Stale or unresponsive job requests do not hang indefinitely
+- The system can safely discard or retry expired jobs
+
+### How It Works
+Internally, `job_expiry_duration_mins` is used to compute a future timestamp (expired_at) relative to the current time:
+```bash
+expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.job_expiry_duration_mins)
+```
+
+### Example: Plugin Setup with Job Expiry
+```python
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.("BUYER_AGENT_WALLET_ADDRESS"),
+ on_evaluate=on_evaluate,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ cluster="hedgefund", #Example Cluster
+ job_expiry_duration_mins=10 # Job will expire 10 minutes after creation
+ )
+)
+```
+
+In this example:
+- Any job created through this plugin instance will be automatically marked as expired after 10 minutes, unless a response is received.
+- You can adjust this value (e.g., to 20 or 30) based on how responsive your agent network is.
+
+---
+
+## Note
+
+- Make sure to replace placeholder API keys and private keys with your own
+- You can use a testnet wallet to test the examples
+- Twitter integration requires a valid access token (check out [Twitter Plugin](https://github.com/game-by-virtuals/game-python/tree/main/plugins/twitter/) for more instructions)
+
+---
+
+## Useful Resources
+
+1. [ACP Builder’s Guide](https://whitepaper.virtuals.io/info-hub/builders-hub/agent-commerce-protocol-acp-builder-guide/acp-tech-playbook)
+ - A comprehensive playbook covering **all onboarding steps and tutorials**:
+ - Create your agent and whitelist developer wallets
+ - Explore SDK & plugin resources for seamless integration
+ - Understand ACP job lifecycle and best prompting practices
+ - Learn the difference between graduated and pre-graduated agents
+ - Review SLA, status indicators, and supporting articles
+ - Designed to help builders have their agent **ready for test interactions** on the ACP platform.
+
+
+2. [Agent Commerce Protocol (ACP) research page](https://app.virtuals.io/research/agent-commerce-protocol)
+ - This webpage introduces the Agent Commerce Protocol - A Standard for Permissionless AI Agent Commerce, a piece of research done by the Virtuals Protocol team
+ - It includes the links to the multi-agent demo dashboard and paper.
+
+
+3. [ACP Plugin FAQs](https://virtualsprotocol.notion.site/ACP-Plugin-FAQs-Troubleshooting-Tips-1d62d2a429e980eb9e61de851b6a7d60?pvs=4)
+ - Comprehensive FAQ section covering common plugin questions—everything from installation and configuration to key API usage patterns.
+ - Step-by-step troubleshooting tips for resolving frequent errors like incomplete deliverable evaluations and wallet credential issues.
\ No newline at end of file
diff --git a/plugins/acp/examples/agentic/buyer.py b/plugins/acp/examples/agentic/buyer.py
new file mode 100644
index 00000000..b19d4c27
--- /dev/null
+++ b/plugins/acp/examples/agentic/buyer.py
@@ -0,0 +1,152 @@
+import os
+
+from dacite import from_dict
+from dacite.config import Config
+from rich import print, box
+from rich.panel import Panel
+from typing import Tuple
+from game_sdk.game.agent import Agent, WorkerConfig
+from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+from virtuals_acp.client import VirtualsACP
+from acp_plugin_gamesdk.env import PluginEnvSettings
+from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus
+from virtuals_acp import ACPJob, ACPJobPhase
+from acp_plugin_gamesdk.interface import AcpState, AcpJobPhasesDesc
+from dotenv import load_dotenv
+
+# GAME Twitter Plugin import
+from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+# Native Twitter Plugin import
+# from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+load_dotenv(override=True)
+
+env = PluginEnvSettings()
+
+
+def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating deliverable for job {job.id}")
+ # Auto-accept all deliverables for this example
+ job.evaluate(True)
+ break
+
+# GAME Twitter Plugin options
+options = {
+ "id": "twitter_plugin",
+ "name": "Twitter Plugin",
+ "description": "Twitter Plugin for tweet-related functions.",
+ "credentials": {
+ "game_twitter_access_token": env.BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN
+ },
+}
+
+# Native Twitter Plugin options
+# options = {
+# "id": "twitter_plugin",
+# "name": "Twitter Plugin",
+# "description": "Twitter Plugin for tweet-related functions.",
+# "credentials": {
+# "bearerToken": env.BUYER_AGENT_TWITTER_BEARER_TOKEN,
+# "apiKey": env.BUYER_AGENT_TWITTER_API_KEY,
+# "apiSecretKey": env.BUYER_AGENT_TWITTER_API_SECRET_KEY,
+# "accessToken": env.BUYER_AGENT_TWITTER_ACCESS_TOKEN,
+# "accessTokenSecret": env.BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET,
+# },
+# }
+
+def buyer():
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options),
+ cluster="", # example cluster
+ graduation_status=ACPGraduationStatus.ALL, # Options: GRADUATED / NOT_GRADUATED / ALL
+ online_status=ACPOnlineStatus.ALL # Options: ONLINE / OFFLINE / ALL
+ )
+ )
+
+ def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ return state
+
+ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str, dict]:
+ return FunctionResultStatus.DONE, "Tweet has been posted", {}
+ # if acp_plugin.twitter_plugin is not None:
+ # post_tweet_fn = acp_plugin.twitter_plugin.get_function("post_tweet")
+ # post_tweet_fn(content)
+ # return FunctionResultStatus.DONE, "Tweet has been posted", {}
+
+ # return FunctionResultStatus.FAILED, "Twitter plugin is not initialized", {}
+
+ core_worker = WorkerConfig(
+ id="core-worker",
+ worker_description="This worker is to post tweet",
+ action_space=[
+ Function(
+ fn_name="post_tweet",
+ fn_description="This function is to post tweet",
+ args=[
+ Argument(
+ name="content",
+ type="string",
+ description="The content of the tweet"
+ ),
+ Argument(
+ name="reasoning",
+ type="string",
+ description="The reasoning of the tweet"
+ )
+ ],
+ executable=post_tweet
+ )
+ ],
+ get_state_fn=get_agent_state
+ )
+
+ acp_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.search_agents_functions,
+ acp_plugin.initiate_job,
+ acp_plugin.pay_job
+ ]
+ }
+ )
+
+ agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Virtuals",
+ agent_goal="Finding the best meme to do tweet posting",
+ agent_description=f"""
+ Agent that gain market traction by posting meme. Your interest are in cats and AI.
+ You can head to acp to look for agents to help you generating meme.
+ Do not look for a relevant validator to validate the deliverable.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[core_worker, acp_worker],
+ get_agent_state_fn=get_agent_state
+ )
+
+ agent.compile()
+
+ while True:
+ print("🟢"*40)
+ agent.step()
+ state = from_dict(data_class=AcpState, data=agent.agent_state, config=Config(type_hooks={AcpJobPhasesDesc: AcpJobPhasesDesc}))
+ print(Panel(f"{state}", title="Agent State", box=box.ROUNDED, title_align="left"))
+ print("🔴"*40)
+ input("\nPress any key to continue...\n")
+
+if __name__ == "__main__":
+ buyer()
diff --git a/plugins/acp/examples/agentic/seller.py b/plugins/acp/examples/agentic/seller.py
new file mode 100644
index 00000000..8d88fc8d
--- /dev/null
+++ b/plugins/acp/examples/agentic/seller.py
@@ -0,0 +1,158 @@
+import os
+
+from dacite import from_dict
+from dacite.config import Config
+from rich import print, box
+from rich.panel import Panel
+from typing import Tuple
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+from virtuals_acp.client import VirtualsACP
+from acp_plugin_gamesdk.env import PluginEnvSettings
+from acp_plugin_gamesdk.interface import AcpState, AcpJobPhasesDesc, IInventory
+from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
+from game_sdk.game.agent import Agent, WorkerConfig
+from dotenv import load_dotenv
+
+# GAME Twitter Plugin import
+from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+# Native Twitter Plugin import
+# from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+load_dotenv(override=True)
+
+env = PluginEnvSettings()
+
+
+# GAME Twitter Plugin options
+options = {
+ "id": "twitter_plugin",
+ "name": "Twitter Plugin",
+ "description": "Twitter Plugin for tweet-related functions.",
+ "credentials": {
+ "game_twitter_access_token": env.SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN
+ },
+}
+
+# Native Twitter Plugin options
+# options = {
+# "id": "twitter_plugin",
+# "name": "Twitter Plugin",
+# "description": "Twitter Plugin for tweet-related functions.",
+# "credentials": {
+# "bearerToken": env.SELLER_AGENT_TWITTER_BEARER_TOKEN,
+# "apiKey": env.SELLER_AGENT_TWITTER_API_KEY,
+# "apiSecretKey": env.SELLER_AGENT_TWITTER_API_SECRET_KEY,
+# "accessToken": env.SELLER_AGENT_TWITTER_ACCESS_TOKEN,
+# "accessTokenSecret": env.SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET,
+# },
+# }
+
+def seller():
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.SELLER_AGENT_WALLET_ADDRESS,
+ entity_id=env.SELLER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options)
+ )
+ )
+
+ def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ return state
+
+ def generate_meme(description: str, job_id: int, reasoning: str) -> Tuple[FunctionResultStatus, str, dict]:
+ if not job_id or job_id == 'None':
+ return FunctionResultStatus.FAILED, f"job_id is invalid. Should only respond to active as a seller job.", {}
+
+ state = acp_plugin.get_acp_state()
+
+ job = next(
+ (j for j in state.get('jobs',{}).get('active').get('as_a_seller') if j.get('job_id') == job_id),
+ None
+ )
+
+ if not job:
+ return FunctionResultStatus.FAILED, f"Job {job_id} is invalid. Should only respond to active as a seller job.", {}
+
+ url = "https://example.com/meme"
+
+ meme = IInventory(
+ type="url",
+ value=url,
+ job_id=job_id,
+ client_name=job.get("client_name"),
+ provider_name=job.get("provider_name"),
+ )
+
+ acp_plugin.add_produce_item(meme)
+
+ return FunctionResultStatus.DONE, f"Meme generated with the URL: {url}", {}
+
+ core_worker = WorkerConfig(
+ id="core-worker",
+ worker_description="This worker to provide meme generation as a service where you are selling",
+ action_space=[
+ Function(
+ fn_name="generate_meme",
+ fn_description="A function to generate meme",
+ args=[
+ Argument(
+ name="description",
+ type="str",
+ description="A description of the meme generated"
+ ),
+ Argument(
+ name="job_id",
+ type="integer",
+ description="Job that your are responding to."
+ ),
+ Argument(
+ name="reasoning",
+ type="str",
+ description="The reasoning of the tweet"
+ )
+ ],
+ executable=generate_meme
+ )
+ ],
+ get_state_fn=get_agent_state
+ )
+
+ acp_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.respond_job,
+ acp_plugin.deliver_job
+ ]
+ }
+ )
+
+ agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Memx",
+ agent_goal="To provide meme generation as a service. You should go to ecosystem worker to response any job once you have gotten it as a seller.",
+ agent_description=f"""You are Memx, a meme generator. Meme generation is your life. You always give buyer the best meme.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[core_worker, acp_worker],
+ get_agent_state_fn=get_agent_state
+ )
+
+ agent.compile()
+
+ while True:
+ print("🟢"*40)
+ agent.step()
+ state = AcpState.model_validate(agent.agent_state)
+ print(Panel(f"{state}", title="Agent State", box=box.ROUNDED, title_align="left"))
+ print("🔴"*40)
+ input("\nPress any key to continue...\n")
+
+if __name__ == "__main__":
+ seller()
diff --git a/plugins/acp/examples/reactive/.env.example b/plugins/acp/examples/reactive/.env.example
new file mode 100644
index 00000000..0ac6123c
--- /dev/null
+++ b/plugins/acp/examples/reactive/.env.example
@@ -0,0 +1,27 @@
+# ACP Agents' Credentials
+WHITELISTED_WALLET_PRIVATE_KEY=<0x-your-whitelisted-wallet-private-key>
+BUYER_AGENT_WALLET_ADDRESS=<0x-your-buyer-agent-wallet-address>
+SELLER_AGENT_WALLET_ADDRESS=<0x-your-seller-agent-wallet-address>
+BUYER_ENTITY_ID=
+SELLER_ENTITY_ID=
+
+# GAME API Key (get from https://console.game.virtuals.io/)
+GAME_API_KEY=
+# GAME Dev API Key (get from Virtuals' DevRels)
+GAME_DEV_API_KEY=
+
+# GAME Twitter Access Token for X (Twitter) Authentication
+BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+
+# GAME Twitter Access Token for X (Twitter) Authentication
+BUYER_AGENT_TWITTER_BEARER_TOKEN=
+BUYER_AGENT_TWITTER_API_KEY=
+BUYER_AGENT_TWITTER_API_SECRET_KEY=
+BUYER_AGENT_TWITTER_ACCESS_TOKEN=
+BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+SELLER_AGENT_TWITTER_BEARER_TOKEN=
+SELLER_AGENT_TWITTER_API_KEY=
+SELLER_AGENT_TWITTER_API_SECRET_KEY=
+SELLER_AGENT_TWITTER_ACCESS_TOKEN=
+SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
diff --git a/plugins/acp/examples/reactive/README.md b/plugins/acp/examples/reactive/README.md
new file mode 100644
index 00000000..a7292e59
--- /dev/null
+++ b/plugins/acp/examples/reactive/README.md
@@ -0,0 +1,573 @@
+# ACP Plugin Examples - Reactive Mode
+
+This directory contains example implementations of the ACP (Agent Commerce Protocol) plugin in the reactive mode, demonstrating both buyer and seller interactions.
+
+## Table of Contents
+
+- [Overview](#overview)
+- [Prerequisite](#prerequisite)
+- [Getting Started](#getting-started)
+- [Installation](#installation)
+- [Seller Agent Guide](#seller-agent-guide)
+- [Buyer Agent Setup Guide](#buyer-agent-setup-guide)
+- [Understanding the `on_evaluate` Function](#understanding-the-on_evaluate-function)
+- [Understanding the Queue Logic](#understanding-the-queue-logic)
+- [Understanding Clusters](#understanding-clusters)
+- [Job Expiry Setup with `job_expiry_duration_mins`](#job-expiry-setup-with-job_expiry_duration_mins)
+- [Note](#note)
+
+## Overview
+
+In this example, we have two agents:
+- `buyer.py`: An agent that looks for meme generation services
+- `seller.py`: An agent that provides meme generation services
+
+## Prerequisite
+⚠️ Important: Before testing your agent's services with a counterpart agent, you must register your agent with the [Service Registry](https://app.virtuals.io/acp).
+This step is a critical precursor. Without registration, the counterpart agent will not be able to discover or interact with your agent.
+
+## Getting Started
+
+## Installation
+
+1. From the directory (`acp`), run the installation:
+
+ ```bash
+ poetry install
+ ```
+
+ or install it with pip
+
+ ```bash
+ pip install acp-plugin-gamesdk
+ ```
+
+2. Activate the virtual environment by running:
+
+ ```bash
+ eval $(poetry env activate)
+ ```
+
+3. Store the key in a safe location, like a .env, .bashrc or a .zshrc file.
+
+ ```dotenv
+ # ACP Agents' Credentials
+ WHITELISTED_WALLET_PRIVATE_KEY=<0x-your-whitelisted-wallet-private-key>
+ BUYER_AGENT_WALLET_ADDRESS=<0x-your-buyer-agent-wallet-address>
+ SELLER_AGENT_WALLET_ADDRESS=<0x-your-seller-agent-wallet-address>
+ BUYER_ENTITY_ID=
+ SELLER_ENTITY_ID=
+
+ # GAME API Key (get from https://console.game.virtuals.io/)
+ GAME_API_KEY=
+
+ # GAME Twitter Access Token for X (Twitter) Authentication
+ BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+ SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN=
+
+ # GAME Twitter Access Token for X (Twitter) Authentication
+ BUYER_AGENT_TWITTER_BEARER_TOKEN=
+ BUYER_AGENT_TWITTER_API_KEY=
+ BUYER_AGENT_TWITTER_API_SECRET_KEY=
+ BUYER_AGENT_TWITTER_ACCESS_TOKEN=
+ BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+ SELLER_AGENT_TWITTER_BEARER_TOKEN=
+ SELLER_AGENT_TWITTER_API_KEY=
+ SELLER_AGENT_TWITTER_API_SECRET_KEY=
+ SELLER_AGENT_TWITTER_ACCESS_TOKEN=
+ SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET=
+ ```
+
+4. Import acp_plugin and load the environment variables by running:
+
+ ```python
+ from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+ from acp_plugin_gamesdk.env import PluginEnvSettings
+ from acp_plugin_gamesdk.interface import AcpState, to_serializable_dict
+ from virtuals_acp.client import VirtualsACP
+ from virtuals_acp import ACPJob, ACPJobPhase
+ from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+ from dotenv import load_dotenv
+
+ load_dotenv(override=True)
+ env = PluginEnvSettings()
+ ```
+
+5. Configure your environment:
+
+ - Set up your API keys
+ - GAME API key (get from https://console.game.virtuals.io/)
+ - Configure your wallet private key
+ - Set up your GAME Twitter access token
+
+6. Run the examples:
+ - Run buyer
+
+ ```python
+ python plugins/acp/examples/reactive/buyer.py
+ ```
+ - Run seller
+
+ ```python
+ python plugins/acp/examples/reactive/seller.py
+ ```
+
+More details on the test buyer and seller scripts are provided in the next section.
+
+## Seller Agent Guide
+
+This guide explains how to run a **Seller Agent** using the ACP Plugin. The seller listens for incoming jobs, responds accordingly, and delivers outputs — such as a meme in this case.
+
+> This example uses a custom function (`generate_meme`) alongside the plugin's core ACP functions to deliver a meme.
+
+### How the Seller Agent Works
+
+This seller agent:
+
+- Listens for ACP job phase changes using `on_new_task` callback
+- Responds to job offers automatically
+- Delivers memes when payment is received
+
+### Core Components Breakdown
+
+ 1. Setup the Seller Agent
+
+ ```python
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.SELLER_AGENT_WALLET_ADDRESS,
+ on_new_task=on_new_task,
+ entity_id=env.SELLER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options)
+ )
+ )
+ ```
+
+ 2. Handle Phase Changes
+ 1. When a job progresses through phases (e.g., `REQUEST`, `TRANSACTION`), the agent will:
+ 1. **Phase: `REQUEST`** — respond to job availability
+ 2. **Phase: `TRANSACTION`** — generate and deliver meme
+
+ ```python
+ def on_new_task(job: ACPJob):
+ out = ""
+ out += f"Reacting to job:\n{job}\n\n"
+ prompt = ""
+
+ if job.phase == ACPJobPhase.REQUEST:
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.NEGOTIATION:
+ prompt = f"""
+ Respond to the following transaction:
+ {job}
+
+ decide whether you should accept the job or not.
+ once you have responded to the job, do not proceed with producing the deliverable and wait.
+ """
+ elif job.phase == ACPJobPhase.TRANSACTION:
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.EVALUATION:
+ prompt = f"""
+ Respond to the following transaction:
+ {job}
+
+ you should produce the deliverable and deliver it to the buyer.
+
+ If no deliverable is provided, you should produce the deliverable and deliver it to the buyer.
+ """
+
+ if prompt:
+ agent.get_worker("acp_worker").run(prompt)
+ out += "✅ Seller has responded to job.\n"
+
+ print(Panel(out, title="🔁 Reaction", border_style="red"))
+ ```
+
+### Run the Seller Script
+
+```python
+python plugins/acp/examples/reactive/seller.py
+```
+
+> The seller will start listening for any jobs initiated by the buyer.
+>
+
+## Next Step
+
+Once the **Seller Agent** is set up, she has already started listening, you can now run a **Buyer Agent** in a separate terminal to test end-to-end ACP job flow.
+
+---
+
+## Buyer Agent Setup Guide
+
+This guide walks you through setting up the **Buyer Agent** that initiates jobs and handles payments via the ACP Plugin.
+
+### How the Buyer Agent Works
+
+This agent plays a **dual role**:
+
+1. **Core Agent:** Allows agent to perform `searchAgents` and `initiateJob`.
+2. **Reactive Agent (automated):** Listens to phase changes and **automatically pays** for jobs once the seller has delivered.
+> Note that the currency of transaction is in \$VIRTUAL, the native token of the Virtuals Protocol. Therefore, please ensure you have enough $VIRTUAL in your buyer agent wallet to pay for the job. In case of testnet, you can reach out to the Virtuals team to get some testnet tokens.
+
+### Core Components
+
+1. `core_worker`
+ 1. Defines a mock function (`post_tweet`) to simulate additional non-ACP actions within the agent. This worker is meant to host the agent's domain-specific functions action space.
+ 2. Sample code:
+
+ ```python
+ core_worker = WorkerConfig(
+ id="core-worker",
+ worker_description="This worker is to post tweet",
+ action_space=[
+ Function(
+ fn_name="post_tweet",
+ fn_description="This function is to post tweet",
+ args=[
+ Argument(
+ name="content",
+ type="string",
+ description="The content of the tweet"
+ ),
+ Argument(
+ name="reasoning",
+ type="string",
+ description="The reasoning of the tweet"
+ )
+ ],
+ executable=post_tweet
+ )
+ ],
+ get_state_fn=get_agent_state
+ )
+ ```
+
+2. Reactive Buyer Agent
+
+ This part automatically pays for a job once a deliverable is received.
+
+ ```python
+ # Buyer agent is meant to handle payments
+ buyer_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.pay_job
+ ]
+ }
+ )
+
+ buyer_agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Buyer",
+ agent_goal="Perform and complete transaction with seller",
+ agent_description=f"""
+ Agent that gain market traction by posting meme. Your interest are in cats and AI.
+ You can head to acp to look for agents to help you generating meme.
+ Do not look for a relevant validator to validate the deliverable.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[buyer_worker],
+ get_agent_state_fn=get_agent_state
+ )
+ ```
+
+ You also need to bind this agent to react on job phase change:
+
+ ```python
+ def on_new_task(job: ACPJob):
+ out = ""
+ if job.phase == ACPJobPhase.NEGOTIATION:
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.TRANSACTION:
+ out += f"Buyer agent is reacting to job:\n{job}\n\n"
+
+ buyer_agent.get_worker("acp_worker").run(
+ f"Respond to the following transaction: {job}",
+ )
+
+ out += "Buyer agent has responded to the job\n"
+ print(Panel(out, title="🔁 Reaction", border_style="red"))
+ ```
+
+3. Initiating and Searching for Jobs
+
+ ```python
+ agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Virtuals",
+ agent_goal="Finding the best meme to do tweet posting",
+ agent_description=f"""
+ Agent that gain market traction by posting meme. Your interest are in cats and AI.
+ You can head to acp to look for agents to help you generating meme.
+ Do not look for a relevant validator to validate the deliverable.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[core_worker, acp_worker],
+ get_agent_state_fn=get_agent_state
+ )
+ ```
+
+### Configuration
+
+```python
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate,
+ on_new_task=on_new_task,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options)
+ )
+)
+```
+
+### Run the Buyer Script
+```bash
+python plugins/acp/examples/reactive/buyer.py
+```
+
+## Understanding the `on_evaluate` Function
+
+The `on_evaluate` parameter in the VirtualsACP client configuration is crucial for handling job evaluation when your agent acts as an evaluator:
+
+- The function is triggered when a job requires evaluation
+- You receive the complete ACPJob object with all memos and deliverables
+- Call `job.evaluate(True)` to approve or `job.evaluate(False)` to reject
+- The function should check for memos with `next_phase == ACPJobPhase.COMPLETED`
+
+### How it works?
+Here's a minimal example to get started with evaluation.
+
+```python
+from virtuals_acp import ACPJob, ACPJobPhase
+
+def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating deliverable for job {job.id}")
+ # Your evaluation logic here
+ job.evaluate(True) # True to approve, False to reject
+ break
+```
+
+Then, pass this function into the VirtualsACP client:
+```python
+acp_client = VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate
+)
+```
+
+### More Realistic Examples
+You can implement custom evaluation logic based on the job deliverables:
+
+1. Example 1: Check deliverable content:
+
+ ```python
+ def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating job {job.id}")
+
+ if job.deliverable:
+ deliverable_data = json.loads(job.deliverable)
+
+ # Check if it's a URL deliverable
+ if deliverable_data.get("type") == "url":
+ url = deliverable_data.get("value", "")
+ if url.startswith(("http://", "https://")):
+ print(f"✅ Valid URL: {url}")
+ job.evaluate(True)
+ else:
+ print(f"❌ Invalid URL: {url}")
+ job.evaluate(False)
+ else:
+ # Accept other types
+ job.evaluate(True)
+ else:
+ print("❌ No deliverable found")
+ job.evaluate(False)
+ break
+ ```
+
+2. Example 2: Check file type for image deliverables:
+ ```python
+ def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating job {job.id}")
+
+ if job.deliverable:
+ deliverable_data = json.loads(job.deliverable)
+ url = deliverable_data.get("value", "")
+
+ if any(url.endswith(ext) for ext in [".png", ".jpg", ".jpeg", ".gif"]):
+ print(f"✅ Valid image format: {url}")
+ job.evaluate(True)
+ else:
+ print(f"❌ Invalid image format: {url}")
+ job.evaluate(False)
+ else:
+ job.evaluate(False)
+ break
+ ```
+
+These are just simple, self-defined examples of custom evaluator logic. You're encouraged to tweak and expand these based on the complexity of your use case. Evaluators are a powerful way to gatekeep quality and ensure consistency in jobs submitted by seller agents.
+
+Moving forward, we are building four in-house evaluator agent clusters (work in progress):
+
+- Blockchain Evaluator Agent
+- Meme Evaluator Agent
+- Hedgefund Evaluator Agent
+- Mediahouse Evaluator Agent
+
+These evaluators will handle more advanced logic and domain-specific validations. But feel free to build your own lightweight ones until they're fully live!
+
+## Understanding the Queue Logic
+
+Both the buyer and seller agents use a thread-safe job queue to handle incoming jobs asynchronously. When a new job arrives (via the `on_new_task` callback), it is appended to a queue protected by a threading lock. A background worker thread waits for jobs to be added and processes them one by one, ensuring that job handling is safe and non-blocking. This design allows the agent to react to multiple jobs efficiently and prevents race conditions.
+
+- **Job Queue:** Uses a Python list (buyer) or `collections.deque` (seller) to store jobs.
+- **Thread Safety:** All queue operations are protected by a `threading.Lock`.
+- **Worker Thread:** A background thread waits for jobs using a `threading.Event` and processes jobs as they arrive.
+- **Event-Driven:** The event is set when a new job is added and cleared when the queue is empty.
+
+This pattern ensures robust, concurrent job handling for both buyer and seller agents.
+
+**Sample Code:**
+
+```python
+import threading
+from collections import deque
+
+job_queue = deque() # or use a list for the buyer
+job_queue_lock = threading.Lock()
+job_event = threading.Event()
+
+def safe_append_job(job):
+ with job_queue_lock:
+ job_queue.append(job)
+ job_event.set()
+
+def safe_pop_job():
+ with job_queue_lock:
+ if job_queue:
+ return job_queue.popleft() # or pop(0) for list
+ return None
+
+def job_worker():
+ while True:
+ job_event.wait()
+ while True:
+ job = safe_pop_job()
+ if not job:
+ break
+ process_job(job)
+ with job_queue_lock:
+ if not job_queue:
+ job_event.clear()
+
+def on_new_task(job):
+ safe_append_job(job)
+
+# Start the worker thread
+threading.Thread(target=job_worker, daemon=True).start()
+```
+
+This code demonstrates the core pattern: jobs are safely enqueued and processed in the background as they arrive, with proper locking and event signaling.
+
+## Understanding Clusters
+
+Clusters in ACP are categories that group agents together based on their functionality or domain:
+
+- `cluster`: Specifies the category your agent belongs to, making it easier for other agents to discover and interact with services in the same domain.
+- [WIP] `evaluator_cluster`: A specialized type of cluster specifically for agents that evaluate jobs generated by AI. These evaluator agents provide quality control and verification services.
+
+Clusters help with:
+
+- Organizing agents by their specialization
+- Improving service discovery efficiency
+- Creating ecosystems of complementary agents
+- Enabling targeted searches for specific capabilities
+
+When configuring your agent, choose clusters that accurately represent your agent's capabilities to ensure it can be found by the right counterparts.
+
+## Job Expiry Setup with `job_expiry_duration_mins`
+
+The `job_expiry_duration_mins` parameter defines how long a job request remains active and valid before it automatically expires. This timeout is crucial for managing agent coordination workflows, especially in asynchronous or decentralized environments where job responses may not arrive immediately.
+
+### Why It Matters
+
+Setting an expiry time ensures that:
+- Stale or unresponsive job requests do not hang indefinitely
+- The system can safely discard or retry expired jobs
+
+### How It Works
+Internally, `job_expiry_duration_mins` is used to compute a future timestamp (expired_at) relative to the current time:
+```bash
+expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.job_expiry_duration_mins)
+```
+
+### Example: Plugin Setup with Job Expiry
+```python
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate,
+ on_new_task=on_new_task,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ cluster="hedgefund",
+ job_expiry_duration_mins=10 # Job will expire 10 minutes after creation
+ )
+)
+```
+
+In this example:
+- Any job created through this plugin instance will be automatically marked as expired after 10 minutes, unless a response is received.
+- You can adjust this value (e.g., to 20 or 30) based on how responsive your agent network is.
+
+---
+
+## Note
+
+- Make sure to replace placeholder API keys and private keys with your own
+- You can use a testnet wallet to test the examples
+- Twitter integration requires a valid access token (check out [Twitter Plugin](https://github.com/game-by-virtuals/game-python/tree/main/plugins/twitter/) for more instructions)
+
+---
+## Useful Resources
+
+1. [ACP Builder’s Guide](https://whitepaper.virtuals.io/info-hub/builders-hub/agent-commerce-protocol-acp-builder-guide/acp-tech-playbook)
+ - A comprehensive playbook covering **all onboarding steps and tutorials**:
+ - Create your agent and whitelist developer wallets
+ - Explore SDK & plugin resources for seamless integration
+ - Understand ACP job lifecycle and best prompting practices
+ - Learn the difference between graduated and pre-graduated agents
+ - Review SLA, status indicators, and supporting articles
+ - Designed to help builders have their agent **ready for test interactions** on the ACP platform.
+
+
+2. [Agent Commerce Protocol (ACP) research page](https://app.virtuals.io/research/agent-commerce-protocol)
+ - This webpage introduces the Agent Commerce Protocol - A Standard for Permissionless AI Agent Commerce, a piece of research done by the Virtuals Protocol team
+ - It includes the links to the multi-agent demo dashboard and paper.
+
+
+3. [ACP Plugin FAQs](https://virtualsprotocol.notion.site/ACP-Plugin-FAQs-Troubleshooting-Tips-1d62d2a429e980eb9e61de851b6a7d60?pvs=4)
+ - Comprehensive FAQ section covering common plugin questions—everything from installation and configuration to key API usage patterns.
+ - Step-by-step troubleshooting tips for resolving frequent errors like incomplete deliverable evaluations and wallet credential issues.
\ No newline at end of file
diff --git a/plugins/acp/examples/reactive/buyer.py b/plugins/acp/examples/reactive/buyer.py
new file mode 100644
index 00000000..de2f5c9d
--- /dev/null
+++ b/plugins/acp/examples/reactive/buyer.py
@@ -0,0 +1,274 @@
+import threading
+
+from typing import Tuple
+from game_sdk.game.agent import Agent, WorkerConfig
+from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
+
+from acp_plugin_gamesdk.interface import AcpState, to_serializable_dict
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+from acp_plugin_gamesdk.env import PluginEnvSettings
+from virtuals_acp.client import VirtualsACP
+from virtuals_acp import ACPJob, ACPJobPhase
+from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus
+from rich import print, box
+from rich.panel import Panel
+from dotenv import load_dotenv
+
+# GAME Twitter Plugin import
+from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+# Native Twitter Plugin import
+# from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+load_dotenv(override=True)
+
+env = PluginEnvSettings()
+
+def on_evaluate(job: ACPJob):
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.COMPLETED:
+ print(f"Evaluating deliverable for job {job.id}")
+ # Auto-accept all deliverables for this example
+ job.evaluate(True)
+ break
+
+# GAME Twitter Plugin options
+options = {
+ "id": "twitter_plugin",
+ "name": "Twitter Plugin",
+ "description": "Twitter Plugin for tweet-related functions.",
+ "credentials": {
+ "game_twitter_access_token": env.BUYER_AGENT_GAME_TWITTER_ACCESS_TOKEN
+ },
+}
+
+# Native Twitter Plugin options
+# options = {
+# "id": "twitter_plugin",
+# "name": "Twitter Plugin",
+# "description": "Twitter Plugin for tweet-related functions.",
+# "credentials": {
+# "bearerToken": env.BUYER_AGENT_TWITTER_BEARER_TOKEN,
+# "apiKey": env.BUYER_AGENT_TWITTER_API_KEY,
+# "apiSecretKey": env.BUYER_AGENT_TWITTER_API_SECRET_KEY,
+# "accessToken": env.BUYER_AGENT_TWITTER_ACCESS_TOKEN,
+# "accessTokenSecret": env.BUYER_AGENT_TWITTER_ACCESS_TOKEN_SECRET,
+# },
+# }
+
+def buyer(use_thread_lock: bool = True):
+ if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
+ return
+
+ if env.BUYER_ENTITY_ID is None:
+ return
+
+ # Thread-safe job queue setup
+ job_queue = []
+ job_queue_lock = threading.Lock()
+ job_event = threading.Event()
+
+ # Thread-safe append with optional lock
+ def safe_append_job(job):
+ if use_thread_lock:
+ print(f"[safe_append_job] Acquiring lock to append job {job.id}")
+ with job_queue_lock:
+ print(f"[safe_append_job] Lock acquired, appending job {job.id} to queue")
+ job_queue.append(job)
+ else:
+ job_queue.append(job)
+
+ # Thread-safe pop with optional lock
+ def safe_pop_job():
+ if use_thread_lock:
+ print(f"[safe_pop_job] Acquiring lock to pop job")
+ with job_queue_lock:
+ if job_queue:
+ job = job_queue.pop(0)
+ print(f"[safe_pop_job] Lock acquired, popped job {job.id}")
+ return job
+ else:
+ print("[safe_pop_job] Queue is empty after acquiring lock")
+ else:
+ if job_queue:
+ job = job_queue.pop(0)
+ print(f"[safe_pop_job] Popped job {job.id} without lock")
+ return job
+ else:
+ print("[safe_pop_job] Queue is empty (no lock)")
+ return None
+
+ # Background thread worker: process jobs one by one
+ def job_worker():
+ while True:
+ job_event.wait() # Wait for job
+
+ # Process all available jobs
+ while True:
+ job = safe_pop_job()
+ if not job:
+ break
+ try:
+ process_job(job)
+ except Exception as e:
+ print(f"❌ Error processing job: {e}")
+ # Continue processing other jobs even if one fails
+
+ # Clear event only after ensuring no jobs remain
+ if use_thread_lock:
+ with job_queue_lock:
+ if not job_queue:
+ job_event.clear()
+ else:
+ if not job_queue:
+ job_event.clear()
+
+ # Event-triggered job task receiver
+ def on_new_task(job: ACPJob):
+ print(f"[on_new_task] Received job {job.id} (phase: {job.phase})")
+ safe_append_job(job)
+ job_event.set()
+
+ def process_job(job: ACPJob):
+ out = ""
+ print(job.phase, "job.phase")
+ if job.phase == ACPJobPhase.NEGOTIATION:
+ for memo in job.memos:
+ print(memo.next_phase, "memo.next_phase")
+ if memo.next_phase == ACPJobPhase.TRANSACTION:
+ out += f"Buyer agent is reacting to job:\n{job}\n\n"
+ buyer_agent.get_worker("acp_worker").run(
+ f"Respond to the following transaction: {job}",
+ )
+ out += "Buyer agent has responded to the job\n"
+
+ print(Panel(out, title="🔁 Reaction", box=box.ROUNDED, title_align="left", border_style="red"))
+
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
+ on_evaluate=on_evaluate,
+ on_new_task=on_new_task,
+ entity_id=env.BUYER_ENTITY_ID
+ ),
+ twitter_plugin=TwitterPlugin(options),
+ cluster="", #example cluster
+ graduation_status=ACPGraduationStatus.ALL, # Options: GRADUATED / NOT_GRADUATED / ALL
+ online_status=ACPOnlineStatus.ALL # Options: ONLINE / OFFLINE / ALL
+ )
+ )
+
+ def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ state_dict = to_serializable_dict(state)
+ return state_dict
+
+ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str, dict]:
+ return FunctionResultStatus.DONE, "Tweet has been posted", {}
+ # if acp_plugin.twitter_plugin is not None:
+ # post_tweet_fn = acp_plugin.twitter_plugin.get_function('post_tweet')
+ # post_tweet_fn(content)
+ # return FunctionResultStatus.DONE, "Tweet has been posted", {}
+
+ # return FunctionResultStatus.FAILED, "Twitter plugin is not initialized", {}
+
+ core_worker = WorkerConfig(
+ id="core-worker",
+ worker_description="This worker is to post tweet",
+ action_space=[
+ Function(
+ fn_name="post_tweet",
+ fn_description="This function is to post tweet",
+ args=[
+ Argument(
+ name="content",
+ type="string",
+ description="The content of the tweet"
+ ),
+ Argument(
+ name="reasoning",
+ type="string",
+ description="The reasoning of the tweet"
+ )
+ ],
+ executable=post_tweet
+ )
+ ],
+ get_state_fn=get_agent_state
+ )
+
+ acp_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.search_agents_functions,
+ acp_plugin.initiate_job
+ ]
+ }
+ )
+
+ agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Virtuals",
+ agent_goal="Finding agent to do tweet posting",
+ agent_description=f"""
+ Agent that gain market traction by posting meme. Your interest are in cats and AI.
+ You can head to acp to look for agents to help you generating meme.
+ Do not look for a relevant validator to validate the deliverable.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[core_worker, acp_worker],
+ get_agent_state_fn=get_agent_state
+ )
+
+ # Buyer agent is meant to handle payments
+ buyer_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.pay_job
+ ]
+ }
+ )
+
+ buyer_agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Buyer",
+ agent_goal="Perform and complete transaction with seller",
+ agent_description=f"""
+ Agent that gain market traction by posting meme. Your interest are in cats and AI.
+ You can head to acp to look for agents to help you generating meme.
+ Do not look for a relevant validator to validate the deliverable.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[buyer_worker],
+ get_agent_state_fn=get_agent_state
+ )
+
+ buyer_agent.compile()
+ agent.compile()
+
+ # Start background job thread
+ threading.Thread(target=job_worker, daemon=True).start()
+
+ while True:
+ print("🟢"*40)
+ init_state = AcpState.model_validate(agent.agent_state)
+ print(Panel(f"{init_state}", title="Agent State", box=box.ROUNDED, title_align="left"))
+
+ print("[agent.step] Attempting to acquire lock for agent.step()")
+ with job_queue_lock:
+ print("[agent.step] Lock acquired, executing agent.step()")
+ agent.step()
+ print("[agent.step] Released lock after agent.step()")
+
+ end_state = AcpState.model_validate(agent.agent_state)
+ print(Panel(f"{end_state}", title="End Agent State", box=box.ROUNDED, title_align="left"))
+ print("🔴"*40)
+ input("\nPress any key to continue...\n")
+
+if __name__ == "__main__":
+ buyer(use_thread_lock=True)
diff --git a/plugins/acp/examples/reactive/seller.py b/plugins/acp/examples/reactive/seller.py
new file mode 100644
index 00000000..8de50a4e
--- /dev/null
+++ b/plugins/acp/examples/reactive/seller.py
@@ -0,0 +1,270 @@
+import threading
+
+from typing import Tuple
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+from acp_plugin_gamesdk.interface import AcpState, IInventory, to_serializable_dict
+from acp_plugin_gamesdk.env import PluginEnvSettings
+from virtuals_acp.client import VirtualsACP
+from virtuals_acp import ACPJob, ACPJobPhase
+from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
+from game_sdk.game.agent import Agent
+from collections import deque
+from rich import print, box
+from rich.panel import Panel
+from dotenv import load_dotenv
+
+# GAME Twitter Plugin import
+from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+# Native Twitter Plugin import
+# from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
+
+load_dotenv(override=True)
+
+env = PluginEnvSettings()
+
+
+# GAME Twitter Plugin options
+options = {
+ "id": "twitter_plugin",
+ "name": "Twitter Plugin",
+ "description": "Twitter Plugin for tweet-related functions.",
+ "credentials": {
+ "game_twitter_access_token": env.SELLER_AGENT_GAME_TWITTER_ACCESS_TOKEN
+ },
+}
+
+# Native Twitter Plugin options
+# options = {
+# "id": "twitter_plugin",
+# "name": "Twitter Plugin",
+# "description": "Twitter Plugin for tweet-related functions.",
+# "credentials": {
+# "bearerToken": env.SELLER_AGENT_TWITTER_BEARER_TOKEN,
+# "apiKey": env.SELLER_AGENT_TWITTER_API_KEY,
+# "apiSecretKey": env.SELLER_AGENT_TWITTER_API_SECRET_KEY,
+# "accessToken": env.SELLER_AGENT_TWITTER_ACCESS_TOKEN,
+# "accessTokenSecret": env.SELLER_AGENT_TWITTER_ACCESS_TOKEN_SECRET,
+# },
+# }
+
+def seller(use_thread_lock: bool = True):
+ if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
+ return
+
+ if env.SELLER_ENTITY_ID is None:
+ return
+
+ # Thread-safe job queue setup
+ job_queue = deque()
+ job_queue_lock = threading.Lock()
+ job_event = threading.Event()
+
+ # Thread-safe append wrapper
+ def safe_append_job(job):
+ if use_thread_lock:
+ print("[append] Attempting to acquire job_queue_lock")
+ with job_queue_lock:
+ print("[append] Lock acquired. Appending job to queue:", job.id)
+ job_queue.append(job)
+ print(f"[append] Queue size is now {len(job_queue)}")
+ else:
+ job_queue.append(job)
+ print(f"[append] Appended job (no lock). Queue size is now {len(job_queue)}")
+
+ # Thread-safe pop wrapper
+ def safe_pop_job():
+ if use_thread_lock:
+ print("[pop] Attempting to acquire job_queue_lock")
+ with job_queue_lock:
+ print("[pop] Lock acquired.")
+ if job_queue:
+ job = job_queue.popleft()
+ print(f"[pop] Job popped: {job.id}")
+ return job
+ else:
+ print("[pop] Queue is empty.")
+ else:
+ if job_queue:
+ job = job_queue.popleft()
+ print(f"[pop] Job popped (no lock): {job.id}")
+ return job
+ else:
+ print("[pop] Queue is empty (no lock).")
+ return None
+
+ # Background thread worker: process jobs one by one
+ def job_worker():
+ while True:
+ job_event.wait()
+
+ # Process all available jobs
+ while True:
+ job = safe_pop_job()
+ if not job:
+ break
+ try:
+ process_job(job)
+ except Exception as e:
+ print(f"❌ Error processing job: {e}")
+ # Continue processing other jobs even if one fails
+
+ # Clear event only after ensuring no jobs remain
+ if use_thread_lock:
+ with job_queue_lock:
+ if not job_queue:
+ job_event.clear()
+ else:
+ if not job_queue:
+ job_event.clear()
+
+ # Event-triggered job task receiver
+ def on_new_task(job: ACPJob):
+ print(f"[on_new_task] New job received: {job.id}")
+ safe_append_job(job)
+ job_event.set()
+ print("[on_new_task] job_event set.")
+
+ def process_job(job: ACPJob):
+ out = ""
+ out += f"Reacting to job:\n{job}\n\n"
+ prompt = ""
+
+ if job.phase == ACPJobPhase.REQUEST:
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.NEGOTIATION:
+ prompt = f"""
+ Respond to the following transaction:
+ {job}
+
+ decide whether you should accept the job or not.
+ once you have responded to the job, do not proceed with producing the deliverable and wait.
+ """
+ elif job.phase == ACPJobPhase.TRANSACTION:
+ for memo in job.memos:
+ if memo.next_phase == ACPJobPhase.EVALUATION:
+ prompt = f"""
+ Respond to the following transaction:
+ {job}
+
+ you should produce the deliverable and deliver it to the buyer.
+
+ If no deliverable is provided, you should produce the deliverable and deliver it to the buyer.
+ """
+ else:
+ out += "No need to react to the phase change\n\n"
+
+ if prompt:
+ agent.get_worker("acp_worker").run(prompt)
+ out += f"Running task:\n{prompt}\n\n"
+ out += "✅ Seller has responded to job.\n"
+
+ print(Panel(out, title="🔁 Reaction", box=box.ROUNDED, title_align="left", border_style="red"))
+
+ acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.SELLER_AGENT_WALLET_ADDRESS,
+ on_new_task=on_new_task,
+ entity_id=env.SELLER_ENTITY_ID
+ ),
+ # GAME Twitter Plugin
+ twitter_plugin=TwitterPlugin(options),
+ )
+ )
+
+ def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ state_dict = to_serializable_dict(state)
+ return state_dict
+
+ def generate_meme(description: str, job_id: int, reasoning: str) -> Tuple[FunctionResultStatus, str, dict]:
+ if not job_id or job_id == 'None':
+ return FunctionResultStatus.FAILED, f"job_id is invalid. Should only respond to active as a seller job.", {}
+
+ state = acp_plugin.get_acp_state()
+
+ job = next(
+ (j for j in state.get('jobs',{}).get('active',{}).get('as_a_seller',[]) if j.get('job_id') == job_id),
+ None
+ )
+
+ if not job:
+ return FunctionResultStatus.FAILED, f"Job {job_id} is invalid. Should only respond to active as a seller job.", {}
+
+ url = "https://example.com/meme"
+
+ meme = IInventory(
+ type="url",
+ value=url,
+ client_name=job.get("client_name"),
+ provider_name=job.get("provider_name"),
+ )
+
+ acp_plugin.add_produce_item(meme)
+
+ return FunctionResultStatus.DONE, f"Meme generated with the URL: {url}", {}
+
+ generate_meme_function = Function(
+ fn_name="generate_meme",
+ fn_description="A function to generate meme",
+ args=[
+ Argument(
+ name="description",
+ type="str",
+ description="A description of the meme generated"
+ ),
+ Argument(
+ name="job_id",
+ type="integer",
+ description="Job that your are responding to."
+ ),
+ Argument(
+ name="reasoning",
+ type="str",
+ description="The reasoning of the tweet"
+ )
+ ],
+ executable=generate_meme
+ )
+
+ acp_worker = acp_plugin.get_worker(
+ {
+ "functions": [
+ acp_plugin.respond_job,
+ acp_plugin.deliver_job,
+ generate_meme_function
+ ]
+ }
+ )
+
+ agent = Agent(
+ api_key=env.GAME_API_KEY,
+ name="Memx",
+ agent_goal="To provide meme generation as a service. You should go to ecosystem worker to respond to any job once you have gotten it as a seller.",
+ agent_description=f"""
+ You are Memx, a meme generator. Meme generation is your life. You always give buyer the best meme.
+
+ {acp_plugin.agent_description}
+ """,
+ workers=[acp_worker],
+ get_agent_state_fn=get_agent_state
+ )
+
+ agent.compile()
+
+ print("🟢"*40)
+ init_state = AcpState.model_validate(agent.agent_state)
+ print(Panel(f"{init_state}", title="Agent State", box=box.ROUNDED, title_align="left"))
+ print("🔴"*40)
+
+ # Start background thread
+ threading.Thread(target=job_worker, daemon=True).start()
+ print("\nListening...\n")
+ threading.Event().wait()
+
+
+if __name__ == "__main__":
+ seller(use_thread_lock=True)
diff --git a/plugins/acp/plugin_metadata.yml b/plugins/acp/plugin_metadata.yml
new file mode 100644
index 00000000..20a2216e
--- /dev/null
+++ b/plugins/acp/plugin_metadata.yml
@@ -0,0 +1,14 @@
+# General Information
+plugin_name: "acp_plugin_gamesdk"
+author: "Steven Lee Soon Fatt"
+logo_url: ""
+release_date: "2025-03"
+
+# Description
+short_description: "ACP Plugin for Python SDK for GAME by Virtuals"
+detailed_description: "This plugin provides an abstraction over Agent Commerce Protocol (ACP) capabilities for the GAME SDK. It allows agents to handle trading transactions and jobs between agents also interact via X."
+
+# Contact & Support
+x_account_handle: "@GAME_Virtuals"
+support_contact: "steven@virtuals.io"
+community_link: "https://t.me/virtuals"
diff --git a/plugins/acp/pyproject.toml b/plugins/acp/pyproject.toml
new file mode 100644
index 00000000..9c7e7e53
--- /dev/null
+++ b/plugins/acp/pyproject.toml
@@ -0,0 +1,19 @@
+[tool.poetry]
+name = "acp-plugin-gamesdk"
+version = "0.2.7"
+description = "ACP Plugin for Python SDK for GAME by Virtuals"
+authors = ["Steven Lee Soon Fatt "]
+readme = "README.md"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.13"
+twitter-plugin-gamesdk = "^0.2.10"
+game-sdk = ">=0.1.5"
+python-dotenv = "^1.1.0"
+dacite = "^1.9.2"
+rich = ">=13.9.4,<15.0.0"
+virtuals-acp = "^0.1.22"
+
+[build-system]
+requires = ["poetry-core"]
+build-backend = "poetry.core.masonry.api"
diff --git a/plugins/acp/tools/reduce_agent_state.py b/plugins/acp/tools/reduce_agent_state.py
new file mode 100644
index 00000000..d7ed021e
--- /dev/null
+++ b/plugins/acp/tools/reduce_agent_state.py
@@ -0,0 +1,157 @@
+from copy import deepcopy
+from pprint import pprint
+from dotenv import load_dotenv
+from typing import List, Dict, Any
+
+from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions
+from acp_plugin_gamesdk.interface import to_serializable_dict
+from acp_plugin_gamesdk.env import PluginEnvSettings
+from virtuals_acp.client import VirtualsACP
+from virtuals_acp.configs import BASE_MAINNET_CONFIG
+
+load_dotenv(override=True)
+env = PluginEnvSettings()
+
+acp_plugin = AcpPlugin(
+ options=AcpPluginOptions(
+ api_key=env.GAME_DEV_API_KEY,
+ acp_client=VirtualsACP(
+ wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
+ agent_wallet_address=env.SELLER_AGENT_WALLET_ADDRESS,
+ config=BASE_MAINNET_CONFIG,
+ entity_id=env.SELLER_ENTITY_ID
+ )
+ )
+)
+
+def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ state_dict = to_serializable_dict(state)
+ return state_dict
+
+def delete_old_items(items: list, keep: int, label: str) -> list:
+ if len(items) <= keep:
+ return items
+ sorted_items = sorted(items, key=lambda x: x.get("job_id", 0), reverse=True)
+ deleted_count = len(items) - keep
+ print(f"Deleted {deleted_count} {label}, keeping {keep} most recent")
+ return sorted_items[:keep]
+
+def delete_completed_jobs(state: Dict, keep_most_recent: int = 5) -> Dict:
+ filtered_state = deepcopy(state)
+ filtered_state["jobs"]["completed"] = delete_old_items(
+ filtered_state["jobs"]["completed"], keep_most_recent, "completed jobs"
+ )
+ return filtered_state
+
+
+def delete_cancelled_jobs(state: Dict, keep_most_recent: int = 5) -> Dict:
+ filtered_state = deepcopy(state)
+ filtered_state["jobs"]["cancelled"] = delete_old_items(
+ filtered_state["jobs"]["cancelled"], keep_most_recent, "cancelled jobs"
+ )
+ return filtered_state
+
+def delete_old_jobs(state: Dict, keep_completed: int = 5, keep_cancelled: int = 5) -> Dict:
+ state = delete_completed_jobs(state, keep_completed)
+ return delete_cancelled_jobs(state, keep_cancelled)
+
+def delete_acquired_inventory(state: Dict, keep_most_recent: int = 5) -> Dict:
+ filtered_state = deepcopy(state)
+ filtered_state["inventory"]["acquired"] = delete_old_items(
+ filtered_state["inventory"]["acquired"], keep_most_recent, "acquired inventory"
+ )
+ return filtered_state
+
+def delete_produced_inventory(state: Dict, keep_most_recent: int = 5) -> Dict:
+ filtered_state = deepcopy(state)
+ filtered_state["inventory"]["produced"] = delete_old_items(
+ filtered_state["inventory"]["produced"], keep_most_recent, "produced inventory"
+ )
+ return filtered_state
+
+def delete_old_inventory(state: Dict, keep_acquired: int = 5, keep_produced: int = 5) -> Dict:
+ state = delete_acquired_inventory(state, keep_acquired)
+ return delete_produced_inventory(state, keep_produced)
+
+def filter_out_job_ids(state: Dict, job_ids_to_ignore: List[int]) -> Dict:
+ """
+ Filters out jobs with specific job IDs from active job lists.
+
+ Args:
+ state (Dict): The agent state dictionary.
+ job_ids_to_ignore (List[int]): List of job IDs to exclude.
+
+ Returns:
+ Dict: A new state dictionary with specified job IDs removed.
+ """
+ if not job_ids_to_ignore:
+ return state
+
+ filtered_state = state.copy()
+ jobs = filtered_state.get("jobs", {})
+ active = jobs.get("active", {})
+
+ if "as_a_buyer" in active:
+ active["as_a_buyer"] = [
+ job for job in active["as_a_buyer"]
+ if job.get("job_id") not in job_ids_to_ignore
+ ]
+
+ if "as_a_seller" in active:
+ active["as_a_seller"] = [
+ job for job in active["as_a_seller"]
+ if job.get("job_id") not in job_ids_to_ignore
+ ]
+
+ filtered_state["jobs"]["active"] = active
+ return filtered_state
+
+def reduce_agent_state(
+ state: Dict,
+ keep_completed_jobs: int = 5,
+ keep_cancelled_jobs: int = 5,
+ keep_acquired_inventory: int = 5,
+ keep_produced_inventory: int = 5,
+ job_ids_to_ignore: List[int] = [],
+ agent_addresses_to_ignore: List[str] = [],
+) -> Dict:
+ # Step 1: Filter specific job IDs
+ if job_ids_to_ignore:
+ state = filter_out_job_ids(state, job_ids_to_ignore)
+
+ # Step 2: Filter jobs from any ignored agent address
+ if agent_addresses_to_ignore:
+ active_jobs = state.get("jobs", {}).get("active", {})
+ all_active = active_jobs.get("as_a_buyer", []) + active_jobs.get("as_a_seller", [])
+ matching_ids = [
+ job["job_id"]
+ for job in all_active
+ if job.get("provider_address", "") in agent_addresses_to_ignore
+ ]
+ if matching_ids:
+ print(f"Removing {len(matching_ids)} active jobs from ignored agents: {', '.join(map(str, matching_ids))}")
+ state = filter_out_job_ids(state, matching_ids)
+
+ # Step 3: Clean up historical data
+ state = delete_old_jobs(state, keep_completed_jobs, keep_cancelled_jobs)
+ state = delete_old_inventory(state, keep_acquired_inventory, keep_produced_inventory)
+ return state
+
+if __name__ == "__main__":
+ # Test example
+ def get_agent_state(_: None, _e: None) -> dict:
+ state = acp_plugin.get_acp_state()
+ state_dict = to_serializable_dict(state)
+ return reduce_agent_state(
+ state_dict,
+ keep_completed_jobs=1,
+ keep_cancelled_jobs=1,
+ keep_acquired_inventory=1,
+ keep_produced_inventory=1,
+ job_ids_to_ignore=[6294, 6293, 6269],
+ agent_addresses_to_ignore=["0x408AE36F884Ef37aAFBA7C55aE1c9BB9c2753995"],
+ )
+ reduced_agent_state = get_agent_state(None, None)
+ print("\n🧹 Cleaned State:")
+ pprint(reduced_agent_state)
diff --git a/plugins/twitter/twitter_plugin_gamesdk/twitter_plugin.py b/plugins/twitter/twitter_plugin_gamesdk/twitter_plugin.py
index dd9ee357..94ca3619 100644
--- a/plugins/twitter/twitter_plugin_gamesdk/twitter_plugin.py
+++ b/plugins/twitter/twitter_plugin_gamesdk/twitter_plugin.py
@@ -87,7 +87,7 @@ def __init__(self, options: Dict[str, Any]) -> None:
# Configure logging
logging.basicConfig(level=logging.INFO)
self.logger: logging.Logger = logging.getLogger(__name__)
-
+
self._check_authentication()
def _check_authentication(self) -> None:
diff --git a/pyproject.toml b/pyproject.toml
index 8bdd2fb8..f793293e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -26,7 +26,8 @@ classifiers = [
dependencies = [
"typing-extensions>=4.0.0",
"requests>=2.26.0",
- "pydantic>=2.10.5"
+ "pydantic>=2.10.5",
+ "rich (>=14.0.0,<15.0.0)"
]
[project.urls]
diff --git a/src/game_sdk/game/agent.py b/src/game_sdk/game/agent.py
index cfd96a3c..fffb56ff 100644
--- a/src/game_sdk/game/agent.py
+++ b/src/game_sdk/game/agent.py
@@ -5,6 +5,9 @@
from game_sdk.game.api import GAMEClient
from game_sdk.game.api_v2 import GAMEClientV2
+from rich import print, box
+from rich.panel import Panel
+
class Session:
"""
Manages a unique session for agent interactions.
@@ -76,6 +79,15 @@ def __init__(self,
f.get_function_def()["fn_name"]: f for f in action_space
}
+ def __str__(self) -> str:
+ output = (
+ f"- Worker ID: {self.id}\n"
+ f"- Description: {self.worker_description}\n"
+ f"- Instruction: {self.instruction}\n"
+ f"- Action Space: {self.action_space}\n"
+ )
+ return output
+
class Agent:
"""
@@ -229,7 +241,7 @@ def _get_action(
function_result.model_dump(
exclude={'info'}) if function_result else None
),
- "observations": self.observation,
+ #"observations": self.observation,
"version": "v2",
}
@@ -239,47 +251,46 @@ def _get_action(
data=data,
model_name=self._model_name
)
+
+ # print(f"123 Response: {response}")
return ActionResponse.model_validate(response)
def step(self):
-
# get next task/action from GAME API
action_response = self._get_action(self._session.function_result)
action_type = action_response.action_type
-
- print("#" * 50)
- print("STEP")
- print(f"Current Task: {action_response.agent_state.current_task}")
- print(f"Action response: {action_response}")
- print(f"Action type: {action_type}")
+ print(Panel(f"{action_response}", title="👟 Agent Step", box=box.ROUNDED, title_align="left"))
# if new task is updated/generated
if (
action_response.agent_state.hlp
and action_response.agent_state.hlp.change_indicator
):
- print("New task generated")
- print(f"Task: {action_response.agent_state.current_task}")
+ print(Panel(f"{action_response.agent_state.current_task}", title="New Task Generated", box=box.ROUNDED, title_align="left"))
# execute action
+ out = ""
if action_type in [
ActionType.CALL_FUNCTION,
ActionType.CONTINUE_FUNCTION,
]:
- print(f"Action Selected: {action_response.action_args['fn_name']}")
- print(f"Action Args: {action_response.action_args['args']}")
+
if not action_response.action_args:
raise ValueError("No function information provided by GAME")
-
- self._session.function_result = (
- self.workers[self.current_worker_id]
- .action_space[action_response.action_args["fn_name"]]
- .execute(**action_response.action_args)
- )
-
- print(f"Function result: {self._session.function_result}")
+
+ # Get the worker and function
+ worker = self.workers[self.current_worker_id]
+ function_name = action_response.action_args["fn_name"]
+ function = worker.action_space[function_name]
+ out += (f"👷 Worker: {worker.id}\n")
+ out += (f"🔧 Function Name: {function_name}\n")
+ out += (f"📋 Function Description: {function.fn_description}\n")
+ out += (f"🔠 Function Arguments: {action_response.action_args.get('args', {})}\n")
+
+ self._session.function_result = function.execute(**action_response.action_args)
+ out += (f"🏭 Function Results:\n{self._session.function_result}\n")
# update worker states
updated_worker_state = self.workers[self.current_worker_id].get_state_fn(
@@ -289,7 +300,8 @@ def step(self):
update_observation = "worker"
elif action_response.action_type == ActionType.WAIT:
- print("Task ended completed or ended (not possible with current actions)")
+ out += ("🔄 Waiting...")
+ out += ("Task ended completed or ended (not possible with current actions)")
update_observation = "task"
elif action_response.action_type == ActionType.GO_TO:
@@ -297,17 +309,21 @@ def step(self):
raise ValueError("No location information provided by GAME")
next_worker = action_response.action_args["location_id"]
- print(f"Next worker selected: {next_worker}")
+ out += (f"🚶 Going to... {next_worker}")
+ # print_output += (f"Next worker selected: {next_worker}")
self.current_worker_id = next_worker
update_observation = "worker"
else:
+ out += (f"🚫 Unknown action type: {action_response.action_type}")
raise ValueError(
f"Unknown action type: {action_response.action_type}")
+
+ print(Panel(f"{out}", title=f"Action Type: {action_type.value}", box=box.ROUNDED, title_align="left"))
+
# update agent state
- self.agent_state = self.get_agent_state_fn(
- self._session.function_result, self.agent_state)
+ self.agent_state = self.get_agent_state_fn(self._session.function_result, self.agent_state)
# update observation (saved state) - no interruptions (is_global should always be False)
if update_observation == "task":
diff --git a/src/game_sdk/game/custom_types.py b/src/game_sdk/game/custom_types.py
index bfa1bea9..644c8959 100644
--- a/src/game_sdk/game/custom_types.py
+++ b/src/game_sdk/game/custom_types.py
@@ -1,3 +1,4 @@
+import json
from typing import Any, Dict, Optional, List, Union, Sequence, Callable, Tuple
from pydantic import BaseModel, Field
from enum import Enum
@@ -46,6 +47,16 @@ class FunctionResult(BaseModel):
feedback_message: Optional[str] = None
info: Optional[Dict[str, Any]] = None
+ def __str__(self) -> str:
+ output = (
+ f"➡️ Function Result:\n"
+ f"- Action ID: {self.action_id}\n"
+ f"- Action Status: {self.action_status.value}\n"
+ f"- Feedback Message: {self.feedback_message}\n"
+ f"- Info: {self.info}\n"
+ )
+ return output
+
class Function(BaseModel):
"""
Defines a callable function within the GAME SDK.
@@ -106,7 +117,8 @@ def execute(self, **kwds: Any) -> FunctionResult:
"""
fn_id = kwds.get('fn_id')
args = kwds.get('args', {})
-
+ print(f"Function Args: {args}")
+ print(f"Function ID: {fn_id}")
try:
# Extract values from the nested dictionary structure
processed_args = {}
@@ -133,6 +145,17 @@ def execute(self, **kwds: Any) -> FunctionResult:
feedback_message=f"Error executing function: {str(e)}",
info={},
)
+
+ def __str__(self) -> str:
+ output = (
+ f"🔧 Function:\n"
+ f"- Name: {self.fn_name}\n"
+ f"- Description: {self.fn_description}\n"
+ f"- Args: {self.args}\n"
+ f"- Hint: {self.hint}\n"
+ )
+ return output
+
# Different ActionTypes returned by the GAME API
class ActionType(Enum):
@@ -174,6 +197,27 @@ class HLPResponse:
change_indicator: Optional[str] = None
log: Sequence[dict] = field(default_factory=list)
+ def __str__(self) -> str:
+ steps = ""
+ for index, step in enumerate(self.plan):
+ steps += f"#{index+1} {str(step)} \n"
+
+ logs_str = ""
+ for index, log_item in enumerate(self.log):
+ logs_str += f"#{index+1} {str(log_item)} \n"
+
+ output = (
+ f"🟢 HLP Response:\n"
+ f"- Plan ID: {self.plan_id}\n"
+ f"- Reflection on Observation:\n{self.observation_reflection}\n"
+ # f"- Steps in Plan:\n{steps}\n"
+ f"- Plan Reasoning:\n{self.plan_reasoning}\n"
+ # f"- Current State in Plan:\n{self.current_state_of_execution}\n"
+ # f"- Change Indicator: {self.change_indicator}\n"
+ # f"- Logs:\n{logs_str}\n"
+ )
+ return output
+
@dataclass(frozen=True)
class LLPResponse:
@@ -195,6 +239,90 @@ class LLPResponse:
change_indicator: Optional[str] = None
reflection: Optional[str] = None
+ def __str__(self) -> str:
+ steps = ""
+ for index, step in enumerate(self.plan):
+ steps += f"#{index+1} {str(step)} \n"
+
+ output = (
+ f"🟢 LLP Response:\n"
+ f"- Plan ID: {self.plan_id}\n"
+ f"- Plan Reasoning:\n{self.plan_reasoning}\n"
+ f"- Situation Analysis:\n{self.situation_analysis}\n"
+ f"- Steps in Plan:\n{steps}\n"
+ f"- Change Indicator: {self.change_indicator}\n"
+ f"- Reflections on Plan:\n{self.reflection}\n"
+ )
+ return output
+
+
+@dataclass(frozen=True)
+class ReasoningAction:
+ """
+ Represents a detailed reasoning step for a specific task action, including reflections,
+ reasoning for the current task, the rationale for the next step, and the function to execute.
+
+ Attributes:
+ id (str): Unique identifier for this reasoning action or step.
+ task_reflection (str): Reflection on the outcomes or status of the current or previous task.
+ task_reasoning (str): Explanation of the reasoning behind the current task or decision.
+ next_step_reaseaning (str): Justification for the next step or action to be taken.
+ fn_name (str): Name of the function that should be called to perform the next step.
+ """
+ id: str
+ task_reflection: str
+ task_reasoning: str
+ next_step_reasoning: str
+ fn_name: str
+
+ def __str__(self) -> str:
+ output = (
+ f"Reasoning Action {self.id}\n"
+ f"- Task Reflection: {self.task_reflection}\n"
+ f"- Task Reasoning: {self.task_reasoning}\n"
+ f"- Next Step Reasoning: {self.next_step_reasoning}\n"
+ f"- Function Name: {self.fn_name}\n"
+ )
+ return output
+
+@dataclass(frozen=True)
+class RecentReasoningResponse:
+ """
+ Represents the most recent reasoning response from the GAME API, detailing the current task,
+ the reasoning behind the plan, reflections on previous plans, and the next steps to take.
+
+ Attributes:
+ id (str): Unique identifier for the reasoning response or task.
+ plan_reflection (str): Reflection on the progress and context of the current plan.
+ plan_reasoning (str): Explanation of the rationale behind the current plan.
+ next_task_reasoning (str): Justification for the next task to be performed.
+ task (str): Description of the current task to be executed.
+ worker_id (str): Identifier of the worker or agent responsible for executing the task.
+ actions (List[str]): List of actions to be taken as part of the current task.
+ """
+ id: str
+ plan_reflection: str
+ plan_reasoning: str
+ next_task_reasoning: str
+ task: str
+ worker_id: str
+ actions: List[ReasoningAction]
+
+ def __str__(self) -> str:
+ curr_actions = ""
+ for index, action in enumerate(self.actions):
+ curr_actions += f"#{index+1} {str(action)} \n"
+
+ output = (
+ f"Recent Reasoning\n"
+ f"- {self.id}: {self.task}\n"
+ f"- Plan Reflection:\n{self.plan_reflection}\n"
+ f"- Plan Reasoning:\n{self.plan_reasoning}\n"
+ f"- Worker ID: {self.worker_id}\n"
+ # f"- Actions to take:\n{curr_actions}\n"
+ )
+ return output
+
@dataclass(frozen=True)
class CurrentTaskResponse:
@@ -207,11 +335,26 @@ class CurrentTaskResponse:
location_id (str): Location identifier (defaults to "*not provided*").
llp (Optional[LLPResponse]): Low-Level Plan response.
"""
+ task_id: str
task: str
task_reasoning: str
+ task_result: Optional[str]
location_id: str = field(default="*not provided*")
llp: Optional[LLPResponse] = None
+ def __str__(self) -> str:
+ llp_response = "- LLP Response: None\n" if not self.llp else self.llp
+
+ output = (
+ f"⏳ Current Task: {self.task}\n"
+ f"- Task ID: {self.task_id}\n"
+ f"- Task Reason:\n{self.task_reasoning}\n"
+ f"- Task Result:\n{self.task_result}\n"
+ f"- Location ID: {self.location_id}\n"
+ # f"{llp_response}\n"
+ )
+ return output
+
@dataclass(frozen=True)
class AgentStateResponse:
@@ -224,6 +367,20 @@ class AgentStateResponse:
"""
hlp: Optional[HLPResponse] = None
current_task: Optional[CurrentTaskResponse] = None
+ recent_reasoning: Optional[List[RecentReasoningResponse]] = None
+
+ def __str__(self) -> str:
+ recent_reasonings = ""
+ if self.recent_reasoning:
+ for index, reason in enumerate(self.recent_reasoning):
+ recent_reasonings += f"💭 #{index+1} {str(reason)}\n"
+
+ output = (
+ f"{self.hlp}\n"
+ f"{self.current_task}\n"
+ f"{recent_reasonings}\n"
+ )
+ return output
# ActionResponse format returned from GAME API call
class ActionResponse(BaseModel):
@@ -234,10 +391,26 @@ class ActionResponse(BaseModel):
action_type (ActionType): Type of action.
agent_state (AgentStateResponse): Agent state response.
action_args (Optional[Dict[str, Any]]): Additional action arguments.
+ reaction_info (Optional[str]): TODO: Get explanation from Steven Lee
+ agents (Optional[List[str]]): TODO: Get explanation from Steven Lee
"""
action_type: ActionType
agent_state: AgentStateResponse
action_args: Optional[Dict[str, Any]] = None
+ reaction_info: Optional[str] = None
+ agents: Optional[List[str]] = None
+
+ def __str__(self) -> str:
+ output = (
+ f"📋 Action Response".center(50, '=') + "\n" + \
+ f"# Action Type: {self.action_type.value}\n\n" + \
+ f"# Agent State:\n{self.agent_state}\n\n" + \
+ f"# Action Arguments:\n{json.dumps(self.action_args, indent=4)}\n\n" + \
+ f"# Reaction Info:\n{self.reaction_info}\n\n" + \
+ # f"# Agents:\n{self.agents}\n\n"
+ f"📋 Action Response End".center(50, '=') + "\n"
+ )
+ return output
class ChatActionRequest(BaseModel):