Examples‐and‐Tutorials - Black-Lights/planetscope-py GitHub Wiki
Complete Workflow Examples
This page provides real-world examples demonstrating the full capabilities of Phase 2, from basic scene discovery to advanced metadata analysis.
Example 1: Agricultural Monitoring Workflow
Monitor crop development across growing season with quality assessment.
from planetscope_py import PlanetScopeQuery, MetadataProcessor
from planetscope_py.utils import validate_geometry, calculate_area_km2
# Define agricultural region - Po Valley, Italy
po_valley = {
"type": "Polygon",
"coordinates": [[
[8.5, 45.0], # Southwest
[12.5, 45.0], # Southeast
[12.5, 46.0], # Northeast
[8.5, 46.0], # Northwest
[8.5, 45.0] # Close polygon
]]
}
# Validate and calculate area
validated_geom = validate_geometry(po_valley)
area_km2 = calculate_area_km2(validated_geom)
print(f"Monitoring area: {area_km2:.0f} km²")
# Initialize systems
query = PlanetScopeQuery()
processor = MetadataProcessor()
# Search for growing season imagery
print("Searching for growing season imagery...")
results = query.search_scenes(
geometry=po_valley,
start_date="2024-04-01", # Spring planting
end_date="2024-09-30", # Harvest season
cloud_cover_max=0.15, # 15% max cloud cover
sun_elevation_min=35, # Good illumination
item_types=["PSScene"]
)
print(f"Found {len(results['features'])} scenes")
# Comprehensive quality assessment
print("\nAnalyzing scene quality...")
assessment = processor.assess_coverage_quality(
scenes=results["features"],
target_geometry=po_valley
)
# Display results
print(f"Total scenes: {assessment['total_scenes']}")
print(f"Date range: {assessment['temporal_analysis']['date_range']['start']} to "
f"{assessment['temporal_analysis']['date_range']['end']}")
print(f"Temporal span: {assessment['temporal_analysis']['span_days']} days")
# Quality distribution
quality_dist = assessment['quality_analysis']['suitability_distribution']
print("\nQuality Distribution:")
for quality, count in quality_dist.items():
percentage = (count / assessment['total_scenes']) * 100
print(f" {quality.capitalize()}: {count} scenes ({percentage:.1f}%)")
# Seasonal coverage
seasonal = assessment['temporal_analysis']['seasonal_distribution']
print("\nSeasonal Coverage:")
for season, count in seasonal.items():
print(f" {season.capitalize()}: {count} scenes")
# Cloud cover statistics
cloud_stats = assessment['quality_analysis']['cloud_cover']
print(f"\nCloud Cover Statistics:")
print(f" Mean: {cloud_stats['mean']:.1%}")
print(f" Median: {cloud_stats['median']:.1%}")
print(f" Range: {cloud_stats['min']:.1%} - {cloud_stats['max']:.1%}")
# Recommendations
print("\nRecommendations:")
for rec in assessment['recommendations']:
print(f" • {rec}")
# Filter for best quality scenes
print("\nFiltering for optimal scenes...")
optimal_scenes, stats = processor.filter_by_metadata_criteria(
scenes=results['features'],
criteria={
"max_cloud_cover": 0.05, # Excellent cloud cover
"min_sun_elevation": 45.0, # Optimal sun angle
"min_overall_quality": 0.9 # High overall quality
}
)
print(f"Optimal scenes: {stats['filtered_count']} ({stats['retention_rate']:.1%})")
print("Rejection reasons:", dict(stats['rejection_reasons']))
Example 2: Urban Development Tracking
Track urban expansion around Milan with monthly temporal analysis.
from datetime import datetime, timedelta
import calendar
# Milan metropolitan area
milan_metro = {
"type": "Polygon",
"coordinates": [[
[8.8, 45.3], # Southwest
[9.5, 45.3], # Southeast
[9.5, 45.7], # Northeast
[8.8, 45.7], # Northwest
[8.8, 45.3] # Close polygon
]]
}
query = PlanetScopeQuery()
processor = MetadataProcessor()
print("Tracking urban development around Milan...")
print(f"Study area: {calculate_area_km2(milan_metro):.0f} km²")
# Analyze monthly coverage for entire year
monthly_analysis = {}
total_scenes = 0
for month in range(1, 13):
# Calculate month boundaries
year = 2024
start_date = f"{year}-{month:02d}-01"
# Get last day of month
last_day = calendar.monthrange(year, month)[1]
end_date = f"{year}-{month:02d}-{last_day:02d}"
print(f"\nAnalyzing {calendar.month_name[month]} {year}...")
# Search for scenes in this month
monthly_results = query.search_scenes(
geometry=milan_metro,
start_date=start_date,
end_date=end_date,
cloud_cover_max=0.25,
item_types=["PSScene"]
)
scene_count = len(monthly_results['features'])
total_scenes += scene_count
if scene_count > 0:
# Analyze monthly scenes
monthly_assessment = processor.assess_coverage_quality(
scenes=monthly_results['features'],
target_geometry=milan_metro
)
quality_dist = monthly_assessment['quality_analysis']['suitability_distribution']
excellent_count = quality_dist.get('excellent', 0)
good_count = quality_dist.get('good', 0)
monthly_analysis[month] = {
'month_name': calendar.month_name[month],
'total_scenes': scene_count,
'excellent_scenes': excellent_count,
'good_scenes': good_count,
'cloud_cover_mean': monthly_assessment['quality_analysis']['cloud_cover']['mean'],
'recommendations': monthly_assessment['recommendations']
}
print(f" Scenes found: {scene_count}")
print(f" Excellent quality: {excellent_count}")
print(f" Good quality: {good_count}")
print(f" Average cloud cover: {monthly_analysis[month]['cloud_cover_mean']:.1%}")
else:
monthly_analysis[month] = {
'month_name': calendar.month_name[month],
'total_scenes': 0,
'excellent_scenes': 0,
'good_scenes': 0,
'cloud_cover_mean': None,
'recommendations': ['No scenes available for this month']
}
print(f" No scenes found")
# Summary analysis
print(f"\n{'='*50}")
print("ANNUAL SUMMARY")
print(f"{'='*50}")
print(f"Total scenes analyzed: {total_scenes}")
# Best months for imagery
best_months = sorted(
[(month, data['excellent_scenes'] + data['good_scenes'])
for month, data in monthly_analysis.items()],
key=lambda x: x[1], reverse=True
)
print("\nBest months for high-quality imagery:")
for month_num, quality_count in best_months[:6]:
month_name = monthly_analysis[month_num]['month_name']
total = monthly_analysis[month_num]['total_scenes']
print(f" {month_name}: {quality_count} quality scenes / {total} total")
# Cloud cover trends
print("\nMonthly cloud cover trends:")
for month in range(1, 13):
data = monthly_analysis[month]
if data['cloud_cover_mean'] is not None:
print(f" {data['month_name']}: {data['cloud_cover_mean']:.1%}")
else:
print(f" {data['month_name']}: No data")
Example 3: Multi-Region Quality Comparison
Compare imagery availability and quality across multiple European cities.
# Define multiple cities for comparison
european_cities = {
"Milan": {"type": "Point", "coordinates": [9.19, 45.46]},
"Rome": {"type": "Point", "coordinates": [12.49, 41.90]},
"Florence": {"type": "Point", "coordinates": [11.26, 43.77]},
"Venice": {"type": "Point", "coordinates": [12.34, 45.44]},
"Naples": {"type": "Point", "coordinates": [14.25, 40.83]},
"Turin": {"type": "Point", "coordinates": [7.69, 45.07]}
}
query = PlanetScopeQuery()
processor = MetadataProcessor()
print("Multi-city imagery quality comparison")
print("Study period: Summer 2024 (June-August)")
print(f"Cities analyzed: {len(european_cities)}")
# Analyze each city
city_results = {}
for city_name, geometry in european_cities.items():
print(f"\nAnalyzing {city_name}...")
# Search for summer imagery
results = query.search_scenes(
geometry=geometry,
start_date="2024-06-01",
end_date="2024-08-31",
cloud_cover_max=0.30,
sun_elevation_min=30,
item_types=["PSScene"]
)
scene_count = len(results['features'])
print(f" Found {scene_count} scenes")
if scene_count > 0:
# Quality assessment
assessment = processor.assess_coverage_quality(
scenes=results['features'],
target_geometry=geometry
)
# Extract key metrics
quality_dist = assessment['quality_analysis']['suitability_distribution']
cloud_stats = assessment['quality_analysis']['cloud_cover']
sun_stats = assessment['quality_analysis']['sun_elevation']
overall_quality = assessment['quality_analysis']['overall_quality']
city_results[city_name] = {
'total_scenes': scene_count,
'excellent_scenes': quality_dist.get('excellent', 0),
'good_scenes': quality_dist.get('good', 0),
'fair_scenes': quality_dist.get('fair', 0),
'poor_scenes': quality_dist.get('poor', 0),
'avg_cloud_cover': cloud_stats['mean'],
'min_cloud_cover': cloud_stats['min'],
'avg_sun_elevation': sun_stats['mean'],
'overall_quality_mean': overall_quality['mean'],
'temporal_span': assessment['temporal_analysis']['span_days']
}
print(f" Quality distribution: {quality_dist}")
print(f" Average cloud cover: {cloud_stats['mean']:.1%}")
print(f" Average sun elevation: {sun_stats['mean']:.1f}°")
else:
city_results[city_name] = None
# Comparative analysis
print(f"\n{'='*60}")
print("COMPARATIVE ANALYSIS")
print(f"{'='*60}")
# Rank cities by total high-quality scenes
ranked_cities = []
for city, data in city_results.items():
if data:
high_quality = data['excellent_scenes'] + data['good_scenes']
ranked_cities.append((city, high_quality, data['total_scenes']))
ranked_cities.sort(key=lambda x: x[1], reverse=True)
print("\nRanking by high-quality scenes available:")
for i, (city, high_quality, total) in enumerate(ranked_cities, 1):
percentage = (high_quality / total) * 100 if total > 0 else 0
print(f"{i:2d}. {city:10s}: {high_quality:3d} quality / {total:3d} total ({percentage:5.1f}%)")
# Best overall quality
print("\nAverage overall quality scores:")
quality_ranking = [
(city, data['overall_quality_mean'])
for city, data in city_results.items() if data
]
quality_ranking.sort(key=lambda x: x[1], reverse=True)
for city, quality in quality_ranking:
print(f" {city:10s}: {quality:.3f}")
# Cloud cover comparison
print("\nAverage cloud cover (lower is better):")
cloud_ranking = [
(city, data['avg_cloud_cover'])
for city, data in city_results.items() if data
]
cloud_ranking.sort(key=lambda x: x[1])
for city, cloud_cover in cloud_ranking:
print(f" {city:10s}: {cloud_cover:.1%}")
# Best city for summer imagery
best_city = ranked_cities[0][0] if ranked_cities else "None"
print(f"\nBest city for summer 2024 imagery: {best_city}")
if best_city != "None":
best_data = city_results[best_city]
print(f" Total scenes: {best_data['total_scenes']}")
print(f" Excellent scenes: {best_data['excellent_scenes']}")
print(f" Average cloud cover: {best_data['avg_cloud_cover']:.1%}")
print(f" Overall quality: {best_data['overall_quality_mean']:.3f}")
Example 4: Batch Processing with Error Handling
Process multiple regions with comprehensive error handling and progress tracking.
import time
import logging
from planetscope_py.exceptions import APIError, RateLimitError, ValidationError
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_batch_processing(geometries, start_date, end_date):
"""Process multiple geometries with error handling and progress tracking."""
query = PlanetScopeQuery()
processor = MetadataProcessor()
results = []
errors = []
processed = 0
print(f"Starting batch processing of {len(geometries)} geometries")
print(f"Date range: {start_date} to {end_date}")
for i, geometry in enumerate(geometries):
geometry_id = f"geom_{i+1:03d}"
try:
print(f"\nProcessing {geometry_id} ({i+1}/{len(geometries)})...")
# Validate geometry first
validated_geom = validate_geometry(geometry)
area_km2 = calculate_area_km2(validated_geom)
# Skip if area too large
if area_km2 > 50000: # 50,000 km² limit
logger.warning(f"{geometry_id}: Area too large ({area_km2:.0f} km²), skipping")
errors.append({
'geometry_id': geometry_id,
'error_type': 'area_too_large',
'error_message': f'Area {area_km2:.0f} km² exceeds 50,000 km² limit'
})
continue
# Search for scenes
search_results = query.search_scenes(
geometry=validated_geom,
start_date=start_date,
end_date=end_date,
cloud_cover_max=0.25,
item_types=["PSScene"]
)
scene_count = len(search_results['features'])
print(f" Found {scene_count} scenes (area: {area_km2:.1f} km²)")
if scene_count > 0:
# Process metadata
assessment = processor.assess_coverage_quality(
scenes=search_results['features'],
target_geometry=validated_geom
)
# Extract summary
quality_dist = assessment['quality_analysis']['suitability_distribution']
cloud_stats = assessment['quality_analysis']['cloud_cover']
result_summary = {
'geometry_id': geometry_id,
'area_km2': area_km2,
'total_scenes': scene_count,
'quality_distribution': quality_dist,
'avg_cloud_cover': cloud_stats['mean'],
'recommendations': assessment['recommendations'][:2], # Top 2 recommendations
'processed_at': time.strftime('%Y-%m-%d %H:%M:%S')
}
results.append(result_summary)
processed += 1
print(f" Quality: {quality_dist}")
print(f" Cloud cover: {cloud_stats['mean']:.1%}")
else:
print(f" No scenes found")
results.append({
'geometry_id': geometry_id,
'area_km2': area_km2,
'total_scenes': 0,
'message': 'No scenes found',
'processed_at': time.strftime('%Y-%m-%d %H:%M:%S')
})
processed += 1
except RateLimitError as e:
logger.warning(f"{geometry_id}: Rate limited, waiting {e.retry_after}s")
time.sleep(e.retry_after)
# Retry the same geometry (don't increment i)
continue
except ValidationError as e:
logger.error(f"{geometry_id}: Validation error - {e.message}")
errors.append({
'geometry_id': geometry_id,
'error_type': 'validation_error',
'error_message': e.message
})
except APIError as e:
logger.error(f"{geometry_id}: API error - {e.message}")
errors.append({
'geometry_id': geometry_id,
'error_type': 'api_error',
'error_message': e.message,
'status_code': getattr(e, 'status_code', None)
})
except Exception as e:
logger.error(f"{geometry_id}: Unexpected error - {str(e)}")
errors.append({
'geometry_id': geometry_id,
'error_type': 'unexpected_error',
'error_message': str(e)
})
# Progress update every 10 geometries
if (i + 1) % 10 == 0:
success_rate = (processed / (i + 1)) * 100
print(f"\nProgress: {i+1}/{len(geometries)} processed ({success_rate:.1f}% success rate)")
print(f"Rate limiter status: {query.rate_limiter.get_current_rate_status()}")
# Final summary
print(f"\n{'='*50}")
print("BATCH PROCESSING SUMMARY")
print(f"{'='*50}")
print(f"Total geometries: {len(geometries)}")
print(f"Successfully processed: {processed}")
print(f"Errors: {len(errors)}")
print(f"Success rate: {(processed / len(geometries)) * 100:.1f}%")
if errors:
print(f"\nError breakdown:")
error_types = {}
for error in errors:
error_type = error['error_type']
error_types[error_type] = error_types.get(error_type, 0) + 1
for error_type, count in error_types.items():
print(f" {error_type}: {count}")
return results, errors
# Example usage with multiple test geometries
test_geometries = [
{"type": "Point", "coordinates": [9.19, 45.46]}, # Milan
{"type": "Point", "coordinates": [11.26, 43.77]}, # Florence
{"type": "Point", "coordinates": [12.49, 41.90]}, # Rome
{"type": "Point", "coordinates": [7.69, 45.07]}, # Turin
{"type": "Point", "coordinates": [14.25, 40.83]}, # Naples
]
# Run batch processing
batch_results, batch_errors = robust_batch_processing(
geometries=test_geometries,
start_date="2024-07-01",
end_date="2024-07-31"
)
# Analyze batch results
if batch_results:
print(f"\nTop performing regions:")
sorted_results = sorted(
[r for r in batch_results if r.get('total_scenes', 0) > 0],
key=lambda x: x.get('total_scenes', 0),
reverse=True
)
for result in sorted_results[:3]:
print(f" {result['geometry_id']}: {result['total_scenes']} scenes, "
f"{result['avg_cloud_cover']:.1%} avg cloud cover")
Example 5: Advanced Quality Filtering Pipeline
Create a sophisticated filtering pipeline for high-precision applications.
def advanced_quality_pipeline(search_results, target_geometry, application_type="precision_agriculture"):
"""
Advanced quality filtering pipeline for different application types.
Args:
search_results: Results from PlanetScopeQuery.search_scenes()
target_geometry: Target area geometry
application_type: Type of application requiring specific quality standards
"""
processor = MetadataProcessor()
# Define quality standards for different applications
quality_standards = {
"precision_agriculture": {
"max_cloud_cover": 0.05, # 5% max cloud cover
"min_sun_elevation": 40.0, # Good illumination
"min_overall_quality": 0.9, # High overall quality
"required_seasons": ["spring", "summer", "autumn"],
"exclude_conditions": ["poor"]
},
"urban_planning": {
"max_cloud_cover": 0.10, # 10% max cloud cover
"min_sun_elevation": 35.0, # Moderate illumination
"min_overall_quality": 0.8, # Good overall quality
"required_seasons": ["spring", "summer"],
"exclude_conditions": ["poor"]
},
"environmental_monitoring": {
"max_cloud_cover": 0.15, # 15% max cloud cover
"min_sun_elevation": 30.0, # Lower illumination acceptable
"min_overall_quality": 0.7, # Moderate overall quality
"required_seasons": ["spring", "summer", "autumn", "winter"],
"exclude_conditions": []
},
"research": {
"max_cloud_cover": 0.20, # 20% max cloud cover
"min_sun_elevation": 25.0, # Lower illumination acceptable
"min_overall_quality": 0.6, # Lower quality acceptable
"required_seasons": ["spring", "summer", "autumn", "winter"],
"exclude_conditions": []
}
}
if application_type not in quality_standards:
raise ValueError(f"Unknown application type: {application_type}")
standards = quality_standards[application_type]
print(f"Advanced Quality Pipeline: {application_type}")
print(f"Quality standards: {standards}")
# Step 1: Initial assessment
print("\nStep 1: Initial quality assessment...")
initial_assessment = processor.assess_coverage_quality(
scenes=search_results["features"],
target_geometry=target_geometry
)
initial_count = initial_assessment['total_scenes']
initial_quality = initial_assessment['quality_analysis']['suitability_distribution']
print(f"Initial scenes: {initial_count}")
print(f"Initial quality distribution: {initial_quality}")
# Step 2: Apply basic quality filters
print("\nStep 2: Applying basic quality filters...")
basic_criteria = {
"max_cloud_cover": standards["max_cloud_cover"],
"min_sun_elevation": standards["min_sun_elevation"],
"min_overall_quality": standards["min_overall_quality"]
}
filtered_scenes, filter_stats = processor.filter_by_metadata_criteria(
scenes=search_results["features"],
criteria=basic_criteria
)
print(f"After basic filtering: {filter_stats['filtered_count']} scenes "
f"({filter_stats['retention_rate']:.1%} retention)")
print(f"Rejection reasons: {dict(filter_stats['rejection_reasons'])}")
# Step 3: Advanced filtering by season and conditions
print("\nStep 3: Advanced filtering...")
advanced_filtered = []
for scene in filtered_scenes:
try:
metadata = processor.extract_scene_metadata(scene)
# Season filter
scene_season = metadata.get('season', 'unknown')
if scene_season not in standards["required_seasons"]:
continue
# Solar conditions filter
solar_conditions = metadata.get('solar_conditions', 'unknown')
if solar_conditions in standards["exclude_conditions"]:
continue
# Suitability filter
suitability = metadata.get('suitability', 'unknown')
if suitability in standards["exclude_conditions"]:
continue
advanced_filtered.append(scene)
except Exception as e:
print(f"Warning: Could not process scene {scene.get('id', 'unknown')}: {e}")
continue
print(f"After advanced filtering: {len(advanced_filtered)} scenes")
# Step 4: Temporal optimization
print("\nStep 4: Temporal optimization...")
if len(advanced_filtered) > 50: # If too many scenes, optimize temporally
# Group by month and select best from each
monthly_scenes = {}
for scene in advanced_filtered:
try:
metadata = processor.extract_scene_metadata(scene)
month = metadata.get('month', 0)
if month not in monthly_scenes:
monthly_scenes[month] = []
monthly_scenes[month].append((scene, metadata))
except Exception:
continue
# Select best 3 scenes per month based on overall quality
optimized_scenes = []
for month, month_scenes in monthly_scenes.items():
# Sort by overall quality
month_scenes.sort(key=lambda x: x[1].get('overall_quality', 0), reverse=True)
# Take top 3 from each month
selected = month_scenes[:3]
optimized_scenes.extend([scene for scene, metadata in selected])
print(f" Month {month}: {len(month_scenes)} -> {len(selected)} scenes")
final_scenes = optimized_scenes
else:
final_scenes = advanced_filtered
print(f"Final optimized scenes: {len(final_scenes)}")
# Step 5: Final assessment
print("\nStep 5: Final quality assessment...")
if final_scenes:
final_assessment = processor.assess_coverage_quality(
scenes=final_scenes,
target_geometry=target_geometry
)
final_quality = final_assessment['quality_analysis']['suitability_distribution']
final_cloud = final_assessment['quality_analysis']['cloud_cover']
print(f"Final quality distribution: {final_quality}")
print(f"Final cloud cover - mean: {final_cloud['mean']:.1%}, "
f"max: {final_cloud['max']:.1%}")
print(f"Recommendations: {final_assessment['recommendations']}")
# Quality improvement metrics
improvement = {
'scene_reduction': ((initial_count - len(final_scenes)) / initial_count) * 100,
'quality_improvement': len(final_scenes) / initial_count if initial_count > 0 else 0
}
print(f"\nPipeline effectiveness:")
print(f" Scene reduction: {improvement['scene_reduction']:.1f}%")
print(f" Quality retention: {improvement['quality_improvement']:.1%}")
return final_scenes, final_assessment, improvement
else:
print("No scenes meet the quality standards!")
return [], None, None
# Example usage for precision agriculture
milan_agriculture = {
"type": "Polygon",
"coordinates": [[
[9.0, 45.3], [9.4, 45.3], [9.4, 45.6], [9.0, 45.6], [9.0, 45.3]
]]
}
# Search for scenes
query = PlanetScopeQuery()
search_results = query.search_scenes(
geometry=milan_agriculture,
start_date="2024-04-01",
end_date="2024-09-30",
cloud_cover_max=0.30, # Initial broader search
item_types=["PSScene"]
)
print(f"Initial search: {len(search_results['features'])} scenes")
# Apply advanced quality pipeline
final_scenes, final_assessment, improvement = advanced_quality_pipeline(
search_results=search_results,
target_geometry=milan_agriculture,
application_type="precision_agriculture"
)
if final_scenes:
print(f"\nFinal result: {len(final_scenes)} high-quality scenes selected")
print("Ready for precision agriculture analysis!")
else:
print("\nNo scenes meet precision agriculture standards")
print("Consider relaxing criteria or expanding date range")
Example 6: Performance Monitoring Dashboard
Create a monitoring system for API performance and rate limiting.
import time
import threading
from datetime import datetime, timedelta
class PlanetScopeMonitor:
"""Real-time monitoring system for PlanetScope-py performance."""
def __init__(self):
self.query = PlanetScopeQuery()
self.rate_limiter = self.query.rate_limiter
self.monitoring = False
self.stats_history = []
def start_monitoring(self, interval=60):
"""Start monitoring with specified interval in seconds."""
self.monitoring = True
def monitor_loop():
while self.monitoring:
try:
stats = self.collect_stats()
self.stats_history.append(stats)
# Keep only last 24 hours of data
cutoff_time = datetime.now() - timedelta(hours=24)
self.stats_history = [
s for s in self.stats_history
if s['timestamp'] > cutoff_time
]
self.print_dashboard()
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(interval)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
print(f"Monitoring started (interval: {interval}s)")
def stop_monitoring(self):
"""Stop monitoring."""
self.monitoring = False
print("Monitoring stopped")
def collect_stats(self):
"""Collect current performance statistics."""
current_status = self.rate_limiter.get_current_rate_status()
performance_metrics = self.rate_limiter.get_performance_metrics()
return {
'timestamp': datetime.now(),
'rate_status': current_status,
'performance': performance_metrics,
'circuit_breaker_state': getattr(
self.query.session, 'circuit_breaker', None
).state if hasattr(self.query, 'session') else 'UNKNOWN'
}
def print_dashboard(self):
"""Print monitoring dashboard."""
if not self.stats_history:
return
latest = self.stats_history[-1]
# Clear screen (works on most terminals)
print("\033[2J\033[H")
print("=" * 60)
print(f"PLANETSCOPE-PY PERFORMANCE DASHBOARD")
print(f"Last updated: {latest['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# Rate limiting status
print("\nRATE LIMITING STATUS:")
for endpoint, status in latest['rate_status'].items():
capacity_bar = self.create_bar(status['capacity_used'], width=20)
print(f" {endpoint:8s}: {capacity_bar} "
f"{status['capacity_used']:6.1%} "
f"({status['current_rate']:4.1f}/{status['limit']:4.1f} req/min)")
# Performance metrics
perf = latest['performance']
print(f"\nPERFORMANCE METRICS:")
print(f" Total requests: {perf['total_requests']}")
print(f" Avg response time: {perf['average_response_time']:.2f}s")
if 'endpoint_metrics' in perf:
print("\n Endpoint performance:")
for endpoint, metrics in perf['endpoint_metrics'].items():
print(f" {endpoint:8s}: {metrics['request_count']:4d} requests, "
f"{metrics['average_response_time']:5.2f}s avg")
# Circuit breaker status
cb_state = latest['circuit_breaker_state']
cb_indicator = "🟢" if cb_state == "CLOSED" else "🔴" if cb_state == "OPEN" else "🟡"
print(f"\nCIRCUIT BREAKER: {cb_indicator} {cb_state}")
# Historical trends (if enough data)
if len(self.stats_history) >= 2:
print(f"\nTRENDS (last {len(self.stats_history)} samples):")
# Response time trend
response_times = [s['performance']['average_response_time'] for s in self.stats_history[-10:]]
if response_times:
trend = "📈" if response_times[-1] > response_times[0] else "📉"
print(f" Response time: {trend} {response_times[-1]:.2f}s "
f"(was {response_times[0]:.2f}s)")
# Request volume trend
request_counts = [s['performance']['total_requests'] for s in self.stats_history[-5:]]
if len(request_counts) >= 2:
recent_requests = request_counts[-1] - request_counts[-2]
print(f" Recent activity: {recent_requests} requests in last interval")
print("\nPress Ctrl+C to stop monitoring")
def create_bar(self, percentage, width=20):
"""Create a text-based progress bar."""
filled = int(percentage * width)
bar = "█" * filled + "░" * (width - filled)
return f"[{bar}]"
def get_alerts(self):
"""Get current system alerts."""
alerts = []
if not self.stats_history:
return alerts
latest = self.stats_history[-1]
# Rate limiting alerts
for endpoint, status in latest['rate_status'].items():
if status['capacity_used'] > 0.9:
alerts.append(f"HIGH: {endpoint} endpoint at {status['capacity_used']:.1%} capacity")
elif status['capacity_used'] > 0.8:
alerts.append(f"MEDIUM: {endpoint} endpoint at {status['capacity_used']:.1%} capacity")
# Performance alerts
avg_response_time = latest['performance']['average_response_time']
if avg_response_time > 10.0:
alerts.append(f"HIGH: Slow response times ({avg_response_time:.1f}s)")
elif avg_response_time > 5.0:
alerts.append(f"MEDIUM: Elevated response times ({avg_response_time:.1f}s)")
# Circuit breaker alerts
if latest['circuit_breaker_state'] == 'OPEN':
alerts.append("CRITICAL: Circuit breaker is OPEN")
elif latest['circuit_breaker_state'] == 'HALF_OPEN':
alerts.append("MEDIUM: Circuit breaker is testing recovery")
return alerts
# Example usage
if __name__ == "__main__":
# Create and start monitor
monitor = PlanetScopeMonitor()
try:
# Start monitoring with 30-second intervals
monitor.start_monitoring(interval=30)
# Simulate some API activity
query = PlanetScopeQuery()
test_geometry = {"type": "Point", "coordinates": [9.19, 45.46]}
for i in range(5):
print(f"\nMaking test API call {i+1}/5...")
results = query.search_scenes(
geometry=test_geometry,
start_date="2024-01-01",
end_date="2024-01-31",
cloud_cover_max=0.2
)
print(f"Found {len(results['features'])} scenes")
# Check for alerts
alerts = monitor.get_alerts()
if alerts:
print("\nALERTS:")
for alert in alerts:
print(f" {alert}")
time.sleep(10) # Wait between calls
# Keep monitoring running
print("\nTest calls complete. Monitoring continues...")
print("Press Ctrl+C to stop...")
while True:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop_monitoring()
print("\nMonitoring stopped by user")
These examples demonstrate the full power of Phase 2, from basic scene discovery to advanced quality analysis and performance monitoring. Each example can be adapted for specific use cases and requirements.
Summary
Phase 2 provides a complete foundation for satellite imagery workflows:
- Scene Discovery: Intelligent search with advanced filtering
- Metadata Analysis: Comprehensive quality assessment
- Batch Processing: Efficient multi-region processing
- Error Handling: Robust error recovery and reporting
- Performance Monitoring: Real-time system monitoring
- Quality Pipelines: Sophisticated filtering for specific applications
These capabilities enable production-ready satellite imagery analysis workflows with Planet's vast data archive.