GmGnAPI Professional Python Client

Connect to GMGN.ai's WebSocket API for real-time Solana blockchain data streams. Built with modern Python, async/await patterns, and enterprise-grade features.

100% Type Hinted
Async First
Python 3.8+ Compatible
quick_start.py
import asyncio
from gmgnapi import GmGnClient

async def main():
    async with GmGnClient() as client:
        # Subscribe to new token pools
        await client.subscribe_new_pools()
        
        # Handle real-time data
        @client.on_new_pool
        async def handle_pool(pool_data):
            print(f"New pool: {pool_data.chain}")
            
        # Start receiving data
        await client.listen()

asyncio.run(main())

Key Features

Real-time WebSocket

Persistent connection with automatic reconnection and exponential backoff for reliable data streaming.

Type Safety

Full type hints with Pydantic v2 models for robust data validation and IDE support.

Advanced Filtering

Sophisticated filtering system for market cap, volume, liquidity, and custom conditions.

Data Export

Export to JSON, CSV, or SQLite with automatic rotation and compression options.

Monitoring & Stats

Real-time monitoring with detailed statistics and performance metrics.

Smart Alerts

Intelligent alerting system with webhook and email notifications for important events.

Quick Installation

PyPI (Recommended)

pip install gmgnapi

Development

git clone https://github.com/theshadow76/GmGnAPI.git
cd GmGnAPI
pip install -e .

See It In Action

import asyncio
from gmgnapi import GmGnClient

async def monitor_new_pools():
    """Monitor new liquidity pools in real-time."""
    
    async with GmGnClient() as client:
        await client.subscribe_new_pools(chain="sol")
        
        @client.on_new_pool
        async def handle_new_pool(pool_data):
            for pool in pool_data.pools:
                if pool.bti:  # Base token info available
                    print(f"🆕 New Pool: {pool.bti.s}")
                    print(f"   💰 Market Cap: ${pool.bti.mc:,.2f}")
                    print(f"   📊 Liquidity: ${pool.il or 0:,.2f}")
                    print(f"   🏪 Exchange: {pool.ex}")
                    print("   " + "="*40)
        
        await client.listen()

asyncio.run(monitor_new_pools())
from gmgnapi import GmGnEnhancedClient, TokenFilter
from decimal import Decimal

async def monitor_high_value_pools():
    """Monitor only high-value pools with custom filters."""
    
    # Define filter for quality tokens
    token_filter = TokenFilter(
        min_market_cap=Decimal("100000"),    # Min $100K market cap
        min_liquidity=Decimal("50000"),      # Min $50K liquidity
        min_volume_24h=Decimal("10000"),     # Min $10K 24h volume
        max_risk_score=0.3,                  # Low risk only
        exchanges=["raydium", "orca"]        # Major DEXes only
    )
    
    async with GmGnEnhancedClient(token_filter=token_filter) as client:
        await client.subscribe_new_pools()
        
        @client.on_filtered_pool
        async def handle_quality_pool(pool_data):
            print(f"🎯 Quality Pool Found!")
            print(f"   Symbol: {pool_data.pools[0].bti.s}")
            print(f"   Market Cap: ${pool_data.pools[0].bti.mc:,.2f}")
            
        await client.listen()
from gmgnapi import GmGnEnhancedClient, AlertConfig, DataExportConfig

async def enterprise_monitoring():
    """Enterprise-grade monitoring with alerts and data export."""
    
    # Configure alerts
    alert_config = AlertConfig(
        enabled=True,
        webhook_url="https://hooks.slack.com/...",
        conditions=[
            {"field": "market_cap", "operator": ">", "value": 1000000},
            {"field": "volume_24h", "operator": ">", "value": 500000}
        ]
    )
    
    # Configure data export
    export_config = DataExportConfig(
        enabled=True,
        format="json",
        file_path="./data/pools.json",
        rotation_interval_hours=6
    )
    
    async with GmGnEnhancedClient(
        alert_config=alert_config,
        export_config=export_config
    ) as client:
        
        await client.subscribe_new_pools()
        
        # Monitor statistics
        @client.on_statistics_update
        async def log_stats(stats):
            print(f"📊 Stats: {stats.total_messages} messages")
            print(f"   Uptime: {stats.connection_uptime:.1f}s")
            print(f"   Tokens: {stats.unique_tokens_seen}")
        
        await client.listen()