Skip to main content

Overview

Build automated price monitoring systems to track competitor pricing, detect price changes, and optimize your pricing strategy.

Real-Time Price Checker

Simple price check across multiple retailers.
import { AGIClient } from 'agi';

const client = new AGIClient({ apiKey: 'your_api_key' });
const session = await client.createSession('agi-0-fast');

try {
    const retailers = ['amazon.com', 'bestbuy.com', 'apple.com'];
    const result = await session.runTask(`
        Check current price for: iPhone 15 Pro 256GB

        Retailers: ${retailers.join(', ')}

        For each retailer, provide: Store name, Current price (numeric), In stock (yes/no), Product URL, Shipping cost (if shown).

        Return as JSON array with retailer, price, in_stock, url, shipping.
    `);
    
    console.log(`Found ${result.length} prices:`);
    result.forEach(p => {
        const status = p.in_stock ? '✓ In Stock' : '✗ Out of Stock';
        console.log(`${p.retailer}: $${p.price} - ${status}`);
    });
} finally {
    await session.delete();
}

Automated Price Drop Alerts

Monitor products and send alerts when price drops below your target.
import { AGIClient } from 'agi';

const client = new AGIClient({ apiKey: 'your_api_key' });

async function checkPrice(productName, url) {
    const session = await client.createSession('agi-0-fast');
    try {
        const result = await session.runTask(`
            Get the current price from: ${url}
            Product: ${productName}
            Return ONLY the numeric price value (no currency symbols).
            If out of stock, return null.
        `);
        return parseFloat(result) || null;
    } finally {
        await session.delete();
    }
}

// Usage: Check price and send alert if below target
const currentPrice = await checkPrice('Sony WH-1000XM5', 'https://www.amazon.com/dp/B09XS7JWHH');
if (currentPrice && currentPrice <= 299.99) {
    console.log(`🎉 Price Alert: Target reached! Current: $${currentPrice}`);
}
For full monitoring implementations with classes, see the complete example below (Python only for reference):
import time
from datetime import datetime

class PriceDropMonitor:
    def __init__(self, api_key, alert_email):
        self.api_key = api_key
        self.alert_email = alert_email
        self.tracked_products = {}

    def add_product(self, product_name, url, target_price, alert_threshold_pct=5):
        """Add a product to track"""
        self.tracked_products[product_name] = {
            'url': url,
            'target_price': target_price,
            'alert_threshold': alert_threshold_pct,
            'last_check': None,
            'price_history': []
        }
        print(f"Now tracking: {product_name} (target: ${target_price})")

    def check_all_prices(self):
        """Check prices for all tracked products"""
        for product_name, data in self.tracked_products.items():
            try:
                current_price = self._get_price(product_name, data['url'])

                if current_price:
                    data['price_history'].append({
                        'price': current_price,
                        'timestamp': datetime.now().isoformat()
                    })

                    # Check if price dropped below target
                    if current_price <= data['target_price']:
                        self._send_alert(
                            product_name,
                            current_price,
                            data['target_price'],
                            data['url'],
                            alert_type="TARGET_REACHED"
                        )

                    # Check for significant price drop
                    elif data['last_check']:
                        pct_change = ((current_price - data['last_check']) / data['last_check']) * 100
                        if abs(pct_change) >= data['alert_threshold']:
                            self._send_alert(
                                product_name,
                                current_price,
                                data['last_check'],
                                data['url'],
                                alert_type="PRICE_CHANGE",
                                percent_change=pct_change
                            )

                    data['last_check'] = current_price

            except Exception as e:
                print(f"Error checking {product_name}: {e}")

    def _get_price(self, product_name, url):
        """Get current price from URL"""
        session = requests.post(
            "https://api.agi.tech/v1/sessions",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={"agent_name": "agi-0-fast"}
        ).json()

        session_id = session["session_id"]

        try:
            requests.post(
                f"https://api.agi.tech/v1/sessions/{session_id}/message",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "message": f"""
                    Get the current price from: {url}

                    Product: {product_name}

                    Return ONLY the numeric price value (no currency symbols).
                    If out of stock, return null.
                    """
                }
            )

            # Wait for result
            while True:
                status = requests.get(
                    f"https://api.agi.tech/v1/sessions/{session_id}/status",
                    headers={"Authorization": f"Bearer {self.api_key}"}
                ).json()

                if status["status"] == "finished":
                    messages = requests.get(
                        f"https://api.agi.tech/v1/sessions/{session_id}/messages",
                        headers={"Authorization": f"Bearer {self.api_key}"}
                    ).json()

                    for msg in messages["messages"]:
                        if msg["type"] == "DONE":
                            try:
                                return float(msg["content"])
                            except:
                                return None
                    break

                time.sleep(2)

        finally:
            requests.delete(
                f"https://api.agi.tech/v1/sessions/{session_id}",
                headers={"Authorization": f"Bearer {self.api_key}"}
            )

    def _send_alert(self, product_name, current_price, comparison_price, url, alert_type, percent_change=None):
        """Send price alert notification"""
        if alert_type == "TARGET_REACHED":
            subject = f"🎉 Price Alert: {product_name} - Target Reached!"
            body = f"""
            Great news! The price has dropped to your target.

            Product: {product_name}
            Current Price: ${current_price:.2f}
            Your Target: ${comparison_price:.2f}
            Savings: ${comparison_price - current_price:.2f}

            Buy now: {url}
            """
        else:
            direction = "📉 dropped" if percent_change < 0 else "📈 increased"
            subject = f"Price Alert: {product_name} - Price {direction}"
            body = f"""
            Significant price change detected!

            Product: {product_name}
            Previous Price: ${comparison_price:.2f}
            Current Price: ${current_price:.2f}
            Change: {percent_change:.1f}%

            View product: {url}
            """

        print(f"\n{'='*60}")
        print(subject)
        print(body)
        print(f"{'='*60}\n")

        # Optional: Send actual email
        # self._send_email(subject, body)

    def run_continuously(self, check_interval_minutes=60):
        """Run price checks continuously"""
        print(f"Starting price monitoring (checking every {check_interval_minutes} minutes)")

        while True:
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Checking prices...")
            self.check_all_prices()
            print(f"Next check in {check_interval_minutes} minutes")
            time.sleep(check_interval_minutes * 60)

# Usage
monitor = PriceDropMonitor(
    api_key="your_api_key",
    alert_email="[email protected]"
)

# Add products to track
monitor.add_product(
    product_name="Sony WH-1000XM5",
    url="https://www.amazon.com/dp/B09XS7JWHH",
    target_price=299.99,
    alert_threshold_pct=5
)

monitor.add_product(
    product_name="iPad Pro 11-inch",
    url="https://www.apple.com/shop/buy-ipad/ipad-pro",
    target_price=749.00,
    alert_threshold_pct=3
)

# Run monitoring
monitor.run_continuously(check_interval_minutes=30)

Historical Price Tracking

Build a database of historical prices for trend analysis.
import json
from datetime import datetime, timedelta

class PriceHistoryTracker:
    def __init__(self, api_key, db_file="price_history.json"):
        self.api_key = api_key
        self.db_file = db_file
        self.load_history()

    def load_history(self):
        """Load price history from file"""
        try:
            with open(self.db_file, 'r') as f:
                self.history = json.load(f)
        except FileNotFoundError:
            self.history = {}

    def save_history(self):
        """Save price history to file"""
        with open(self.db_file, 'w') as f:
            json.dump(self.history, f, indent=2)

    def record_price(self, product_id, product_name, url):
        """Record current price"""
        current_price = self._fetch_price(product_name, url)

        if not current_price:
            return None

        if product_id not in self.history:
            self.history[product_id] = {
                'name': product_name,
                'url': url,
                'prices': []
            }

        price_record = {
            'price': current_price,
            'timestamp': datetime.now().isoformat(),
            'day_of_week': datetime.now().strftime('%A'),
            'date': datetime.now().strftime('%Y-%m-%d')
        }

        self.history[product_id]['prices'].append(price_record)
        self.save_history()

        return price_record

    def get_price_stats(self, product_id, days=30):
        """Get price statistics for a product"""
        if product_id not in self.history:
            return None

        cutoff_date = datetime.now() - timedelta(days=days)
        recent_prices = [
            p for p in self.history[product_id]['prices']
            if datetime.fromisoformat(p['timestamp']) >= cutoff_date
        ]

        if not recent_prices:
            return None

        prices = [p['price'] for p in recent_prices]

        return {
            'product_name': self.history[product_id]['name'],
            'current_price': prices[-1],
            'min_price': min(prices),
            'max_price': max(prices),
            'avg_price': sum(prices) / len(prices),
            'price_range': max(prices) - min(prices),
            'trend': 'increasing' if prices[-1] > prices[0] else 'decreasing',
            'total_samples': len(recent_prices),
            'first_seen': recent_prices[0]['timestamp'],
            'last_updated': recent_prices[-1]['timestamp']
        }

    def find_best_day_to_buy(self, product_id):
        """Analyze which day of week has lowest prices"""
        if product_id not in self.history:
            return None

        day_prices = {}

        for record in self.history[product_id]['prices']:
            day = record['day_of_week']
            if day not in day_prices:
                day_prices[day] = []
            day_prices[day].append(record['price'])

        day_averages = {
            day: sum(prices) / len(prices)
            for day, prices in day_prices.items()
        }

        best_day = min(day_averages, key=day_averages.get)

        return {
            'best_day': best_day,
            'average_price': day_averages[best_day],
            'all_day_averages': day_averages
        }

    def _fetch_price(self, product_name, url):
        """Fetch current price using AGI Agent"""
        # Implementation similar to previous examples
        pass

# Usage
tracker = PriceHistoryTracker(api_key="your_api_key")

# Record prices daily
products_to_track = [
    ("laptop_dell_xps13", "Dell XPS 13", "https://www.dell.com/..."),
    ("headphones_sony_xm5", "Sony WH-1000XM5", "https://www.amazon.com/...")
]

for product_id, name, url in products_to_track:
    record = tracker.record_price(product_id, name, url)
    print(f"Recorded: {name} - ${record['price']}")

# Get statistics
stats = tracker.get_price_stats("laptop_dell_xps13", days=30)
print(f"\n30-Day Price Statistics:")
print(f"Current: ${stats['current_price']}")
print(f"Min: ${stats['min_price']}")
print(f"Max: ${stats['max_price']}")
print(f"Average: ${stats['avg_price']:.2f}")
print(f"Trend: {stats['trend']}")

# Find best day to buy
best_day_info = tracker.find_best_day_to_buy("laptop_dell_xps13")
print(f"\nBest day to buy: {best_day_info['best_day']}")
print(f"Average price on {best_day_info['best_day']}: ${best_day_info['average_price']:.2f}")

Competitive Price Intelligence

Monitor competitor pricing strategies and respond dynamically.
class CompetitivePriceMonitor:
    def __init__(self, api_key):
        self.api_key = api_key

    def compare_with_competitors(self, your_price, your_product_url, competitor_urls):
        """Compare your price with competitors"""

        competitor_prices = []

        for comp_url in competitor_urls:
            price = self._get_price_from_url(comp_url)
            if price:
                competitor_prices.append({
                    'url': comp_url,
                    'price': price,
                    'domain': self._extract_domain(comp_url)
                })

        if not competitor_prices:
            return None

        sorted_prices = sorted(competitor_prices, key=lambda x: x['price'])
        lowest_competitor = sorted_prices[0]
        highest_competitor = sorted_prices[-1]
        avg_competitor_price = sum(p['price'] for p in competitor_prices) / len(competitor_prices)

        analysis = {
            'your_price': your_price,
            'lowest_competitor_price': lowest_competitor['price'],
            'highest_competitor_price': highest_competitor['price'],
            'average_competitor_price': avg_competitor_price,
            'your_position': self._calculate_position(your_price, [p['price'] for p in competitor_prices]),
            'price_difference_from_lowest': your_price - lowest_competitor['price'],
            'price_difference_pct': ((your_price - lowest_competitor['price']) / lowest_competitor['price']) * 100,
            'competitors': competitor_prices,
            'recommendation': self._get_pricing_recommendation(your_price, avg_competitor_price, lowest_competitor['price'])
        }

        return analysis

    def _calculate_position(self, your_price, competitor_prices):
        """Calculate where your price ranks"""
        all_prices = sorted(competitor_prices + [your_price])
        position = all_prices.index(your_price) + 1
        total = len(all_prices)

        if position == 1:
            return "LOWEST"
        elif position == total:
            return "HIGHEST"
        elif position <= total / 3:
            return "LOW"
        elif position <= 2 * total / 3:
            return "MIDDLE"
        else:
            return "HIGH"

    def _get_pricing_recommendation(self, your_price, avg_price, lowest_price):
        """Generate pricing recommendation"""
        if your_price > avg_price * 1.1:
            return {
                'action': 'REDUCE',
                'reason': 'Your price is significantly above average',
                'suggested_price': round(avg_price * 0.95, 2)
            }
        elif your_price < lowest_price:
            return {
                'action': 'INCREASE',
                'reason': 'You have the lowest price, room to increase margin',
                'suggested_price': round(lowest_price * 0.98, 2)
            }
        else:
            return {
                'action': 'MAINTAIN',
                'reason': 'Your price is competitive',
                'suggested_price': your_price
            }

# Usage
monitor = CompetitivePriceMonitor(api_key="your_api_key")

analysis = monitor.compare_with_competitors(
    your_price=349.99,
    your_product_url="https://yourstore.com/product",
    competitor_urls=[
        "https://amazon.com/product/...",
        "https://bestbuy.com/product/...",
        "https://walmart.com/product/..."
    ]
)

print("Competitive Price Analysis")
print(f"Your Price: ${analysis['your_price']}")
print(f"Market Position: {analysis['your_position']}")
print(f"Lowest Competitor: ${analysis['lowest_competitor_price']}")
print(f"Average Market Price: ${analysis['average_competitor_price']:.2f}")
print(f"\nRecommendation: {analysis['recommendation']['action']}")
print(f"Reason: {analysis['recommendation']['reason']}")
print(f"Suggested Price: ${analysis['recommendation']['suggested_price']}")

Best Implementation

Price checking is straightforward, so use agi-0-fast for:
  • Faster results (30-50% quicker)
  • Lower costs
  • Higher throughput
To avoid overwhelming retailers or hitting rate limits:
  • Add delays between checks (2-5 seconds)
  • Batch requests when possible
  • Use exponential backoff on errors
Reduce unnecessary API calls:
  • Cache prices for 15-30 minutes
  • Store results in a database
  • Only re-check when necessary
Account for common issues:
  • Out of stock items
  • Temporary price unavailability
  • Page load failures
  • Format variations ($99.99 vs 99.99)