Basic Pool Monitoring

Simple example to monitor new liquidity pools in real-time.

Basic Pool Monitor

import asyncio
import logging
from datetime import datetime
from gmgnapi import GmGnClient

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def basic_pool_monitor():
    """
    Monitor new Solana pools and log basic information.
    Perfect for getting started with real-time blockchain data.
    """
    
    async with GmGnClient() as client:
        # Subscribe to new pools on Solana
        await client.subscribe_new_pools(chain="sol")
        
        @client.on_new_pool
        async def handle_new_pool(pool_data):
            timestamp = datetime.now().strftime("%H:%M:%S")
            logger.info(f"[{timestamp}] New pools detected: {len(pool_data.pools)}")
            
            for pool in pool_data.pools:
                if pool.bti:  # Base token info available
                    token_info = pool.bti
                    logger.info(f"  šŸŖ™ Token: {token_info.s} ({token_info.n})")
                    logger.info(f"  šŸ’° Market Cap: ${token_info.mc:,.2f}" if token_info.mc else "  šŸ’° Market Cap: Unknown")
                    logger.info(f"  šŸŖ Exchange: {pool.ex}")
                    logger.info(f"  šŸ“ Pool Address: {pool.a}")
                    logger.info("  " + "="*50)
        
        @client.on_connect
        async def handle_connect():
            logger.info("āœ… Connected to GMGN WebSocket")
        
        @client.on_disconnect
        async def handle_disconnect():
            logger.warning("āš ļø Disconnected from GMGN WebSocket")
        
        @client.on_error
        async def handle_error(error):
            logger.error(f"āŒ Error: {error}")
        
        logger.info("šŸš€ Starting basic pool monitor...")
        await client.listen()

if __name__ == "__main__":
    try:
        asyncio.run(basic_pool_monitor())
    except KeyboardInterrupt:
        logger.info("šŸ‘‹ Monitor stopped by user")
    except Exception as e:
        logger.error(f"šŸ’„ Unexpected error: {e}")
        raise

Advanced Token Filtering

Filter tokens based on market cap, volume, and risk criteria.

Smart Token Filter

import asyncio
from decimal import Decimal
from gmgnapi import GmGnEnhancedClient, TokenFilter

class SmartTokenMonitor:
    """
    Advanced token monitoring with intelligent filtering.
    Only shows tokens that meet specific quality criteria.
    """
    
    def __init__(self):
        # Define filter for high-quality tokens
        self.quality_filter = TokenFilter(
            min_market_cap=Decimal("50000"),      # Minimum $50K market cap
            max_market_cap=Decimal("50000000"),   # Maximum $50M market cap
            min_liquidity=Decimal("25000"),       # Minimum $25K liquidity
            min_volume_24h=Decimal("5000"),       # Minimum $5K daily volume
            exchanges=["raydium", "orca", "meteora"],  # Trusted exchanges only
            max_risk_score=0.4,                   # Medium-low risk only
            exclude_symbols=["SCAM", "TEST", "FAKE"]  # Filter out obvious scams
        )
        
        # Track statistics
        self.total_pools_seen = 0
        self.quality_pools_found = 0
        
    async def start_monitoring(self):
        """Start the smart monitoring system."""
        
        async with GmGnEnhancedClient(token_filter=self.quality_filter) as client:
            await client.subscribe_new_pools(chain="sol")
            
            @client.on_new_pool
            async def handle_all_pools(pool_data):
                """Track all pools for statistics."""
                self.total_pools_seen += len(pool_data.pools)
                
            @client.on_filtered_pool
            async def handle_quality_pool(pool_data):
                """Handle pools that pass quality filters."""
                self.quality_pools_found += 1
                
                for pool in pool_data.pools:
                    if pool.bti:
                        await self.analyze_quality_token(pool)
                
                # Log statistics
                filter_rate = (self.quality_pools_found / max(self.total_pools_seen, 1)) * 100
                print(f"šŸ“Š Filter Rate: {filter_rate:.1f}% ({self.quality_pools_found}/{self.total_pools_seen})")
            
            @client.on_statistics_update
            async def handle_stats(stats):
                """Log monitoring statistics."""
                print(f"šŸ“ˆ Monitoring Stats:")
                print(f"   Messages: {stats.total_messages}")
                print(f"   Uptime: {stats.connection_uptime:.1f}s")
                print(f"   Unique Tokens: {stats.unique_tokens_seen}")
            
            print("🧠 Starting smart token monitor...")
            await client.listen()
    
    async def analyze_quality_token(self, pool):
        """Analyze a quality token that passed filters."""
        token = pool.bti
        
        print(f"\nšŸŽÆ QUALITY TOKEN FOUND:")
        print(f"  Symbol: {token.s}")
        print(f"  Name: {token.n}")
        print(f"  Market Cap: ${token.mc:,.2f}")
        print(f"  24h Volume: ${token.v24h:,.2f}" if token.v24h else "  24h Volume: Unknown")
        print(f"  Exchange: {pool.ex}")
        print(f"  Pool: {pool.a}")
        
        # Additional analysis
        if token.mc and token.v24h:
            volume_to_mcap_ratio = token.v24h / token.mc
            if volume_to_mcap_ratio > 0.1:  # High activity
                print(f"  šŸ”„ HIGH ACTIVITY: {volume_to_mcap_ratio:.2%} volume/mcap ratio")
        
        if token.hc and token.hc > 1000:  # High holder count
            print(f"  šŸ‘„ STRONG COMMUNITY: {token.hc:,} holders")
        
        print("  " + "="*60)

async def main():
    monitor = SmartTokenMonitor()
    try:
        await monitor.start_monitoring()
    except KeyboardInterrupt:
        print("\nšŸ‘‹ Smart monitor stopped")

if __name__ == "__main__":
    asyncio.run(main())

Data Export & Storage

Export real-time data to various formats for analysis and archiving.

Comprehensive Data Exporter

import asyncio
import json
import csv
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from gmgnapi import GmGnEnhancedClient, DataExportConfig

class DataExporter:
    """
    Export blockchain data to multiple formats:
    - JSON for programmatic access
    - CSV for spreadsheet analysis
    - SQLite for structured queries
    """
    
    def __init__(self, data_dir="./blockchain_data"):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
        
        # Configure different export formats
        self.json_config = DataExportConfig(
            enabled=True,
            format="json",
            file_path=str(self.data_dir / "pools.json"),
            max_file_size_mb=50,
            rotation_interval_hours=12,
            compress=True,
            include_metadata=True
        )
        
        self.csv_config = DataExportConfig(
            enabled=True,
            format="csv",
            file_path=str(self.data_dir / "pools.csv"),
            max_file_size_mb=25,
            rotation_interval_hours=24
        )
        
        # Set up SQLite database
        self.setup_database()
        
    def setup_database(self):
        """Initialize SQLite database for structured storage."""
        db_path = self.data_dir / "blockchain_data.db"
        
        with sqlite3.connect(db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS pools (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                    pool_address TEXT,
                    token_symbol TEXT,
                    token_name TEXT,
                    market_cap REAL,
                    volume_24h REAL,
                    exchange TEXT,
                    chain TEXT,
                    holder_count INTEGER
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_timestamp ON pools(timestamp);
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_symbol ON pools(token_symbol);
            """)
    
    async def start_export_monitoring(self):
        """Start monitoring and exporting data."""
        
        async with GmGnEnhancedClient(export_config=self.json_config) as client:
            await client.subscribe_new_pools(chain="sol")
            
            @client.on_new_pool
            async def export_pool_data(pool_data):
                """Export pool data to all configured formats."""
                
                for pool in pool_data.pools:
                    if pool.bti:
                        # Export to CSV
                        await self.export_to_csv(pool)
                        
                        # Export to SQLite
                        await self.export_to_sqlite(pool)
                        
                        # JSON export is handled automatically by the client
                        
                        print(f"šŸ“ Exported: {pool.bti.s} to all formats")
            
            print("šŸ’¾ Starting data export monitoring...")
            await client.listen()
    
    async def export_to_csv(self, pool):
        """Export pool data to CSV format."""
        csv_file = self.data_dir / f"pools_{datetime.now().strftime('%Y%m%d')}.csv"
        
        # Check if file exists to write headers
        write_headers = not csv_file.exists()
        
        with open(csv_file, 'a', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            
            if write_headers:
                writer.writerow([
                    'timestamp', 'pool_address', 'token_symbol', 'token_name',
                    'market_cap', 'volume_24h', 'exchange', 'chain', 'holder_count'
                ])
            
            token = pool.bti
            writer.writerow([
                datetime.now().isoformat(),
                pool.a,
                token.s or '',
                token.n or '',
                token.mc or 0,
                token.v24h or 0,
                pool.ex,
                'sol',
                token.hc or 0
            ])
    
    async def export_to_sqlite(self, pool):
        """Export pool data to SQLite database."""
        db_path = self.data_dir / "blockchain_data.db"
        
        with sqlite3.connect(db_path) as conn:
            token = pool.bti
            conn.execute("""
                INSERT INTO pools (
                    pool_address, token_symbol, token_name, market_cap,
                    volume_24h, exchange, chain, holder_count
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                pool.a,
                token.s,
                token.n,
                token.mc,
                token.v24h,
                pool.ex,
                'sol',
                token.hc
            ))
    
    async def generate_daily_report(self):
        """Generate a daily summary report."""
        db_path = self.data_dir / "blockchain_data.db"
        report_date = datetime.now().strftime('%Y-%m-%d')
        
        with sqlite3.connect(db_path) as conn:
            # Get daily statistics
            cursor = conn.execute("""
                SELECT 
                    COUNT(*) as total_pools,
                    COUNT(DISTINCT token_symbol) as unique_tokens,
                    AVG(market_cap) as avg_market_cap,
                    SUM(volume_24h) as total_volume,
                    exchange,
                    COUNT(*) as exchange_count
                FROM pools 
                WHERE date(timestamp) = date('now')
                GROUP BY exchange
                ORDER BY exchange_count DESC
            """)
            
            results = cursor.fetchall()
            
            report_file = self.data_dir / f"daily_report_{report_date}.txt"
            with open(report_file, 'w') as f:
                f.write(f"Daily Blockchain Data Report - {report_date}\n")
                f.write("=" * 50 + "\n\n")
                
                for row in results:
                    f.write(f"Exchange: {row[4]}\n")
                    f.write(f"  Total Pools: {row[5]}\n")
                    f.write(f"  Avg Market Cap: ${row[2]:,.2f}\n")
                    f.write(f"  Total Volume: ${row[3]:,.2f}\n\n")
            
            print(f"šŸ“Š Daily report generated: {report_file}")

async def main():
    exporter = DataExporter()
    
    # Start monitoring in background
    monitor_task = asyncio.create_task(exporter.start_export_monitoring())
    
    # Generate daily reports every 24 hours
    async def daily_report_scheduler():
        while True:
            await asyncio.sleep(24 * 60 * 60)  # 24 hours
            await exporter.generate_daily_report()
    
    report_task = asyncio.create_task(daily_report_scheduler())
    
    try:
        await asyncio.gather(monitor_task, report_task)
    except KeyboardInterrupt:
        print("\nšŸ“ Data export stopped")
        monitor_task.cancel()
        report_task.cancel()

if __name__ == "__main__":
    asyncio.run(main())

Intelligent Alert System

Set up smart alerts for significant market events and opportunities.

Multi-Channel Alert System

import asyncio
import aiohttp
import smtplib
from email.mime.text import MimeText
from datetime import datetime
from decimal import Decimal
from gmgnapi import GmGnEnhancedClient, AlertConfig, TokenFilter

class IntelligentAlertSystem:
    """
    Advanced alert system with multiple notification channels:
    - Slack/Discord webhooks
    - Email notifications
    - Console alerts with severity levels
    """
    
    def __init__(self, slack_webhook=None, email_config=None):
        self.slack_webhook = slack_webhook
        self.email_config = email_config
        
        # Configure alerts for different scenarios
        self.alert_config = AlertConfig(
            enabled=True,
            webhook_url=slack_webhook,
            conditions=[
                # Large market cap opportunities
                {
                    "name": "large_cap_opportunity",
                    "field": "market_cap",
                    "operator": ">",
                    "value": 1000000,
                    "severity": "high"
                },
                # High volume activity
                {
                    "name": "volume_spike",
                    "field": "volume_24h",
                    "operator": ">",
                    "value": 500000,
                    "severity": "medium"
                },
                # New exchange listings
                {
                    "name": "major_exchange",
                    "field": "exchange",
                    "operator": "in",
                    "value": ["raydium", "orca"],
                    "severity": "medium"
                }
            ],
            rate_limit_seconds=30  # Prevent spam
        )
        
        # Filter for alert-worthy tokens
        self.alert_filter = TokenFilter(
            min_market_cap=Decimal("10000"),
            min_liquidity=Decimal("5000"),
            max_risk_score=0.5
        )
    
    async def start_alert_monitoring(self):
        """Start the intelligent alert monitoring system."""
        
        async with GmGnEnhancedClient(
            token_filter=self.alert_filter,
            alert_config=self.alert_config
        ) as client:
            
            await client.subscribe_new_pools(chain="sol")
            
            @client.on_filtered_pool
            async def analyze_for_alerts(pool_data):
                """Analyze filtered pools for alert conditions."""
                
                for pool in pool_data.pools:
                    if pool.bti:
                        await self.check_alert_conditions(pool)
            
            @client.on_alert_triggered
            async def handle_triggered_alert(alert_data):
                """Handle alerts triggered by the system."""
                await self.send_alert_notifications(alert_data)
            
            print("🚨 Starting intelligent alert system...")
            await client.listen()
    
    async def check_alert_conditions(self, pool):
        """Check pool against custom alert conditions."""
        token = pool.bti
        
        # Mega cap alert (>$10M)
        if token.mc and token.mc > 10000000:
            await self.trigger_custom_alert(
                "mega_cap_alert",
                f"šŸš€ MEGA CAP TOKEN: {token.s}",
                f"Market Cap: ${token.mc:,.2f}",
                "critical"
            )
        
        # Rapid growth alert
        if token.v24h and token.mc:
            volume_ratio = token.v24h / token.mc
            if volume_ratio > 0.5:  # Volume > 50% of market cap
                await self.trigger_custom_alert(
                    "rapid_growth",
                    f"šŸ“ˆ RAPID GROWTH: {token.s}",
                    f"Volume/MarketCap: {volume_ratio:.1%}",
                    "high"
                )
        
        # Community strength alert
        if token.hc and token.hc > 5000:
            await self.trigger_custom_alert(
                "strong_community",
                f"šŸ‘„ STRONG COMMUNITY: {token.s}",
                f"Holders: {token.hc:,}",
                "medium"
            )
    
    async def trigger_custom_alert(self, alert_type, title, message, severity):
        """Trigger a custom alert with notifications."""
        
        alert_data = {
            "type": alert_type,
            "title": title,
            "message": message,
            "severity": severity,
            "timestamp": datetime.now().isoformat()
        }
        
        await self.send_alert_notifications(alert_data)
    
    async def send_alert_notifications(self, alert_data):
        """Send notifications through all configured channels."""
        
        # Console alert
        self.log_console_alert(alert_data)
        
        # Slack notification
        if self.slack_webhook:
            await self.send_slack_alert(alert_data)
        
        # Email notification for high/critical severity
        if self.email_config and alert_data.get("severity") in ["high", "critical"]:
            await self.send_email_alert(alert_data)
    
    def log_console_alert(self, alert_data):
        """Log alert to console with color coding."""
        severity = alert_data.get("severity", "low")
        
        # Color coding based on severity
        colors = {
            "critical": "šŸ”“",
            "high": "🟠", 
            "medium": "🟔",
            "low": "🟢"
        }
        
        icon = colors.get(severity, "ā„¹ļø")
        timestamp = datetime.now().strftime("%H:%M:%S")
        
        print(f"\n{icon} [{timestamp}] {alert_data['title']}")
        print(f"   {alert_data['message']}")
        print(f"   Severity: {severity.upper()}")
        print("   " + "="*50)
    
    async def send_slack_alert(self, alert_data):
        """Send alert to Slack webhook."""
        if not self.slack_webhook:
            return
        
        severity_colors = {
            "critical": "#ff0000",
            "high": "#ff8800",
            "medium": "#ffaa00",
            "low": "#00ff00"
        }
        
        color = severity_colors.get(alert_data.get("severity"), "#808080")
        
        payload = {
            "attachments": [
                {
                    "color": color,
                    "title": alert_data["title"],
                    "text": alert_data["message"],
                    "fields": [
                        {
                            "title": "Severity",
                            "value": alert_data.get("severity", "unknown").upper(),
                            "short": True
                        },
                        {
                            "title": "Time",
                            "value": alert_data["timestamp"],
                            "short": True
                        }
                    ]
                }
            ]
        }
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(self.slack_webhook, json=payload) as response:
                    if response.status == 200:
                        print("āœ… Slack alert sent successfully")
                    else:
                        print(f"āŒ Failed to send Slack alert: {response.status}")
        except Exception as e:
            print(f"āŒ Slack alert error: {e}")
    
    async def send_email_alert(self, alert_data):
        """Send email alert for high-priority events."""
        if not self.email_config:
            return
        
        try:
            subject = f"🚨 Blockchain Alert: {alert_data['title']}"
            body = f"""
            Alert Details:
            
            Title: {alert_data['title']}
            Message: {alert_data['message']}
            Severity: {alert_data.get('severity', 'unknown').upper()}
            Time: {alert_data['timestamp']}
            
            This is an automated alert from GmGnAPI monitoring system.
            """
            
            msg = MimeText(body)
            msg['Subject'] = subject
            msg['From'] = self.email_config['from']
            msg['To'] = self.email_config['to']
            
            # Send email (implementation depends on your email provider)
            print(f"šŸ“§ Email alert prepared: {subject}")
            
        except Exception as e:
            print(f"āŒ Email alert error: {e}")

async def main():
    # Configure your notification channels
    SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
    EMAIL_CONFIG = {
        "from": "alerts@yourapp.com",
        "to": "you@email.com",
        "smtp_server": "smtp.gmail.com",
        "smtp_port": 587,
        "username": "your_email@gmail.com",
        "password": "your_app_password"
    }
    
    alert_system = IntelligentAlertSystem(
        slack_webhook=SLACK_WEBHOOK,
        email_config=EMAIL_CONFIG
    )
    
    try:
        await alert_system.start_alert_monitoring()
    except KeyboardInterrupt:
        print("\n🚨 Alert system stopped")

if __name__ == "__main__":
    asyncio.run(main())

Whale Activity Monitor

Track large wallet movements and whale trading patterns.

Whale Detection System

import asyncio
from decimal import Decimal
from collections import defaultdict, deque
from datetime import datetime, timedelta
from gmgnapi import GmGnClient

class WhaleActivityMonitor:
    """
    Monitor and analyze whale trading activity.
    Tracks large transactions and wallet patterns.
    """
    
    def __init__(self, whale_threshold_usd=50000):
        self.whale_threshold = Decimal(str(whale_threshold_usd))
        
        # Track whale wallets and their activity
        self.whale_wallets = set()
        self.wallet_activity = defaultdict(lambda: {
            'total_volume': Decimal('0'),
            'trade_count': 0,
            'first_seen': None,
            'last_activity': None,
            'tokens_traded': set()
        })
        
        # Track recent large transactions
        self.recent_large_trades = deque(maxlen=100)
        
        # Known whale wallets (you can populate this from external sources)
        self.known_whales = {
            # Add known whale addresses here
            # "wallet_address": "Whale Name/Description"
        }
    
    async def start_whale_monitoring(self):
        """Start monitoring whale activity."""
        
        # Note: Wallet monitoring requires authentication
        # Make sure you have GMGN_ACCESS_TOKEN set
        
        async with GmGnClient() as client:
            
            # Monitor new pools for whale involvement
            await client.subscribe_new_pools(chain="sol")
            
            # If you have whale wallet addresses, monitor them directly
            if self.known_whales:
                whale_addresses = list(self.known_whales.keys())
                await client.subscribe_wallet_trades(whale_addresses, chain="sol")
            
            @client.on_new_pool
            async def analyze_pool_for_whales(pool_data):
                """Analyze new pools for whale involvement."""
                
                for pool in pool_data.pools:
                    if pool.bti and pool.bti.mc:
                        await self.check_pool_whale_activity(pool)
            
            @client.on_wallet_trade
            async def track_whale_trades(trade_data):
                """Track direct whale trading activity."""
                await self.analyze_whale_trade(trade_data)
            
            print("šŸ‹ Starting whale activity monitor...")
            print(f"šŸ’° Whale threshold: ${self.whale_threshold:,}")
            await client.listen()
    
    async def check_pool_whale_activity(self, pool):
        """Check if a new pool shows signs of whale involvement."""
        token = pool.bti
        
        # Large initial liquidity suggests whale involvement
        if pool.il and pool.il > float(self.whale_threshold):
            await self.log_whale_activity(
                "large_liquidity",
                f"šŸŠ Large Initial Liquidity: {token.s}",
                f"Liquidity: ${pool.il:,.2f} | Pool: {pool.a[:8]}..."
            )
        
        # High market cap at launch
        if token.mc and token.mc > float(self.whale_threshold * 2):
            await self.log_whale_activity(
                "high_mcap_launch",
                f"šŸ’Ž High Market Cap Launch: {token.s}",
                f"Market Cap: ${token.mc:,.2f}"
            )
        
        # Check volume to market cap ratio (whales often create high volume)
        if token.v24h and token.mc:
            volume_ratio = token.v24h / token.mc
            if volume_ratio > 0.3:  # High volume relative to market cap
                await self.log_whale_activity(
                    "high_volume_ratio",
                    f"šŸ“Š High Volume Activity: {token.s}",
                    f"Volume/MCap: {volume_ratio:.1%} | Volume: ${token.v24h:,.2f}"
                )
    
    async def analyze_whale_trade(self, trade_data):
        """Analyze individual whale trades."""
        wallet = trade_data.wallet_address
        
        # Update wallet statistics
        wallet_stats = self.wallet_activity[wallet]
        
        for trade in trade_data.trades:
            # Check if this is a whale-sized trade
            if trade.amount_usd >= self.whale_threshold:
                await self.record_whale_trade(wallet, trade)
        
        # Update wallet activity
        wallet_stats['total_volume'] += trade_data.total_volume_24h_usd or Decimal('0')
        wallet_stats['trade_count'] += len(trade_data.trades)
        wallet_stats['last_activity'] = datetime.now()
        
        if not wallet_stats['first_seen']:
            wallet_stats['first_seen'] = datetime.now()
        
        # Add wallet to whale list if volume threshold exceeded
        if wallet_stats['total_volume'] >= self.whale_threshold:
            if wallet not in self.whale_wallets:
                self.whale_wallets.add(wallet)
                await self.log_whale_activity(
                    "new_whale_detected",
                    f"šŸ†• New Whale Detected: {wallet[:8]}...",
                    f"Total Volume: ${wallet_stats['total_volume']:,.2f}"
                )
    
    async def record_whale_trade(self, wallet, trade):
        """Record a significant whale trade."""
        
        trade_info = {
            'wallet': wallet,
            'trade': trade,
            'timestamp': datetime.now()
        }
        
        self.recent_large_trades.append(trade_info)
        
        # Determine whale name if known
        whale_name = self.known_whales.get(wallet, f"Whale {wallet[:8]}...")
        
        await self.log_whale_activity(
            "whale_trade",
            f"šŸ‹ {whale_name} {trade.trade_type.upper()}",
            f"Amount: ${trade.amount_usd:,.2f} | Token: {trade.token_address[:8]}..."
        )
        
        # Check for unusual patterns
        await self.check_whale_patterns(wallet)
    
    async def check_whale_patterns(self, wallet):
        """Check for unusual whale trading patterns."""
        
        # Check recent trades from this wallet
        recent_trades = [
            t for t in self.recent_large_trades 
            if t['wallet'] == wallet and 
            t['timestamp'] > datetime.now() - timedelta(hours=1)
        ]
        
        if len(recent_trades) >= 3:
            total_amount = sum(t['trade'].amount_usd for t in recent_trades)
            await self.log_whale_activity(
                "whale_activity_burst",
                f"⚔ Whale Activity Burst: {wallet[:8]}...",
                f"3+ trades in 1h | Total: ${total_amount:,.2f}",
                severity="high"
            )
    
    async def log_whale_activity(self, activity_type, title, details, severity="medium"):
        """Log whale activity with appropriate formatting."""
        
        timestamp = datetime.now().strftime("%H:%M:%S")
        
        # Severity indicators
        indicators = {
            "low": "🟢",
            "medium": "🟔", 
            "high": "šŸ”“"
        }
        
        indicator = indicators.get(severity, "ā„¹ļø")
        
        print(f"\n{indicator} [{timestamp}] {title}")
        print(f"   {details}")
        print(f"   Activity Type: {activity_type}")
        print("   " + "="*60)
    
    def get_whale_statistics(self):
        """Get current whale monitoring statistics."""
        
        print(f"\nšŸ‹ WHALE MONITORING STATISTICS")
        print("=" * 50)
        print(f"Known Whales: {len(self.whale_wallets)}")
        print(f"Recent Large Trades: {len(self.recent_large_trades)}")
        print(f"Monitoring Threshold: ${self.whale_threshold:,}")
        
        # Top whales by volume
        top_whales = sorted(
            self.wallet_activity.items(),
            key=lambda x: x[1]['total_volume'],
            reverse=True
        )[:5]
        
        print(f"\nTop 5 Whales by Volume:")
        for i, (wallet, stats) in enumerate(top_whales, 1):
            whale_name = self.known_whales.get(wallet, f"{wallet[:8]}...")
            print(f"  {i}. {whale_name}")
            print(f"     Volume: ${stats['total_volume']:,.2f}")
            print(f"     Trades: {stats['trade_count']}")

async def main():
    # Create whale monitor with $50K threshold
    whale_monitor = WhaleActivityMonitor(whale_threshold_usd=50000)
    
    try:
        await whale_monitor.start_whale_monitoring()
    except KeyboardInterrupt:
        print("\nšŸ‹ Whale monitor stopped")
        
        # Show final statistics
        whale_monitor.get_whale_statistics()

if __name__ == "__main__":
    asyncio.run(main())

Quick Examples

Short, focused examples for specific use cases.

Portfolio Tracker

async def track_portfolio():
    """Track specific tokens in your portfolio."""
    
    portfolio_tokens = [
        "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",  # USDC
        "So11111111111111111111111111111111111111112",   # SOL
    ]
    
    async with GmGnClient() as client:
        await client.subscribe_token_updates(portfolio_tokens)
        
        @client.on_token_update
        async def handle_portfolio_update(update):
            print(f"šŸ’¼ {update.token_address[:8]}...")
            print(f"   Price: ${update.price_usd}")
            print(f"   24h Change: {update.price_change_24h:.2%}")
        
        await client.listen()

Price Alert Bot

async def price_alert_bot():
    """Simple price alert system."""
    
    price_targets = {
        "SOL": {"above": 200, "below": 150},
        "BTC": {"above": 100000, "below": 90000}
    }
    
    async with GmGnClient() as client:
        await client.subscribe_new_pools()
        
        @client.on_new_pool
        async def check_prices(pool_data):
            for pool in pool_data.pools:
                if pool.bti and pool.bti.s in price_targets:
                    price = pool.bti.p
                    symbol = pool.bti.s
                    targets = price_targets[symbol]
                    
                    if price > targets["above"]:
                        print(f"šŸš€ {symbol} above ${targets['above']}: ${price}")
                    elif price < targets["below"]:
                        print(f"šŸ“‰ {symbol} below ${targets['below']}: ${price}")
        
        await client.listen()

Volume Scanner

async def volume_scanner():
                    """Scan for high volume trading activity."""
                    
                    volume_threshold = 1000000  # $1M+ volume
                    
                    async with GmGnClient() as client:
                        await client.subscribe_new_pools()
                        
                        @client.on_new_pool
                        async def scan_volume(pool_data):
                            for pool in pool_data.pools:
                                if pool.bti and pool.bti.v24h:
                                    if pool.bti.v24h > volume_threshold:
                                        print(f"šŸ“Š HIGH VOLUME: {pool.bti.s}")
                                        print(f"   24h Volume: ${pool.bti.v24h:,.2f}")
                                        print(f"   Exchange: {pool.ex}")
                        
                        await client.listen()

New Token Announcer

async def new_token_announcer():
                    """Announce new tokens with basic info."""
                    
                    async with GmGnClient() as client:
                        await client.subscribe_new_pools()
                        
                        @client.on_new_pool
                        async def announce_token(pool_data):
                            for pool in pool_data.pools:
                                if pool.bti:
                                    token = pool.bti
                                    print(f"šŸ†• NEW TOKEN: {token.s}")
                                    print(f"   Name: {token.n}")
                                    print(f"   Market Cap: ${token.mc:,.2f}")
                                    print(f"   Pool: {pool.a}")
                                    print(f"   Exchange: {pool.ex}")
                                    print("   " + "-"*30)
                        
                        await client.listen()