GmGnClient
Basic WebSocket client for connecting to GMGN.ai's real-time data streams. Provides core functionality with automatic reconnection and event handling.
class GmGnClient(
websocket_url: str = "wss://gmgn.ai/ws",
access_token: Optional[str] = None,
max_reconnect_attempts: int = 10,
reconnect_delay: float = 1.0,
heartbeat_interval: float = 30.0
)
Constructor Parameters
Parameter | Type | Default | Description |
---|---|---|---|
websocket_url |
str |
"wss://gmgn.ai/ws" |
WebSocket endpoint URL |
access_token |
Optional[str] |
None |
Authentication token for protected channels |
max_reconnect_attempts |
int |
10 |
Maximum number of reconnection attempts |
reconnect_delay |
float |
1.0 |
Initial delay between reconnection attempts (seconds) |
heartbeat_interval |
float |
30.0 |
Heartbeat ping interval (seconds) |
Methods
async subscribe_new_pools(chain: str = "sol") -> None
Subscribe to real-time new liquidity pool notifications.
await client.subscribe_new_pools(chain="sol")
async subscribe_token_updates(tokens: List[str], chain: str = "sol") -> None
Subscribe to price and volume updates for specific tokens.
tokens = ["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"]
await client.subscribe_token_updates(tokens, chain="sol")
async subscribe_wallet_trades(wallets: List[str], chain: str = "sol") -> None
Subscribe to trading activity from specific wallet addresses.
wallets = ["wallet_address_here"]
await client.subscribe_wallet_trades(wallets, chain="sol")
async listen() -> None
Start listening for WebSocket messages and dispatch events.
await client.listen() # Runs until connection closes
async connect() -> None
Establish WebSocket connection manually.
async disconnect() -> None
Close WebSocket connection gracefully.
Event Decorators
@client.on_new_pool
Decorator for handling new pool notifications.
@client.on_new_pool
async def handle_new_pool(pool_data: NewPoolInfo):
print(f"New pool: {pool_data.chain}")
@client.on_token_update
Decorator for handling token price/volume updates.
@client.on_token_update
async def handle_update(update: PairUpdateData):
print(f"Price: ${update.price_usd}")
@client.on_wallet_trade
Decorator for handling wallet trading activity.
@client.on_error
Decorator for handling connection and data errors.
@client.on_connect
Decorator for handling successful connections.
@client.on_disconnect
Decorator for handling disconnections.
GmGnEnhancedClient
Advanced WebSocket client with filtering, monitoring, data export, and alerting capabilities. Extends GmGnClient with enterprise-grade features for production use.
class GmGnEnhancedClient(GmGnClient):
def __init__(
self,
token_filter: Optional[TokenFilter] = None,
export_config: Optional[DataExportConfig] = None,
alert_config: Optional[AlertConfig] = None,
enable_statistics: bool = True,
**kwargs
)
Additional Parameters
Parameter | Type | Description |
---|---|---|
token_filter |
Optional[TokenFilter] |
Filter configuration for token screening |
export_config |
Optional[DataExportConfig] |
Data export settings (JSON, CSV, database) |
alert_config |
Optional[AlertConfig] |
Alert and notification configuration |
enable_statistics |
bool |
Enable real-time statistics collection |
Enhanced Methods
get_statistics() -> MonitoringStats
Get current monitoring statistics and metrics.
stats = client.get_statistics()
print(f"Messages: {stats.total_messages}")
print(f"Uptime: {stats.connection_uptime}s")
async export_data(format: str = "json", file_path: Optional[str] = None) -> str
Export collected data to specified format.
file_path = await client.export_data("csv", "data.csv")
print(f"Data exported to: {file_path}")
update_filter(new_filter: TokenFilter) -> None
Update token filtering criteria dynamically.
add_alert_condition(condition: Dict[str, Any]) -> None
Add new alert condition at runtime.
Enhanced Events
@client.on_filtered_pool
Triggered when a pool passes all filter criteria.
@client.on_alert_triggered
Triggered when alert conditions are met.
@client.on_statistics_update
Periodic statistics updates (every 60 seconds).
Data Models
Pydantic data models for type-safe data handling and validation.
NewPoolInfo
Information about new liquidity pools
c: str
- Chain identifierp: List[PoolData]
- Pool data listrg: Optional[str]
- Region
PoolData
Individual pool information
a: str
- Pool addressex: str
- Exchange nameba: str
- Base token addressqa: str
- Quote token addressbti: Optional[TokenInfo]
- Base token info
TokenInfo
Detailed token information
s: Optional[str]
- Symboln: Optional[str]
- Namemc: Optional[int]
- Market capv24h: Optional[float]
- 24h volumep: Optional[float]
- Price
PairUpdateData
Trading pair price/volume updates
pair_address: str
token_address: str
price_usd: Optional[Decimal]
volume_24h_usd: Optional[Decimal]
chain: str
WalletTradeData
Wallet trading activity
wallet_address: str
chain: str
trades: List[TradeData]
total_volume_24h_usd: Optional[Decimal]
TradeData
Individual trade information
transaction_hash: str
trade_type: Literal["buy", "sell"]
amount_usd: Decimal
timestamp: datetime
Configuration Models
TokenFilter
Configure token filtering criteria
from decimal import Decimal
from gmgnapi import TokenFilter
filter_config = TokenFilter(
min_market_cap=Decimal("100000"), # $100K minimum
max_market_cap=Decimal("10000000"), # $10M maximum
min_liquidity=Decimal("50000"), # $50K minimum liquidity
min_volume_24h=Decimal("10000"), # $10K 24h volume
exchanges=["raydium", "orca"], # Specific exchanges
max_risk_score=0.3 # Low risk only
)
DataExportConfig
Configure data export settings
from gmgnapi import DataExportConfig
export_config = DataExportConfig(
enabled=True,
format="json", # "json", "csv", "parquet"
file_path="./data/pools.json",
max_file_size_mb=100,
rotation_interval_hours=24,
compress=True,
include_metadata=True
)
AlertConfig
Configure alerts and notifications
from gmgnapi import AlertConfig
alert_config = AlertConfig(
enabled=True,
webhook_url="https://hooks.slack.com/...",
email="alerts@example.com",
conditions=[
{
"field": "market_cap",
"operator": ">",
"value": 1000000
},
{
"field": "volume_24h",
"operator": ">",
"value": 500000
}
],
rate_limit_seconds=60
)
Exceptions
GmGnAPIError
Base exception class for all GmGnAPI errors
ConnectionError
Raised when WebSocket connection fails
AuthenticationError
Raised when authentication fails for protected channels
ValidationError
Raised when received data fails validation
SubscriptionError
Raised when subscription to a channel fails
Exception Handling Example
from gmgnapi import GmGnClient, GmGnAPIError, AuthenticationError
try:
async with GmGnClient() as client:
await client.subscribe_wallet_trades(["wallet_address"])
await client.listen()
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except GmGnAPIError as e:
print(f"API error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")