WebSocket Guide
Everything you need to know about GmGnAPI's WebSocket layer — connection lifecycle, message types, reconnection logic, and performance tuning for low-latency token monitoring.
How the WebSocket Connection Works
GmGnAPI maintains a single persistent WebSocket connection to wss://gmgn.ai/ws. When you call subscribe_new_pools(), a subscription message is sent over this connection. GMGN.ai's servers push matching events to your client as they happen — zero polling, minimal latency.
Connection Lifecycle
- Connect —
GmGnClient.__aenter__opens the WebSocket - Authenticate — session handshake with GMGN servers
- Subscribe — send subscription message(s) for the channels you want
- Listen —
client.listen()enters the event loop - Reconnect — on disconnect, exponential backoff auto-reconnects and re-subscribes
- Close —
__aexit__sends a clean close frame
Subscription Types
| Method | Channel | Description |
|---|---|---|
subscribe_new_pools() | new_pools | New liquidity pool creation events |
subscribe_token_updates(address) | token:{address} | Price, volume, and holder updates for a specific token |
subscribe_trades(address) | trades:{address} | Individual buy/sell transactions for a token |
subscribe_trending() | trending | GMGN trending token list updates |
Reconnection & Resilience
GmGnAPI uses exponential backoff: 1s → 2s → 4s → 8s → 16s → 32s, capped at 60s. After reconnecting, all active subscriptions are automatically re-sent.
from gmgnapi import GmGnClient, ConnectionConfig
config = ConnectionConfig(
max_reconnect_attempts=20, # give up after 20 attempts
initial_backoff=1.0, # first retry after 1 second
max_backoff=60.0, # cap at 60 seconds
backoff_multiplier=2.0, # double each time
ping_interval=30, # send ping every 30s to keep alive
ping_timeout=10, # disconnect if no pong within 10s
)
async with GmGnClient(connection_config=config) as client:
await client.subscribe_new_pools()
@client.on_reconnect
async def on_reconnect(attempt: int):
print(f"Reconnected (attempt #{attempt})")
await client.listen()
Raw Message Format
If you need low-level access to the raw JSON before it's parsed into Pydantic models:
@client.on_raw_message
async def on_raw(message: dict):
print(f"Channel: {message.get('channel')}")
print(f"Raw data: {message.get('data')}")
Example raw new_pool message
{
"channel": "new_pools",
"data": {
"address": "7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU",
"chain": "solana",
"market_cap": 125000.50,
"liquidity": 22500.00,
"volume_1h": 8700.20,
"buy_count_1h": 34,
"sell_count_1h": 12,
"top_holder_pct": 0.18,
"created_at": "2025-06-09T14:22:00Z"
}
}
Performance Tuning
Keep Handlers Fast
Your on_new_pool handler should return in <10ms. Offload slow work (DB writes, HTTP calls) to a background queue.
Use asyncio.Queue
Decouple the WebSocket receive loop from processing by pushing events to a queue and consuming them in a separate coroutine.
Filter Early
Use TokenFilter to reduce events at the source. Fewer events = less handler overhead = lower latency on the ones that matter.
Co-locate Your Bot
Run your bot on a VPS in a US data center close to GMGN's servers to minimize network latency.
Queue Pattern Example
import asyncio
from gmgnapi import GmGnClient
queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
async def processor():
while True:
pool = await queue.get()
# slow work here — DB, HTTP, etc.
print(f"Processing: {pool.address}")
await asyncio.sleep(0) # yield to event loop
queue.task_done()
async def main():
async with GmGnClient() as client:
await client.subscribe_new_pools()
@client.on_new_pool
async def on_pool(pool):
try:
queue.put_nowait(pool) # non-blocking
except asyncio.QueueFull:
print("⚠️ Queue full, dropping event")
# run processor concurrently
asyncio.create_task(processor())
await client.listen()
asyncio.run(main())
Related Docs
Code Async Python Faster with Chipa Editor
Chipa Editor understands asyncio patterns, offers inline docs for every GmGnAPI method, and catches common async mistakes before you run your code. Built by developers, for developers.