Production Deployment
Best practices for deploying GmGnAPI in production environments.
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 gmgnapi && chown -R gmgnapi:gmgnapi /app
USER gmgnapi
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import gmgnapi; print('OK')" || exit 1
# Run the application
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'
services:
gmgnapi:
build: .
restart: unless-stopped
environment:
- GMGN_ACCESS_TOKEN=${GMGN_ACCESS_TOKEN}
- LOG_LEVEL=INFO
- DATA_DIR=/app/data
volumes:
- ./data:/app/data
- ./logs:/app/logs
networks:
- gmgn-network
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
- redis_data:/data
networks:
- gmgn-network
prometheus:
image: prom/prometheus:latest
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- gmgn-network
volumes:
redis_data:
prometheus_data:
networks:
gmgn-network:
driver: bridge
Kubernetes Deployment
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: gmgnapi
labels:
app: gmgnapi
spec:
replicas: 3
selector:
matchLabels:
app: gmgnapi
template:
metadata:
labels:
app: gmgnapi
spec:
containers:
- name: gmgnapi
image: gmgnapi:latest
ports:
- containerPort: 8080
env:
- name: GMGN_ACCESS_TOKEN
valueFrom:
secretKeyRef:
name: gmgn-secrets
key: access-token
resources:
limits:
memory: "512Mi"
cpu: "500m"
requests:
memory: "256Mi"
cpu: "250m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: gmgnapi-service
spec:
selector:
app: gmgnapi
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
Advanced Custom Filtering
Create sophisticated filtering logic for complex trading strategies.
from decimal import Decimal
from typing import Dict, Any, Callable, List
from gmgnapi import TokenFilter, GmGnEnhancedClient
import asyncio
class AdvancedTokenFilter:
"""
Advanced filtering system with custom rules and machine learning integration.
"""
def __init__(self):
self.custom_rules: List[Callable] = []
self.scoring_weights = {
'market_cap_score': 0.3,
'volume_score': 0.25,
'liquidity_score': 0.2,
'community_score': 0.15,
'risk_score': 0.1
}
# Historical data for pattern analysis
self.token_history = {}
def add_custom_rule(self, rule_func: Callable[[Dict[str, Any]], bool]):
"""Add a custom filtering rule."""
self.custom_rules.append(rule_func)
def calculate_token_score(self, token_data: Dict[str, Any]) -> float:
"""Calculate a comprehensive score for token quality."""
scores = {
'market_cap_score': self._score_market_cap(token_data),
'volume_score': self._score_volume(token_data),
'liquidity_score': self._score_liquidity(token_data),
'community_score': self._score_community(token_data),
'risk_score': self._score_risk(token_data)
}
# Weighted average
total_score = sum(
scores[key] * self.scoring_weights[key]
for key in scores
)
return min(max(total_score, 0), 100) # Clamp to 0-100
def _score_market_cap(self, token_data: Dict[str, Any]) -> float:
"""Score based on market cap (0-100)."""
mc = token_data.get('market_cap', 0)
if mc < 10000:
return 0
elif mc < 100000:
return 20
elif mc < 1000000:
return 60
elif mc < 10000000:
return 90
else:
return 100
def _score_volume(self, token_data: Dict[str, Any]) -> float:
"""Score based on 24h volume activity."""
volume = token_data.get('volume_24h', 0)
mc = token_data.get('market_cap', 1)
volume_ratio = volume / mc if mc > 0 else 0
if volume_ratio < 0.01:
return 0
elif volume_ratio < 0.05:
return 30
elif volume_ratio < 0.1:
return 60
elif volume_ratio < 0.3:
return 90
else:
return 100
def _score_liquidity(self, token_data: Dict[str, Any]) -> float:
"""Score based on liquidity depth."""
liquidity = token_data.get('liquidity', 0)
if liquidity < 1000:
return 0
elif liquidity < 10000:
return 30
elif liquidity < 50000:
return 60
elif liquidity < 200000:
return 90
else:
return 100
def _score_community(self, token_data: Dict[str, Any]) -> float:
"""Score based on community metrics."""
holders = token_data.get('holder_count', 0)
if holders < 10:
return 0
elif holders < 100:
return 25
elif holders < 1000:
return 50
elif holders < 5000:
return 75
else:
return 100
def _score_risk(self, token_data: Dict[str, Any]) -> float:
"""Score based on risk factors (higher score = lower risk)."""
risk_factors = 0
# Check for risk indicators
if token_data.get('creator_balance_ratio', 0) > 0.5:
risk_factors += 30 # High creator holding
if token_data.get('top_10_ratio', 0) > 0.8:
risk_factors += 25 # High concentration
if token_data.get('burn_rate', 0) > 0.1:
risk_factors += 20 # High burn rate
# Suspicious name patterns
symbol = token_data.get('symbol', '').upper()
suspicious_keywords = ['MOON', 'SAFE', 'BABY', 'DOGE', 'PEPE', 'TEST']
if any(keyword in symbol for keyword in suspicious_keywords):
risk_factors += 15
return max(0, 100 - risk_factors)
async def apply_filters(self, token_data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply all filters and return results."""
# Calculate base score
score = self.calculate_token_score(token_data)
# Apply custom rules
custom_results = []
for rule in self.custom_rules:
try:
result = rule(token_data)
custom_results.append(result)
except Exception as e:
print(f"Custom rule error: {e}")
custom_results.append(False)
# Determine if token passes filters
passes_score = score >= 60 # Minimum score threshold
passes_custom = all(custom_results) if custom_results else True
return {
'score': score,
'passes_filter': passes_score and passes_custom,
'custom_rule_results': custom_results,
'breakdown': {
'market_cap_score': self._score_market_cap(token_data),
'volume_score': self._score_volume(token_data),
'liquidity_score': self._score_liquidity(token_data),
'community_score': self._score_community(token_data),
'risk_score': self._score_risk(token_data)
}
}
# Example usage with custom rules
async def advanced_filtering_example():
"""Example of advanced filtering with custom rules."""
filter_system = AdvancedTokenFilter()
# Add custom rules
def no_meme_coins(token_data):
"""Filter out obvious meme coins."""
symbol = token_data.get('symbol', '').upper()
meme_keywords = ['DOGE', 'SHIB', 'PEPE', 'FLOKI', 'BABY']
return not any(keyword in symbol for keyword in meme_keywords)
def minimum_age_rule(token_data):
"""Require tokens to be at least 1 hour old."""
created_time = token_data.get('created_timestamp', 0)
current_time = asyncio.get_event_loop().time()
age_hours = (current_time - created_time) / 3600
return age_hours >= 1
def whale_activity_rule(token_data):
"""Check for healthy whale activity."""
top_10_ratio = token_data.get('top_10_ratio', 1)
return 0.3 <= top_10_ratio <= 0.7 # Balanced distribution
# Register custom rules
filter_system.add_custom_rule(no_meme_coins)
filter_system.add_custom_rule(minimum_age_rule)
filter_system.add_custom_rule(whale_activity_rule)
# Use with enhanced client
async with GmGnEnhancedClient() as client:
await client.subscribe_new_pools()
@client.on_new_pool
async def analyze_with_advanced_filter(pool_data):
for pool in pool_data.pools:
if pool.bti:
token_info = {
'symbol': pool.bti.s,
'market_cap': pool.bti.mc,
'volume_24h': pool.bti.v24h,
'holder_count': pool.bti.hc,
'liquidity': pool.il,
'created_timestamp': asyncio.get_event_loop().time(),
'top_10_ratio': pool.bti.t10hr or 0.5,
'creator_balance_ratio': pool.bti.cbr or 0.1
}
result = await filter_system.apply_filters(token_info)
if result['passes_filter']:
print(f"✅ HIGH QUALITY TOKEN: {token_info['symbol']}")
print(f" Score: {result['score']:.1f}/100")
print(f" Breakdown: {result['breakdown']}")
else:
print(f"❌ Filtered out: {token_info['symbol']} (Score: {result['score']:.1f})")
await client.listen()
if __name__ == "__main__":
asyncio.run(advanced_filtering_example())
Enterprise Data Persistence
Advanced data storage and retrieval patterns for large-scale operations.
Time-Series Database Integration
import asyncio
import asyncpg
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class TimeSeriesDataManager:
"""
Advanced time-series data management with PostgreSQL and TimescaleDB.
"""
def __init__(self, database_url: str):
self.database_url = database_url
self.pool: Optional[asyncpg.Pool] = None
async def initialize(self):
"""Initialize database connection pool and create tables."""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20,
command_timeout=60
)
await self.create_tables()
async def create_tables(self):
"""Create optimized tables for blockchain data."""
async with self.pool.acquire() as conn:
# Main pools table
await conn.execute("""
CREATE TABLE IF NOT EXISTS pools_timeseries (
time TIMESTAMPTZ NOT NULL,
pool_address TEXT NOT NULL,
token_symbol TEXT,
token_name TEXT,
market_cap NUMERIC,
volume_24h NUMERIC,
liquidity NUMERIC,
price NUMERIC,
holder_count INTEGER,
exchange TEXT,
chain TEXT,
metadata JSONB,
PRIMARY KEY (time, pool_address)
);
""")
# Create hypertable (TimescaleDB extension)
try:
await conn.execute("""
SELECT create_hypertable('pools_timeseries', 'time',
if_not_exists => TRUE);
""")
except Exception as e:
print(f"Note: TimescaleDB not available: {e}")
# Indexes for fast queries
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pools_symbol_time
ON pools_timeseries (token_symbol, time DESC);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pools_mcap
ON pools_timeseries (market_cap) WHERE market_cap IS NOT NULL;
""")
# Materialized view for aggregated data
await conn.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS daily_token_stats AS
SELECT
date_trunc('day', time) AS day,
token_symbol,
avg(market_cap) AS avg_market_cap,
max(market_cap) AS max_market_cap,
avg(volume_24h) AS avg_volume,
avg(price) AS avg_price,
count(*) AS data_points
FROM pools_timeseries
WHERE token_symbol IS NOT NULL
GROUP BY date_trunc('day', time), token_symbol;
""")
async def store_pool_data(self, pool_data: Dict):
"""Store pool data with optimized batch insertion."""
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO pools_timeseries (
time, pool_address, token_symbol, token_name,
market_cap, volume_24h, liquidity, price,
holder_count, exchange, chain, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (time, pool_address) DO UPDATE SET
market_cap = EXCLUDED.market_cap,
volume_24h = EXCLUDED.volume_24h,
liquidity = EXCLUDED.liquidity,
price = EXCLUDED.price,
holder_count = EXCLUDED.holder_count,
metadata = EXCLUDED.metadata;
""",
datetime.now(),
pool_data['pool_address'],
pool_data.get('token_symbol'),
pool_data.get('token_name'),
pool_data.get('market_cap'),
pool_data.get('volume_24h'),
pool_data.get('liquidity'),
pool_data.get('price'),
pool_data.get('holder_count'),
pool_data.get('exchange'),
pool_data.get('chain'),
json.dumps(pool_data.get('metadata', {}))
)
async def get_token_history(self,
symbol: str,
hours: int = 24) -> List[Dict]:
"""Get historical data for a token."""
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT time, market_cap, volume_24h, price, liquidity
FROM pools_timeseries
WHERE token_symbol = $1
AND time >= NOW() - INTERVAL '%s hours'
ORDER BY time DESC;
""", symbol, hours)
return [dict(row) for row in rows]
async def get_market_trends(self, timeframe: str = '1h') -> List[Dict]:
"""Get market trend analysis."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(f"""
SELECT
date_trunc('{timeframe}', time) AS period,
count(DISTINCT token_symbol) AS new_tokens,
avg(market_cap) AS avg_market_cap,
sum(volume_24h) AS total_volume
FROM pools_timeseries
WHERE time >= NOW() - INTERVAL '7 days'
GROUP BY date_trunc('{timeframe}', time)
ORDER BY period DESC;
""")
return [dict(row) for row in rows]
async def cleanup_old_data(self, days_to_keep: int = 30):
"""Clean up old data to manage storage."""
async with self.pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM pools_timeseries
WHERE time < NOW() - INTERVAL '%s days';
""", days_to_keep)
print(f"Cleaned up old data: {result}")
# Refresh materialized view
await conn.execute("REFRESH MATERIALIZED VIEW daily_token_stats;")
async def close(self):
"""Close database connections."""
if self.pool:
await self.pool.close()
Performance Optimization
Optimize GmGnAPI for high-throughput, low-latency operations.
Connection Pooling
import asyncio
from gmgnapi import GmGnEnhancedClient
from typing import List
class OptimizedClientPool:
"""Manage multiple client connections for high throughput."""
def __init__(self, pool_size: int = 5):
self.pool_size = pool_size
self.clients: List[GmGnEnhancedClient] = []
self.current_client = 0
async def initialize(self):
"""Initialize client pool."""
for i in range(self.pool_size):
client = GmGnEnhancedClient()
await client.connect()
self.clients.append(client)
def get_client(self) -> GmGnEnhancedClient:
"""Get next client in round-robin fashion."""
client = self.clients[self.current_client]
self.current_client = (self.current_client + 1) % self.pool_size
return client
Message Batching
import asyncio
from collections import deque
from typing import Deque, Dict, Any
class MessageBatcher:
"""Batch messages for efficient processing."""
def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer: Deque[Dict[str, Any]] = deque()
self.last_flush = asyncio.get_event_loop().time()
async def add_message(self, message: Dict[str, Any]):
"""Add message to batch."""
self.buffer.append(message)
current_time = asyncio.get_event_loop().time()
# Flush if batch is full or interval exceeded
if (len(self.buffer) >= self.batch_size or
current_time - self.last_flush >= self.flush_interval):
await self.flush()
async def flush(self):
"""Process batched messages."""
if not self.buffer:
return
batch = list(self.buffer)
self.buffer.clear()
self.last_flush = asyncio.get_event_loop().time()
# Process batch
await self.process_batch(batch)
async def process_batch(self, batch: List[Dict[str, Any]]):
"""Override this method for custom batch processing."""
print(f"Processing batch of {len(batch)} messages")
# Custom processing logic here
Memory Management
import gc
import psutil
import asyncio
from typing import Optional
class MemoryMonitor:
"""Monitor and manage memory usage."""
def __init__(self, max_memory_mb: int = 1024):
self.max_memory_mb = max_memory_mb
self.process = psutil.Process()
async def start_monitoring(self):
"""Start memory monitoring loop."""
while True:
await self.check_memory()
await asyncio.sleep(30) # Check every 30 seconds
async def check_memory(self):
"""Check current memory usage."""
memory_mb = self.process.memory_info().rss / 1024 / 1024
if memory_mb > self.max_memory_mb:
print(f"⚠️ High memory usage: {memory_mb:.1f}MB")
# Force garbage collection
gc.collect()
# Check again after GC
new_memory_mb = self.process.memory_info().rss / 1024 / 1024
print(f"After GC: {new_memory_mb:.1f}MB")
if new_memory_mb > self.max_memory_mb * 0.9:
print("🚨 Memory usage still high after GC")
# Implement additional cleanup strategies
Security Best Practices
Implement security measures for production deployments.
Credential Management
- Use environment variables for sensitive data
- Implement credential rotation
- Use encrypted storage for API tokens
- Enable audit logging for credential access
Network Security
- Use TLS 1.3 for all connections
- Implement connection rate limiting
- Set up firewall rules for outbound connections
- Monitor for suspicious network activity
Data Protection
- Encrypt sensitive data at rest
- Implement data retention policies
- Use secure data deletion methods
- Regular security audits of stored data
import os
import ssl
import hashlib
from cryptography.fernet import Fernet
from typing import Optional
class SecureConfigManager:
"""Secure configuration and credential management."""
def __init__(self, encryption_key: Optional[bytes] = None):
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
# Generate new key (store securely!)
self.cipher = Fernet(Fernet.generate_key())
def encrypt_credential(self, credential: str) -> str:
"""Encrypt a credential for secure storage."""
return self.cipher.encrypt(credential.encode()).decode()
def decrypt_credential(self, encrypted_credential: str) -> str:
"""Decrypt a stored credential."""
return self.cipher.decrypt(encrypted_credential.encode()).decode()
def get_secure_token(self) -> str:
"""Get API token with fallback hierarchy."""
# Try environment variable first
token = os.getenv('GMGN_ACCESS_TOKEN')
if token:
return token
# Try encrypted config file
config_file = os.getenv('GMGN_CONFIG_FILE')
if config_file and os.path.exists(config_file):
with open(config_file, 'r') as f:
encrypted_token = f.read().strip()
return self.decrypt_credential(encrypted_token)
raise ValueError("No valid API token found")
@staticmethod
def create_ssl_context() -> ssl.SSLContext:
"""Create secure SSL context for connections."""
context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
return context
Troubleshooting Guide
Common issues and their solutions for production deployments.
Connection Issues
Problem: WebSocket connections dropping frequently
Solutions:
- Increase reconnection delay and max attempts
- Check network stability and firewall settings
- Implement connection health checks
- Use connection pooling for redundancy
Memory Leaks
Problem: Memory usage continuously increasing
Solutions:
- Implement data rotation and cleanup
- Use weak references for caches
- Monitor with memory profiling tools
- Set maximum buffer sizes
Performance Degradation
Problem: Slow message processing under high load
Solutions:
- Implement message batching
- Use async processing queues
- Optimize database queries
- Scale horizontally with multiple instances
Authentication Failures
Problem: Random authentication errors
Solutions:
- Implement token refresh mechanism
- Add retry logic with exponential backoff
- Monitor token expiration
- Use secure credential storage
Diagnostic Commands
# Memory usage analysis
import tracemalloc
import gc
def analyze_memory():
"""Analyze current memory usage."""
tracemalloc.start()
# Your application code here
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory usage: {peak / 1024 / 1024:.1f} MB")
# Get top memory consumers
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("\nTop 10 memory consumers:")
for stat in top_stats[:10]:
print(stat)
# Connection diagnostics
async def diagnose_connection():
"""Diagnose WebSocket connection issues."""
import websockets
import time
start_time = time.time()
try:
async with websockets.connect("wss://gmgn.ai/ws") as ws:
# Test ping/pong
pong_waiter = await ws.ping()
await pong_waiter
latency = (time.time() - start_time) * 1000
print(f"✅ Connection successful, latency: {latency:.1f}ms")
except Exception as e:
print(f"❌ Connection failed: {e}")
# Performance monitoring
import asyncio
import time
from collections import deque
class PerformanceMonitor:
def __init__(self, window_size: int = 100):
self.message_times = deque(maxlen=window_size)
self.processing_times = deque(maxlen=window_size)
def record_message(self, processing_time: float):
"""Record message processing time."""
current_time = time.time()
self.message_times.append(current_time)
self.processing_times.append(processing_time)
def get_stats(self) -> dict:
"""Get performance statistics."""
if len(self.message_times) < 2:
return {"error": "Not enough data"}
# Calculate message rate (messages per second)
time_span = self.message_times[-1] - self.message_times[0]
message_rate = len(self.message_times) / time_span if time_span > 0 else 0
# Average processing time
avg_processing_time = sum(self.processing_times) / len(self.processing_times)
return {
"message_rate": message_rate,
"avg_processing_time_ms": avg_processing_time * 1000,
"total_messages": len(self.message_times)
}