Production Deployment

Best practices for deploying GmGnAPI in production environments.

Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1000 gmgnapi && chown -R gmgnapi:gmgnapi /app
USER gmgnapi

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import gmgnapi; print('OK')" || exit 1

# Run the application
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'

services:
  gmgnapi:
    build: .
    restart: unless-stopped
    environment:
      - GMGN_ACCESS_TOKEN=${GMGN_ACCESS_TOKEN}
      - LOG_LEVEL=INFO
      - DATA_DIR=/app/data
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    networks:
      - gmgn-network

  redis:
    image: redis:7-alpine
    restart: unless-stopped
    volumes:
      - redis_data:/data
    networks:
      - gmgn-network

  prometheus:
    image: prom/prometheus:latest
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - gmgn-network

volumes:
  redis_data:
  prometheus_data:

networks:
  gmgn-network:
    driver: bridge

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gmgnapi
  labels:
    app: gmgnapi
spec:
  replicas: 3
  selector:
    matchLabels:
      app: gmgnapi
  template:
    metadata:
      labels:
        app: gmgnapi
    spec:
      containers:
      - name: gmgnapi
        image: gmgnapi:latest
        ports:
        - containerPort: 8080
        env:
        - name: GMGN_ACCESS_TOKEN
          valueFrom:
            secretKeyRef:
              name: gmgn-secrets
              key: access-token
        resources:
          limits:
            memory: "512Mi"
            cpu: "500m"
          requests:
            memory: "256Mi"
            cpu: "250m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: gmgnapi-service
spec:
  selector:
    app: gmgnapi
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: ClusterIP

Advanced Custom Filtering

Create sophisticated filtering logic for complex trading strategies.

from decimal import Decimal
from typing import Dict, Any, Callable, List
from gmgnapi import TokenFilter, GmGnEnhancedClient
import asyncio

class AdvancedTokenFilter:
    """
    Advanced filtering system with custom rules and machine learning integration.
    """
    
    def __init__(self):
        self.custom_rules: List[Callable] = []
        self.scoring_weights = {
            'market_cap_score': 0.3,
            'volume_score': 0.25,
            'liquidity_score': 0.2,
            'community_score': 0.15,
            'risk_score': 0.1
        }
        
        # Historical data for pattern analysis
        self.token_history = {}
        
    def add_custom_rule(self, rule_func: Callable[[Dict[str, Any]], bool]):
        """Add a custom filtering rule."""
        self.custom_rules.append(rule_func)
    
    def calculate_token_score(self, token_data: Dict[str, Any]) -> float:
        """Calculate a comprehensive score for token quality."""
        
        scores = {
            'market_cap_score': self._score_market_cap(token_data),
            'volume_score': self._score_volume(token_data),
            'liquidity_score': self._score_liquidity(token_data),
            'community_score': self._score_community(token_data),
            'risk_score': self._score_risk(token_data)
        }
        
        # Weighted average
        total_score = sum(
            scores[key] * self.scoring_weights[key] 
            for key in scores
        )
        
        return min(max(total_score, 0), 100)  # Clamp to 0-100
    
    def _score_market_cap(self, token_data: Dict[str, Any]) -> float:
        """Score based on market cap (0-100)."""
        mc = token_data.get('market_cap', 0)
        
        if mc < 10000:
            return 0
        elif mc < 100000:
            return 20
        elif mc < 1000000:
            return 60
        elif mc < 10000000:
            return 90
        else:
            return 100
    
    def _score_volume(self, token_data: Dict[str, Any]) -> float:
        """Score based on 24h volume activity."""
        volume = token_data.get('volume_24h', 0)
        mc = token_data.get('market_cap', 1)
        
        volume_ratio = volume / mc if mc > 0 else 0
        
        if volume_ratio < 0.01:
            return 0
        elif volume_ratio < 0.05:
            return 30
        elif volume_ratio < 0.1:
            return 60
        elif volume_ratio < 0.3:
            return 90
        else:
            return 100
    
    def _score_liquidity(self, token_data: Dict[str, Any]) -> float:
        """Score based on liquidity depth."""
        liquidity = token_data.get('liquidity', 0)
        
        if liquidity < 1000:
            return 0
        elif liquidity < 10000:
            return 30
        elif liquidity < 50000:
            return 60
        elif liquidity < 200000:
            return 90
        else:
            return 100
    
    def _score_community(self, token_data: Dict[str, Any]) -> float:
        """Score based on community metrics."""
        holders = token_data.get('holder_count', 0)
        
        if holders < 10:
            return 0
        elif holders < 100:
            return 25
        elif holders < 1000:
            return 50
        elif holders < 5000:
            return 75
        else:
            return 100
    
    def _score_risk(self, token_data: Dict[str, Any]) -> float:
        """Score based on risk factors (higher score = lower risk)."""
        risk_factors = 0
        
        # Check for risk indicators
        if token_data.get('creator_balance_ratio', 0) > 0.5:
            risk_factors += 30  # High creator holding
        
        if token_data.get('top_10_ratio', 0) > 0.8:
            risk_factors += 25  # High concentration
        
        if token_data.get('burn_rate', 0) > 0.1:
            risk_factors += 20  # High burn rate
        
        # Suspicious name patterns
        symbol = token_data.get('symbol', '').upper()
        suspicious_keywords = ['MOON', 'SAFE', 'BABY', 'DOGE', 'PEPE', 'TEST']
        if any(keyword in symbol for keyword in suspicious_keywords):
            risk_factors += 15
        
        return max(0, 100 - risk_factors)
    
    async def apply_filters(self, token_data: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all filters and return results."""
        
        # Calculate base score
        score = self.calculate_token_score(token_data)
        
        # Apply custom rules
        custom_results = []
        for rule in self.custom_rules:
            try:
                result = rule(token_data)
                custom_results.append(result)
            except Exception as e:
                print(f"Custom rule error: {e}")
                custom_results.append(False)
        
        # Determine if token passes filters
        passes_score = score >= 60  # Minimum score threshold
        passes_custom = all(custom_results) if custom_results else True
        
        return {
            'score': score,
            'passes_filter': passes_score and passes_custom,
            'custom_rule_results': custom_results,
            'breakdown': {
                'market_cap_score': self._score_market_cap(token_data),
                'volume_score': self._score_volume(token_data),
                'liquidity_score': self._score_liquidity(token_data),
                'community_score': self._score_community(token_data),
                'risk_score': self._score_risk(token_data)
            }
        }

# Example usage with custom rules
async def advanced_filtering_example():
    """Example of advanced filtering with custom rules."""
    
    filter_system = AdvancedTokenFilter()
    
    # Add custom rules
    def no_meme_coins(token_data):
        """Filter out obvious meme coins."""
        symbol = token_data.get('symbol', '').upper()
        meme_keywords = ['DOGE', 'SHIB', 'PEPE', 'FLOKI', 'BABY']
        return not any(keyword in symbol for keyword in meme_keywords)
    
    def minimum_age_rule(token_data):
        """Require tokens to be at least 1 hour old."""
        created_time = token_data.get('created_timestamp', 0)
        current_time = asyncio.get_event_loop().time()
        age_hours = (current_time - created_time) / 3600
        return age_hours >= 1
    
    def whale_activity_rule(token_data):
        """Check for healthy whale activity."""
        top_10_ratio = token_data.get('top_10_ratio', 1)
        return 0.3 <= top_10_ratio <= 0.7  # Balanced distribution
    
    # Register custom rules
    filter_system.add_custom_rule(no_meme_coins)
    filter_system.add_custom_rule(minimum_age_rule)
    filter_system.add_custom_rule(whale_activity_rule)
    
    # Use with enhanced client
    async with GmGnEnhancedClient() as client:
        await client.subscribe_new_pools()
        
        @client.on_new_pool
        async def analyze_with_advanced_filter(pool_data):
            for pool in pool_data.pools:
                if pool.bti:
                    token_info = {
                        'symbol': pool.bti.s,
                        'market_cap': pool.bti.mc,
                        'volume_24h': pool.bti.v24h,
                        'holder_count': pool.bti.hc,
                        'liquidity': pool.il,
                        'created_timestamp': asyncio.get_event_loop().time(),
                        'top_10_ratio': pool.bti.t10hr or 0.5,
                        'creator_balance_ratio': pool.bti.cbr or 0.1
                    }
                    
                    result = await filter_system.apply_filters(token_info)
                    
                    if result['passes_filter']:
                        print(f"✅ HIGH QUALITY TOKEN: {token_info['symbol']}")
                        print(f"   Score: {result['score']:.1f}/100")
                        print(f"   Breakdown: {result['breakdown']}")
                    else:
                        print(f"❌ Filtered out: {token_info['symbol']} (Score: {result['score']:.1f})")
        
        await client.listen()

if __name__ == "__main__":
    asyncio.run(advanced_filtering_example())

Enterprise Data Persistence

Advanced data storage and retrieval patterns for large-scale operations.

Time-Series Database Integration

import asyncio
import asyncpg
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json

class TimeSeriesDataManager:
    """
    Advanced time-series data management with PostgreSQL and TimescaleDB.
    """
    
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.pool: Optional[asyncpg.Pool] = None
        
    async def initialize(self):
        """Initialize database connection pool and create tables."""
        
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
        
        await self.create_tables()
        
    async def create_tables(self):
        """Create optimized tables for blockchain data."""
        
        async with self.pool.acquire() as conn:
            # Main pools table
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS pools_timeseries (
                    time TIMESTAMPTZ NOT NULL,
                    pool_address TEXT NOT NULL,
                    token_symbol TEXT,
                    token_name TEXT,
                    market_cap NUMERIC,
                    volume_24h NUMERIC,
                    liquidity NUMERIC,
                    price NUMERIC,
                    holder_count INTEGER,
                    exchange TEXT,
                    chain TEXT,
                    metadata JSONB,
                    PRIMARY KEY (time, pool_address)
                );
            """)
            
            # Create hypertable (TimescaleDB extension)
            try:
                await conn.execute("""
                    SELECT create_hypertable('pools_timeseries', 'time', 
                                           if_not_exists => TRUE);
                """)
            except Exception as e:
                print(f"Note: TimescaleDB not available: {e}")
            
            # Indexes for fast queries
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_pools_symbol_time 
                ON pools_timeseries (token_symbol, time DESC);
            """)
            
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_pools_mcap 
                ON pools_timeseries (market_cap) WHERE market_cap IS NOT NULL;
            """)
            
            # Materialized view for aggregated data
            await conn.execute("""
                CREATE MATERIALIZED VIEW IF NOT EXISTS daily_token_stats AS
                SELECT 
                    date_trunc('day', time) AS day,
                    token_symbol,
                    avg(market_cap) AS avg_market_cap,
                    max(market_cap) AS max_market_cap,
                    avg(volume_24h) AS avg_volume,
                    avg(price) AS avg_price,
                    count(*) AS data_points
                FROM pools_timeseries
                WHERE token_symbol IS NOT NULL
                GROUP BY date_trunc('day', time), token_symbol;
            """)
    
    async def store_pool_data(self, pool_data: Dict):
        """Store pool data with optimized batch insertion."""
        
        async with self.pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO pools_timeseries (
                    time, pool_address, token_symbol, token_name,
                    market_cap, volume_24h, liquidity, price,
                    holder_count, exchange, chain, metadata
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
                ON CONFLICT (time, pool_address) DO UPDATE SET
                    market_cap = EXCLUDED.market_cap,
                    volume_24h = EXCLUDED.volume_24h,
                    liquidity = EXCLUDED.liquidity,
                    price = EXCLUDED.price,
                    holder_count = EXCLUDED.holder_count,
                    metadata = EXCLUDED.metadata;
            """, 
                datetime.now(),
                pool_data['pool_address'],
                pool_data.get('token_symbol'),
                pool_data.get('token_name'),
                pool_data.get('market_cap'),
                pool_data.get('volume_24h'),
                pool_data.get('liquidity'),
                pool_data.get('price'),
                pool_data.get('holder_count'),
                pool_data.get('exchange'),
                pool_data.get('chain'),
                json.dumps(pool_data.get('metadata', {}))
            )
    
    async def get_token_history(self, 
                               symbol: str, 
                               hours: int = 24) -> List[Dict]:
        """Get historical data for a token."""
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT time, market_cap, volume_24h, price, liquidity
                FROM pools_timeseries
                WHERE token_symbol = $1 
                AND time >= NOW() - INTERVAL '%s hours'
                ORDER BY time DESC;
            """, symbol, hours)
            
            return [dict(row) for row in rows]
    
    async def get_market_trends(self, timeframe: str = '1h') -> List[Dict]:
        """Get market trend analysis."""
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(f"""
                SELECT 
                    date_trunc('{timeframe}', time) AS period,
                    count(DISTINCT token_symbol) AS new_tokens,
                    avg(market_cap) AS avg_market_cap,
                    sum(volume_24h) AS total_volume
                FROM pools_timeseries
                WHERE time >= NOW() - INTERVAL '7 days'
                GROUP BY date_trunc('{timeframe}', time)
                ORDER BY period DESC;
            """)
            
            return [dict(row) for row in rows]
    
    async def cleanup_old_data(self, days_to_keep: int = 30):
        """Clean up old data to manage storage."""
        
        async with self.pool.acquire() as conn:
            result = await conn.execute("""
                DELETE FROM pools_timeseries
                WHERE time < NOW() - INTERVAL '%s days';
            """, days_to_keep)
            
            print(f"Cleaned up old data: {result}")
            
            # Refresh materialized view
            await conn.execute("REFRESH MATERIALIZED VIEW daily_token_stats;")
    
    async def close(self):
        """Close database connections."""
        if self.pool:
            await self.pool.close()

Performance Optimization

Optimize GmGnAPI for high-throughput, low-latency operations.

Connection Pooling

import asyncio
from gmgnapi import GmGnEnhancedClient
from typing import List

class OptimizedClientPool:
    """Manage multiple client connections for high throughput."""
    
    def __init__(self, pool_size: int = 5):
        self.pool_size = pool_size
        self.clients: List[GmGnEnhancedClient] = []
        self.current_client = 0
        
    async def initialize(self):
        """Initialize client pool."""
        for i in range(self.pool_size):
            client = GmGnEnhancedClient()
            await client.connect()
            self.clients.append(client)
    
    def get_client(self) -> GmGnEnhancedClient:
        """Get next client in round-robin fashion."""
        client = self.clients[self.current_client]
        self.current_client = (self.current_client + 1) % self.pool_size
        return client

Message Batching

import asyncio
from collections import deque
from typing import Deque, Dict, Any

class MessageBatcher:
    """Batch messages for efficient processing."""
    
    def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer: Deque[Dict[str, Any]] = deque()
        self.last_flush = asyncio.get_event_loop().time()
        
    async def add_message(self, message: Dict[str, Any]):
        """Add message to batch."""
        self.buffer.append(message)
        
        current_time = asyncio.get_event_loop().time()
        
        # Flush if batch is full or interval exceeded
        if (len(self.buffer) >= self.batch_size or 
            current_time - self.last_flush >= self.flush_interval):
            await self.flush()
    
    async def flush(self):
        """Process batched messages."""
        if not self.buffer:
            return
            
        batch = list(self.buffer)
        self.buffer.clear()
        self.last_flush = asyncio.get_event_loop().time()
        
        # Process batch
        await self.process_batch(batch)
    
    async def process_batch(self, batch: List[Dict[str, Any]]):
        """Override this method for custom batch processing."""
        print(f"Processing batch of {len(batch)} messages")
        # Custom processing logic here

Memory Management

import gc
import psutil
import asyncio
from typing import Optional

class MemoryMonitor:
    """Monitor and manage memory usage."""
    
    def __init__(self, max_memory_mb: int = 1024):
        self.max_memory_mb = max_memory_mb
        self.process = psutil.Process()
        
    async def start_monitoring(self):
        """Start memory monitoring loop."""
        while True:
            await self.check_memory()
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def check_memory(self):
        """Check current memory usage."""
        memory_mb = self.process.memory_info().rss / 1024 / 1024
        
        if memory_mb > self.max_memory_mb:
            print(f"⚠️ High memory usage: {memory_mb:.1f}MB")
            
            # Force garbage collection
            gc.collect()
            
            # Check again after GC
            new_memory_mb = self.process.memory_info().rss / 1024 / 1024
            print(f"After GC: {new_memory_mb:.1f}MB")
            
            if new_memory_mb > self.max_memory_mb * 0.9:
                print("🚨 Memory usage still high after GC")
                # Implement additional cleanup strategies

Security Best Practices

Implement security measures for production deployments.

Credential Management

  • Use environment variables for sensitive data
  • Implement credential rotation
  • Use encrypted storage for API tokens
  • Enable audit logging for credential access

Network Security

  • Use TLS 1.3 for all connections
  • Implement connection rate limiting
  • Set up firewall rules for outbound connections
  • Monitor for suspicious network activity

Data Protection

  • Encrypt sensitive data at rest
  • Implement data retention policies
  • Use secure data deletion methods
  • Regular security audits of stored data
import os
import ssl
import hashlib
from cryptography.fernet import Fernet
from typing import Optional

class SecureConfigManager:
    """Secure configuration and credential management."""
    
    def __init__(self, encryption_key: Optional[bytes] = None):
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            # Generate new key (store securely!)
            self.cipher = Fernet(Fernet.generate_key())
    
    def encrypt_credential(self, credential: str) -> str:
        """Encrypt a credential for secure storage."""
        return self.cipher.encrypt(credential.encode()).decode()
    
    def decrypt_credential(self, encrypted_credential: str) -> str:
        """Decrypt a stored credential."""
        return self.cipher.decrypt(encrypted_credential.encode()).decode()
    
    def get_secure_token(self) -> str:
        """Get API token with fallback hierarchy."""
        
        # Try environment variable first
        token = os.getenv('GMGN_ACCESS_TOKEN')
        if token:
            return token
        
        # Try encrypted config file
        config_file = os.getenv('GMGN_CONFIG_FILE')
        if config_file and os.path.exists(config_file):
            with open(config_file, 'r') as f:
                encrypted_token = f.read().strip()
                return self.decrypt_credential(encrypted_token)
        
        raise ValueError("No valid API token found")
    
    @staticmethod
    def create_ssl_context() -> ssl.SSLContext:
        """Create secure SSL context for connections."""
        context = ssl.create_default_context()
        context.minimum_version = ssl.TLSVersion.TLSv1_3
        context.check_hostname = True
        context.verify_mode = ssl.CERT_REQUIRED
        return context

Troubleshooting Guide

Common issues and their solutions for production deployments.

Connection Issues

Problem: WebSocket connections dropping frequently
Solutions:
  • Increase reconnection delay and max attempts
  • Check network stability and firewall settings
  • Implement connection health checks
  • Use connection pooling for redundancy

Memory Leaks

Problem: Memory usage continuously increasing
Solutions:
  • Implement data rotation and cleanup
  • Use weak references for caches
  • Monitor with memory profiling tools
  • Set maximum buffer sizes

Performance Degradation

Problem: Slow message processing under high load
Solutions:
  • Implement message batching
  • Use async processing queues
  • Optimize database queries
  • Scale horizontally with multiple instances

Authentication Failures

Problem: Random authentication errors
Solutions:
  • Implement token refresh mechanism
  • Add retry logic with exponential backoff
  • Monitor token expiration
  • Use secure credential storage

Diagnostic Commands

# Memory usage analysis
import tracemalloc
import gc

def analyze_memory():
    """Analyze current memory usage."""
    tracemalloc.start()
    
    # Your application code here
    
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current memory usage: {current / 1024 / 1024:.1f} MB")
    print(f"Peak memory usage: {peak / 1024 / 1024:.1f} MB")
    
    # Get top memory consumers
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')
    
    print("\nTop 10 memory consumers:")
    for stat in top_stats[:10]:
        print(stat)

# Connection diagnostics
async def diagnose_connection():
    """Diagnose WebSocket connection issues."""
    import websockets
    import time
    
    start_time = time.time()
    
    try:
        async with websockets.connect("wss://gmgn.ai/ws") as ws:
            # Test ping/pong
            pong_waiter = await ws.ping()
            await pong_waiter
            
            latency = (time.time() - start_time) * 1000
            print(f"✅ Connection successful, latency: {latency:.1f}ms")
            
    except Exception as e:
        print(f"❌ Connection failed: {e}")

# Performance monitoring
import asyncio
import time
from collections import deque

class PerformanceMonitor:
    def __init__(self, window_size: int = 100):
        self.message_times = deque(maxlen=window_size)
        self.processing_times = deque(maxlen=window_size)
    
    def record_message(self, processing_time: float):
        """Record message processing time."""
        current_time = time.time()
        self.message_times.append(current_time)
        self.processing_times.append(processing_time)
    
    def get_stats(self) -> dict:
        """Get performance statistics."""
        if len(self.message_times) < 2:
            return {"error": "Not enough data"}
        
        # Calculate message rate (messages per second)
        time_span = self.message_times[-1] - self.message_times[0]
        message_rate = len(self.message_times) / time_span if time_span > 0 else 0
        
        # Average processing time
        avg_processing_time = sum(self.processing_times) / len(self.processing_times)
        
        return {
            "message_rate": message_rate,
            "avg_processing_time_ms": avg_processing_time * 1000,
            "total_messages": len(self.message_times)
        }