Examples & Use Cases
Practical implementations and real-world trading scenarios using the OlympTrade API
Basic Trading Bot
A simple automated trading bot that places trades based on basic market conditions.
Simple Moving Average Strategy
This bot uses a simple moving average crossover strategy to make trading decisions.
import time
import statistics
from olymptrade import OlympTrade
class SimpleMovingAverageBot:
def __init__(self, email, password):
self.api = OlympTrade(email, password)
self.asset = "EURUSD"
self.trade_amount = 10.0
self.sma_short = 5 # 5-period SMA
self.sma_long = 20 # 20-period SMA
self.price_history = []
def get_sma(self, period):
"""Calculate Simple Moving Average"""
if len(self.price_history) < period:
return None
return statistics.mean(self.price_history[-period:])
def update_price_history(self):
"""Fetch latest price and update history"""
candles = self.api.get_candles(self.asset, 60, count=1)
if candles:
latest_price = candles[0]['close']
self.price_history.append(latest_price)
# Keep only last 50 prices to manage memory
if len(self.price_history) > 50:
self.price_history.pop(0)
return latest_price
return None
def check_trading_signal(self):
"""Check for trading signals based on SMA crossover"""
sma_short = self.get_sma(self.sma_short)
sma_long = self.get_sma(self.sma_long)
if sma_short is None or sma_long is None:
return None
# Buy signal: short SMA crosses above long SMA
if sma_short > sma_long:
return "call"
# Sell signal: short SMA crosses below long SMA
elif sma_short < sma_long:
return "put"
return None
def place_trade(self, direction):
"""Place a trade based on signal"""
try:
result = self.api.place_binary_order(
asset=self.asset,
amount=self.trade_amount,
direction=direction,
duration=300 # 5 minutes
)
if result['success']:
print(f"ā
{direction.upper()} order placed successfully!")
print(f"Order ID: {result['order_id']}")
print(f"Expected payout: {result['payout']}%")
return True
else:
print(f"ā Failed to place order: {result['error']}")
return False
except Exception as e:
print(f"ā Error placing trade: {e}")
return False
def run(self):
"""Main bot loop"""
print("š Starting Simple Moving Average Bot...")
print(f"Asset: {self.asset}")
print(f"Trade Amount: ${self.trade_amount}")
print(f"SMA Periods: {self.sma_short}/{self.sma_long}")
print("-" * 50)
while True:
try:
# Update price history
current_price = self.update_price_history()
if current_price:
print(f"Current price: {current_price}")
# Check for trading signal
signal = self.check_trading_signal()
if signal:
print(f"š Trading signal detected: {signal.upper()}")
# Check account balance before trading
balance = self.api.get_balance()
if balance['available'] >= self.trade_amount:
self.place_trade(signal)
else:
print("ā Insufficient balance for trading")
else:
print("ā³ No trading signal, waiting...")
# Wait 1 minute before next check
time.sleep(60)
except KeyboardInterrupt:
print("\nš Bot stopped by user")
break
except Exception as e:
print(f"ā Error in main loop: {e}")
time.sleep(60)
# Cleanup
self.api.disconnect()
# Usage
if __name__ == "__main__":
bot = SimpleMovingAverageBot("your-email@example.com", "your-password")
bot.run()
Real-Time Price Monitoring
Monitor multiple assets simultaneously and track price movements with alerts.
Multi-Asset Price Monitor
Monitor prices across multiple assets and send alerts when significant price movements occur.
import json
import time
from datetime import datetime
from olymptrade import OlympTrade
class PriceMonitor:
def __init__(self, email, password):
self.api = OlympTrade(email, password)
self.assets = ["EURUSD", "GBPUSD", "BTCUSD", "ETHUSD", "XAUUSD"]
self.price_data = {}
self.alert_thresholds = {
"EURUSD": 0.005, # 0.5% change
"GBPUSD": 0.007, # 0.7% change
"BTCUSD": 0.02, # 2% change
"ETHUSD": 0.03, # 3% change
"XAUUSD": 0.01 # 1% change
}
def price_update_callback(self, data):
"""Handle incoming price updates"""
asset = data['asset']
price = data['price']
timestamp = datetime.now()
# Store price data
if asset not in self.price_data:
self.price_data[asset] = {
'current': price,
'previous': price,
'high_24h': price,
'low_24h': price,
'change_24h': 0,
'last_alert': None
}
else:
prev_price = self.price_data[asset]['current']
self.price_data[asset]['previous'] = prev_price
self.price_data[asset]['current'] = price
# Update 24h high/low
if price > self.price_data[asset]['high_24h']:
self.price_data[asset]['high_24h'] = price
if price < self.price_data[asset]['low_24h']:
self.price_data[asset]['low_24h'] = price
# Calculate percentage change
if prev_price > 0:
change = ((price - prev_price) / prev_price) * 100
self.price_data[asset]['change_24h'] = change
# Check for alert conditions
self.check_price_alert(asset, price, change)
# Display current prices
self.display_prices()
def check_price_alert(self, asset, price, change_pct):
"""Check if price change exceeds alert threshold"""
threshold = self.alert_thresholds.get(asset, 0.01)
if abs(change_pct) >= (threshold * 100):
direction = "š UP" if change_pct > 0 else "š DOWN"
alert_msg = f"šØ PRICE ALERT: {asset} {direction} {change_pct:.2f}% | Price: ${price:.4f}"
print(alert_msg)
# Update last alert time
self.price_data[asset]['last_alert'] = datetime.now()
# You could send email/SMS alerts here
self.send_alert(alert_msg)
def send_alert(self, message):
"""Send alert notification (placeholder)"""
# Implement your preferred notification method
# Email, SMS, Discord webhook, etc.
with open("price_alerts.log", "a") as f:
f.write(f"{datetime.now()}: {message}\n")
def display_prices(self):
"""Display current price information"""
print("\n" + "="*80)
print("š OLYMPTRADE PRICE MONITOR")
print("="*80)
for asset in self.assets:
if asset in self.price_data:
data = self.price_data[asset]
change_color = "š¢" if data['change_24h'] >= 0 else "š“"
print(f"{asset:<8} | ${data['current']:<10.4f} | "
f"{change_color} {data['change_24h']:>6.2f}% | "
f"H: ${data['high_24h']:.4f} | L: ${data['low_24h']:.4f}")
print(f"\nLast Update: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)
def export_data(self):
"""Export price data to JSON file"""
export_data = {
'timestamp': datetime.now().isoformat(),
'prices': self.price_data
}
with open(f"price_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "w") as f:
json.dump(export_data, f, indent=2)
print("š Price data exported successfully!")
def start_monitoring(self):
"""Start the price monitoring service"""
print("š Starting Price Monitor...")
print(f"Monitoring assets: {', '.join(self.assets)}")
print("Press Ctrl+C to stop monitoring\n")
try:
# Subscribe to real-time price updates
self.api.subscribe_to_quotes(
assets=self.assets,
callback=self.price_update_callback
)
# Keep the program running
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nš Stopping price monitor...")
self.api.disconnect()
# Export final data
self.export_data()
# Usage
if __name__ == "__main__":
monitor = PriceMonitor("your-email@example.com", "your-password")
monitor.start_monitoring()
Technical Analysis Bot
Advanced trading bot using multiple technical indicators for decision making.
RSI + MACD Strategy Bot
Combines RSI and MACD indicators to identify optimal entry and exit points.
import numpy as np
import pandas as pd
from olymptrade import OlympTrade
class TechnicalAnalysisBot:
def __init__(self, email, password):
self.api = OlympTrade(email, password)
self.asset = "EURUSD"
self.timeframe = 300 # 5-minute candles
self.trade_amount = 25.0
def calculate_rsi(self, prices, period=14):
"""Calculate Relative Strength Index"""
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gains = pd.Series(gains).rolling(window=period).mean()
avg_losses = pd.Series(losses).rolling(window=period).mean()
rs = avg_gains / avg_losses
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50
def calculate_macd(self, prices, fast=12, slow=26, signal=9):
"""Calculate MACD indicator"""
prices_series = pd.Series(prices)
ema_fast = prices_series.ewm(span=fast).mean()
ema_slow = prices_series.ewm(span=slow).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal).mean()
histogram = macd_line - signal_line
return {
'macd': macd_line.iloc[-1],
'signal': signal_line.iloc[-1],
'histogram': histogram.iloc[-1]
}
def get_market_data(self, count=100):
"""Fetch recent market data for analysis"""
candles = self.api.get_candles(self.asset, self.timeframe, count=count)
if not candles:
return None
prices = [candle['close'] for candle in candles]
highs = [candle['high'] for candle in candles]
lows = [candle['low'] for candle in candles]
volumes = [candle['volume'] for candle in candles]
return {
'prices': prices,
'highs': highs,
'lows': lows,
'volumes': volumes,
'current_price': prices[-1]
}
def analyze_market(self):
"""Perform technical analysis and generate signals"""
data = self.get_market_data()
if not data:
return None
# Calculate technical indicators
rsi = self.calculate_rsi(data['prices'])
macd_data = self.calculate_macd(data['prices'])
# Determine market conditions
rsi_oversold = rsi < 30
rsi_overbought = rsi > 70
rsi_neutral = 30 <= rsi <= 70
macd_bullish = macd_data['macd'] > macd_data['signal'] and macd_data['histogram'] > 0
macd_bearish = macd_data['macd'] < macd_data['signal'] and macd_data['histogram'] < 0
# Generate trading signals
signal = None
confidence = 0
reasoning = []
# CALL signals
if rsi_oversold and macd_bullish:
signal = "call"
confidence = 85
reasoning.append("RSI oversold + MACD bullish crossover")
elif rsi_neutral and macd_bullish:
signal = "call"
confidence = 70
reasoning.append("MACD bullish momentum")
# PUT signals
elif rsi_overbought and macd_bearish:
signal = "put"
confidence = 85
reasoning.append("RSI overbought + MACD bearish crossover")
elif rsi_neutral and macd_bearish:
signal = "put"
confidence = 70
reasoning.append("MACD bearish momentum")
return {
'signal': signal,
'confidence': confidence,
'reasoning': reasoning,
'indicators': {
'rsi': rsi,
'macd': macd_data,
'current_price': data['current_price']
}
}
def execute_trade(self, analysis):
"""Execute trade based on analysis"""
if not analysis or not analysis['signal']:
return False
# Only trade with high confidence signals
if analysis['confidence'] < 75:
print(f"ā ļø Low confidence signal ({analysis['confidence']}%), skipping trade")
return False
try:
result = self.api.place_binary_order(
asset=self.asset,
amount=self.trade_amount,
direction=analysis['signal'],
duration=900 # 15 minutes
)
if result['success']:
print(f"ā
{analysis['signal'].upper()} trade executed successfully!")
print(f"Confidence: {analysis['confidence']}%")
print(f"Reasoning: {', '.join(analysis['reasoning'])}")
print(f"Order ID: {result['order_id']}")
return True
else:
print(f"ā Trade execution failed: {result['error']}")
return False
except Exception as e:
print(f"ā Error executing trade: {e}")
return False
def run_analysis_cycle(self):
"""Run one complete analysis and trading cycle"""
print(f"\nš Analyzing {self.asset}...")
analysis = self.analyze_market()
if analysis:
indicators = analysis['indicators']
print(f"Current Price: ${indicators['current_price']:.4f}")
print(f"RSI: {indicators['rsi']:.2f}")
print(f"MACD: {indicators['macd']['macd']:.6f}")
print(f"Signal Line: {indicators['macd']['signal']:.6f}")
print(f"Histogram: {indicators['macd']['histogram']:.6f}")
if analysis['signal']:
print(f"\nšÆ Trading Signal: {analysis['signal'].upper()}")
print(f"Confidence: {analysis['confidence']}%")
print(f"Reasoning: {', '.join(analysis['reasoning'])}")
# Check account balance
balance = self.api.get_balance()
if balance['available'] >= self.trade_amount:
self.execute_trade(analysis)
else:
print("ā Insufficient balance for trading")
else:
print("ā³ No clear trading signal detected")
else:
print("ā Failed to retrieve market data")
def start_bot(self):
"""Start the technical analysis bot"""
print("š Starting Technical Analysis Bot")
print(f"Asset: {self.asset}")
print(f"Timeframe: {self.timeframe // 60} minutes")
print(f"Trade Amount: ${self.trade_amount}")
print("-" * 50)
while True:
try:
self.run_analysis_cycle()
# Wait for next candle
print(f"\nā° Waiting {self.timeframe // 60} minutes for next analysis...")
time.sleep(self.timeframe)
except KeyboardInterrupt:
print("\nš Bot stopped by user")
break
except Exception as e:
print(f"ā Error in analysis cycle: {e}")
time.sleep(60) # Wait 1 minute before retry
self.api.disconnect()
# Usage
if __name__ == "__main__":
bot = TechnicalAnalysisBot("your-email@example.com", "your-password")
bot.start_bot()
Advanced Risk Management
Implement sophisticated risk management strategies to protect your trading capital.
Portfolio Risk Manager
Comprehensive risk management system with position sizing, stop-loss, and exposure limits.
import json
from datetime import datetime, timedelta
from olymptrade import OlympTrade
class RiskManager:
def __init__(self, email, password, risk_config=None):
self.api = OlympTrade(email, password)
# Default risk configuration
self.config = risk_config or {
'max_daily_loss': 100.0, # Maximum daily loss in USD
'max_position_size': 0.05, # Max 5% of balance per trade
'max_concurrent_trades': 3, # Maximum open positions
'max_asset_exposure': 0.15, # Max 15% exposure per asset
'daily_profit_target': 200.0, # Daily profit target
'max_consecutive_losses': 5, # Stop after 5 consecutive losses
'recovery_mode_threshold': 0.2 # Reduce risk after 20% drawdown
}
# Tracking variables
self.daily_pnl = 0.0
self.consecutive_losses = 0
self.daily_trades = 0
self.start_balance = 0.0
self.recovery_mode = False
self.trade_history = []
def initialize_session(self):
"""Initialize trading session with current balance"""
balance = self.api.get_balance()
self.start_balance = balance['balance']
print(f"š° Session started with balance: ${self.start_balance:.2f}")
def calculate_position_size(self, current_balance, risk_level=1.0):
"""Calculate appropriate position size based on risk parameters"""
# Base position size (percentage of balance)
base_size = current_balance * self.config['max_position_size']
# Adjust based on consecutive losses
if self.consecutive_losses > 2:
risk_multiplier = max(0.5, 1 - (self.consecutive_losses * 0.1))
base_size *= risk_multiplier
# Recovery mode - reduce position size
if self.recovery_mode:
base_size *= 0.5
# Apply custom risk level
position_size = base_size * risk_level
# Ensure minimum viable trade size
return max(position_size, 1.0)
def check_risk_limits(self, asset, amount):
"""Check if proposed trade violates risk limits"""
violations = []
# Check daily loss limit
if self.daily_pnl <= -self.config['max_daily_loss']:
violations.append("Daily loss limit exceeded")
# Check if we've hit profit target
if self.daily_pnl >= self.config['daily_profit_target']:
violations.append("Daily profit target reached - consider stopping")
# Check consecutive losses
if self.consecutive_losses >= self.config['max_consecutive_losses']:
violations.append("Maximum consecutive losses reached")
# Check concurrent trades
open_positions = self.api.get_open_positions()
if len(open_positions) >= self.config['max_concurrent_trades']:
violations.append("Maximum concurrent trades reached")
# Check asset exposure
asset_exposure = sum(pos['amount'] for pos in open_positions if pos['asset'] == asset)
balance = self.api.get_balance()['balance']
if (asset_exposure + amount) / balance > self.config['max_asset_exposure']:
violations.append(f"Asset exposure limit exceeded for {asset}")
return violations
def update_pnl(self, trade_result):
"""Update P&L tracking after trade completion"""
if trade_result['result'] == 'win':
profit = trade_result['payout']
self.daily_pnl += profit
self.consecutive_losses = 0
print(f"ā
WIN: +${profit:.2f} | Daily P&L: ${self.daily_pnl:.2f}")
else:
loss = trade_result['amount']
self.daily_pnl -= loss
self.consecutive_losses += 1
print(f"ā LOSS: -${loss:.2f} | Daily P&L: ${self.daily_pnl:.2f}")
self.daily_trades += 1
self.trade_history.append(trade_result)
# Check for recovery mode
current_balance = self.api.get_balance()['balance']
drawdown = (self.start_balance - current_balance) / self.start_balance
if drawdown >= self.config['recovery_mode_threshold']:
self.recovery_mode = True
print("ā ļø Recovery mode activated - reducing position sizes")
elif drawdown <= 0.05: # 5% profit
self.recovery_mode = False
def execute_safe_trade(self, asset, direction, duration, risk_level=1.0):
"""Execute trade with full risk management"""
print(f"\nšÆ Attempting to place trade: {asset} {direction.upper()}")
# Get current balance
balance = self.api.get_balance()
current_balance = balance['available']
# Calculate position size
amount = self.calculate_position_size(current_balance, risk_level)
# Check risk limits
violations = self.check_risk_limits(asset, amount)
if violations:
print("š« Trade blocked due to risk violations:")
for violation in violations:
print(f" - {violation}")
return False
# Execute the trade
try:
result = self.api.place_binary_order(
asset=asset,
amount=amount,
direction=direction,
duration=duration
)
if result['success']:
print(f"ā
Trade placed successfully!")
print(f"Amount: ${amount:.2f}")
print(f"Expected payout: {result['payout']}%")
print(f"Order ID: {result['order_id']}")
# Store trade for tracking
trade_record = {
'timestamp': datetime.now(),
'asset': asset,
'direction': direction,
'amount': amount,
'order_id': result['order_id'],
'expected_payout': result['payout']
}
return trade_record
else:
print(f"ā Trade failed: {result['error']}")
return False
except Exception as e:
print(f"ā Error executing trade: {e}")
return False
def generate_risk_report(self):
"""Generate comprehensive risk report"""
current_balance = self.api.get_balance()['balance']
report = {
'session_summary': {
'start_balance': self.start_balance,
'current_balance': current_balance,
'daily_pnl': self.daily_pnl,
'total_return': ((current_balance - self.start_balance) / self.start_balance) * 100,
'total_trades': self.daily_trades
},
'risk_metrics': {
'consecutive_losses': self.consecutive_losses,
'recovery_mode': self.recovery_mode,
'win_rate': self.calculate_win_rate(),
'max_drawdown': self.calculate_max_drawdown()
},
'limits_status': {
'daily_loss_used': abs(min(0, self.daily_pnl)),
'daily_loss_limit': self.config['max_daily_loss'],
'profit_target_reached': self.daily_pnl >= self.config['daily_profit_target']
}
}
print("\nš RISK MANAGEMENT REPORT")
print("=" * 50)
print(f"Starting Balance: ${report['session_summary']['start_balance']:.2f}")
print(f"Current Balance: ${report['session_summary']['current_balance']:.2f}")
print(f"Daily P&L: ${report['session_summary']['daily_pnl']:.2f}")
print(f"Total Return: {report['session_summary']['total_return']:.2f}%")
print(f"Total Trades: {report['session_summary']['total_trades']}")
print(f"Win Rate: {report['risk_metrics']['win_rate']:.1f}%")
print(f"Consecutive Losses: {report['risk_metrics']['consecutive_losses']}")
print(f"Recovery Mode: {'Active' if report['risk_metrics']['recovery_mode'] else 'Inactive'}")
print("=" * 50)
return report
def calculate_win_rate(self):
"""Calculate win rate from trade history"""
if not self.trade_history:
return 0.0
wins = sum(1 for trade in self.trade_history if trade.get('result') == 'win')
return (wins / len(self.trade_history)) * 100
def calculate_max_drawdown(self):
"""Calculate maximum drawdown during session"""
if not self.trade_history:
return 0.0
running_balance = self.start_balance
max_balance = self.start_balance
max_drawdown = 0.0
for trade in self.trade_history:
if trade.get('result') == 'win':
running_balance += trade.get('payout', 0)
else:
running_balance -= trade.get('amount', 0)
if running_balance > max_balance:
max_balance = running_balance
drawdown = (max_balance - running_balance) / max_balance
if drawdown > max_drawdown:
max_drawdown = drawdown
return max_drawdown * 100
# Example usage
if __name__ == "__main__":
# Initialize risk manager
risk_manager = RiskManager("your-email@example.com", "your-password")
risk_manager.initialize_session()
# Example trading with risk management
assets = ["EURUSD", "GBPUSD", "BTCUSD"]
for i in range(10): # Simulate 10 trades
asset = assets[i % len(assets)]
direction = "call" if i % 2 == 0 else "put"
# Execute trade with risk management
trade = risk_manager.execute_safe_trade(
asset=asset,
direction=direction,
duration=300, # 5 minutes
risk_level=0.8 # Slightly conservative
)
if trade:
# Simulate trade result (in real scenario, you'd get this from API)
import random
win = random.choice([True, False])
trade_result = {
'result': 'win' if win else 'loss',
'amount': trade['amount'],
'payout': trade['amount'] * (trade['expected_payout'] / 100) if win else 0
}
risk_manager.update_pnl(trade_result)
# Generate report every 5 trades
if (i + 1) % 5 == 0:
risk_manager.generate_risk_report()
# Final report
risk_manager.generate_risk_report()
Portfolio Performance Tracker
Track and analyze your trading performance with detailed metrics and visualizations.
Performance Analytics Dashboard
Comprehensive portfolio tracking with performance metrics, trade analysis, and reporting.
import json
import csv
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from olymptrade import OlympTrade
class PortfolioTracker:
def __init__(self, email, password):
self.api = OlympTrade(email, password)
self.trades = []
self.daily_balances = []
self.start_date = datetime.now()
def record_trade(self, trade_data):
"""Record a completed trade"""
trade_record = {
'timestamp': datetime.now(),
'asset': trade_data['asset'],
'direction': trade_data['direction'],
'amount': trade_data['amount'],
'duration': trade_data['duration'],
'entry_price': trade_data.get('entry_price', 0),
'exit_price': trade_data.get('exit_price', 0),
'result': trade_data['result'], # 'win' or 'loss'
'payout': trade_data.get('payout', 0),
'profit_loss': trade_data.get('profit_loss', 0)
}
self.trades.append(trade_record)
self.save_trades()
def record_daily_balance(self):
"""Record daily balance for performance tracking"""
balance = self.api.get_balance()
balance_record = {
'date': datetime.now().date(),
'balance': balance['balance'],
'available': balance['available'],
'demo': balance.get('demo', False)
}
self.daily_balances.append(balance_record)
self.save_balances()
def calculate_performance_metrics(self):
"""Calculate comprehensive performance metrics"""
if not self.trades:
return None
# Basic statistics
total_trades = len(self.trades)
winning_trades = [t for t in self.trades if t['result'] == 'win']
losing_trades = [t for t in self.trades if t['result'] == 'loss']
win_count = len(winning_trades)
loss_count = len(losing_trades)
win_rate = (win_count / total_trades) * 100 if total_trades > 0 else 0
# Profit/Loss calculations
total_profit = sum(t['profit_loss'] for t in winning_trades)
total_loss = sum(abs(t['profit_loss']) for t in losing_trades)
net_profit = total_profit - total_loss
# Average trade values
avg_win = total_profit / win_count if win_count > 0 else 0
avg_loss = total_loss / loss_count if loss_count > 0 else 0
# Risk metrics
profit_factor = total_profit / total_loss if total_loss > 0 else float('inf')
# Streak analysis
current_streak = self.calculate_current_streak()
max_win_streak = self.calculate_max_streak('win')
max_loss_streak = self.calculate_max_streak('loss')
# Asset analysis
asset_performance = self.analyze_asset_performance()
# Time-based analysis
hourly_performance = self.analyze_hourly_performance()
return {
'basic_stats': {
'total_trades': total_trades,
'winning_trades': win_count,
'losing_trades': loss_count,
'win_rate': win_rate
},
'financial_metrics': {
'total_profit': total_profit,
'total_loss': total_loss,
'net_profit': net_profit,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': profit_factor
},
'risk_metrics': {
'current_streak': current_streak,
'max_win_streak': max_win_streak,
'max_loss_streak': max_loss_streak
},
'analysis': {
'asset_performance': asset_performance,
'hourly_performance': hourly_performance
}
}
def calculate_current_streak(self):
"""Calculate current winning/losing streak"""
if not self.trades:
return {'type': 'none', 'count': 0}
recent_trades = sorted(self.trades, key=lambda x: x['timestamp'], reverse=True)
if not recent_trades:
return {'type': 'none', 'count': 0}
streak_type = recent_trades[0]['result']
streak_count = 1
for trade in recent_trades[1:]:
if trade['result'] == streak_type:
streak_count += 1
else:
break
return {'type': streak_type, 'count': streak_count}
def calculate_max_streak(self, result_type):
"""Calculate maximum streak of specified type"""
if not self.trades:
return 0
sorted_trades = sorted(self.trades, key=lambda x: x['timestamp'])
max_streak = 0
current_streak = 0
for trade in sorted_trades:
if trade['result'] == result_type:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
return max_streak
def analyze_asset_performance(self):
"""Analyze performance by asset"""
asset_stats = {}
for trade in self.trades:
asset = trade['asset']
if asset not in asset_stats:
asset_stats[asset] = {
'total_trades': 0,
'wins': 0,
'losses': 0,
'total_profit': 0,
'total_loss': 0
}
stats = asset_stats[asset]
stats['total_trades'] += 1
if trade['result'] == 'win':
stats['wins'] += 1
stats['total_profit'] += trade['profit_loss']
else:
stats['losses'] += 1
stats['total_loss'] += abs(trade['profit_loss'])
# Calculate win rates and profit factors
for asset, stats in asset_stats.items():
stats['win_rate'] = (stats['wins'] / stats['total_trades']) * 100
stats['net_profit'] = stats['total_profit'] - stats['total_loss']
stats['profit_factor'] = (stats['total_profit'] / stats['total_loss']
if stats['total_loss'] > 0 else float('inf'))
return asset_stats
def analyze_hourly_performance(self):
"""Analyze performance by hour of day"""
hourly_stats = {}
for trade in self.trades:
hour = trade['timestamp'].hour
if hour not in hourly_stats:
hourly_stats[hour] = {
'total_trades': 0,
'wins': 0,
'total_profit': 0
}
stats = hourly_stats[hour]
stats['total_trades'] += 1
if trade['result'] == 'win':
stats['wins'] += 1
stats['total_profit'] += trade['profit_loss']
# Calculate win rates
for hour, stats in hourly_stats.items():
stats['win_rate'] = (stats['wins'] / stats['total_trades']) * 100
return hourly_stats
def generate_performance_report(self):
"""Generate detailed performance report"""
metrics = self.calculate_performance_metrics()
if not metrics:
print("š No trading data available for analysis")
return
print("\n" + "="*60)
print("š PORTFOLIO PERFORMANCE REPORT")
print("="*60)
# Basic Statistics
basic = metrics['basic_stats']
print(f"\nš TRADING STATISTICS")
print(f"Total Trades: {basic['total_trades']}")
print(f"Winning Trades: {basic['winning_trades']}")
print(f"Losing Trades: {basic['losing_trades']}")
print(f"Win Rate: {basic['win_rate']:.1f}%")
# Financial Metrics
financial = metrics['financial_metrics']
print(f"\nš° FINANCIAL PERFORMANCE")
print(f"Total Profit: ${financial['total_profit']:.2f}")
print(f"Total Loss: ${financial['total_loss']:.2f}")
print(f"Net Profit: ${financial['net_profit']:.2f}")
print(f"Average Win: ${financial['avg_win']:.2f}")
print(f"Average Loss: ${financial['avg_loss']:.2f}")
print(f"Profit Factor: {financial['profit_factor']:.2f}")
# Risk Metrics
risk = metrics['risk_metrics']
current_streak = risk['current_streak']
print(f"\nā” STREAK ANALYSIS")
print(f"Current Streak: {current_streak['count']} {current_streak['type']}s")
print(f"Max Win Streak: {risk['max_win_streak']}")
print(f"Max Loss Streak: {risk['max_loss_streak']}")
# Asset Performance
print(f"\nšÆ ASSET PERFORMANCE")
asset_perf = metrics['analysis']['asset_performance']
for asset, stats in asset_perf.items():
print(f"{asset}:")
print(f" Trades: {stats['total_trades']}, Win Rate: {stats['win_rate']:.1f}%")
print(f" Net Profit: ${stats['net_profit']:.2f}, Profit Factor: {stats['profit_factor']:.2f}")
print("="*60)
def export_trades_to_csv(self, filename=None):
"""Export trade history to CSV file"""
if not filename:
filename = f"trades_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, 'w', newline='') as csvfile:
fieldnames = ['timestamp', 'asset', 'direction', 'amount', 'duration',
'entry_price', 'exit_price', 'result', 'payout', 'profit_loss']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for trade in self.trades:
writer.writerow(trade)
print(f"š Trades exported to {filename}")
def create_performance_chart(self):
"""Create performance visualization charts"""
if not self.daily_balances:
print("No balance data available for charting")
return
# Balance over time chart
dates = [b['date'] for b in self.daily_balances]
balances = [b['balance'] for b in self.daily_balances]
plt.figure(figsize=(12, 8))
# Subplot 1: Balance over time
plt.subplot(2, 2, 1)
plt.plot(dates, balances, 'b-', linewidth=2)
plt.title('Account Balance Over Time')
plt.xlabel('Date')
plt.ylabel('Balance ($)')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)
# Subplot 2: Win/Loss distribution
if self.trades:
plt.subplot(2, 2, 2)
wins = len([t for t in self.trades if t['result'] == 'win'])
losses = len([t for t in self.trades if t['result'] == 'loss'])
plt.pie([wins, losses], labels=['Wins', 'Losses'],
colors=['green', 'red'], autopct='%1.1f%%')
plt.title('Win/Loss Distribution')
# Subplot 3: Asset performance
if self.trades:
plt.subplot(2, 2, 3)
asset_profits = {}
for trade in self.trades:
asset = trade['asset']
if asset not in asset_profits:
asset_profits[asset] = 0
asset_profits[asset] += trade['profit_loss']
assets = list(asset_profits.keys())
profits = list(asset_profits.values())
colors = ['green' if p >= 0 else 'red' for p in profits]
plt.bar(assets, profits, color=colors, alpha=0.7)
plt.title('Profit/Loss by Asset')
plt.xlabel('Asset')
plt.ylabel('Profit/Loss ($)')
plt.xticks(rotation=45)
# Subplot 4: Hourly performance
if self.trades:
plt.subplot(2, 2, 4)
hourly_profits = {}
for trade in self.trades:
hour = trade['timestamp'].hour
if hour not in hourly_profits:
hourly_profits[hour] = 0
hourly_profits[hour] += trade['profit_loss']
hours = sorted(hourly_profits.keys())
profits = [hourly_profits[h] for h in hours]
plt.bar(hours, profits, alpha=0.7)
plt.title('Hourly Performance')
plt.xlabel('Hour of Day')
plt.ylabel('Profit/Loss ($)')
plt.xticks(hours)
plt.tight_layout()
plt.savefig(f"performance_chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.show()
print("š Performance chart saved and displayed")
def save_trades(self):
"""Save trades to JSON file"""
trades_data = []
for trade in self.trades:
trade_copy = trade.copy()
trade_copy['timestamp'] = trade_copy['timestamp'].isoformat()
trades_data.append(trade_copy)
with open('trades_history.json', 'w') as f:
json.dump(trades_data, f, indent=2)
def save_balances(self):
"""Save balance history to JSON file"""
balances_data = []
for balance in self.daily_balances:
balance_copy = balance.copy()
balance_copy['date'] = balance_copy['date'].isoformat()
balances_data.append(balance_copy)
with open('balance_history.json', 'w') as f:
json.dump(balances_data, f, indent=2)
# Example usage
if __name__ == "__main__":
tracker = PortfolioTracker("your-email@example.com", "your-password")
# Record daily balance
tracker.record_daily_balance()
# Example of recording trades (you would get this data from actual trading)
sample_trades = [
{
'asset': 'EURUSD',
'direction': 'call',
'amount': 25.0,
'duration': 300,
'entry_price': 1.1234,
'exit_price': 1.1245,
'result': 'win',
'payout': 45.0,
'profit_loss': 20.0
},
{
'asset': 'GBPUSD',
'direction': 'put',
'amount': 30.0,
'duration': 600,
'entry_price': 1.3456,
'exit_price': 1.3445,
'result': 'win',
'payout': 54.0,
'profit_loss': 24.0
},
{
'asset': 'BTCUSD',
'direction': 'call',
'amount': 50.0,
'duration': 900,
'entry_price': 45000,
'exit_price': 44800,
'result': 'loss',
'payout': 0,
'profit_loss': -50.0
}
]
# Record the sample trades
for trade in sample_trades:
tracker.record_trade(trade)
# Generate performance report
tracker.generate_performance_report()
# Export data
tracker.export_trades_to_csv()
# Create visualization
tracker.create_performance_chart()
Want More Advanced Examples?
Get access to professional trading bot implementations, custom strategies, and advanced automation tools