Error Handling and Debugging Guide for AxiomTradeAPI
Comprehensive guide to error handling, debugging, and troubleshooting when using AxiomTradeAPI. Learn professional debugging techniques and error recovery strategies used by successful traders on chipa.tech.
Table of Contents
- Error Handling Fundamentals
- Common Error Types
- Robust Error Recovery
- Debugging Techniques
- Logging Best Practices
- Testing Error Scenarios
- Production Error Management
- Troubleshooting Guide
Error Handling Fundamentals
Proper error handling is crucial for building reliable trading systems. The AxiomTradeAPI provides comprehensive error information to help diagnose and resolve issues quickly.
Exception Hierarchy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import logging
from typing import Optional, Dict, Any, List
from enum import Enum
import traceback
import sys
from datetime import datetime
class AxiomErrorType(Enum):
"""Comprehensive error type classification"""
AUTHENTICATION = "authentication"
RATE_LIMIT = "rate_limit"
NETWORK = "network"
API_ERROR = "api_error"
VALIDATION = "validation"
INSUFFICIENT_BALANCE = "insufficient_balance"
TOKEN_NOT_FOUND = "token_not_found"
WEBSOCKET_CONNECTION = "websocket_connection"
DATA_PARSING = "data_parsing"
SYSTEM_ERROR = "system_error"
class AxiomTradeException(Exception):
"""Base exception for all AxiomTradeAPI errors"""
def __init__(self, message: str, error_type: AxiomErrorType = AxiomErrorType.SYSTEM_ERROR,
status_code: Optional[int] = None, response_data: Optional[Dict] = None,
retry_after: Optional[int] = None):
super().__init__(message)
self.message = message
self.error_type = error_type
self.status_code = status_code
self.response_data = response_data or {}
self.retry_after = retry_after
self.timestamp = datetime.utcnow()
self.trace_id = self._generate_trace_id()
def _generate_trace_id(self) -> str:
"""Generate unique trace ID for error tracking"""
import uuid
return str(uuid.uuid4())[:8]
def to_dict(self) -> Dict[str, Any]:
"""Convert exception to dictionary for logging/reporting"""
return {
'message': self.message,
'error_type': self.error_type.value,
'status_code': self.status_code,
'response_data': self.response_data,
'retry_after': self.retry_after,
'timestamp': self.timestamp.isoformat(),
'trace_id': self.trace_id
}
class AuthenticationError(AxiomTradeException):
"""Authentication related errors"""
def __init__(self, message: str = "Authentication failed", **kwargs):
super().__init__(message, AxiomErrorType.AUTHENTICATION, **kwargs)
class RateLimitError(AxiomTradeException):
"""Rate limiting errors"""
def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60, **kwargs):
super().__init__(message, AxiomErrorType.RATE_LIMIT, retry_after=retry_after, **kwargs)
class NetworkError(AxiomTradeException):
"""Network connectivity errors"""
def __init__(self, message: str = "Network error occurred", **kwargs):
super().__init__(message, AxiomErrorType.NETWORK, **kwargs)
class APIError(AxiomTradeException):
"""API server errors"""
def __init__(self, message: str = "API error occurred", **kwargs):
super().__init__(message, AxiomErrorType.API_ERROR, **kwargs)
class ValidationError(AxiomTradeException):
"""Data validation errors"""
def __init__(self, message: str = "Validation failed", **kwargs):
super().__init__(message, AxiomErrorType.VALIDATION, **kwargs)
class InsufficientBalanceError(AxiomTradeException):
"""Insufficient balance for operation"""
def __init__(self, message: str = "Insufficient balance", **kwargs):
super().__init__(message, AxiomErrorType.INSUFFICIENT_BALANCE, **kwargs)
class TokenNotFoundError(AxiomTradeException):
"""Token not found errors"""
def __init__(self, message: str = "Token not found", **kwargs):
super().__init__(message, AxiomErrorType.TOKEN_NOT_FOUND, **kwargs)
class WebSocketError(AxiomTradeException):
"""WebSocket connection errors"""
def __init__(self, message: str = "WebSocket error", **kwargs):
super().__init__(message, AxiomErrorType.WEBSOCKET_CONNECTION, **kwargs)
Error Response Parser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class ErrorResponseParser:
"""
Parse and classify API error responses
Advanced error parsing from chipa.tech error handling system
"""
@staticmethod
def parse_error_response(status_code: int, response_data: Dict[str, Any]) -> AxiomTradeException:
"""Parse API error response and return appropriate exception"""
error_message = response_data.get('error', 'Unknown error')
error_code = response_data.get('code', 'UNKNOWN')
details = response_data.get('details', {})
# Authentication errors
if status_code in [401, 403]:
if 'token' in error_message.lower() or 'auth' in error_message.lower():
return AuthenticationError(
message=f"Authentication failed: {error_message}",
status_code=status_code,
response_data=response_data
)
# Rate limiting
if status_code == 429:
retry_after = int(response_data.get('retry_after', 60))
return RateLimitError(
message=f"Rate limit exceeded: {error_message}",
status_code=status_code,
response_data=response_data,
retry_after=retry_after
)
# Validation errors
if status_code == 400:
if error_code in ['INVALID_ADDRESS', 'INVALID_AMOUNT', 'INVALID_TOKEN']:
return ValidationError(
message=f"Validation error: {error_message}",
status_code=status_code,
response_data=response_data
)
if error_code == 'INSUFFICIENT_BALANCE':
return InsufficientBalanceError(
message=f"Insufficient balance: {error_message}",
status_code=status_code,
response_data=response_data
)
# Not found errors
if status_code == 404:
if 'token' in error_message.lower():
return TokenNotFoundError(
message=f"Token not found: {error_message}",
status_code=status_code,
response_data=response_data
)
# Server errors
if status_code >= 500:
return APIError(
message=f"Server error: {error_message}",
status_code=status_code,
response_data=response_data
)
# Default to generic API error
return APIError(
message=f"API error: {error_message}",
status_code=status_code,
response_data=response_data
)
Error Recovery Strategies
Implementing robust error recovery ensures your trading bot can handle failures gracefully:
Retry Logic with Exponential Backoff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio
import random
from typing import Callable, Any, TypeVar, Generic
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry behavior"""
def __init__(self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
class RetryHandler:
"""
Advanced retry handler with exponential backoff
Production-tested strategies from chipa.tech reliability engineering
"""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self.logger = logging.getLogger(__name__)
async def execute_with_retry(self,
operation: Callable[[], Any],
operation_name: str = "operation",
retry_on: tuple = (Exception,),
no_retry_on: tuple = (AuthenticationError, ValidationError)) -> Any:
"""Execute operation with intelligent retry logic"""
last_exception = None
for attempt in range(1, self.config.max_attempts + 1):
try:
self.logger.debug(f"Executing {operation_name} (attempt {attempt}/{self.config.max_attempts})")
result = await operation()
# Log successful retry
if attempt > 1:
self.logger.info(f"✅ {operation_name} succeeded on attempt {attempt}")
return result
except no_retry_on as e:
self.logger.error(f"❌ {operation_name} failed with non-retryable error: {e}")
raise
except retry_on as e:
last_exception = e
# Don't retry on last attempt
if attempt == self.config.max_attempts:
break
# Calculate delay with exponential backoff
delay = self._calculate_delay(attempt, e)
self.logger.warning(
f"⚠️ {operation_name} failed on attempt {attempt}: {str(e)[:100]}... "
f"Retrying in {delay:.2f}s"
)
await asyncio.sleep(delay)
# All attempts failed
self.logger.error(f"❌ {operation_name} failed after {self.config.max_attempts} attempts")
raise last_exception
def _calculate_delay(self, attempt: int, exception: Exception) -> float:
"""Calculate delay with exponential backoff and jitter"""
# Check for rate limit specific delay
if isinstance(exception, RateLimitError) and exception.retry_after:
base_delay = exception.retry_after
else:
base_delay = self.config.base_delay
# Exponential backoff
delay = base_delay * (self.config.exponential_base ** (attempt - 1))
# Apply maximum delay limit
delay = min(delay, self.config.max_delay)
# Add jitter to prevent thundering herd
if self.config.jitter:
jitter_amount = delay * 0.1 # 10% jitter
delay += random.uniform(-jitter_amount, jitter_amount)
return max(0, delay)
# Usage example with AxiomTradeAPI
class RobustAxiomClient:
"""
Robust AxiomTradeAPI client with comprehensive error handling
Professional error handling patterns from chipa.tech trading systems
"""
def __init__(self, auth_token: str):
self.auth_token = auth_token
self.retry_handler = RetryHandler(
RetryConfig(
max_attempts=5,
base_delay=1.0,
max_delay=30.0,
exponential_base=2.0,
jitter=True
)
)
self.logger = logging.getLogger(__name__)
self.circuit_breaker = CircuitBreaker()
async def get_balance_with_retry(self, wallet_address: str) -> Dict[str, Any]:
"""Get balance with robust error handling"""
async def operation():
return await self._get_balance_internal(wallet_address)
try:
return await self.retry_handler.execute_with_retry(
operation=operation,
operation_name=f"get_balance({wallet_address})",
retry_on=(NetworkError, APIError, RateLimitError),
no_retry_on=(AuthenticationError, ValidationError, TokenNotFoundError)
)
except Exception as e:
self.logger.error(f"Failed to get balance for {wallet_address}: {e}")
# Return safe default or raise based on your requirements
raise
async def _get_balance_internal(self, wallet_address: str) -> Dict[str, Any]:
"""Internal balance retrieval with error parsing"""
try:
# Simulate API call - replace with actual AxiomTradeAPI call
response = await self._make_api_call(f'/balance/{wallet_address}')
return response
except Exception as raw_error:
# Parse and classify the error
if hasattr(raw_error, 'status_code') and hasattr(raw_error, 'response'):
parsed_error = ErrorResponseParser.parse_error_response(
raw_error.status_code,
raw_error.response
)
raise parsed_error
else:
# Network or other system error
raise NetworkError(f"Network error: {str(raw_error)}")
async def _make_api_call(self, endpoint: str) -> Dict[str, Any]:
"""Make API call with circuit breaker protection"""
return await self.circuit_breaker.call(
self._raw_api_call,
endpoint
)
async def _raw_api_call(self, endpoint: str) -> Dict[str, Any]:
"""Raw API call implementation"""
# Implement actual API call here
# This is a placeholder
pass
Circuit Breaker Pattern
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import time
from enum import Enum
from typing import Callable, Any
class CircuitBreakerState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing fast
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""
Circuit breaker pattern for API reliability
Advanced resilience patterns from chipa.tech infrastructure
"""
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: tuple = (Exception,)):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
self.logger = logging.getLogger(__name__)
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function through circuit breaker"""
if self.state == CircuitBreakerState.OPEN:
if self._should_attempt_reset():
self.state = CircuitBreakerState.HALF_OPEN
self.logger.info("🔄 Circuit breaker transitioning to HALF_OPEN")
else:
raise APIError("Circuit breaker is OPEN - failing fast")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if we should attempt to reset the circuit breaker"""
return (
self.last_failure_time and
time.time() - self.last_failure_time >= self.recovery_timeout
)
def _on_success(self):
"""Handle successful operation"""
if self.state == CircuitBreakerState.HALF_OPEN:
self.logger.info("✅ Circuit breaker reset to CLOSED")
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
def _on_failure(self):
"""Handle failed operation"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
if self.state != CircuitBreakerState.OPEN:
self.logger.warning(f"🚨 Circuit breaker OPENED after {self.failure_count} failures")
self.state = CircuitBreakerState.OPEN
def get_state(self) -> Dict[str, Any]:
"""Get current circuit breaker state"""
return {
'state': self.state.value,
'failure_count': self.failure_count,
'failure_threshold': self.failure_threshold,
'last_failure_time': self.last_failure_time,
'time_until_retry': max(0, self.recovery_timeout - (time.time() - (self.last_failure_time or 0)))
}
Debugging Techniques
Advanced debugging techniques for identifying and resolving issues:
Comprehensive Debug Logger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
import os
from typing import Dict, Any, Optional
import inspect
import functools
class DebugLogger:
"""
Advanced debugging logger for AxiomTradeAPI
Professional debugging techniques from chipa.tech development team
"""
def __init__(self, enable_debug: bool = None):
self.enable_debug = enable_debug or os.getenv('AXIOM_DEBUG', 'false').lower() == 'true'
self.logger = logging.getLogger('axiom_debug')
if self.enable_debug:
self.logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'🐛 %(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def debug_api_call(self, func_name: str = None):
"""Decorator for debugging API calls"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if not self.enable_debug:
return await func(*args, **kwargs)
call_name = func_name or func.__name__
call_id = self._generate_call_id()
# Log entry
self.logger.debug(f"🚀 [{call_id}] Starting {call_name}")
self.logger.debug(f"📝 [{call_id}] Args: {self._safe_repr(args)}")
self.logger.debug(f"📝 [{call_id}] Kwargs: {self._safe_repr(kwargs)}")
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
self.logger.debug(f"✅ [{call_id}] Completed {call_name} in {execution_time:.3f}s")
self.logger.debug(f"📤 [{call_id}] Result: {self._safe_repr(result)}")
return result
except Exception as e:
execution_time = time.time() - start_time
self.logger.debug(f"❌ [{call_id}] Failed {call_name} in {execution_time:.3f}s")
self.logger.debug(f"🔥 [{call_id}] Error: {str(e)}")
self.logger.debug(f"📚 [{call_id}] Traceback: {traceback.format_exc()}")
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if not self.enable_debug:
return func(*args, **kwargs)
call_name = func_name or func.__name__
call_id = self._generate_call_id()
self.logger.debug(f"🚀 [{call_id}] Starting {call_name}")
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
self.logger.debug(f"✅ [{call_id}] Completed {call_name} in {execution_time:.3f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
self.logger.debug(f"❌ [{call_id}] Failed {call_name} in {execution_time:.3f}s: {e}")
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def debug_websocket_message(self, message: Dict[str, Any], direction: str = "received"):
"""Debug WebSocket messages"""
if not self.enable_debug:
return
direction_emoji = "📥" if direction == "received" else "📤"
self.logger.debug(f"{direction_emoji} WebSocket {direction}: {self._safe_repr(message)}")
def debug_state_change(self, component: str, old_state: Any, new_state: Any):
"""Debug state changes"""
if not self.enable_debug:
return
self.logger.debug(f"🔄 {component} state change: {old_state} -> {new_state}")
def _generate_call_id(self) -> str:
"""Generate unique call ID for tracking"""
import uuid
return str(uuid.uuid4())[:8]
def _safe_repr(self, obj: Any, max_length: int = 500) -> str:
"""Safe representation of objects for logging"""
try:
if isinstance(obj, dict):
# Hide sensitive information
safe_obj = {}
for key, value in obj.items():
if any(sensitive in key.lower() for sensitive in ['token', 'key', 'secret', 'password']):
safe_obj[key] = "[HIDDEN]"
else:
safe_obj[key] = value
repr_str = json.dumps(safe_obj, indent=2, default=str)
else:
repr_str = repr(obj)
if len(repr_str) > max_length:
repr_str = repr_str[:max_length] + "... [TRUNCATED]"
return repr_str
except Exception:
return f"<Unable to represent {type(obj).__name__}>"
# Usage with AxiomTradeAPI
class DebuggableAxiomClient:
"""AxiomTradeAPI client with comprehensive debugging"""
def __init__(self, auth_token: str, enable_debug: bool = False):
self.auth_token = auth_token
self.debug_logger = DebugLogger(enable_debug)
self.client_id = self._generate_client_id()
def _generate_client_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
@property
def debug_api_call(self):
return self.debug_logger.debug_api_call
@debug_api_call("get_balance")
async def get_balance(self, wallet_address: str) -> Dict[str, Any]:
"""Get balance with debugging"""
# Validate input
if not wallet_address:
raise ValidationError("Wallet address is required")
if len(wallet_address) < 32: # Basic Solana address validation
raise ValidationError("Invalid wallet address format")
# Make API call
return await self._make_api_request(f'/balance/{wallet_address}')
@debug_api_call("websocket_connect")
async def connect_websocket(self) -> None:
"""Connect to WebSocket with debugging"""
self.debug_logger.debug_state_change("WebSocket", "disconnected", "connecting")
try:
# WebSocket connection logic here
self.debug_logger.debug_state_change("WebSocket", "connecting", "connected")
except Exception as e:
self.debug_logger.debug_state_change("WebSocket", "connecting", "failed")
raise WebSocketError(f"Failed to connect WebSocket: {e}")
async def _make_api_request(self, endpoint: str) -> Dict[str, Any]:
"""Make API request with debug logging"""
# This would contain actual API request logic
pass
Advanced Error Analysis
Error Pattern Detection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import statistics
class ErrorAnalyzer:
"""
Advanced error pattern analysis
Error intelligence from chipa.tech monitoring systems
"""
def __init__(self, retention_hours: int = 24):
self.retention_hours = retention_hours
self.errors = []
self.error_patterns = defaultdict(list)
self.error_stats = defaultdict(int)
def record_error(self, error: AxiomTradeException, context: Dict[str, Any] = None):
"""Record error for analysis"""
error_record = {
'timestamp': datetime.utcnow(),
'error_type': error.error_type.value,
'message': error.message,
'status_code': error.status_code,
'trace_id': error.trace_id,
'context': context or {}
}
self.errors.append(error_record)
self.error_stats[error.error_type.value] += 1
# Clean old errors
self._cleanup_old_errors()
# Analyze patterns
self._analyze_error_patterns()
def _cleanup_old_errors(self):
"""Remove errors older than retention period"""
cutoff_time = datetime.utcnow() - timedelta(hours=self.retention_hours)
self.errors = [e for e in self.errors if e['timestamp'] > cutoff_time]
def _analyze_error_patterns(self):
"""Analyze error patterns and trends"""
if len(self.errors) < 10: # Need minimum errors for analysis
return
# Group errors by time windows
time_windows = defaultdict(list)
for error in self.errors:
# Group by 5-minute windows
window = error['timestamp'].replace(minute=(error['timestamp'].minute // 5) * 5, second=0, microsecond=0)
time_windows[window].append(error)
# Detect error spikes
error_counts = [len(errors) for errors in time_windows.values()]
if len(error_counts) > 1:
mean_errors = statistics.mean(error_counts)
std_errors = statistics.stdev(error_counts) if len(error_counts) > 1 else 0
for window, errors in time_windows.items():
if len(errors) > mean_errors + (2 * std_errors): # 2 standard deviations
self._alert_error_spike(window, errors)
def _alert_error_spike(self, window: datetime, errors: List[Dict]):
"""Alert on error spike detection"""
error_types = Counter(e['error_type'] for e in errors)
print(f"🚨 ERROR SPIKE DETECTED at {window}")
print(f" Total errors: {len(errors)}")
print(f" Error types: {dict(error_types)}")
# Additional analysis could trigger alerts to monitoring systems
def get_error_summary(self) -> Dict[str, Any]:
"""Get comprehensive error summary"""
if not self.errors:
return {'status': 'No errors recorded'}
recent_errors = [e for e in self.errors if e['timestamp'] > datetime.utcnow() - timedelta(hours=1)]
error_types = Counter(e['error_type'] for e in self.errors)
recent_error_types = Counter(e['error_type'] for e in recent_errors)
# Calculate error rate trends
hourly_counts = defaultdict(int)
for error in self.errors:
hour = error['timestamp'].replace(minute=0, second=0, microsecond=0)
hourly_counts[hour] += 1
return {
'total_errors': len(self.errors),
'recent_errors_1h': len(recent_errors),
'error_types': dict(error_types),
'recent_error_types': dict(recent_error_types),
'hourly_error_trend': dict(hourly_counts),
'most_common_error': error_types.most_common(1)[0] if error_types else None,
'error_rate_per_hour': len(self.errors) / max(1, self.retention_hours)
}
def get_troubleshooting_suggestions(self) -> List[str]:
"""Get troubleshooting suggestions based on error patterns"""
suggestions = []
error_types = Counter(e['error_type'] for e in self.errors)
# Authentication errors
if error_types.get('authentication', 0) > 5:
suggestions.append("🔐 Multiple authentication errors detected. Check your API token validity.")
suggestions.append("🔄 Consider implementing token refresh mechanism.")
# Rate limiting
if error_types.get('rate_limit', 0) > 3:
suggestions.append("⏱️ Rate limiting detected. Implement exponential backoff.")
suggestions.append("🚀 Consider upgrading to higher rate limit tier.")
# Network errors
if error_types.get('network', 0) > 10:
suggestions.append("🌐 High network error rate. Check internet connectivity.")
suggestions.append("🔄 Implement connection pooling and retry logic.")
# WebSocket errors
if error_types.get('websocket_connection', 0) > 5:
suggestions.append("🔌 WebSocket connection issues. Check firewall settings.")
suggestions.append("💔 Implement WebSocket reconnection logic.")
return suggestions
Production Error Management
Centralized Error Handling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
import json
import aiofiles
from typing import Dict, Any, Optional
import httpx
class ProductionErrorHandler:
"""
Production-grade error handling and reporting
Enterprise error management from chipa.tech production systems
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.error_analyzer = ErrorAnalyzer()
self.logger = logging.getLogger('production_errors')
# External monitoring integration
self.webhook_url = config.get('webhook_url')
self.slack_webhook = config.get('slack_webhook')
self.enable_file_logging = config.get('enable_file_logging', True)
if self.enable_file_logging:
self._setup_file_logging()
def _setup_file_logging(self):
"""Setup file logging for errors"""
os.makedirs('logs', exist_ok=True)
file_handler = logging.FileHandler('logs/errors.log')
file_handler.setLevel(logging.ERROR)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
async def handle_error(self, error: Exception, context: Dict[str, Any] = None):
"""Centralized error handling"""
# Convert to AxiomTradeException if needed
if not isinstance(error, AxiomTradeException):
axiom_error = AxiomTradeException(
message=str(error),
error_type=AxiomErrorType.SYSTEM_ERROR
)
else:
axiom_error = error
# Record for analysis
self.error_analyzer.record_error(axiom_error, context)
# Log error
self.logger.error(f"Error occurred: {axiom_error.to_dict()}")
# Determine severity
severity = self._determine_severity(axiom_error)
# Handle based on severity
if severity == 'critical':
await self._handle_critical_error(axiom_error, context)
elif severity == 'high':
await self._handle_high_severity_error(axiom_error, context)
elif severity == 'medium':
await self._handle_medium_severity_error(axiom_error, context)
# Send to external monitoring if configured
if self.webhook_url:
await self._send_to_webhook(axiom_error, context, severity)
if self.slack_webhook:
await self._send_to_slack(axiom_error, context, severity)
def _determine_severity(self, error: AxiomTradeException) -> str:
"""Determine error severity"""
critical_errors = [AxiomErrorType.AUTHENTICATION, AxiomErrorType.SYSTEM_ERROR]
high_errors = [AxiomErrorType.API_ERROR, AxiomErrorType.WEBSOCKET_CONNECTION]
medium_errors = [AxiomErrorType.RATE_LIMIT, AxiomErrorType.NETWORK]
if error.error_type in critical_errors:
return 'critical'
elif error.error_type in high_errors:
return 'high'
elif error.error_type in medium_errors:
return 'medium'
else:
return 'low'
async def _handle_critical_error(self, error: AxiomTradeException, context: Dict[str, Any]):
"""Handle critical errors"""
print(f"🚨 CRITICAL ERROR: {error.message}")
# Save error details
await self._save_error_details(error, context)
# Could trigger emergency procedures here
# e.g., stop trading, send alerts, etc.
async def _handle_high_severity_error(self, error: AxiomTradeException, context: Dict[str, Any]):
"""Handle high severity errors"""
print(f"⚠️ HIGH SEVERITY ERROR: {error.message}")
async def _handle_medium_severity_error(self, error: AxiomTradeException, context: Dict[str, Any]):
"""Handle medium severity errors"""
print(f"⚡ MEDIUM SEVERITY ERROR: {error.message}")
async def _save_error_details(self, error: AxiomTradeException, context: Dict[str, Any]):
"""Save detailed error information"""
error_details = {
'error': error.to_dict(),
'context': context,
'system_info': {
'python_version': sys.version,
'platform': sys.platform,
'timestamp': datetime.utcnow().isoformat()
}
}
filename = f"logs/error_detail_{error.trace_id}.json"
async with aiofiles.open(filename, 'w') as f:
await f.write(json.dumps(error_details, indent=2, default=str))
async def _send_to_webhook(self, error: AxiomTradeException, context: Dict[str, Any], severity: str):
"""Send error to external webhook"""
try:
payload = {
'severity': severity,
'error': error.to_dict(),
'context': context,
'timestamp': datetime.utcnow().isoformat()
}
async with httpx.AsyncClient() as client:
response = await client.post(self.webhook_url, json=payload, timeout=10)
response.raise_for_status()
except Exception as e:
self.logger.error(f"Failed to send error to webhook: {e}")
async def _send_to_slack(self, error: AxiomTradeException, context: Dict[str, Any], severity: str):
"""Send error alert to Slack"""
try:
severity_emoji = {
'critical': '🚨',
'high': '⚠️',
'medium': '⚡',
'low': 'ℹ️'
}
message = {
'text': f"{severity_emoji.get(severity, 'ℹ️')} AxiomTradeAPI Error - {severity.upper()}",
'attachments': [
{
'color': 'danger' if severity == 'critical' else 'warning',
'fields': [
{'title': 'Error Type', 'value': error.error_type.value, 'short': True},
{'title': 'Message', 'value': error.message[:200], 'short': False},
{'title': 'Trace ID', 'value': error.trace_id, 'short': True},
{'title': 'Timestamp', 'value': error.timestamp.isoformat(), 'short': True}
]
}
]
}
async with httpx.AsyncClient() as client:
response = await client.post(self.slack_webhook, json=message, timeout=10)
response.raise_for_status()
except Exception as e:
self.logger.error(f"Failed to send error to Slack: {e}")
def get_health_status(self) -> Dict[str, Any]:
"""Get system health status based on error patterns"""
error_summary = self.error_analyzer.get_error_summary()
# Determine overall health
recent_errors = error_summary.get('recent_errors_1h', 0)
error_rate = error_summary.get('error_rate_per_hour', 0)
if recent_errors > 50 or error_rate > 20:
health_status = 'unhealthy'
elif recent_errors > 20 or error_rate > 10:
health_status = 'degraded'
else:
health_status = 'healthy'
return {
'status': health_status,
'error_summary': error_summary,
'troubleshooting_suggestions': self.error_analyzer.get_troubleshooting_suggestions()
}
Troubleshooting Guide
Common issues and their solutions:
Quick Troubleshooting Checklist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
🔍 AxiomTradeAPI Troubleshooting Checklist
==========================================
🔐 Authentication Issues:
□ Verify API token is valid and not expired
□ Check token permissions and scopes
□ Ensure proper authorization headers
□ Test with minimal API call first
🌐 Network Connectivity:
□ Check internet connection stability
□ Verify API endpoints are accessible
□ Test with curl or similar tool
□ Check firewall and proxy settings
⚡ Performance Issues:
□ Monitor API response times
□ Check for rate limiting
□ Verify connection pooling is enabled
□ Review concurrent request limits
🔌 WebSocket Problems:
□ Test WebSocket connectivity separately
□ Check for proxy/firewall blocking WebSockets
□ Verify authentication for WebSocket connection
□ Implement reconnection logic
💾 Memory Issues:
□ Monitor memory usage over time
□ Check for memory leaks in long-running processes
□ Implement proper cleanup procedures
□ Use memory profiling tools
📊 Data Issues:
□ Validate input data format
□ Check API response format changes
□ Verify token addresses are correct
□ Handle missing or null data gracefully
Error Code Reference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
ERROR_CODES = {
# Authentication Errors (401, 403)
'INVALID_TOKEN': {
'description': 'API token is invalid or expired',
'solution': 'Generate a new API token from your dashboard',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/authentication'
},
'INSUFFICIENT_PERMISSIONS': {
'description': 'Token lacks required permissions',
'solution': 'Update token permissions in your account settings',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/permissions'
},
# Rate Limiting (429)
'RATE_LIMIT_EXCEEDED': {
'description': 'Too many requests in time window',
'solution': 'Implement exponential backoff and respect rate limits',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/rate-limits'
},
# Validation Errors (400)
'INVALID_WALLET_ADDRESS': {
'description': 'Wallet address format is invalid',
'solution': 'Verify Solana address format (44 characters, base58)',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/wallet-addresses'
},
'INVALID_TOKEN_ADDRESS': {
'description': 'Token address format is invalid',
'solution': 'Check token address on Solana explorer',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/token-addresses'
},
'INSUFFICIENT_BALANCE': {
'description': 'Wallet has insufficient balance for operation',
'solution': 'Check wallet balance and add funds if needed',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/balance-management'
},
# Not Found Errors (404)
'TOKEN_NOT_FOUND': {
'description': 'Specified token does not exist',
'solution': 'Verify token address on Solana blockchain',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/token-lookup'
},
'WALLET_NOT_FOUND': {
'description': 'Wallet address not found or has no activity',
'solution': 'Check wallet address and ensure it exists on-chain',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/wallet-lookup'
},
# Server Errors (500+)
'INTERNAL_SERVER_ERROR': {
'description': 'Internal server error occurred',
'solution': 'Retry request with exponential backoff',
'documentation': 'https://chipa.tech/axiomtradeapi-docs/error-handling'
},
'SERVICE_UNAVAILABLE': {
'description': 'Service temporarily unavailable',
'solution': 'Check service status and retry later',
'documentation': 'https://chipa.tech/status'
}
}
def get_error_help(error_code: str) -> Dict[str, str]:
"""Get help information for specific error code"""
return ERROR_CODES.get(error_code, {
'description': 'Unknown error code',
'solution': 'Check API documentation for details',
'documentation': 'https://chipa.tech/axiomtradeapi-docs'
})
Best Practices Summary
Error Handling Principles
- Fail Fast: Detect and handle errors as early as possible
- Graceful Degradation: Provide fallback behavior when possible
- Comprehensive Logging: Log all errors with sufficient context
- User-Friendly Messages: Provide clear, actionable error messages
- Monitoring Integration: Connect errors to monitoring systems
Production Readiness Checklist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
✅ Production Error Handling Checklist
=====================================
🔧 Error Classification:
□ All error types properly classified
□ Custom exceptions for business logic errors
□ Appropriate error codes and messages
🔄 Retry Logic:
□ Exponential backoff implemented
□ Maximum retry limits set
□ Non-retryable errors identified
🚨 Monitoring:
□ Error tracking and alerting configured
□ Health check endpoints implemented
□ Performance metrics collected
📝 Logging:
□ Structured logging format
□ Appropriate log levels
□ Sensitive data sanitization
🧪 Testing:
□ Error scenarios covered in tests
□ Chaos engineering practices
□ Load testing with error injection
Community Support
For additional help with error handling and debugging:
- 🛠️ Technical Support: chipa.tech/support
- 📚 Documentation: chipa.tech/axiomtradeapi-docs
- 💬 Discord Community: chipa.tech/discord
- 🐛 Bug Reports: chipa.tech/bug-reports
- 📈 Status Page: chipa.tech/status
Conclusion
Effective error handling is crucial for building reliable trading systems. This guide provides comprehensive strategies for handling errors, debugging issues, and maintaining production stability with AxiomTradeAPI.
Key takeaways:
- Implement comprehensive error classification and handling
- Use retry logic with exponential backoff
- Set up proper monitoring and alerting
- Test error scenarios thoroughly
- Connect with the chipa.tech community for support
Remember that good error handling is not just about catching exceptions—it’s about providing a robust, reliable experience for your users and maintaining system stability under all conditions.
This error handling guide represents production-tested practices from successful trading systems. For the latest updates and community discussions, visit chipa.tech.