Overview
Build automated price monitoring systems to track competitor pricing, detect price changes, and optimize your pricing strategy.Real-Time Price Checker
Simple price check across multiple retailers.- JavaScript
- Python
- HTTPie
Copy
import { AGIClient } from 'agi';
const client = new AGIClient({ apiKey: 'your_api_key' });
const session = await client.createSession('agi-0-fast');
try {
const retailers = ['amazon.com', 'bestbuy.com', 'apple.com'];
const result = await session.runTask(`
Check current price for: iPhone 15 Pro 256GB
Retailers: ${retailers.join(', ')}
For each retailer, provide: Store name, Current price (numeric), In stock (yes/no), Product URL, Shipping cost (if shown).
Return as JSON array with retailer, price, in_stock, url, shipping.
`);
console.log(`Found ${result.length} prices:`);
result.forEach(p => {
const status = p.in_stock ? '✓ In Stock' : '✗ Out of Stock';
console.log(`${p.retailer}: $${p.price} - ${status}`);
});
} finally {
await session.delete();
}
Copy
from pyagi import AGIClient
client = AGIClient(api_key="your_api_key")
with client.session("agi-0-fast") as session:
retailers = ["amazon.com", "bestbuy.com", "apple.com"]
result = session.run_task(f"""
Check current price for: iPhone 15 Pro 256GB
Retailers: {', '.join(retailers)}
For each retailer, provide: Store name, Current price (numeric), In stock (yes/no), Product URL, Shipping cost (if shown).
Return as JSON array with retailer, price, in_stock, url, shipping.
""")
print(f"Found {len(result)} prices:")
for p in result:
status = "✓ In Stock" if p['in_stock'] else "✗ Out of Stock"
print(f"{p['retailer']}: ${p['price']} - {status}")
Copy
export AGI_API_KEY="your_api_key"
SESSION=$(http POST https://api.agi.tech/v1/sessions \
Authorization:"Bearer $AGI_API_KEY" \
agent_name=agi-0-fast | jq -r '.session_id')
http POST https://api.agi.tech/v1/sessions/$SESSION/message \
Authorization:"Bearer $AGI_API_KEY" \
message="Check current price for: iPhone 15 Pro 256GB. Retailers: amazon.com, bestbuy.com, apple.com. For each retailer provide: Store name, Current price (numeric), In stock (yes/no), Product URL, Shipping cost. Return as JSON array."
while true; do
STATUS=$(http GET https://api.agi.tech/v1/sessions/$SESSION/status \
Authorization:"Bearer $AGI_API_KEY" | jq -r '.status')
if [ "$STATUS" = "finished" ]; then
http GET https://api.agi.tech/v1/sessions/$SESSION/messages \
Authorization:"Bearer $AGI_API_KEY" | jq '.messages[] | select(.type=="DONE")'
break
fi
sleep 2
done
http DELETE https://api.agi.tech/v1/sessions/$SESSION \
Authorization:"Bearer $AGI_API_KEY"
Automated Price Drop Alerts
Monitor products and send alerts when price drops below your target.- JavaScript
- Python
- HTTPie
Copy
import { AGIClient } from 'agi';
const client = new AGIClient({ apiKey: 'your_api_key' });
async function checkPrice(productName, url) {
const session = await client.createSession('agi-0-fast');
try {
const result = await session.runTask(`
Get the current price from: ${url}
Product: ${productName}
Return ONLY the numeric price value (no currency symbols).
If out of stock, return null.
`);
return parseFloat(result) || null;
} finally {
await session.delete();
}
}
// Usage: Check price and send alert if below target
const currentPrice = await checkPrice('Sony WH-1000XM5', 'https://www.amazon.com/dp/B09XS7JWHH');
if (currentPrice && currentPrice <= 299.99) {
console.log(`🎉 Price Alert: Target reached! Current: $${currentPrice}`);
}
Copy
from pyagi import AGIClient
client = AGIClient(api_key="your_api_key")
def check_price(product_name, url):
with client.session("agi-0-fast") as session:
result = session.run_task(f"""
Get the current price from: {url}
Product: {product_name}
Return ONLY the numeric price value (no currency symbols).
If out of stock, return null.
""")
try:
return float(result)
except:
return None
# Usage: Check price and send alert if below target
current_price = check_price('Sony WH-1000XM5', 'https://www.amazon.com/dp/B09XS7JWHH')
if current_price and current_price <= 299.99:
print(f"🎉 Price Alert: Target reached! Current: ${current_price:.2f}")
Copy
# Check price
http POST https://api.agi.tech/v1/sessions/$SESSION/message \
Authorization:"Bearer $AGI_API_KEY" \
message="Get the current price from: https://www.amazon.com/dp/B09XS7JWHH. Product: Sony WH-1000XM5. Return ONLY the numeric price value (no currency symbols). If out of stock, return null."
# Poll for result and check if below target
# (See quickstart for full polling example)
For full monitoring implementations with classes, see the complete example below (Python only for reference):
Copy
import time
from datetime import datetime
class PriceDropMonitor:
def __init__(self, api_key, alert_email):
self.api_key = api_key
self.alert_email = alert_email
self.tracked_products = {}
def add_product(self, product_name, url, target_price, alert_threshold_pct=5):
"""Add a product to track"""
self.tracked_products[product_name] = {
'url': url,
'target_price': target_price,
'alert_threshold': alert_threshold_pct,
'last_check': None,
'price_history': []
}
print(f"Now tracking: {product_name} (target: ${target_price})")
def check_all_prices(self):
"""Check prices for all tracked products"""
for product_name, data in self.tracked_products.items():
try:
current_price = self._get_price(product_name, data['url'])
if current_price:
data['price_history'].append({
'price': current_price,
'timestamp': datetime.now().isoformat()
})
# Check if price dropped below target
if current_price <= data['target_price']:
self._send_alert(
product_name,
current_price,
data['target_price'],
data['url'],
alert_type="TARGET_REACHED"
)
# Check for significant price drop
elif data['last_check']:
pct_change = ((current_price - data['last_check']) / data['last_check']) * 100
if abs(pct_change) >= data['alert_threshold']:
self._send_alert(
product_name,
current_price,
data['last_check'],
data['url'],
alert_type="PRICE_CHANGE",
percent_change=pct_change
)
data['last_check'] = current_price
except Exception as e:
print(f"Error checking {product_name}: {e}")
def _get_price(self, product_name, url):
"""Get current price from URL"""
session = requests.post(
"https://api.agi.tech/v1/sessions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"agent_name": "agi-0-fast"}
).json()
session_id = session["session_id"]
try:
requests.post(
f"https://api.agi.tech/v1/sessions/{session_id}/message",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"message": f"""
Get the current price from: {url}
Product: {product_name}
Return ONLY the numeric price value (no currency symbols).
If out of stock, return null.
"""
}
)
# Wait for result
while True:
status = requests.get(
f"https://api.agi.tech/v1/sessions/{session_id}/status",
headers={"Authorization": f"Bearer {self.api_key}"}
).json()
if status["status"] == "finished":
messages = requests.get(
f"https://api.agi.tech/v1/sessions/{session_id}/messages",
headers={"Authorization": f"Bearer {self.api_key}"}
).json()
for msg in messages["messages"]:
if msg["type"] == "DONE":
try:
return float(msg["content"])
except:
return None
break
time.sleep(2)
finally:
requests.delete(
f"https://api.agi.tech/v1/sessions/{session_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
def _send_alert(self, product_name, current_price, comparison_price, url, alert_type, percent_change=None):
"""Send price alert notification"""
if alert_type == "TARGET_REACHED":
subject = f"🎉 Price Alert: {product_name} - Target Reached!"
body = f"""
Great news! The price has dropped to your target.
Product: {product_name}
Current Price: ${current_price:.2f}
Your Target: ${comparison_price:.2f}
Savings: ${comparison_price - current_price:.2f}
Buy now: {url}
"""
else:
direction = "📉 dropped" if percent_change < 0 else "📈 increased"
subject = f"Price Alert: {product_name} - Price {direction}"
body = f"""
Significant price change detected!
Product: {product_name}
Previous Price: ${comparison_price:.2f}
Current Price: ${current_price:.2f}
Change: {percent_change:.1f}%
View product: {url}
"""
print(f"\n{'='*60}")
print(subject)
print(body)
print(f"{'='*60}\n")
# Optional: Send actual email
# self._send_email(subject, body)
def run_continuously(self, check_interval_minutes=60):
"""Run price checks continuously"""
print(f"Starting price monitoring (checking every {check_interval_minutes} minutes)")
while True:
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Checking prices...")
self.check_all_prices()
print(f"Next check in {check_interval_minutes} minutes")
time.sleep(check_interval_minutes * 60)
# Usage
monitor = PriceDropMonitor(
api_key="your_api_key",
alert_email="[email protected]"
)
# Add products to track
monitor.add_product(
product_name="Sony WH-1000XM5",
url="https://www.amazon.com/dp/B09XS7JWHH",
target_price=299.99,
alert_threshold_pct=5
)
monitor.add_product(
product_name="iPad Pro 11-inch",
url="https://www.apple.com/shop/buy-ipad/ipad-pro",
target_price=749.00,
alert_threshold_pct=3
)
# Run monitoring
monitor.run_continuously(check_interval_minutes=30)
Historical Price Tracking
Build a database of historical prices for trend analysis.Copy
import json
from datetime import datetime, timedelta
class PriceHistoryTracker:
def __init__(self, api_key, db_file="price_history.json"):
self.api_key = api_key
self.db_file = db_file
self.load_history()
def load_history(self):
"""Load price history from file"""
try:
with open(self.db_file, 'r') as f:
self.history = json.load(f)
except FileNotFoundError:
self.history = {}
def save_history(self):
"""Save price history to file"""
with open(self.db_file, 'w') as f:
json.dump(self.history, f, indent=2)
def record_price(self, product_id, product_name, url):
"""Record current price"""
current_price = self._fetch_price(product_name, url)
if not current_price:
return None
if product_id not in self.history:
self.history[product_id] = {
'name': product_name,
'url': url,
'prices': []
}
price_record = {
'price': current_price,
'timestamp': datetime.now().isoformat(),
'day_of_week': datetime.now().strftime('%A'),
'date': datetime.now().strftime('%Y-%m-%d')
}
self.history[product_id]['prices'].append(price_record)
self.save_history()
return price_record
def get_price_stats(self, product_id, days=30):
"""Get price statistics for a product"""
if product_id not in self.history:
return None
cutoff_date = datetime.now() - timedelta(days=days)
recent_prices = [
p for p in self.history[product_id]['prices']
if datetime.fromisoformat(p['timestamp']) >= cutoff_date
]
if not recent_prices:
return None
prices = [p['price'] for p in recent_prices]
return {
'product_name': self.history[product_id]['name'],
'current_price': prices[-1],
'min_price': min(prices),
'max_price': max(prices),
'avg_price': sum(prices) / len(prices),
'price_range': max(prices) - min(prices),
'trend': 'increasing' if prices[-1] > prices[0] else 'decreasing',
'total_samples': len(recent_prices),
'first_seen': recent_prices[0]['timestamp'],
'last_updated': recent_prices[-1]['timestamp']
}
def find_best_day_to_buy(self, product_id):
"""Analyze which day of week has lowest prices"""
if product_id not in self.history:
return None
day_prices = {}
for record in self.history[product_id]['prices']:
day = record['day_of_week']
if day not in day_prices:
day_prices[day] = []
day_prices[day].append(record['price'])
day_averages = {
day: sum(prices) / len(prices)
for day, prices in day_prices.items()
}
best_day = min(day_averages, key=day_averages.get)
return {
'best_day': best_day,
'average_price': day_averages[best_day],
'all_day_averages': day_averages
}
def _fetch_price(self, product_name, url):
"""Fetch current price using AGI Agent"""
# Implementation similar to previous examples
pass
# Usage
tracker = PriceHistoryTracker(api_key="your_api_key")
# Record prices daily
products_to_track = [
("laptop_dell_xps13", "Dell XPS 13", "https://www.dell.com/..."),
("headphones_sony_xm5", "Sony WH-1000XM5", "https://www.amazon.com/...")
]
for product_id, name, url in products_to_track:
record = tracker.record_price(product_id, name, url)
print(f"Recorded: {name} - ${record['price']}")
# Get statistics
stats = tracker.get_price_stats("laptop_dell_xps13", days=30)
print(f"\n30-Day Price Statistics:")
print(f"Current: ${stats['current_price']}")
print(f"Min: ${stats['min_price']}")
print(f"Max: ${stats['max_price']}")
print(f"Average: ${stats['avg_price']:.2f}")
print(f"Trend: {stats['trend']}")
# Find best day to buy
best_day_info = tracker.find_best_day_to_buy("laptop_dell_xps13")
print(f"\nBest day to buy: {best_day_info['best_day']}")
print(f"Average price on {best_day_info['best_day']}: ${best_day_info['average_price']:.2f}")
Competitive Price Intelligence
Monitor competitor pricing strategies and respond dynamically.Copy
class CompetitivePriceMonitor:
def __init__(self, api_key):
self.api_key = api_key
def compare_with_competitors(self, your_price, your_product_url, competitor_urls):
"""Compare your price with competitors"""
competitor_prices = []
for comp_url in competitor_urls:
price = self._get_price_from_url(comp_url)
if price:
competitor_prices.append({
'url': comp_url,
'price': price,
'domain': self._extract_domain(comp_url)
})
if not competitor_prices:
return None
sorted_prices = sorted(competitor_prices, key=lambda x: x['price'])
lowest_competitor = sorted_prices[0]
highest_competitor = sorted_prices[-1]
avg_competitor_price = sum(p['price'] for p in competitor_prices) / len(competitor_prices)
analysis = {
'your_price': your_price,
'lowest_competitor_price': lowest_competitor['price'],
'highest_competitor_price': highest_competitor['price'],
'average_competitor_price': avg_competitor_price,
'your_position': self._calculate_position(your_price, [p['price'] for p in competitor_prices]),
'price_difference_from_lowest': your_price - lowest_competitor['price'],
'price_difference_pct': ((your_price - lowest_competitor['price']) / lowest_competitor['price']) * 100,
'competitors': competitor_prices,
'recommendation': self._get_pricing_recommendation(your_price, avg_competitor_price, lowest_competitor['price'])
}
return analysis
def _calculate_position(self, your_price, competitor_prices):
"""Calculate where your price ranks"""
all_prices = sorted(competitor_prices + [your_price])
position = all_prices.index(your_price) + 1
total = len(all_prices)
if position == 1:
return "LOWEST"
elif position == total:
return "HIGHEST"
elif position <= total / 3:
return "LOW"
elif position <= 2 * total / 3:
return "MIDDLE"
else:
return "HIGH"
def _get_pricing_recommendation(self, your_price, avg_price, lowest_price):
"""Generate pricing recommendation"""
if your_price > avg_price * 1.1:
return {
'action': 'REDUCE',
'reason': 'Your price is significantly above average',
'suggested_price': round(avg_price * 0.95, 2)
}
elif your_price < lowest_price:
return {
'action': 'INCREASE',
'reason': 'You have the lowest price, room to increase margin',
'suggested_price': round(lowest_price * 0.98, 2)
}
else:
return {
'action': 'MAINTAIN',
'reason': 'Your price is competitive',
'suggested_price': your_price
}
# Usage
monitor = CompetitivePriceMonitor(api_key="your_api_key")
analysis = monitor.compare_with_competitors(
your_price=349.99,
your_product_url="https://yourstore.com/product",
competitor_urls=[
"https://amazon.com/product/...",
"https://bestbuy.com/product/...",
"https://walmart.com/product/..."
]
)
print("Competitive Price Analysis")
print(f"Your Price: ${analysis['your_price']}")
print(f"Market Position: {analysis['your_position']}")
print(f"Lowest Competitor: ${analysis['lowest_competitor_price']}")
print(f"Average Market Price: ${analysis['average_competitor_price']:.2f}")
print(f"\nRecommendation: {analysis['recommendation']['action']}")
print(f"Reason: {analysis['recommendation']['reason']}")
print(f"Suggested Price: ${analysis['recommendation']['suggested_price']}")
Best Implementation
Use Fast Model for Price Checks
Use Fast Model for Price Checks
Price checking is straightforward, so use
agi-0-fast for:- Faster results (30-50% quicker)
- Lower costs
- Higher throughput
Implement Rate Limiting
Implement Rate Limiting
To avoid overwhelming retailers or hitting rate limits:
- Add delays between checks (2-5 seconds)
- Batch requests when possible
- Use exponential backoff on errors
Cache Results
Cache Results
Reduce unnecessary API calls:
- Cache prices for 15-30 minutes
- Store results in a database
- Only re-check when necessary
Handle Edge Cases
Handle Edge Cases
Account for common issues:
- Out of stock items
- Temporary price unavailability
- Page load failures
- Format variations ($99.99 vs 99.99)