GmGnClient

Basic WebSocket client for connecting to GMGN.ai's real-time data streams. Provides core functionality with automatic reconnection and event handling.

class GmGnClient(
    websocket_url: str = "wss://gmgn.ai/ws",
    access_token: Optional[str] = None,
    max_reconnect_attempts: int = 10,
    reconnect_delay: float = 1.0,
    heartbeat_interval: float = 30.0
)

Constructor Parameters

Parameter Type Default Description
websocket_url str "wss://gmgn.ai/ws" WebSocket endpoint URL
access_token Optional[str] None Authentication token for protected channels
max_reconnect_attempts int 10 Maximum number of reconnection attempts
reconnect_delay float 1.0 Initial delay between reconnection attempts (seconds)
heartbeat_interval float 30.0 Heartbeat ping interval (seconds)

Methods

async subscribe_new_pools(chain: str = "sol") -> None

Subscribe to real-time new liquidity pool notifications.

await client.subscribe_new_pools(chain="sol")

async subscribe_token_updates(tokens: List[str], chain: str = "sol") -> None

Subscribe to price and volume updates for specific tokens.

tokens = ["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"]
await client.subscribe_token_updates(tokens, chain="sol")

async subscribe_wallet_trades(wallets: List[str], chain: str = "sol") -> None

Subscribe to trading activity from specific wallet addresses.

wallets = ["wallet_address_here"]
await client.subscribe_wallet_trades(wallets, chain="sol")

async listen() -> None

Start listening for WebSocket messages and dispatch events.

await client.listen()  # Runs until connection closes

async connect() -> None

Establish WebSocket connection manually.

async disconnect() -> None

Close WebSocket connection gracefully.

Event Decorators

@client.on_new_pool

Decorator for handling new pool notifications.

@client.on_new_pool
async def handle_new_pool(pool_data: NewPoolInfo):
    print(f"New pool: {pool_data.chain}")

@client.on_token_update

Decorator for handling token price/volume updates.

@client.on_token_update
async def handle_update(update: PairUpdateData):
    print(f"Price: ${update.price_usd}")

@client.on_wallet_trade

Decorator for handling wallet trading activity.

@client.on_error

Decorator for handling connection and data errors.

@client.on_connect

Decorator for handling successful connections.

@client.on_disconnect

Decorator for handling disconnections.

GmGnEnhancedClient

Advanced WebSocket client with filtering, monitoring, data export, and alerting capabilities. Extends GmGnClient with enterprise-grade features for production use.

class GmGnEnhancedClient(GmGnClient):
    def __init__(
        self,
        token_filter: Optional[TokenFilter] = None,
        export_config: Optional[DataExportConfig] = None,
        alert_config: Optional[AlertConfig] = None,
        enable_statistics: bool = True,
        **kwargs
    )

Additional Parameters

Parameter Type Description
token_filter Optional[TokenFilter] Filter configuration for token screening
export_config Optional[DataExportConfig] Data export settings (JSON, CSV, database)
alert_config Optional[AlertConfig] Alert and notification configuration
enable_statistics bool Enable real-time statistics collection

Enhanced Methods

get_statistics() -> MonitoringStats

Get current monitoring statistics and metrics.

stats = client.get_statistics()
print(f"Messages: {stats.total_messages}")
print(f"Uptime: {stats.connection_uptime}s")

async export_data(format: str = "json", file_path: Optional[str] = None) -> str

Export collected data to specified format.

file_path = await client.export_data("csv", "data.csv")
print(f"Data exported to: {file_path}")

update_filter(new_filter: TokenFilter) -> None

Update token filtering criteria dynamically.

add_alert_condition(condition: Dict[str, Any]) -> None

Add new alert condition at runtime.

Enhanced Events

@client.on_filtered_pool

Triggered when a pool passes all filter criteria.

@client.on_alert_triggered

Triggered when alert conditions are met.

@client.on_statistics_update

Periodic statistics updates (every 60 seconds).

Data Models

Pydantic data models for type-safe data handling and validation.

NewPoolInfo

Information about new liquidity pools

  • c: str - Chain identifier
  • p: List[PoolData] - Pool data list
  • rg: Optional[str] - Region

PoolData

Individual pool information

  • a: str - Pool address
  • ex: str - Exchange name
  • ba: str - Base token address
  • qa: str - Quote token address
  • bti: Optional[TokenInfo] - Base token info

TokenInfo

Detailed token information

  • s: Optional[str] - Symbol
  • n: Optional[str] - Name
  • mc: Optional[int] - Market cap
  • v24h: Optional[float] - 24h volume
  • p: Optional[float] - Price

PairUpdateData

Trading pair price/volume updates

  • pair_address: str
  • token_address: str
  • price_usd: Optional[Decimal]
  • volume_24h_usd: Optional[Decimal]
  • chain: str

WalletTradeData

Wallet trading activity

  • wallet_address: str
  • chain: str
  • trades: List[TradeData]
  • total_volume_24h_usd: Optional[Decimal]

TradeData

Individual trade information

  • transaction_hash: str
  • trade_type: Literal["buy", "sell"]
  • amount_usd: Decimal
  • timestamp: datetime

Configuration Models

TokenFilter

Configure token filtering criteria

from decimal import Decimal
from gmgnapi import TokenFilter

filter_config = TokenFilter(
    min_market_cap=Decimal("100000"),     # $100K minimum
    max_market_cap=Decimal("10000000"),   # $10M maximum
    min_liquidity=Decimal("50000"),       # $50K minimum liquidity
    min_volume_24h=Decimal("10000"),      # $10K 24h volume
    exchanges=["raydium", "orca"],        # Specific exchanges
    max_risk_score=0.3                    # Low risk only
)

DataExportConfig

Configure data export settings

from gmgnapi import DataExportConfig

export_config = DataExportConfig(
    enabled=True,
    format="json",                        # "json", "csv", "parquet"
    file_path="./data/pools.json",
    max_file_size_mb=100,
    rotation_interval_hours=24,
    compress=True,
    include_metadata=True
)

AlertConfig

Configure alerts and notifications

from gmgnapi import AlertConfig

alert_config = AlertConfig(
    enabled=True,
    webhook_url="https://hooks.slack.com/...",
    email="alerts@example.com",
    conditions=[
        {
            "field": "market_cap",
            "operator": ">",
            "value": 1000000
        },
        {
            "field": "volume_24h",
            "operator": ">",
            "value": 500000
        }
    ],
    rate_limit_seconds=60
)

Exceptions

GmGnAPIError

Base exception class for all GmGnAPI errors

ConnectionError

Raised when WebSocket connection fails

AuthenticationError

Raised when authentication fails for protected channels

ValidationError

Raised when received data fails validation

SubscriptionError

Raised when subscription to a channel fails

Exception Handling Example

from gmgnapi import GmGnClient, GmGnAPIError, AuthenticationError

try:
    async with GmGnClient() as client:
        await client.subscribe_wallet_trades(["wallet_address"])
        await client.listen()
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except GmGnAPIError as e:
    print(f"API error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")