Error Handling and Debugging Guide for AxiomTradeAPI

Comprehensive guide to error handling, debugging, and troubleshooting when using AxiomTradeAPI. Learn professional debugging techniques and error recovery strategies used by successful traders on chipa.tech.

Table of Contents

Error Handling Fundamentals

Proper error handling is crucial for building reliable trading systems. The AxiomTradeAPI provides comprehensive error information to help diagnose and resolve issues quickly.

Exception Hierarchy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import logging
from typing import Optional, Dict, Any, List
from enum import Enum
import traceback
import sys
from datetime import datetime

class AxiomErrorType(Enum):
    """Comprehensive error type classification"""
    AUTHENTICATION = "authentication"
    RATE_LIMIT = "rate_limit"
    NETWORK = "network"
    API_ERROR = "api_error"
    VALIDATION = "validation"
    INSUFFICIENT_BALANCE = "insufficient_balance"
    TOKEN_NOT_FOUND = "token_not_found"
    WEBSOCKET_CONNECTION = "websocket_connection"
    DATA_PARSING = "data_parsing"
    SYSTEM_ERROR = "system_error"

class AxiomTradeException(Exception):
    """Base exception for all AxiomTradeAPI errors"""
    
    def __init__(self, message: str, error_type: AxiomErrorType = AxiomErrorType.SYSTEM_ERROR,
                 status_code: Optional[int] = None, response_data: Optional[Dict] = None,
                 retry_after: Optional[int] = None):
        super().__init__(message)
        self.message = message
        self.error_type = error_type
        self.status_code = status_code
        self.response_data = response_data or {}
        self.retry_after = retry_after
        self.timestamp = datetime.utcnow()
        self.trace_id = self._generate_trace_id()
    
    def _generate_trace_id(self) -> str:
        """Generate unique trace ID for error tracking"""
        import uuid
        return str(uuid.uuid4())[:8]
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert exception to dictionary for logging/reporting"""
        return {
            'message': self.message,
            'error_type': self.error_type.value,
            'status_code': self.status_code,
            'response_data': self.response_data,
            'retry_after': self.retry_after,
            'timestamp': self.timestamp.isoformat(),
            'trace_id': self.trace_id
        }

class AuthenticationError(AxiomTradeException):
    """Authentication related errors"""
    def __init__(self, message: str = "Authentication failed", **kwargs):
        super().__init__(message, AxiomErrorType.AUTHENTICATION, **kwargs)

class RateLimitError(AxiomTradeException):
    """Rate limiting errors"""
    def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60, **kwargs):
        super().__init__(message, AxiomErrorType.RATE_LIMIT, retry_after=retry_after, **kwargs)

class NetworkError(AxiomTradeException):
    """Network connectivity errors"""
    def __init__(self, message: str = "Network error occurred", **kwargs):
        super().__init__(message, AxiomErrorType.NETWORK, **kwargs)

class APIError(AxiomTradeException):
    """API server errors"""
    def __init__(self, message: str = "API error occurred", **kwargs):
        super().__init__(message, AxiomErrorType.API_ERROR, **kwargs)

class ValidationError(AxiomTradeException):
    """Data validation errors"""
    def __init__(self, message: str = "Validation failed", **kwargs):
        super().__init__(message, AxiomErrorType.VALIDATION, **kwargs)

class InsufficientBalanceError(AxiomTradeException):
    """Insufficient balance for operation"""
    def __init__(self, message: str = "Insufficient balance", **kwargs):
        super().__init__(message, AxiomErrorType.INSUFFICIENT_BALANCE, **kwargs)

class TokenNotFoundError(AxiomTradeException):
    """Token not found errors"""
    def __init__(self, message: str = "Token not found", **kwargs):
        super().__init__(message, AxiomErrorType.TOKEN_NOT_FOUND, **kwargs)

class WebSocketError(AxiomTradeException):
    """WebSocket connection errors"""
    def __init__(self, message: str = "WebSocket error", **kwargs):
        super().__init__(message, AxiomErrorType.WEBSOCKET_CONNECTION, **kwargs)

Error Response Parser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class ErrorResponseParser:
    """
    Parse and classify API error responses
    Advanced error parsing from chipa.tech error handling system
    """
    
    @staticmethod
    def parse_error_response(status_code: int, response_data: Dict[str, Any]) -> AxiomTradeException:
        """Parse API error response and return appropriate exception"""
        
        error_message = response_data.get('error', 'Unknown error')
        error_code = response_data.get('code', 'UNKNOWN')
        details = response_data.get('details', {})
        
        # Authentication errors
        if status_code in [401, 403]:
            if 'token' in error_message.lower() or 'auth' in error_message.lower():
                return AuthenticationError(
                    message=f"Authentication failed: {error_message}",
                    status_code=status_code,
                    response_data=response_data
                )
        
        # Rate limiting
        if status_code == 429:
            retry_after = int(response_data.get('retry_after', 60))
            return RateLimitError(
                message=f"Rate limit exceeded: {error_message}",
                status_code=status_code,
                response_data=response_data,
                retry_after=retry_after
            )
        
        # Validation errors
        if status_code == 400:
            if error_code in ['INVALID_ADDRESS', 'INVALID_AMOUNT', 'INVALID_TOKEN']:
                return ValidationError(
                    message=f"Validation error: {error_message}",
                    status_code=status_code,
                    response_data=response_data
                )
            
            if error_code == 'INSUFFICIENT_BALANCE':
                return InsufficientBalanceError(
                    message=f"Insufficient balance: {error_message}",
                    status_code=status_code,
                    response_data=response_data
                )
        
        # Not found errors
        if status_code == 404:
            if 'token' in error_message.lower():
                return TokenNotFoundError(
                    message=f"Token not found: {error_message}",
                    status_code=status_code,
                    response_data=response_data
                )
        
        # Server errors
        if status_code >= 500:
            return APIError(
                message=f"Server error: {error_message}",
                status_code=status_code,
                response_data=response_data
            )
        
        # Default to generic API error
        return APIError(
            message=f"API error: {error_message}",
            status_code=status_code,
            response_data=response_data
        )

Error Recovery Strategies

Implementing robust error recovery ensures your trading bot can handle failures gracefully:

Retry Logic with Exponential Backoff

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio
import random
from typing import Callable, Any, TypeVar, Generic

T = TypeVar('T')

class RetryConfig:
    """Configuration for retry behavior"""
    
    def __init__(self, 
                 max_attempts: int = 3,
                 base_delay: float = 1.0,
                 max_delay: float = 60.0,
                 exponential_base: float = 2.0,
                 jitter: bool = True):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter

class RetryHandler:
    """
    Advanced retry handler with exponential backoff
    Production-tested strategies from chipa.tech reliability engineering
    """
    
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self.logger = logging.getLogger(__name__)
    
    async def execute_with_retry(self, 
                               operation: Callable[[], Any],
                               operation_name: str = "operation",
                               retry_on: tuple = (Exception,),
                               no_retry_on: tuple = (AuthenticationError, ValidationError)) -> Any:
        """Execute operation with intelligent retry logic"""
        
        last_exception = None
        
        for attempt in range(1, self.config.max_attempts + 1):
            try:
                self.logger.debug(f"Executing {operation_name} (attempt {attempt}/{self.config.max_attempts})")
                result = await operation()
                
                # Log successful retry
                if attempt > 1:
                    self.logger.info(f"✅ {operation_name} succeeded on attempt {attempt}")
                
                return result
                
            except no_retry_on as e:
                self.logger.error(f"❌ {operation_name} failed with non-retryable error: {e}")
                raise
                
            except retry_on as e:
                last_exception = e
                
                # Don't retry on last attempt
                if attempt == self.config.max_attempts:
                    break
                
                # Calculate delay with exponential backoff
                delay = self._calculate_delay(attempt, e)
                
                self.logger.warning(
                    f"⚠️ {operation_name} failed on attempt {attempt}: {str(e)[:100]}... "
                    f"Retrying in {delay:.2f}s"
                )
                
                await asyncio.sleep(delay)
        
        # All attempts failed
        self.logger.error(f"❌ {operation_name} failed after {self.config.max_attempts} attempts")
        raise last_exception
    
    def _calculate_delay(self, attempt: int, exception: Exception) -> float:
        """Calculate delay with exponential backoff and jitter"""
        
        # Check for rate limit specific delay
        if isinstance(exception, RateLimitError) and exception.retry_after:
            base_delay = exception.retry_after
        else:
            base_delay = self.config.base_delay
        
        # Exponential backoff
        delay = base_delay * (self.config.exponential_base ** (attempt - 1))
        
        # Apply maximum delay limit
        delay = min(delay, self.config.max_delay)
        
        # Add jitter to prevent thundering herd
        if self.config.jitter:
            jitter_amount = delay * 0.1  # 10% jitter
            delay += random.uniform(-jitter_amount, jitter_amount)
        
        return max(0, delay)

# Usage example with AxiomTradeAPI
class RobustAxiomClient:
    """
    Robust AxiomTradeAPI client with comprehensive error handling
    Professional error handling patterns from chipa.tech trading systems
    """
    
    def __init__(self, auth_token: str):
        self.auth_token = auth_token
        self.retry_handler = RetryHandler(
            RetryConfig(
                max_attempts=5,
                base_delay=1.0,
                max_delay=30.0,
                exponential_base=2.0,
                jitter=True
            )
        )
        self.logger = logging.getLogger(__name__)
        self.circuit_breaker = CircuitBreaker()
    
    async def get_balance_with_retry(self, wallet_address: str) -> Dict[str, Any]:
        """Get balance with robust error handling"""
        
        async def operation():
            return await self._get_balance_internal(wallet_address)
        
        try:
            return await self.retry_handler.execute_with_retry(
                operation=operation,
                operation_name=f"get_balance({wallet_address})",
                retry_on=(NetworkError, APIError, RateLimitError),
                no_retry_on=(AuthenticationError, ValidationError, TokenNotFoundError)
            )
        except Exception as e:
            self.logger.error(f"Failed to get balance for {wallet_address}: {e}")
            # Return safe default or raise based on your requirements
            raise
    
    async def _get_balance_internal(self, wallet_address: str) -> Dict[str, Any]:
        """Internal balance retrieval with error parsing"""
        try:
            # Simulate API call - replace with actual AxiomTradeAPI call
            response = await self._make_api_call(f'/balance/{wallet_address}')
            return response
            
        except Exception as raw_error:
            # Parse and classify the error
            if hasattr(raw_error, 'status_code') and hasattr(raw_error, 'response'):
                parsed_error = ErrorResponseParser.parse_error_response(
                    raw_error.status_code,
                    raw_error.response
                )
                raise parsed_error
            else:
                # Network or other system error
                raise NetworkError(f"Network error: {str(raw_error)}")
    
    async def _make_api_call(self, endpoint: str) -> Dict[str, Any]:
        """Make API call with circuit breaker protection"""
        return await self.circuit_breaker.call(
            self._raw_api_call,
            endpoint
        )
    
    async def _raw_api_call(self, endpoint: str) -> Dict[str, Any]:
        """Raw API call implementation"""
        # Implement actual API call here
        # This is a placeholder
        pass

Circuit Breaker Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import time
from enum import Enum
from typing import Callable, Any

class CircuitBreakerState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing fast
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """
    Circuit breaker pattern for API reliability
    Advanced resilience patterns from chipa.tech infrastructure
    """
    
    def __init__(self, 
                 failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 expected_exception: tuple = (Exception,)):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitBreakerState.CLOSED
        self.logger = logging.getLogger(__name__)
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function through circuit breaker"""
        
        if self.state == CircuitBreakerState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitBreakerState.HALF_OPEN
                self.logger.info("🔄 Circuit breaker transitioning to HALF_OPEN")
            else:
                raise APIError("Circuit breaker is OPEN - failing fast")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        """Check if we should attempt to reset the circuit breaker"""
        return (
            self.last_failure_time and
            time.time() - self.last_failure_time >= self.recovery_timeout
        )
    
    def _on_success(self):
        """Handle successful operation"""
        if self.state == CircuitBreakerState.HALF_OPEN:
            self.logger.info("✅ Circuit breaker reset to CLOSED")
            self.state = CircuitBreakerState.CLOSED
        
        self.failure_count = 0
    
    def _on_failure(self):
        """Handle failed operation"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            if self.state != CircuitBreakerState.OPEN:
                self.logger.warning(f"🚨 Circuit breaker OPENED after {self.failure_count} failures")
                self.state = CircuitBreakerState.OPEN
    
    def get_state(self) -> Dict[str, Any]:
        """Get current circuit breaker state"""
        return {
            'state': self.state.value,
            'failure_count': self.failure_count,
            'failure_threshold': self.failure_threshold,
            'last_failure_time': self.last_failure_time,
            'time_until_retry': max(0, self.recovery_timeout - (time.time() - (self.last_failure_time or 0)))
        }

Debugging Techniques

Advanced debugging techniques for identifying and resolving issues:

Comprehensive Debug Logger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
import os
from typing import Dict, Any, Optional
import inspect
import functools

class DebugLogger:
    """
    Advanced debugging logger for AxiomTradeAPI
    Professional debugging techniques from chipa.tech development team
    """
    
    def __init__(self, enable_debug: bool = None):
        self.enable_debug = enable_debug or os.getenv('AXIOM_DEBUG', 'false').lower() == 'true'
        self.logger = logging.getLogger('axiom_debug')
        
        if self.enable_debug:
            self.logger.setLevel(logging.DEBUG)
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '🐛 %(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
    
    def debug_api_call(self, func_name: str = None):
        """Decorator for debugging API calls"""
        def decorator(func):
            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                if not self.enable_debug:
                    return await func(*args, **kwargs)
                
                call_name = func_name or func.__name__
                call_id = self._generate_call_id()
                
                # Log entry
                self.logger.debug(f"🚀 [{call_id}] Starting {call_name}")
                self.logger.debug(f"📝 [{call_id}] Args: {self._safe_repr(args)}")
                self.logger.debug(f"📝 [{call_id}] Kwargs: {self._safe_repr(kwargs)}")
                
                start_time = time.time()
                
                try:
                    result = await func(*args, **kwargs)
                    execution_time = time.time() - start_time
                    
                    self.logger.debug(f"✅ [{call_id}] Completed {call_name} in {execution_time:.3f}s")
                    self.logger.debug(f"📤 [{call_id}] Result: {self._safe_repr(result)}")
                    
                    return result
                    
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.logger.debug(f"❌ [{call_id}] Failed {call_name} in {execution_time:.3f}s")
                    self.logger.debug(f"🔥 [{call_id}] Error: {str(e)}")
                    self.logger.debug(f"📚 [{call_id}] Traceback: {traceback.format_exc()}")
                    raise
            
            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                if not self.enable_debug:
                    return func(*args, **kwargs)
                
                call_name = func_name or func.__name__
                call_id = self._generate_call_id()
                
                self.logger.debug(f"🚀 [{call_id}] Starting {call_name}")
                start_time = time.time()
                
                try:
                    result = func(*args, **kwargs)
                    execution_time = time.time() - start_time
                    self.logger.debug(f"✅ [{call_id}] Completed {call_name} in {execution_time:.3f}s")
                    return result
                except Exception as e:
                    execution_time = time.time() - start_time
                    self.logger.debug(f"❌ [{call_id}] Failed {call_name} in {execution_time:.3f}s: {e}")
                    raise
            
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        return decorator
    
    def debug_websocket_message(self, message: Dict[str, Any], direction: str = "received"):
        """Debug WebSocket messages"""
        if not self.enable_debug:
            return
        
        direction_emoji = "📥" if direction == "received" else "📤"
        self.logger.debug(f"{direction_emoji} WebSocket {direction}: {self._safe_repr(message)}")
    
    def debug_state_change(self, component: str, old_state: Any, new_state: Any):
        """Debug state changes"""
        if not self.enable_debug:
            return
        
        self.logger.debug(f"🔄 {component} state change: {old_state} -> {new_state}")
    
    def _generate_call_id(self) -> str:
        """Generate unique call ID for tracking"""
        import uuid
        return str(uuid.uuid4())[:8]
    
    def _safe_repr(self, obj: Any, max_length: int = 500) -> str:
        """Safe representation of objects for logging"""
        try:
            if isinstance(obj, dict):
                # Hide sensitive information
                safe_obj = {}
                for key, value in obj.items():
                    if any(sensitive in key.lower() for sensitive in ['token', 'key', 'secret', 'password']):
                        safe_obj[key] = "[HIDDEN]"
                    else:
                        safe_obj[key] = value
                repr_str = json.dumps(safe_obj, indent=2, default=str)
            else:
                repr_str = repr(obj)
            
            if len(repr_str) > max_length:
                repr_str = repr_str[:max_length] + "... [TRUNCATED]"
            
            return repr_str
        except Exception:
            return f"<Unable to represent {type(obj).__name__}>"

# Usage with AxiomTradeAPI
class DebuggableAxiomClient:
    """AxiomTradeAPI client with comprehensive debugging"""
    
    def __init__(self, auth_token: str, enable_debug: bool = False):
        self.auth_token = auth_token
        self.debug_logger = DebugLogger(enable_debug)
        self.client_id = self._generate_client_id()
        
    def _generate_client_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]
    
    @property
    def debug_api_call(self):
        return self.debug_logger.debug_api_call
    
    @debug_api_call("get_balance")
    async def get_balance(self, wallet_address: str) -> Dict[str, Any]:
        """Get balance with debugging"""
        # Validate input
        if not wallet_address:
            raise ValidationError("Wallet address is required")
        
        if len(wallet_address) < 32:  # Basic Solana address validation
            raise ValidationError("Invalid wallet address format")
        
        # Make API call
        return await self._make_api_request(f'/balance/{wallet_address}')
    
    @debug_api_call("websocket_connect")
    async def connect_websocket(self) -> None:
        """Connect to WebSocket with debugging"""
        self.debug_logger.debug_state_change("WebSocket", "disconnected", "connecting")
        
        try:
            # WebSocket connection logic here
            self.debug_logger.debug_state_change("WebSocket", "connecting", "connected")
        except Exception as e:
            self.debug_logger.debug_state_change("WebSocket", "connecting", "failed")
            raise WebSocketError(f"Failed to connect WebSocket: {e}")
    
    async def _make_api_request(self, endpoint: str) -> Dict[str, Any]:
        """Make API request with debug logging"""
        # This would contain actual API request logic
        pass

Advanced Error Analysis

Error Pattern Detection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import statistics

class ErrorAnalyzer:
    """
    Advanced error pattern analysis
    Error intelligence from chipa.tech monitoring systems
    """
    
    def __init__(self, retention_hours: int = 24):
        self.retention_hours = retention_hours
        self.errors = []
        self.error_patterns = defaultdict(list)
        self.error_stats = defaultdict(int)
    
    def record_error(self, error: AxiomTradeException, context: Dict[str, Any] = None):
        """Record error for analysis"""
        error_record = {
            'timestamp': datetime.utcnow(),
            'error_type': error.error_type.value,
            'message': error.message,
            'status_code': error.status_code,
            'trace_id': error.trace_id,
            'context': context or {}
        }
        
        self.errors.append(error_record)
        self.error_stats[error.error_type.value] += 1
        
        # Clean old errors
        self._cleanup_old_errors()
        
        # Analyze patterns
        self._analyze_error_patterns()
    
    def _cleanup_old_errors(self):
        """Remove errors older than retention period"""
        cutoff_time = datetime.utcnow() - timedelta(hours=self.retention_hours)
        self.errors = [e for e in self.errors if e['timestamp'] > cutoff_time]
    
    def _analyze_error_patterns(self):
        """Analyze error patterns and trends"""
        if len(self.errors) < 10:  # Need minimum errors for analysis
            return
        
        # Group errors by time windows
        time_windows = defaultdict(list)
        for error in self.errors:
            # Group by 5-minute windows
            window = error['timestamp'].replace(minute=(error['timestamp'].minute // 5) * 5, second=0, microsecond=0)
            time_windows[window].append(error)
        
        # Detect error spikes
        error_counts = [len(errors) for errors in time_windows.values()]
        if len(error_counts) > 1:
            mean_errors = statistics.mean(error_counts)
            std_errors = statistics.stdev(error_counts) if len(error_counts) > 1 else 0
            
            for window, errors in time_windows.items():
                if len(errors) > mean_errors + (2 * std_errors):  # 2 standard deviations
                    self._alert_error_spike(window, errors)
    
    def _alert_error_spike(self, window: datetime, errors: List[Dict]):
        """Alert on error spike detection"""
        error_types = Counter(e['error_type'] for e in errors)
        
        print(f"🚨 ERROR SPIKE DETECTED at {window}")
        print(f"   Total errors: {len(errors)}")
        print(f"   Error types: {dict(error_types)}")
        
        # Additional analysis could trigger alerts to monitoring systems
    
    def get_error_summary(self) -> Dict[str, Any]:
        """Get comprehensive error summary"""
        if not self.errors:
            return {'status': 'No errors recorded'}
        
        recent_errors = [e for e in self.errors if e['timestamp'] > datetime.utcnow() - timedelta(hours=1)]
        
        error_types = Counter(e['error_type'] for e in self.errors)
        recent_error_types = Counter(e['error_type'] for e in recent_errors)
        
        # Calculate error rate trends
        hourly_counts = defaultdict(int)
        for error in self.errors:
            hour = error['timestamp'].replace(minute=0, second=0, microsecond=0)
            hourly_counts[hour] += 1
        
        return {
            'total_errors': len(self.errors),
            'recent_errors_1h': len(recent_errors),
            'error_types': dict(error_types),
            'recent_error_types': dict(recent_error_types),
            'hourly_error_trend': dict(hourly_counts),
            'most_common_error': error_types.most_common(1)[0] if error_types else None,
            'error_rate_per_hour': len(self.errors) / max(1, self.retention_hours)
        }
    
    def get_troubleshooting_suggestions(self) -> List[str]:
        """Get troubleshooting suggestions based on error patterns"""
        suggestions = []
        error_types = Counter(e['error_type'] for e in self.errors)
        
        # Authentication errors
        if error_types.get('authentication', 0) > 5:
            suggestions.append("🔐 Multiple authentication errors detected. Check your API token validity.")
            suggestions.append("🔄 Consider implementing token refresh mechanism.")
        
        # Rate limiting
        if error_types.get('rate_limit', 0) > 3:
            suggestions.append("⏱️ Rate limiting detected. Implement exponential backoff.")
            suggestions.append("🚀 Consider upgrading to higher rate limit tier.")
        
        # Network errors
        if error_types.get('network', 0) > 10:
            suggestions.append("🌐 High network error rate. Check internet connectivity.")
            suggestions.append("🔄 Implement connection pooling and retry logic.")
        
        # WebSocket errors
        if error_types.get('websocket_connection', 0) > 5:
            suggestions.append("🔌 WebSocket connection issues. Check firewall settings.")
            suggestions.append("💔 Implement WebSocket reconnection logic.")
        
        return suggestions

Production Error Management

Centralized Error Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
import json
import aiofiles
from typing import Dict, Any, Optional
import httpx

class ProductionErrorHandler:
    """
    Production-grade error handling and reporting
    Enterprise error management from chipa.tech production systems
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.error_analyzer = ErrorAnalyzer()
        self.logger = logging.getLogger('production_errors')
        
        # External monitoring integration
        self.webhook_url = config.get('webhook_url')
        self.slack_webhook = config.get('slack_webhook')
        self.enable_file_logging = config.get('enable_file_logging', True)
        
        if self.enable_file_logging:
            self._setup_file_logging()
    
    def _setup_file_logging(self):
        """Setup file logging for errors"""
        os.makedirs('logs', exist_ok=True)
        
        file_handler = logging.FileHandler('logs/errors.log')
        file_handler.setLevel(logging.ERROR)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler)
    
    async def handle_error(self, error: Exception, context: Dict[str, Any] = None):
        """Centralized error handling"""
        
        # Convert to AxiomTradeException if needed
        if not isinstance(error, AxiomTradeException):
            axiom_error = AxiomTradeException(
                message=str(error),
                error_type=AxiomErrorType.SYSTEM_ERROR
            )
        else:
            axiom_error = error
        
        # Record for analysis
        self.error_analyzer.record_error(axiom_error, context)
        
        # Log error
        self.logger.error(f"Error occurred: {axiom_error.to_dict()}")
        
        # Determine severity
        severity = self._determine_severity(axiom_error)
        
        # Handle based on severity
        if severity == 'critical':
            await self._handle_critical_error(axiom_error, context)
        elif severity == 'high':
            await self._handle_high_severity_error(axiom_error, context)
        elif severity == 'medium':
            await self._handle_medium_severity_error(axiom_error, context)
        
        # Send to external monitoring if configured
        if self.webhook_url:
            await self._send_to_webhook(axiom_error, context, severity)
        
        if self.slack_webhook:
            await self._send_to_slack(axiom_error, context, severity)
    
    def _determine_severity(self, error: AxiomTradeException) -> str:
        """Determine error severity"""
        
        critical_errors = [AxiomErrorType.AUTHENTICATION, AxiomErrorType.SYSTEM_ERROR]
        high_errors = [AxiomErrorType.API_ERROR, AxiomErrorType.WEBSOCKET_CONNECTION]
        medium_errors = [AxiomErrorType.RATE_LIMIT, AxiomErrorType.NETWORK]
        
        if error.error_type in critical_errors:
            return 'critical'
        elif error.error_type in high_errors:
            return 'high'
        elif error.error_type in medium_errors:
            return 'medium'
        else:
            return 'low'
    
    async def _handle_critical_error(self, error: AxiomTradeException, context: Dict[str, Any]):
        """Handle critical errors"""
        print(f"🚨 CRITICAL ERROR: {error.message}")
        
        # Save error details
        await self._save_error_details(error, context)
        
        # Could trigger emergency procedures here
        # e.g., stop trading, send alerts, etc.
    
    async def _handle_high_severity_error(self, error: AxiomTradeException, context: Dict[str, Any]):
        """Handle high severity errors"""
        print(f"⚠️ HIGH SEVERITY ERROR: {error.message}")
    
    async def _handle_medium_severity_error(self, error: AxiomTradeException, context: Dict[str, Any]):
        """Handle medium severity errors"""
        print(f"⚡ MEDIUM SEVERITY ERROR: {error.message}")
    
    async def _save_error_details(self, error: AxiomTradeException, context: Dict[str, Any]):
        """Save detailed error information"""
        error_details = {
            'error': error.to_dict(),
            'context': context,
            'system_info': {
                'python_version': sys.version,
                'platform': sys.platform,
                'timestamp': datetime.utcnow().isoformat()
            }
        }
        
        filename = f"logs/error_detail_{error.trace_id}.json"
        async with aiofiles.open(filename, 'w') as f:
            await f.write(json.dumps(error_details, indent=2, default=str))
    
    async def _send_to_webhook(self, error: AxiomTradeException, context: Dict[str, Any], severity: str):
        """Send error to external webhook"""
        try:
            payload = {
                'severity': severity,
                'error': error.to_dict(),
                'context': context,
                'timestamp': datetime.utcnow().isoformat()
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(self.webhook_url, json=payload, timeout=10)
                response.raise_for_status()
                
        except Exception as e:
            self.logger.error(f"Failed to send error to webhook: {e}")
    
    async def _send_to_slack(self, error: AxiomTradeException, context: Dict[str, Any], severity: str):
        """Send error alert to Slack"""
        try:
            severity_emoji = {
                'critical': '🚨',
                'high': '⚠️',
                'medium': '⚡',
                'low': 'ℹ️'
            }
            
            message = {
                'text': f"{severity_emoji.get(severity, 'ℹ️')} AxiomTradeAPI Error - {severity.upper()}",
                'attachments': [
                    {
                        'color': 'danger' if severity == 'critical' else 'warning',
                        'fields': [
                            {'title': 'Error Type', 'value': error.error_type.value, 'short': True},
                            {'title': 'Message', 'value': error.message[:200], 'short': False},
                            {'title': 'Trace ID', 'value': error.trace_id, 'short': True},
                            {'title': 'Timestamp', 'value': error.timestamp.isoformat(), 'short': True}
                        ]
                    }
                ]
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(self.slack_webhook, json=message, timeout=10)
                response.raise_for_status()
                
        except Exception as e:
            self.logger.error(f"Failed to send error to Slack: {e}")
    
    def get_health_status(self) -> Dict[str, Any]:
        """Get system health status based on error patterns"""
        error_summary = self.error_analyzer.get_error_summary()
        
        # Determine overall health
        recent_errors = error_summary.get('recent_errors_1h', 0)
        error_rate = error_summary.get('error_rate_per_hour', 0)
        
        if recent_errors > 50 or error_rate > 20:
            health_status = 'unhealthy'
        elif recent_errors > 20 or error_rate > 10:
            health_status = 'degraded'
        else:
            health_status = 'healthy'
        
        return {
            'status': health_status,
            'error_summary': error_summary,
            'troubleshooting_suggestions': self.error_analyzer.get_troubleshooting_suggestions()
        }

Troubleshooting Guide

Common issues and their solutions:

Quick Troubleshooting Checklist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
🔍 AxiomTradeAPI Troubleshooting Checklist
==========================================

🔐 Authentication Issues:
□ Verify API token is valid and not expired
□ Check token permissions and scopes
□ Ensure proper authorization headers
□ Test with minimal API call first

🌐 Network Connectivity:
□ Check internet connection stability
□ Verify API endpoints are accessible
□ Test with curl or similar tool
□ Check firewall and proxy settings

⚡ Performance Issues:
□ Monitor API response times
□ Check for rate limiting
□ Verify connection pooling is enabled
□ Review concurrent request limits

🔌 WebSocket Problems:
□ Test WebSocket connectivity separately
□ Check for proxy/firewall blocking WebSockets
□ Verify authentication for WebSocket connection
□ Implement reconnection logic

💾 Memory Issues:
□ Monitor memory usage over time
□ Check for memory leaks in long-running processes
□ Implement proper cleanup procedures
□ Use memory profiling tools

📊 Data Issues:
□ Validate input data format
□ Check API response format changes
□ Verify token addresses are correct
□ Handle missing or null data gracefully

Error Code Reference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
ERROR_CODES = {
    # Authentication Errors (401, 403)
    'INVALID_TOKEN': {
        'description': 'API token is invalid or expired',
        'solution': 'Generate a new API token from your dashboard',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/authentication'
    },
    'INSUFFICIENT_PERMISSIONS': {
        'description': 'Token lacks required permissions',
        'solution': 'Update token permissions in your account settings',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/permissions'
    },
    
    # Rate Limiting (429)
    'RATE_LIMIT_EXCEEDED': {
        'description': 'Too many requests in time window',
        'solution': 'Implement exponential backoff and respect rate limits',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/rate-limits'
    },
    
    # Validation Errors (400)
    'INVALID_WALLET_ADDRESS': {
        'description': 'Wallet address format is invalid',
        'solution': 'Verify Solana address format (44 characters, base58)',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/wallet-addresses'
    },
    'INVALID_TOKEN_ADDRESS': {
        'description': 'Token address format is invalid',
        'solution': 'Check token address on Solana explorer',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/token-addresses'
    },
    'INSUFFICIENT_BALANCE': {
        'description': 'Wallet has insufficient balance for operation',
        'solution': 'Check wallet balance and add funds if needed',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/balance-management'
    },
    
    # Not Found Errors (404)
    'TOKEN_NOT_FOUND': {
        'description': 'Specified token does not exist',
        'solution': 'Verify token address on Solana blockchain',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/token-lookup'
    },
    'WALLET_NOT_FOUND': {
        'description': 'Wallet address not found or has no activity',
        'solution': 'Check wallet address and ensure it exists on-chain',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/wallet-lookup'
    },
    
    # Server Errors (500+)
    'INTERNAL_SERVER_ERROR': {
        'description': 'Internal server error occurred',
        'solution': 'Retry request with exponential backoff',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs/error-handling'
    },
    'SERVICE_UNAVAILABLE': {
        'description': 'Service temporarily unavailable',
        'solution': 'Check service status and retry later',
        'documentation': 'https://chipa.tech/status'
    }
}

def get_error_help(error_code: str) -> Dict[str, str]:
    """Get help information for specific error code"""
    return ERROR_CODES.get(error_code, {
        'description': 'Unknown error code',
        'solution': 'Check API documentation for details',
        'documentation': 'https://chipa.tech/axiomtradeapi-docs'
    })

Best Practices Summary

Error Handling Principles

  1. Fail Fast: Detect and handle errors as early as possible
  2. Graceful Degradation: Provide fallback behavior when possible
  3. Comprehensive Logging: Log all errors with sufficient context
  4. User-Friendly Messages: Provide clear, actionable error messages
  5. Monitoring Integration: Connect errors to monitoring systems

Production Readiness Checklist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
✅ Production Error Handling Checklist
=====================================

🔧 Error Classification:
□ All error types properly classified
□ Custom exceptions for business logic errors
□ Appropriate error codes and messages

🔄 Retry Logic:
□ Exponential backoff implemented
□ Maximum retry limits set
□ Non-retryable errors identified

🚨 Monitoring:
□ Error tracking and alerting configured
□ Health check endpoints implemented
□ Performance metrics collected

📝 Logging:
□ Structured logging format
□ Appropriate log levels
□ Sensitive data sanitization

🧪 Testing:
□ Error scenarios covered in tests
□ Chaos engineering practices
□ Load testing with error injection

Community Support

For additional help with error handling and debugging:

Conclusion

Effective error handling is crucial for building reliable trading systems. This guide provides comprehensive strategies for handling errors, debugging issues, and maintaining production stability with AxiomTradeAPI.

Key takeaways:

Remember that good error handling is not just about catching exceptions—it’s about providing a robust, reliable experience for your users and maintaining system stability under all conditions.


This error handling guide represents production-tested practices from successful trading systems. For the latest updates and community discussions, visit chipa.tech.