Basic Pool Monitoring
Simple example to monitor new liquidity pools in real-time.
Basic Pool Monitor
import asyncio
import logging
from datetime import datetime
from gmgnapi import GmGnClient
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def basic_pool_monitor():
"""
Monitor new Solana pools and log basic information.
Perfect for getting started with real-time blockchain data.
"""
async with GmGnClient() as client:
# Subscribe to new pools on Solana
await client.subscribe_new_pools(chain="sol")
@client.on_new_pool
async def handle_new_pool(pool_data):
timestamp = datetime.now().strftime("%H:%M:%S")
logger.info(f"[{timestamp}] New pools detected: {len(pool_data.pools)}")
for pool in pool_data.pools:
if pool.bti: # Base token info available
token_info = pool.bti
logger.info(f" šŖ Token: {token_info.s} ({token_info.n})")
logger.info(f" š° Market Cap: ${token_info.mc:,.2f}" if token_info.mc else " š° Market Cap: Unknown")
logger.info(f" šŖ Exchange: {pool.ex}")
logger.info(f" š Pool Address: {pool.a}")
logger.info(" " + "="*50)
@client.on_connect
async def handle_connect():
logger.info("ā
Connected to GMGN WebSocket")
@client.on_disconnect
async def handle_disconnect():
logger.warning("ā ļø Disconnected from GMGN WebSocket")
@client.on_error
async def handle_error(error):
logger.error(f"ā Error: {error}")
logger.info("š Starting basic pool monitor...")
await client.listen()
if __name__ == "__main__":
try:
asyncio.run(basic_pool_monitor())
except KeyboardInterrupt:
logger.info("š Monitor stopped by user")
except Exception as e:
logger.error(f"š„ Unexpected error: {e}")
raise
Advanced Token Filtering
Filter tokens based on market cap, volume, and risk criteria.
Smart Token Filter
import asyncio
from decimal import Decimal
from gmgnapi import GmGnEnhancedClient, TokenFilter
class SmartTokenMonitor:
"""
Advanced token monitoring with intelligent filtering.
Only shows tokens that meet specific quality criteria.
"""
def __init__(self):
# Define filter for high-quality tokens
self.quality_filter = TokenFilter(
min_market_cap=Decimal("50000"), # Minimum $50K market cap
max_market_cap=Decimal("50000000"), # Maximum $50M market cap
min_liquidity=Decimal("25000"), # Minimum $25K liquidity
min_volume_24h=Decimal("5000"), # Minimum $5K daily volume
exchanges=["raydium", "orca", "meteora"], # Trusted exchanges only
max_risk_score=0.4, # Medium-low risk only
exclude_symbols=["SCAM", "TEST", "FAKE"] # Filter out obvious scams
)
# Track statistics
self.total_pools_seen = 0
self.quality_pools_found = 0
async def start_monitoring(self):
"""Start the smart monitoring system."""
async with GmGnEnhancedClient(token_filter=self.quality_filter) as client:
await client.subscribe_new_pools(chain="sol")
@client.on_new_pool
async def handle_all_pools(pool_data):
"""Track all pools for statistics."""
self.total_pools_seen += len(pool_data.pools)
@client.on_filtered_pool
async def handle_quality_pool(pool_data):
"""Handle pools that pass quality filters."""
self.quality_pools_found += 1
for pool in pool_data.pools:
if pool.bti:
await self.analyze_quality_token(pool)
# Log statistics
filter_rate = (self.quality_pools_found / max(self.total_pools_seen, 1)) * 100
print(f"š Filter Rate: {filter_rate:.1f}% ({self.quality_pools_found}/{self.total_pools_seen})")
@client.on_statistics_update
async def handle_stats(stats):
"""Log monitoring statistics."""
print(f"š Monitoring Stats:")
print(f" Messages: {stats.total_messages}")
print(f" Uptime: {stats.connection_uptime:.1f}s")
print(f" Unique Tokens: {stats.unique_tokens_seen}")
print("š§ Starting smart token monitor...")
await client.listen()
async def analyze_quality_token(self, pool):
"""Analyze a quality token that passed filters."""
token = pool.bti
print(f"\nšÆ QUALITY TOKEN FOUND:")
print(f" Symbol: {token.s}")
print(f" Name: {token.n}")
print(f" Market Cap: ${token.mc:,.2f}")
print(f" 24h Volume: ${token.v24h:,.2f}" if token.v24h else " 24h Volume: Unknown")
print(f" Exchange: {pool.ex}")
print(f" Pool: {pool.a}")
# Additional analysis
if token.mc and token.v24h:
volume_to_mcap_ratio = token.v24h / token.mc
if volume_to_mcap_ratio > 0.1: # High activity
print(f" š„ HIGH ACTIVITY: {volume_to_mcap_ratio:.2%} volume/mcap ratio")
if token.hc and token.hc > 1000: # High holder count
print(f" š„ STRONG COMMUNITY: {token.hc:,} holders")
print(" " + "="*60)
async def main():
monitor = SmartTokenMonitor()
try:
await monitor.start_monitoring()
except KeyboardInterrupt:
print("\nš Smart monitor stopped")
if __name__ == "__main__":
asyncio.run(main())
Data Export & Storage
Export real-time data to various formats for analysis and archiving.
Comprehensive Data Exporter
import asyncio
import json
import csv
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from gmgnapi import GmGnEnhancedClient, DataExportConfig
class DataExporter:
"""
Export blockchain data to multiple formats:
- JSON for programmatic access
- CSV for spreadsheet analysis
- SQLite for structured queries
"""
def __init__(self, data_dir="./blockchain_data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
# Configure different export formats
self.json_config = DataExportConfig(
enabled=True,
format="json",
file_path=str(self.data_dir / "pools.json"),
max_file_size_mb=50,
rotation_interval_hours=12,
compress=True,
include_metadata=True
)
self.csv_config = DataExportConfig(
enabled=True,
format="csv",
file_path=str(self.data_dir / "pools.csv"),
max_file_size_mb=25,
rotation_interval_hours=24
)
# Set up SQLite database
self.setup_database()
def setup_database(self):
"""Initialize SQLite database for structured storage."""
db_path = self.data_dir / "blockchain_data.db"
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS pools (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
pool_address TEXT,
token_symbol TEXT,
token_name TEXT,
market_cap REAL,
volume_24h REAL,
exchange TEXT,
chain TEXT,
holder_count INTEGER
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON pools(timestamp);
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_symbol ON pools(token_symbol);
""")
async def start_export_monitoring(self):
"""Start monitoring and exporting data."""
async with GmGnEnhancedClient(export_config=self.json_config) as client:
await client.subscribe_new_pools(chain="sol")
@client.on_new_pool
async def export_pool_data(pool_data):
"""Export pool data to all configured formats."""
for pool in pool_data.pools:
if pool.bti:
# Export to CSV
await self.export_to_csv(pool)
# Export to SQLite
await self.export_to_sqlite(pool)
# JSON export is handled automatically by the client
print(f"š Exported: {pool.bti.s} to all formats")
print("š¾ Starting data export monitoring...")
await client.listen()
async def export_to_csv(self, pool):
"""Export pool data to CSV format."""
csv_file = self.data_dir / f"pools_{datetime.now().strftime('%Y%m%d')}.csv"
# Check if file exists to write headers
write_headers = not csv_file.exists()
with open(csv_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if write_headers:
writer.writerow([
'timestamp', 'pool_address', 'token_symbol', 'token_name',
'market_cap', 'volume_24h', 'exchange', 'chain', 'holder_count'
])
token = pool.bti
writer.writerow([
datetime.now().isoformat(),
pool.a,
token.s or '',
token.n or '',
token.mc or 0,
token.v24h or 0,
pool.ex,
'sol',
token.hc or 0
])
async def export_to_sqlite(self, pool):
"""Export pool data to SQLite database."""
db_path = self.data_dir / "blockchain_data.db"
with sqlite3.connect(db_path) as conn:
token = pool.bti
conn.execute("""
INSERT INTO pools (
pool_address, token_symbol, token_name, market_cap,
volume_24h, exchange, chain, holder_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
pool.a,
token.s,
token.n,
token.mc,
token.v24h,
pool.ex,
'sol',
token.hc
))
async def generate_daily_report(self):
"""Generate a daily summary report."""
db_path = self.data_dir / "blockchain_data.db"
report_date = datetime.now().strftime('%Y-%m-%d')
with sqlite3.connect(db_path) as conn:
# Get daily statistics
cursor = conn.execute("""
SELECT
COUNT(*) as total_pools,
COUNT(DISTINCT token_symbol) as unique_tokens,
AVG(market_cap) as avg_market_cap,
SUM(volume_24h) as total_volume,
exchange,
COUNT(*) as exchange_count
FROM pools
WHERE date(timestamp) = date('now')
GROUP BY exchange
ORDER BY exchange_count DESC
""")
results = cursor.fetchall()
report_file = self.data_dir / f"daily_report_{report_date}.txt"
with open(report_file, 'w') as f:
f.write(f"Daily Blockchain Data Report - {report_date}\n")
f.write("=" * 50 + "\n\n")
for row in results:
f.write(f"Exchange: {row[4]}\n")
f.write(f" Total Pools: {row[5]}\n")
f.write(f" Avg Market Cap: ${row[2]:,.2f}\n")
f.write(f" Total Volume: ${row[3]:,.2f}\n\n")
print(f"š Daily report generated: {report_file}")
async def main():
exporter = DataExporter()
# Start monitoring in background
monitor_task = asyncio.create_task(exporter.start_export_monitoring())
# Generate daily reports every 24 hours
async def daily_report_scheduler():
while True:
await asyncio.sleep(24 * 60 * 60) # 24 hours
await exporter.generate_daily_report()
report_task = asyncio.create_task(daily_report_scheduler())
try:
await asyncio.gather(monitor_task, report_task)
except KeyboardInterrupt:
print("\nš Data export stopped")
monitor_task.cancel()
report_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Intelligent Alert System
Set up smart alerts for significant market events and opportunities.
Multi-Channel Alert System
import asyncio
import aiohttp
import smtplib
from email.mime.text import MimeText
from datetime import datetime
from decimal import Decimal
from gmgnapi import GmGnEnhancedClient, AlertConfig, TokenFilter
class IntelligentAlertSystem:
"""
Advanced alert system with multiple notification channels:
- Slack/Discord webhooks
- Email notifications
- Console alerts with severity levels
"""
def __init__(self, slack_webhook=None, email_config=None):
self.slack_webhook = slack_webhook
self.email_config = email_config
# Configure alerts for different scenarios
self.alert_config = AlertConfig(
enabled=True,
webhook_url=slack_webhook,
conditions=[
# Large market cap opportunities
{
"name": "large_cap_opportunity",
"field": "market_cap",
"operator": ">",
"value": 1000000,
"severity": "high"
},
# High volume activity
{
"name": "volume_spike",
"field": "volume_24h",
"operator": ">",
"value": 500000,
"severity": "medium"
},
# New exchange listings
{
"name": "major_exchange",
"field": "exchange",
"operator": "in",
"value": ["raydium", "orca"],
"severity": "medium"
}
],
rate_limit_seconds=30 # Prevent spam
)
# Filter for alert-worthy tokens
self.alert_filter = TokenFilter(
min_market_cap=Decimal("10000"),
min_liquidity=Decimal("5000"),
max_risk_score=0.5
)
async def start_alert_monitoring(self):
"""Start the intelligent alert monitoring system."""
async with GmGnEnhancedClient(
token_filter=self.alert_filter,
alert_config=self.alert_config
) as client:
await client.subscribe_new_pools(chain="sol")
@client.on_filtered_pool
async def analyze_for_alerts(pool_data):
"""Analyze filtered pools for alert conditions."""
for pool in pool_data.pools:
if pool.bti:
await self.check_alert_conditions(pool)
@client.on_alert_triggered
async def handle_triggered_alert(alert_data):
"""Handle alerts triggered by the system."""
await self.send_alert_notifications(alert_data)
print("šØ Starting intelligent alert system...")
await client.listen()
async def check_alert_conditions(self, pool):
"""Check pool against custom alert conditions."""
token = pool.bti
# Mega cap alert (>$10M)
if token.mc and token.mc > 10000000:
await self.trigger_custom_alert(
"mega_cap_alert",
f"š MEGA CAP TOKEN: {token.s}",
f"Market Cap: ${token.mc:,.2f}",
"critical"
)
# Rapid growth alert
if token.v24h and token.mc:
volume_ratio = token.v24h / token.mc
if volume_ratio > 0.5: # Volume > 50% of market cap
await self.trigger_custom_alert(
"rapid_growth",
f"š RAPID GROWTH: {token.s}",
f"Volume/MarketCap: {volume_ratio:.1%}",
"high"
)
# Community strength alert
if token.hc and token.hc > 5000:
await self.trigger_custom_alert(
"strong_community",
f"š„ STRONG COMMUNITY: {token.s}",
f"Holders: {token.hc:,}",
"medium"
)
async def trigger_custom_alert(self, alert_type, title, message, severity):
"""Trigger a custom alert with notifications."""
alert_data = {
"type": alert_type,
"title": title,
"message": message,
"severity": severity,
"timestamp": datetime.now().isoformat()
}
await self.send_alert_notifications(alert_data)
async def send_alert_notifications(self, alert_data):
"""Send notifications through all configured channels."""
# Console alert
self.log_console_alert(alert_data)
# Slack notification
if self.slack_webhook:
await self.send_slack_alert(alert_data)
# Email notification for high/critical severity
if self.email_config and alert_data.get("severity") in ["high", "critical"]:
await self.send_email_alert(alert_data)
def log_console_alert(self, alert_data):
"""Log alert to console with color coding."""
severity = alert_data.get("severity", "low")
# Color coding based on severity
colors = {
"critical": "š“",
"high": "š ",
"medium": "š”",
"low": "š¢"
}
icon = colors.get(severity, "ā¹ļø")
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"\n{icon} [{timestamp}] {alert_data['title']}")
print(f" {alert_data['message']}")
print(f" Severity: {severity.upper()}")
print(" " + "="*50)
async def send_slack_alert(self, alert_data):
"""Send alert to Slack webhook."""
if not self.slack_webhook:
return
severity_colors = {
"critical": "#ff0000",
"high": "#ff8800",
"medium": "#ffaa00",
"low": "#00ff00"
}
color = severity_colors.get(alert_data.get("severity"), "#808080")
payload = {
"attachments": [
{
"color": color,
"title": alert_data["title"],
"text": alert_data["message"],
"fields": [
{
"title": "Severity",
"value": alert_data.get("severity", "unknown").upper(),
"short": True
},
{
"title": "Time",
"value": alert_data["timestamp"],
"short": True
}
]
}
]
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.slack_webhook, json=payload) as response:
if response.status == 200:
print("ā
Slack alert sent successfully")
else:
print(f"ā Failed to send Slack alert: {response.status}")
except Exception as e:
print(f"ā Slack alert error: {e}")
async def send_email_alert(self, alert_data):
"""Send email alert for high-priority events."""
if not self.email_config:
return
try:
subject = f"šØ Blockchain Alert: {alert_data['title']}"
body = f"""
Alert Details:
Title: {alert_data['title']}
Message: {alert_data['message']}
Severity: {alert_data.get('severity', 'unknown').upper()}
Time: {alert_data['timestamp']}
This is an automated alert from GmGnAPI monitoring system.
"""
msg = MimeText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
# Send email (implementation depends on your email provider)
print(f"š§ Email alert prepared: {subject}")
except Exception as e:
print(f"ā Email alert error: {e}")
async def main():
# Configure your notification channels
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
EMAIL_CONFIG = {
"from": "alerts@yourapp.com",
"to": "you@email.com",
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your_email@gmail.com",
"password": "your_app_password"
}
alert_system = IntelligentAlertSystem(
slack_webhook=SLACK_WEBHOOK,
email_config=EMAIL_CONFIG
)
try:
await alert_system.start_alert_monitoring()
except KeyboardInterrupt:
print("\nšØ Alert system stopped")
if __name__ == "__main__":
asyncio.run(main())
Whale Activity Monitor
Track large wallet movements and whale trading patterns.
Whale Detection System
import asyncio
from decimal import Decimal
from collections import defaultdict, deque
from datetime import datetime, timedelta
from gmgnapi import GmGnClient
class WhaleActivityMonitor:
"""
Monitor and analyze whale trading activity.
Tracks large transactions and wallet patterns.
"""
def __init__(self, whale_threshold_usd=50000):
self.whale_threshold = Decimal(str(whale_threshold_usd))
# Track whale wallets and their activity
self.whale_wallets = set()
self.wallet_activity = defaultdict(lambda: {
'total_volume': Decimal('0'),
'trade_count': 0,
'first_seen': None,
'last_activity': None,
'tokens_traded': set()
})
# Track recent large transactions
self.recent_large_trades = deque(maxlen=100)
# Known whale wallets (you can populate this from external sources)
self.known_whales = {
# Add known whale addresses here
# "wallet_address": "Whale Name/Description"
}
async def start_whale_monitoring(self):
"""Start monitoring whale activity."""
# Note: Wallet monitoring requires authentication
# Make sure you have GMGN_ACCESS_TOKEN set
async with GmGnClient() as client:
# Monitor new pools for whale involvement
await client.subscribe_new_pools(chain="sol")
# If you have whale wallet addresses, monitor them directly
if self.known_whales:
whale_addresses = list(self.known_whales.keys())
await client.subscribe_wallet_trades(whale_addresses, chain="sol")
@client.on_new_pool
async def analyze_pool_for_whales(pool_data):
"""Analyze new pools for whale involvement."""
for pool in pool_data.pools:
if pool.bti and pool.bti.mc:
await self.check_pool_whale_activity(pool)
@client.on_wallet_trade
async def track_whale_trades(trade_data):
"""Track direct whale trading activity."""
await self.analyze_whale_trade(trade_data)
print("š Starting whale activity monitor...")
print(f"š° Whale threshold: ${self.whale_threshold:,}")
await client.listen()
async def check_pool_whale_activity(self, pool):
"""Check if a new pool shows signs of whale involvement."""
token = pool.bti
# Large initial liquidity suggests whale involvement
if pool.il and pool.il > float(self.whale_threshold):
await self.log_whale_activity(
"large_liquidity",
f"š Large Initial Liquidity: {token.s}",
f"Liquidity: ${pool.il:,.2f} | Pool: {pool.a[:8]}..."
)
# High market cap at launch
if token.mc and token.mc > float(self.whale_threshold * 2):
await self.log_whale_activity(
"high_mcap_launch",
f"š High Market Cap Launch: {token.s}",
f"Market Cap: ${token.mc:,.2f}"
)
# Check volume to market cap ratio (whales often create high volume)
if token.v24h and token.mc:
volume_ratio = token.v24h / token.mc
if volume_ratio > 0.3: # High volume relative to market cap
await self.log_whale_activity(
"high_volume_ratio",
f"š High Volume Activity: {token.s}",
f"Volume/MCap: {volume_ratio:.1%} | Volume: ${token.v24h:,.2f}"
)
async def analyze_whale_trade(self, trade_data):
"""Analyze individual whale trades."""
wallet = trade_data.wallet_address
# Update wallet statistics
wallet_stats = self.wallet_activity[wallet]
for trade in trade_data.trades:
# Check if this is a whale-sized trade
if trade.amount_usd >= self.whale_threshold:
await self.record_whale_trade(wallet, trade)
# Update wallet activity
wallet_stats['total_volume'] += trade_data.total_volume_24h_usd or Decimal('0')
wallet_stats['trade_count'] += len(trade_data.trades)
wallet_stats['last_activity'] = datetime.now()
if not wallet_stats['first_seen']:
wallet_stats['first_seen'] = datetime.now()
# Add wallet to whale list if volume threshold exceeded
if wallet_stats['total_volume'] >= self.whale_threshold:
if wallet not in self.whale_wallets:
self.whale_wallets.add(wallet)
await self.log_whale_activity(
"new_whale_detected",
f"š New Whale Detected: {wallet[:8]}...",
f"Total Volume: ${wallet_stats['total_volume']:,.2f}"
)
async def record_whale_trade(self, wallet, trade):
"""Record a significant whale trade."""
trade_info = {
'wallet': wallet,
'trade': trade,
'timestamp': datetime.now()
}
self.recent_large_trades.append(trade_info)
# Determine whale name if known
whale_name = self.known_whales.get(wallet, f"Whale {wallet[:8]}...")
await self.log_whale_activity(
"whale_trade",
f"š {whale_name} {trade.trade_type.upper()}",
f"Amount: ${trade.amount_usd:,.2f} | Token: {trade.token_address[:8]}..."
)
# Check for unusual patterns
await self.check_whale_patterns(wallet)
async def check_whale_patterns(self, wallet):
"""Check for unusual whale trading patterns."""
# Check recent trades from this wallet
recent_trades = [
t for t in self.recent_large_trades
if t['wallet'] == wallet and
t['timestamp'] > datetime.now() - timedelta(hours=1)
]
if len(recent_trades) >= 3:
total_amount = sum(t['trade'].amount_usd for t in recent_trades)
await self.log_whale_activity(
"whale_activity_burst",
f"ā” Whale Activity Burst: {wallet[:8]}...",
f"3+ trades in 1h | Total: ${total_amount:,.2f}",
severity="high"
)
async def log_whale_activity(self, activity_type, title, details, severity="medium"):
"""Log whale activity with appropriate formatting."""
timestamp = datetime.now().strftime("%H:%M:%S")
# Severity indicators
indicators = {
"low": "š¢",
"medium": "š”",
"high": "š“"
}
indicator = indicators.get(severity, "ā¹ļø")
print(f"\n{indicator} [{timestamp}] {title}")
print(f" {details}")
print(f" Activity Type: {activity_type}")
print(" " + "="*60)
def get_whale_statistics(self):
"""Get current whale monitoring statistics."""
print(f"\nš WHALE MONITORING STATISTICS")
print("=" * 50)
print(f"Known Whales: {len(self.whale_wallets)}")
print(f"Recent Large Trades: {len(self.recent_large_trades)}")
print(f"Monitoring Threshold: ${self.whale_threshold:,}")
# Top whales by volume
top_whales = sorted(
self.wallet_activity.items(),
key=lambda x: x[1]['total_volume'],
reverse=True
)[:5]
print(f"\nTop 5 Whales by Volume:")
for i, (wallet, stats) in enumerate(top_whales, 1):
whale_name = self.known_whales.get(wallet, f"{wallet[:8]}...")
print(f" {i}. {whale_name}")
print(f" Volume: ${stats['total_volume']:,.2f}")
print(f" Trades: {stats['trade_count']}")
async def main():
# Create whale monitor with $50K threshold
whale_monitor = WhaleActivityMonitor(whale_threshold_usd=50000)
try:
await whale_monitor.start_whale_monitoring()
except KeyboardInterrupt:
print("\nš Whale monitor stopped")
# Show final statistics
whale_monitor.get_whale_statistics()
if __name__ == "__main__":
asyncio.run(main())
Quick Examples
Short, focused examples for specific use cases.
Portfolio Tracker
async def track_portfolio():
"""Track specific tokens in your portfolio."""
portfolio_tokens = [
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC
"So11111111111111111111111111111111111111112", # SOL
]
async with GmGnClient() as client:
await client.subscribe_token_updates(portfolio_tokens)
@client.on_token_update
async def handle_portfolio_update(update):
print(f"š¼ {update.token_address[:8]}...")
print(f" Price: ${update.price_usd}")
print(f" 24h Change: {update.price_change_24h:.2%}")
await client.listen()
Price Alert Bot
async def price_alert_bot():
"""Simple price alert system."""
price_targets = {
"SOL": {"above": 200, "below": 150},
"BTC": {"above": 100000, "below": 90000}
}
async with GmGnClient() as client:
await client.subscribe_new_pools()
@client.on_new_pool
async def check_prices(pool_data):
for pool in pool_data.pools:
if pool.bti and pool.bti.s in price_targets:
price = pool.bti.p
symbol = pool.bti.s
targets = price_targets[symbol]
if price > targets["above"]:
print(f"š {symbol} above ${targets['above']}: ${price}")
elif price < targets["below"]:
print(f"š {symbol} below ${targets['below']}: ${price}")
await client.listen()
Volume Scanner
async def volume_scanner():
"""Scan for high volume trading activity."""
volume_threshold = 1000000 # $1M+ volume
async with GmGnClient() as client:
await client.subscribe_new_pools()
@client.on_new_pool
async def scan_volume(pool_data):
for pool in pool_data.pools:
if pool.bti and pool.bti.v24h:
if pool.bti.v24h > volume_threshold:
print(f"š HIGH VOLUME: {pool.bti.s}")
print(f" 24h Volume: ${pool.bti.v24h:,.2f}")
print(f" Exchange: {pool.ex}")
await client.listen()
New Token Announcer
async def new_token_announcer():
"""Announce new tokens with basic info."""
async with GmGnClient() as client:
await client.subscribe_new_pools()
@client.on_new_pool
async def announce_token(pool_data):
for pool in pool_data.pools:
if pool.bti:
token = pool.bti
print(f"š NEW TOKEN: {token.s}")
print(f" Name: {token.n}")
print(f" Market Cap: ${token.mc:,.2f}")
print(f" Pool: {pool.a}")
print(f" Exchange: {pool.ex}")
print(" " + "-"*30)
await client.listen()