Complete Analysis Workflows - Black-Lights/planetscope-py GitHub Wiki
Complete Analysis Workflows
End-to-end workflows combining all PlanetScope-py capabilities
This guide demonstrates complete analysis workflows that integrate scene discovery, spatial analysis, temporal analysis, asset management, and data export into comprehensive research and operational workflows.
Overview
Complete workflows in PlanetScope-py combine all major components:
- Scene Discovery: Finding and filtering PlanetScope scenes
- Spatial Analysis: Density calculations and coverage assessment
- Temporal Analysis: Pattern detection and gap analysis
- Asset Management: Quota monitoring and downloads
- Data Export: Professional GeoPackage creation with imagery
Basic Complete Workflow
Standard Research Workflow
from planetscope_py import (
PlanetScopeQuery, SpatialDensityEngine, TemporalAnalyzer,
AssetManager, GeoPackageManager, TemporalConfig, TemporalResolution
)
from shapely.geometry import box
import asyncio
async def standard_research_workflow(roi, date_range, output_dir="analysis_results"):
"""
Standard research workflow for environmental monitoring.
Args:
roi: Region of interest (Shapely geometry)
date_range: Tuple of (start_date, end_date) strings
output_dir: Directory for outputs
"""
print("🚀 Starting Standard Research Workflow")
print("=" * 50)
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Step 1: Scene Discovery
print("\n📡 Step 1: Scene Discovery")
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
sun_elevation_min=30,
item_types=["PSScene"]
)
scenes = results['features']
print(f" Found {len(scenes)} scenes")
if len(scenes) == 0:
print(" ❌ No scenes found - try expanding date range or ROI")
return None
# Step 2: Spatial Analysis
print("\n📊 Step 2: Spatial Density Analysis")
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scene_footprints=scenes,
roi_geometry=roi
)
print(f" Method used: {spatial_result.method_used.value}")
print(f" Grid size: {spatial_result.grid_info['width']}×{spatial_result.grid_info['height']}")
print(f" Density range: {spatial_result.stats['min']}-{spatial_result.stats['max']} scenes/cell")
# Export spatial analysis
spatial_output = output_path / "spatial_density.tif"
spatial_result.export_geotiff(str(spatial_output))
print(f" Exported: {spatial_output}")
# Step 3: Temporal Analysis
print("\n⏱️ Step 3: Temporal Pattern Analysis")
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
spatial_resolution=1000.0, # 1km for efficiency
enable_gap_analysis=True
)
temporal_analyzer = TemporalAnalyzer(temporal_config)
# Create data cube
datacube = temporal_analyzer.create_spatiotemporal_datacube(
scenes=scenes,
roi=roi,
start_date=datetime.fromisoformat(date_range[0]),
end_date=datetime.fromisoformat(date_range[1])
)
# Analyze patterns
patterns = temporal_analyzer.analyze_acquisition_patterns(datacube)
gaps = temporal_analyzer.detect_temporal_gaps(datacube)
recommendations = temporal_analyzer.generate_acquisition_recommendations(datacube)
print(f" Seasonal patterns: {len(patterns['seasonal_patterns'])}")
print(f" Temporal gaps: {len(gaps)}")
print(f" Data availability: {patterns['frequency_stats']['data_availability_percentage']:.1%}")
# Step 4: Asset Management (with user confirmation)
print("\n💾 Step 4: Asset Management")
asset_manager = AssetManager(query.auth)
# Check quota
quota_info = await asset_manager.get_quota_info()
print(f" Current quota usage: {quota_info.usage_percentage:.1%}")
print(f" Remaining area: {quota_info.remaining_area_km2:.1f} km²")
# Download high-quality subset
high_quality_scenes = [
scene for scene in scenes
if scene['properties'].get('cloud_cover', 1.0) < 0.15
and scene['properties'].get('sun_elevation', 0) > 40
][:20] # Limit to 20 best scenes
downloads = None
if high_quality_scenes and quota_info.remaining_area_km2 > 100:
print(f" Downloading {len(high_quality_scenes)} high-quality scenes...")
downloads = await asset_manager.activate_and_download_assets(
scenes=high_quality_scenes,
asset_types=["ortho_analytic_4b"],
output_dir=str(output_path / "imagery"),
clip_to_roi=roi,
confirm_download=True
)
successful_downloads = [d for d in downloads if d.status.value == "completed"]
print(f" Downloaded: {len(successful_downloads)}/{len(downloads)} assets")
else:
print(" Skipping downloads (insufficient quota or no suitable scenes)")
# Step 5: Comprehensive Data Export
print("\n📦 Step 5: Data Export")
# Configure GeoPackage with imagery if available
geopackage_config = GeoPackageConfig(
include_imagery=downloads is not None,
clip_to_roi=True,
attribute_schema="comprehensive"
)
geopackage_manager = GeoPackageManager(config=geopackage_config)
# Get downloaded file paths
downloaded_files = []
if downloads:
downloaded_files = [
d.downloaded_file_path for d in downloads
if hasattr(d, 'downloaded_file_path') and d.downloaded_file_path
]
# Create comprehensive GeoPackage
geopackage_path = output_path / "complete_analysis.gpkg"
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path=str(geopackage_path),
roi=roi,
downloaded_files=downloaded_files if downloaded_files else None
)
print(f" Created GeoPackage: {geopackage_path}")
print(f" Vector features: {layer_info.feature_count}")
if downloaded_files:
print(f" Raster layers: {len(downloaded_files)} imagery files")
# Generate summary report
report = {
'workflow': 'standard_research',
'completion_time': datetime.now().isoformat(),
'roi_area_km2': calculate_area_km2(roi),
'date_range': date_range,
'results': {
'total_scenes': len(scenes),
'high_quality_scenes': len(high_quality_scenes),
'downloaded_assets': len(downloaded_files) if downloaded_files else 0,
'spatial_analysis': {
'method': spatial_result.method_used.value,
'density_stats': spatial_result.stats
},
'temporal_analysis': {
'seasonal_patterns': len(patterns['seasonal_patterns']),
'temporal_gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
},
'quota_usage': {
'remaining_km2': quota_info.remaining_area_km2,
'usage_percentage': quota_info.usage_percentage
}
},
'outputs': {
'geopackage': str(geopackage_path),
'spatial_density': str(spatial_output),
'imagery_directory': str(output_path / "imagery") if downloads else None
}
}
# Save report
report_path = output_path / "workflow_report.json"
with open(report_path, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\n✅ Workflow Complete!")
print(f" Report saved: {report_path}")
print(f" Total processing time: {datetime.now() - workflow_start}")
return report
# Example usage
roi = box(9.04, 45.40, 9.28, 45.52) # Milan, Italy
date_range = ("2024-01-01", "2024-06-30")
# Run workflow
# workflow_start = datetime.now()
# result = await standard_research_workflow(roi, date_range)
Specialized Workflows
Environmental Monitoring Workflow
async def environmental_monitoring_workflow(roi, monitoring_period_months=12):
"""
Specialized workflow for environmental monitoring with temporal emphasis.
Focuses on seasonal patterns, change detection, and long-term trends.
"""
print("🌍 Environmental Monitoring Workflow")
print("=" * 40)
# Calculate date range for monitoring period
end_date = datetime.now()
start_date = end_date - timedelta(days=monitoring_period_months * 30)
# Step 1: Extended Scene Discovery
query = PlanetScopeQuery()
# Search with relaxed cloud cover for temporal completeness
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=0.5, # More permissive for temporal coverage
item_types=["PSScene"]
)
scenes = results['features']
print(f"📡 Found {len(scenes)} scenes over {monitoring_period_months} months")
# Step 2: Temporal-Focused Analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.MONTHLY,
spatial_resolution=500.0,
enable_gap_analysis=True,
gap_threshold_days=45 # Longer gap threshold for monitoring
)
temporal_analyzer = TemporalAnalyzer(temporal_config)
# Create fine-scale temporal analysis
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
seasonal_analysis = temporal_analyzer.analyze_seasonal_patterns(datacube)
quality_trends = temporal_analyzer.analyze_quality_trends(datacube)
coverage_gaps = temporal_analyzer.detect_temporal_gaps(datacube)
print(f"⏱️ Temporal Analysis:")
print(f" Quality trend: {quality_trends['trend_direction']}")
print(f" Coverage gaps: {len(coverage_gaps)} periods")
print(f" Best season: {quality_trends['best_period']}")
# Step 3: Change Detection Preparation
# Identify scenes for change detection (seasonal comparisons)
seasonal_scenes = {}
for season, data in seasonal_analysis.items():
# Get best scenes from each season
season_scenes = [
scene for scene in scenes
if get_season_from_scene(scene) == season
and scene['properties'].get('cloud_cover', 1.0) < 0.2
]
# Sort by quality and take best
season_scenes.sort(key=lambda s: s['properties'].get('cloud_cover', 1.0))
seasonal_scenes[season] = season_scenes[:5] # Top 5 per season
# Step 4: Strategic Downloads for Monitoring
asset_manager = AssetManager(query.auth)
# Download representative scenes from each season
monitoring_scenes = []
for season, season_scenes in seasonal_scenes.items():
monitoring_scenes.extend(season_scenes[:2]) # 2 best per season
if monitoring_scenes:
print(f"💾 Downloading {len(monitoring_scenes)} monitoring scenes")
downloads = await asset_manager.activate_and_download_assets(
scenes=monitoring_scenes,
asset_types=["ortho_analytic_4b", "ortho_analytic_sr"], # Include surface reflectance
clip_to_roi=roi
)
# Step 5: Monitoring-Specific Export
config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive"
)
geopackage_manager = GeoPackageManager(config=config)
# Add monitoring-specific metadata
for scene in scenes:
props = scene['properties']
props['monitoring_season'] = get_season_from_scene(scene)
props['monitoring_quality'] = assess_monitoring_quality(scene)
props['change_detection_suitable'] = (
props.get('cloud_cover', 1.0) < 0.15 and
props.get('sun_elevation', 0) > 35
)
# Create monitoring GeoPackage
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path="environmental_monitoring.gpkg",
roi=roi,
downloaded_files=[d.downloaded_file_path for d in downloads if downloads]
)
# Generate monitoring report
monitoring_report = {
'monitoring_type': 'environmental',
'period_months': monitoring_period_months,
'temporal_analysis': {
'seasonal_patterns': seasonal_analysis,
'quality_trends': quality_trends,
'coverage_gaps': len(coverage_gaps)
},
'change_detection_candidates': {
season: len(scenes) for season, scenes in seasonal_scenes.items()
},
'recommendations': generate_monitoring_recommendations(
seasonal_analysis, quality_trends, coverage_gaps
)
}
return monitoring_report
def get_season_from_scene(scene):
"""Extract season from scene acquisition date."""
acquired = scene['properties']['acquired']
date_obj = datetime.fromisoformat(acquired.replace('Z', '+00:00'))
month = date_obj.month
if month in [12, 1, 2]:
return 'winter'
elif month in [3, 4, 5]:
return 'spring'
elif month in [6, 7, 8]:
return 'summer'
else:
return 'autumn'
def assess_monitoring_quality(scene):
"""Assess scene quality for monitoring purposes."""
props = scene['properties']
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
if cloud_cover < 0.05 and sun_elevation > 50:
return 'excellent'
elif cloud_cover < 0.15 and sun_elevation > 35:
return 'good'
elif cloud_cover < 0.3:
return 'fair'
else:
return 'poor'
Operational Monitoring Workflow
async def operational_monitoring_workflow(roi, alert_thresholds=None):
"""
High-frequency operational monitoring with alerting capabilities.
Designed for rapid response and operational decision making.
"""
print("🚨 Operational Monitoring Workflow")
print("=" * 35)
alert_thresholds = alert_thresholds or {
'cloud_cover_max': 0.2,
'min_scenes_per_week': 3,
'max_gap_days': 7
}
# Step 1: Recent Scene Discovery (last 30 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=alert_thresholds['cloud_cover_max'],
item_types=["PSScene"]
)
recent_scenes = results['features']
print(f"📡 Recent scenes (30 days): {len(recent_scenes)}")
# Step 2: Operational Alerts
alerts = []
# Check scene frequency
scenes_per_week = len(recent_scenes) / 4.3 # 30 days ≈ 4.3 weeks
if scenes_per_week < alert_thresholds['min_scenes_per_week']:
alerts.append({
'type': 'LOW_FREQUENCY',
'severity': 'HIGH',
'message': f"Only {scenes_per_week:.1f} scenes/week (threshold: {alert_thresholds['min_scenes_per_week']})"
})
# Check recent gaps
if recent_scenes:
# Sort by date
recent_scenes.sort(key=lambda s: s['properties']['acquired'])
# Check for gaps
last_acquisition = None
for scene in recent_scenes:
current_date = datetime.fromisoformat(scene['properties']['acquired'].replace('Z', '+00:00'))
if last_acquisition:
gap_days = (current_date - last_acquisition).days
if gap_days > alert_thresholds['max_gap_days']:
alerts.append({
'type': 'TEMPORAL_GAP',
'severity': 'MEDIUM',
'message': f"Gap of {gap_days} days detected",
'gap_start': last_acquisition.isoformat(),
'gap_end': current_date.isoformat()
})
last_acquisition = current_date
# Check data quality
if recent_scenes:
avg_cloud_cover = sum(s['properties'].get('cloud_cover', 1.0) for s in recent_scenes) / len(recent_scenes)
if avg_cloud_cover > 0.3:
alerts.append({
'type': 'POOR_QUALITY',
'severity': 'MEDIUM',
'message': f"High average cloud cover: {avg_cloud_cover:.1%}"
})
# Print alerts
if alerts:
print(f"🚨 {len(alerts)} alerts detected:")
for alert in alerts:
print(f" [{alert['severity']}] {alert['type']}: {alert['message']}")
else:
print("✅ No alerts - system operating normally")
# Step 3: Immediate Download of Critical Scenes
if recent_scenes:
# Get best scenes from last 7 days
week_ago = end_date - timedelta(days=7)
critical_scenes = [
scene for scene in recent_scenes
if datetime.fromisoformat(scene['properties']['acquired'].replace('Z', '+00:00')) > week_ago
and scene['properties'].get('cloud_cover', 1.0) < 0.1
]
if critical_scenes:
print(f"💾 Downloading {len(critical_scenes)} critical scenes")
asset_manager = AssetManager(query.auth)
# Fast download without confirmation for operational use
downloads = await asset_manager.activate_and_download_assets(
scenes=critical_scenes,
asset_types=["ortho_analytic_4b"],
confirm_download=False, # Operational mode - no confirmation
max_concurrent=10 # Faster download
)
print(f" Downloaded: {len([d for d in downloads if d.status.value == 'completed'])} assets")
# Step 4: Operational Report Generation
operational_status = {
'timestamp': datetime.now().isoformat(),
'monitoring_period_days': 30,
'operational_status': 'NORMAL' if not alerts else 'ALERT',
'alerts': alerts,
'metrics': {
'total_recent_scenes': len(recent_scenes),
'scenes_per_week': scenes_per_week,
'avg_cloud_cover': avg_cloud_cover if recent_scenes else None,
'last_acquisition': recent_scenes[-1]['properties']['acquired'] if recent_scenes else None
},
'recommendations': generate_operational_recommendations(alerts, recent_scenes)
}
# Save operational report
with open('operational_status.json', 'w') as f:
json.dump(operational_status, f, indent=2, default=str)
print(f"📊 Operational status: {operational_status['operational_status']}")
return operational_status
def generate_operational_recommendations(alerts, recent_scenes):
"""Generate operational recommendations based on alerts."""
recommendations = []
for alert in alerts:
if alert['type'] == 'LOW_FREQUENCY':
recommendations.append("Consider expanding search criteria or increasing monitoring frequency")
elif alert['type'] == 'TEMPORAL_GAP':
recommendations.append("Review recent weather conditions and satellite tasking priorities")
elif alert['type'] == 'POOR_QUALITY':
recommendations.append("Wait for better weather conditions or consider alternative data sources")
if not alerts and recent_scenes:
recommendations.append("System operating normally - maintain current monitoring parameters")
return recommendations
Research Publication Workflow
async def research_publication_workflow(roi, study_period, research_questions):
"""
Comprehensive workflow for research publications with full documentation.
Includes reproducibility metadata, quality assessments, and publication-ready outputs.
"""
print("📚 Research Publication Workflow")
print("=" * 35)
# Step 1: Comprehensive Scene Discovery
query = PlanetScopeQuery()
# Multiple search strategies for completeness
search_strategies = [
{'cloud_cover_max': 0.1, 'label': 'high_quality'},
{'cloud_cover_max': 0.3, 'label': 'standard_quality'},
{'cloud_cover_max': 0.6, 'label': 'all_available'}
]
all_scenes = {}
for strategy in search_strategies:
results = query.search_scenes(
geometry=roi,
start_date=study_period[0],
end_date=study_period[1],
cloud_cover_max=strategy['cloud_cover_max'],
item_types=["PSScene"]
)
all_scenes[strategy['label']] = results['features']
print(f"📡 {strategy['label']}: {len(results['features'])} scenes")
# Use highest quality dataset
primary_scenes = all_scenes['high_quality']
if len(primary_scenes) < 50: # If insufficient high-quality scenes
primary_scenes = all_scenes['standard_quality']
print("⚠️ Using standard quality scenes due to insufficient high-quality data")
# Step 2: Comprehensive Spatial Analysis
print("\n📊 Comprehensive Spatial Analysis")
# Test multiple methods for comparison
spatial_engine = SpatialDensityEngine()
# High-resolution analysis
high_res_result = spatial_engine.calculate_density(
scene_footprints=primary_scenes,
roi_geometry=roi,
config=DensityConfig(resolution=30.0) # High resolution
)
# Export for publication
high_res_result.export_geotiff("publication_density_30m.tif")
# Multiple resolution analysis for scale sensitivity
resolutions = [100, 500, 1000, 2000]
scale_analysis = {}
for resolution in resolutions:
result = spatial_engine.calculate_density(
scene_footprints=primary_scenes,
roi_geometry=roi,
config=DensityConfig(resolution=float(resolution))
)
scale_analysis[resolution] = {
'method': result.method_used.value,
'stats': result.stats,
'processing_time': result.processing_time_seconds
}
print(f" Scale analysis: {len(resolutions)} resolutions tested")
# Step 3: Advanced Temporal Analysis
print("\n⏱️ Advanced Temporal Analysis")
# Multiple temporal resolutions
temporal_results = {}
temporal_resolutions = [TemporalResolution.WEEKLY, TemporalResolution.MONTHLY]
for temp_res in temporal_resolutions:
config = TemporalConfig(
temporal_resolution=temp_res,
spatial_resolution=500.0,
enable_gap_analysis=True
)
analyzer = TemporalAnalyzer(config)
datacube = analyzer.create_spatiotemporal_datacube(primary_scenes, roi)
patterns = analyzer.analyze_acquisition_patterns(datacube)
gaps = analyzer.detect_temporal_gaps(datacube)
temporal_results[temp_res.value] = {
'patterns': patterns,
'gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
}
print(f" Temporal analysis: {len(temporal_resolutions)} resolutions")
# Step 4: Statistical Analysis
print("\n📈 Statistical Analysis")
# Quality statistics
quality_stats = analyze_scene_quality_statistics(primary_scenes)
# Temporal coverage statistics
temporal_stats = analyze_temporal_coverage_statistics(primary_scenes, study_period)
# Spatial coverage statistics
spatial_stats = analyze_spatial_coverage_statistics(primary_scenes, roi)
print(f" Quality assessment: {quality_stats['excellent_scenes']} excellent scenes")
print(f" Temporal coverage: {temporal_stats['coverage_percentage']:.1%}")
print(f" Spatial coverage: {spatial_stats['roi_coverage_percentage']:.1%}")
# Step 5: Data Download for Analysis
print("\n💾 Research Data Download")
# Strategic scene selection for research
research_scenes = select_research_scenes(
primary_scenes,
research_questions,
max_scenes=100 # Limit for research budget
)
asset_manager = AssetManager(query.auth)
# Download with multiple asset types for research
downloads = await asset_manager.activate_and_download_assets(
scenes=research_scenes,
asset_types=["ortho_analytic_4b", "ortho_analytic_sr", "ortho_udm2"],
clip_to_roi=roi
)
successful_downloads = [d for d in downloads if d.status.value == "completed"]
print(f" Downloaded: {len(successful_downloads)} research assets")
# Step 6: Publication-Ready Export
print("\n📦 Publication Data Export")
# Create comprehensive research GeoPackage
research_config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive",
clip_to_roi=True
)
geopackage_manager = GeoPackageManager(config=research_config)
# Add research metadata
for scene in primary_scenes:
props = scene['properties']
props.update({
'research_quality_class': quality_stats['scene_classifications'].get(
props['id'], 'unclassified'
),
'temporal_completeness': temporal_stats['scene_temporal_scores'].get(
props['id'], 0.0
),
'spatial_relevance': spatial_stats['scene_spatial_scores'].get(
props['id'], 0.0
),
'research_priority': calculate_research_priority(scene, research_questions)
})
# Create research GeoPackage
layer_info = geopackage_manager.create_scene_geopackage(
scenes=primary_scenes,
output_path="research_publication_data.gpkg",
roi=roi,
downloaded_files=[d.downloaded_file_path for d in successful_downloads]
)
# Step 7: Reproducibility Documentation
print("\n📋 Reproducibility Documentation")
reproducibility_metadata = {
'study_metadata': {
'title': 'PlanetScope Analysis Study',
'study_period': study_period,
'roi_wkt': roi.wkt,
'research_questions': research_questions,
'methodology': 'PlanetScope-py automated workflow'
},
'data_sources': {
'primary_dataset': f"{len(primary_scenes)} PlanetScope scenes",
'quality_criteria': {
'cloud_cover_max': 0.1 if primary_scenes == all_scenes['high_quality'] else 0.3,
'sun_elevation_min': 30,
'acquisition_period': study_period
},
'alternative_datasets': {
label: len(scenes) for label, scenes in all_scenes.items()
}
},
'analysis_methods': {
'spatial_analysis': {
'primary_resolution': 30.0,
'method_used': high_res_result.method_used.value,
'scale_sensitivity': scale_analysis
},
'temporal_analysis': {
'resolutions_tested': [tr.value for tr in temporal_resolutions],
'results': temporal_results
}
},
'software_environment': {
'library': f"planetscope-py v{planetscope_py.__version__}",
'python_version': platform.python_version(),
'execution_platform': platform.system(),
'processing_date': datetime.now().isoformat()
},
'quality_assessment': quality_stats,
'statistical_summary': {
'temporal_coverage': temporal_stats,
'spatial_coverage': spatial_stats
},
'data_availability': {
'total_scenes_analyzed': len(primary_scenes),
'downloaded_for_research': len(successful_downloads),
'data_volume_gb': sum(d.file_size_mb for d in successful_downloads) / 1024
}
}
# Save reproducibility metadata
with open('research_reproducibility_metadata.json', 'w') as f:
json.dump(reproducibility_metadata, f, indent=2, default=str)
# Create publication summary
publication_summary = generate_publication_summary(
reproducibility_metadata, research_questions
)
with open('publication_summary.md', 'w') as f:
f.write(publication_summary)
print(f"✅ Research workflow complete!")
print(f" Primary dataset: {len(primary_scenes)} scenes")
print(f" Downloaded assets: {len(successful_downloads)}")
print(f" Documentation: research_reproducibility_metadata.json")
print(f" Summary: publication_summary.md")
return reproducibility_metadata
def select_research_scenes(scenes, research_questions, max_scenes=100):
"""Select optimal scenes for research based on research questions."""
# Score scenes based on research relevance
scored_scenes = []
for scene in scenes:
score = 0
props = scene['properties']
# Quality score
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
score += (1 - cloud_cover) * 50 # Cloud cover contribution
score += min(sun_elevation / 50, 1) * 30 # Sun elevation contribution
# Temporal distribution score
# Add logic based on research questions
if 'seasonal' in ' '.join(research_questions).lower():
# Prefer even seasonal distribution
month = datetime.fromisoformat(props['acquired'].replace('Z', '+00:00')).month
if month in [3, 6, 9, 12]: # Seasonal transition months
score += 20
if 'change detection' in ' '.join(research_questions).lower():
# Prefer high-quality scenes with good temporal spacing
if cloud_cover < 0.05 and sun_elevation > 45:
score += 25
scored_scenes.append((scene, score))
# Sort by score and take top scenes
scored_scenes.sort(key=lambda x: x[1], reverse=True)
return [scene for scene, score in scored_scenes[:max_scenes]]
def analyze_scene_quality_statistics(scenes):
"""Analyze quality statistics for scenes."""
quality_classes = {'excellent': 0, 'good': 0, 'fair': 0, 'poor': 0}
scene_classifications = {}
for scene in scenes:
props = scene['properties']
cloud_cover = props.get('cloud_cover', 1.0)
sun_elevation = props.get('sun_elevation', 0)
if cloud_cover < 0.05 and sun_elevation > 50:
quality_class = 'excellent'
elif cloud_cover < 0.15 and sun_elevation > 35:
quality_class = 'good'
elif cloud_cover < 0.3:
quality_class = 'fair'
else:
quality_class = 'poor'
quality_classes[quality_class] += 1
scene_classifications[props['id']] = quality_class
return {
'quality_distribution': quality_classes,
'scene_classifications': scene_classifications,
'excellent_scenes': quality_classes['excellent'],
'total_scenes': len(scenes)
}
def generate_publication_summary(metadata, research_questions):
"""Generate publication-ready summary."""
summary = f"""# PlanetScope Analysis Study - Publication Summary
## Study Overview
**Study Period**: {metadata['study_metadata']['study_period'][0]} to {metadata['study_metadata']['study_period'][1]}
**Geographic Area**: {metadata['data_sources']['primary_dataset']}
**Processing Date**: {metadata['software_environment']['processing_date'][:10]}
## Research Questions
{chr(10).join(f"- {q}" for q in research_questions)}
## Methodology
### Data Sources
- **Primary Dataset**: {metadata['data_sources']['primary_dataset']}
- **Quality Criteria**:
- Maximum cloud cover: {metadata['data_sources']['quality_criteria']['cloud_cover_max']:.0%}
- Minimum sun elevation: {metadata['data_sources']['quality_criteria']['sun_elevation_min']}°
- **Processing Software**: {metadata['software_environment']['library']}
### Spatial Analysis
- **Primary Resolution**: {metadata['analysis_methods']['spatial_analysis']['primary_resolution']}m
- **Method Used**: {metadata['analysis_methods']['spatial_analysis']['method_used']}
- **Scale Analysis**: {len(metadata['analysis_methods']['spatial_analysis']['scale_sensitivity'])} resolutions tested
### Temporal Analysis
- **Temporal Resolutions**: {', '.join(metadata['analysis_methods']['temporal_analysis']['resolutions_tested'])}
- **Data Availability**: {metadata['analysis_methods']['temporal_analysis']['results']['weekly']['data_availability']:.1%}
## Results Summary
### Data Quality Assessment
- **Excellent Quality Scenes**: {metadata['quality_assessment']['excellent_scenes']} ({metadata['quality_assessment']['excellent_scenes']/metadata['quality_assessment']['total_scenes']:.1%})
- **Total Scenes Analyzed**: {metadata['quality_assessment']['total_scenes']}
### Coverage Analysis
- **Temporal Coverage**: {metadata['statistical_summary']['temporal_coverage']['coverage_percentage']:.1%}
- **Spatial Coverage**: {metadata['statistical_summary']['spatial_coverage']['roi_coverage_percentage']:.1%}
### Downloaded Research Data
- **Research Assets**: {metadata['data_availability']['downloaded_for_research']} files
- **Data Volume**: {metadata['data_availability']['data_volume_gb']:.1f} GB
## Reproducibility Information
All analysis code, parameters, and intermediate results are documented in the accompanying metadata file (`research_reproducibility_metadata.json`). The complete dataset is available in GeoPackage format (`research_publication_data.gpkg`).
**Software Environment**:
- Python {metadata['software_environment']['python_version']}
- {metadata['software_environment']['library']}
- Platform: {metadata['software_environment']['execution_platform']}
## Data Availability Statement
The processed PlanetScope data used in this study is available in standardized GeoPackage format. Raw PlanetScope imagery is available through Planet Labs PBC subject to licensing agreements.
---
*Generated automatically by planetscope-py workflow system*
"""
return summary
Batch Processing Workflows
Multi-Site Analysis Workflow
async def multi_site_analysis_workflow(sites, global_date_range, output_base_dir="multi_site_analysis"):
"""
Process multiple study sites with standardized methodology.
Args:
sites: Dictionary of {site_name: roi_geometry}
global_date_range: Tuple of (start_date, end_date)
output_base_dir: Base directory for outputs
"""
print("🌐 Multi-Site Analysis Workflow")
print("=" * 35)
# Initialize components
query = PlanetScopeQuery()
spatial_engine = SpatialDensityEngine()
temporal_analyzer = TemporalAnalyzer()
asset_manager = AssetManager(query.auth)
geopackage_manager = GeoPackageManager()
# Global results storage
global_results = {
'sites': {},
'comparative_analysis': {},
'processing_metadata': {
'start_time': datetime.now().isoformat(),
'total_sites': len(sites),
'date_range': global_date_range
}
}
# Create output directory structure
output_path = Path(output_base_dir)
output_path.mkdir(exist_ok=True)
for site_name in sites.keys():
(output_path / site_name).mkdir(exist_ok=True)
# Process each site
for site_name, roi in sites.items():
print(f"\n📍 Processing site: {site_name}")
print("-" * 30)
site_start_time = datetime.now()
try:
# Site-specific scene discovery
results = query.search_scenes(
geometry=roi,
start_date=global_date_range[0],
end_date=global_date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
scenes = results['features']
print(f" Scenes found: {len(scenes)}")
if len(scenes) == 0:
print(f" ⚠️ No scenes found for {site_name}")
global_results['sites'][site_name] = {'status': 'no_data'}
continue
# Spatial analysis
spatial_result = spatial_engine.calculate_density(scenes, roi)
# Export spatial analysis
site_output_dir = output_path / site_name
spatial_output = site_output_dir / f"{site_name}_density.tif"
spatial_result.export_geotiff(str(spatial_output))
# Temporal analysis
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
patterns = temporal_analyzer.analyze_acquisition_patterns(datacube)
gaps = temporal_analyzer.detect_temporal_gaps(datacube)
# Download best scenes (limited per site)
best_scenes = [
scene for scene in scenes
if scene['properties'].get('cloud_cover', 1.0) < 0.15
][:10] # Limit to 10 per site
downloads = []
if best_scenes:
site_downloads = await asset_manager.activate_and_download_assets(
scenes=best_scenes,
asset_types=["ortho_analytic_4b"],
output_dir=str(site_output_dir / "imagery"),
clip_to_roi=roi,
confirm_download=False
)
downloads = [d for d in site_downloads if d.status.value == "completed"]
# Create site GeoPackage
site_geopackage = site_output_dir / f"{site_name}_analysis.gpkg"
layer_info = geopackage_manager.create_scene_geopackage(
scenes=scenes,
output_path=str(site_geopackage),
roi=roi,
downloaded_files=[d.downloaded_file_path for d in downloads] if downloads else None
)
# Store site results
processing_time = (datetime.now() - site_start_time).total_seconds()
global_results['sites'][site_name] = {
'status': 'completed',
'roi_area_km2': calculate_area_km2(roi),
'total_scenes': len(scenes),
'downloaded_assets': len(downloads),
'processing_time_seconds': processing_time,
'spatial_analysis': {
'method': spatial_result.method_used.value,
'density_stats': spatial_result.stats
},
'temporal_analysis': {
'seasonal_patterns': len(patterns['seasonal_patterns']),
'temporal_gaps': len(gaps),
'data_availability': patterns['frequency_stats']['data_availability_percentage']
},
'outputs': {
'geopackage': str(site_geopackage),
'density_map': str(spatial_output),
'imagery_count': len(downloads)
}
}
print(f" ✅ Completed in {processing_time:.1f}s")
except Exception as e:
print(f" ❌ Error processing {site_name}: {e}")
global_results['sites'][site_name] = {
'status': 'failed',
'error': str(e)
}
# Comparative analysis across sites
print(f"\n📊 Comparative Analysis")
successful_sites = {
name: data for name, data in global_results['sites'].items()
if data.get('status') == 'completed'
}
if len(successful_sites) > 1:
comparative_analysis = perform_comparative_analysis(successful_sites)
global_results['comparative_analysis'] = comparative_analysis
# Create comparative visualization
create_comparative_plots(successful_sites, output_path / "comparative_analysis")
print(f" Compared {len(successful_sites)} sites")
print(f" Scene count range: {comparative_analysis['scene_count_range']}")
print(f" Data availability range: {comparative_analysis['data_availability_range']}")
# Global summary
global_results['processing_metadata']['end_time'] = datetime.now().isoformat()
global_results['processing_metadata']['total_processing_time'] = sum(
site_data.get('processing_time_seconds', 0)
for site_data in global_results['sites'].values()
if isinstance(site_data.get('processing_time_seconds'), (int, float))
)
# Save global results
global_report_path = output_path / "multi_site_analysis_report.json"
with open(global_report_path, 'w') as f:
json.dump(global_results, f, indent=2, default=str)
print(f"\n✅ Multi-site analysis complete!")
print(f" Successful sites: {len(successful_sites)}/{len(sites)}")
print(f" Total processing time: {global_results['processing_metadata']['total_processing_time']:.1f}s")
print(f" Global report: {global_report_path}")
return global_results
def perform_comparative_analysis(sites_data):
"""Perform comparative analysis across multiple sites."""
scene_counts = [data['total_scenes'] for data in sites_data.values()]
data_availability = [
data['temporal_analysis']['data_availability']
for data in sites_data.values()
]
roi_areas = [data['roi_area_km2'] for data in sites_data.values()]
return {
'scene_count_range': [min(scene_counts), max(scene_counts)],
'scene_count_mean': sum(scene_counts) / len(scene_counts),
'data_availability_range': [min(data_availability), max(data_availability)],
'data_availability_mean': sum(data_availability) / len(data_availability),
'roi_area_range': [min(roi_areas), max(roi_areas)],
'total_area_analyzed': sum(roi_areas),
'site_rankings': {
'by_scene_count': sorted(
sites_data.items(),
key=lambda x: x[1]['total_scenes'],
reverse=True
)[:5],
'by_data_availability': sorted(
sites_data.items(),
key=lambda x: x[1]['temporal_analysis']['data_availability'],
reverse=True
)[:5]
}
}
def create_comparative_plots(sites_data, output_dir):
"""Create comparative visualization plots."""
try:
import matplotlib.pyplot as plt
import numpy as np
output_dir.mkdir(exist_ok=True)
# Scene count comparison
site_names = list(sites_data.keys())
scene_counts = [data['total_scenes'] for data in sites_data.values()]
plt.figure(figsize=(12, 6))
plt.bar(site_names, scene_counts)
plt.title('Scene Count by Site')
plt.ylabel('Number of Scenes')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(output_dir / 'scene_count_comparison.png', dpi=300)
plt.close()
# Data availability comparison
data_availability = [
data['temporal_analysis']['data_availability']
for data in sites_data.values()
]
plt.figure(figsize=(12, 6))
plt.bar(site_names, data_availability)
plt.title('Data Availability by Site')
plt.ylabel('Data Availability (%)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(output_dir / 'data_availability_comparison.png', dpi=300)
plt.close()
print(f" Comparative plots saved to {output_dir}")
except ImportError:
print(" Matplotlib not available - skipping plot generation")
Automated Monitoring Workflow
async def automated_monitoring_workflow(config_file="monitoring_config.json"):
"""
Fully automated monitoring workflow that can be scheduled.
Designed for operational deployment with minimal human intervention.
"""
print("🤖 Automated Monitoring Workflow")
print("=" * 32)
# Load configuration
try:
with open(config_file, 'r') as f:
config = json.load(f)
except FileNotFoundError:
# Create default configuration
config = create_default_monitoring_config()
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
print(f" Created default configuration: {config_file}")
# Initialize systems
query = PlanetScopeQuery()
asset_manager = AssetManager(query.auth)
# Create timestamped output directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path(config['output_base_dir']) / f"monitoring_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
monitoring_results = {
'execution_timestamp': timestamp,
'config': config,
'sites': {},
'global_status': 'UNKNOWN',
'alerts': [],
'recommendations': []
}
# Process each monitored site
for site_config in config['monitored_sites']:
site_name = site_config['name']
roi = shape(site_config['roi_geojson'])
print(f"\n🔍 Monitoring: {site_name}")
try:
# Recent scene discovery
end_date = datetime.now()
start_date = end_date - timedelta(days=config['monitoring_period_days'])
results = query.search_scenes(
geometry=roi,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
cloud_cover_max=config['quality_thresholds']['cloud_cover_max'],
item_types=["PSScene"]
)
recent_scenes = results['features']
# Automated analysis
site_analysis = perform_automated_site_analysis(
recent_scenes, roi, site_config, config
)
# Check for alerts
site_alerts = check_site_alerts(
site_analysis, site_config, config['alert_thresholds']
)
# Automated downloads if needed
downloads = []
if site_alerts and config['auto_download']['enabled']:
critical_scenes = [
scene for scene in recent_scenes
if scene['properties'].get('cloud_cover', 1.0) < config['auto_download']['quality_threshold']
][:config['auto_download']['max_scenes_per_site']]
if critical_scenes:
downloads = await asset_manager.activate_and_download_assets(
scenes=critical_scenes,
asset_types=config['auto_download']['asset_types'],
output_dir=str(output_dir / site_name / "imagery"),
confirm_download=False # Automated mode
)
monitoring_results['sites'][site_name] = {
'status': 'completed',
'analysis': site_analysis,
'alerts': site_alerts,
'downloads': len([d for d in downloads if d.status.value == "completed"])
}
monitoring_results['alerts'].extend(site_alerts)
print(f" Status: {'⚠️ ALERTS' if site_alerts else '✅ NORMAL'}")
except Exception as e:
print(f" ❌ Error: {e}")
monitoring_results['sites'][site_name] = {
'status': 'failed',
'error': str(e)
}
monitoring_results['alerts'].append({
'site': site_name,
'type': 'SYSTEM_ERROR',
'severity': 'HIGH',
'message': f"Monitoring failed: {str(e)}"
})
# Global status assessment
total_alerts = len(monitoring_results['alerts'])
high_severity_alerts = len([
alert for alert in monitoring_results['alerts']
if alert.get('severity') == 'HIGH'
])
if high_severity_alerts > 0:
monitoring_results['global_status'] = 'CRITICAL'
elif total_alerts > 0:
monitoring_results['global_status'] = 'WARNING'
else:
monitoring_results['global_status'] = 'NORMAL'
# Generate recommendations
monitoring_results['recommendations'] = generate_automated_recommendations(
monitoring_results
)
# Save monitoring report
report_path = output_dir / "automated_monitoring_report.json"
with open(report_path, 'w') as f:
json.dump(monitoring_results, f, indent=2, default=str)
# Send alerts if configured
if config.get('notifications', {}).get('enabled', False):
await send_monitoring_alerts(monitoring_results, config['notifications'])
print(f"\n🎯 Monitoring Complete")
print(f" Global Status: {monitoring_results['global_status']}")
print(f" Total Alerts: {total_alerts}")
print(f" Report: {report_path}")
return monitoring_results
def create_default_monitoring_config():
"""Create default monitoring configuration."""
return {
"output_base_dir": "automated_monitoring",
"monitoring_period_days": 14,
"monitored_sites": [
{
"name": "example_site",
"roi_geojson": {
"type": "Polygon",
"coordinates": [[9.0, 45.4], [9.2, 45.4], [9.2, 45.6], [9.0, 45.6], [9.0, 45.4](/Black-Lights/planetscope-py/wiki/[9.0,-45.4],-[9.2,-45.4],-[9.2,-45.6],-[9.0,-45.6],-[9.0,-45.4)]
},
"expected_frequency_days": 7,
"minimum_quality_threshold": 0.8
}
],
"quality_thresholds": {
"cloud_cover_max": 0.3,
"sun_elevation_min": 30
},
"alert_thresholds": {
"max_gap_days": 14,
"min_scenes_per_week": 2,
"quality_degradation_threshold": 0.2
},
"auto_download": {
"enabled": True,
"quality_threshold": 0.1,
"max_scenes_per_site": 5,
"asset_types": ["ortho_analytic_4b"]
},
"notifications": {
"enabled": False,
"email": {
"recipients": ["[email protected]"],
"smtp_server": "smtp.example.com"
}
}
}
async def send_monitoring_alerts(monitoring_results, notification_config):
"""Send monitoring alerts via configured channels."""
if notification_config.get('email', {}).get('enabled', False):
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Create email content
subject = f"PlanetScope Monitoring Alert - {monitoring_results['global_status']}"
body = f"""
Automated PlanetScope Monitoring Report
=====================================
Global Status: {monitoring_results['global_status']}
Execution Time: {monitoring_results['execution_timestamp']}
Total Alerts: {len(monitoring_results['alerts'])}
Site Status:
{format_site_status_for_email(monitoring_results['sites'])}
Alerts:
{format_alerts_for_email(monitoring_results['alerts'])}
Recommendations:
{chr(10).join(f"- {rec}" for rec in monitoring_results['recommendations'])}
This is an automated message from the PlanetScope monitoring system.
"""
# Send email (implementation would depend on specific email service)
print(" 📧 Email notification sent")
except Exception as e:
print(f" ❌ Email notification failed: {e}")
def format_site_status_for_email(sites):
"""Format site status for email notification."""
lines = []
for site_name, site_data in sites.items():
status_emoji = "✅" if site_data['status'] == 'completed' and not site_data.get('alerts') else "⚠️"
lines.append(f"{status_emoji} {site_name}: {site_data['status']}")
return '\n'.join(lines)
def format_alerts_for_email(alerts):
"""Format alerts for email notification."""
if not alerts:
return "No alerts"
lines = []
for alert in alerts:
severity_emoji = "🚨" if alert.get('severity') == 'HIGH' else "⚠️"
lines.append(f"{severity_emoji} {alert.get('type', 'UNKNOWN')}: {alert.get('message', 'No message')}")
return '\n'.join(lines)
Performance and Optimization
Workflow Performance Monitoring
import time
import psutil
from contextlib import contextmanager
@contextmanager
def performance_monitor(workflow_name):
"""Context manager for monitoring workflow performance."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
print(f"🚀 Starting {workflow_name}")
print(f" Initial memory: {start_memory:.1f} MB")
try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
duration = end_time - start_time
memory_delta = end_memory - start_memory
print(f"✅ {workflow_name} completed")
print(f" Duration: {duration:.1f} seconds")
print(f" Memory usage: {end_memory:.1f} MB ({memory_delta:+.1f} MB)")
# Usage in workflows
async def optimized_workflow_example():
"""Example of performance-optimized workflow."""
with performance_monitor("Optimized Research Workflow"):
# Use performance monitoring throughout workflow
with performance_monitor("Scene Discovery"):
# Scene discovery code
pass
with performance_monitor("Spatial Analysis"):
# Spatial analysis code
pass
with performance_monitor("Data Export"):
# Export code
pass
Best Practices for Complete Workflows
1. Error Handling and Recovery
async def robust_workflow_wrapper(workflow_func, *args, **kwargs):
"""Robust wrapper for any workflow with error handling and recovery."""
max_retries = 3
retry_delay = 60 # seconds
for attempt in range(max_retries):
try:
return await workflow_func(*args, **kwargs)
except Exception as e:
print(f"❌ Workflow failed (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
print(f"⏳ Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("❌ Workflow failed permanently after all retries")
# Save partial results if available
save_partial_results(e, *args, **kwargs)
raise e
def save_partial_results(error, *args, **kwargs):
"""Save partial results when workflow fails."""
error_report = {
'error_timestamp': datetime.now().isoformat(),
'error_message': str(error),
'workflow_args': str(args),
'workflow_kwargs': str(kwargs)
}
with open('workflow_error_report.json', 'w') as f:
json.dump(error_report, f, indent=2, default=str)
2. Configuration Management
def load_workflow_config(config_path="workflow_config.yaml"):
"""Load workflow configuration from file."""
try:
import yaml
with open(config_path, 'r') as f:
return yaml.safe_load(f)
except ImportError:
# Fallback to JSON
with open(config_path.replace('.yaml', '.json'), 'r') as f:
return json.load(f)
def validate_workflow_config(config):
"""Validate workflow configuration."""
required_fields = ['roi', 'date_range', 'output_dir']
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required configuration field: {field}")
# Additional validation logic
return True
3. Resource Management
class WorkflowResourceManager:
"""Manage computational resources during workflows."""
def __init__(self, max_memory_gb=8, max_concurrent_downloads=5):
self.max_memory_gb = max_memory_gb
self.max_concurrent_downloads = max_concurrent_downloads
self.active_downloads = 0
def check_memory_usage(self):
"""Check current memory usage."""
memory_gb = psutil.Process().memory_info().rss / (1024**3)
if memory_gb > self.max_memory_gb:
raise MemoryError(f"Memory usage ({memory_gb:.1f} GB) exceeds limit ({self.max_memory_gb} GB)")
async def managed_download(self, download_func, *args, **kwargs):
"""Manage download with resource limits."""
while self.active_downloads >= self.max_concurrent_downloads:
await asyncio.sleep(1)
self.active_downloads += 1
try:
result = await download_func(*args, **kwargs)
return result
finally:
self.active_downloads -= 1
# Usage
resource_manager = WorkflowResourceManager(max_memory_gb=16)
async def resource_managed_workflow():
"""Workflow with resource management."""
resource_manager.check_memory_usage()
# Use managed downloads
downloads = await resource_manager.managed_download(
asset_manager.activate_and_download_assets,
scenes=scenes
)
Example Usage
Running Complete Workflows
import asyncio
from shapely.geometry import box
async def main():
"""Example of running complete workflows."""
# Define study area (Milan, Italy)
roi = box(9.04, 45.40, 9.28, 45.52)
date_range = ("2024-01-01", "2024-06-30")
# Run standard research workflow
research_result = await standard_research_workflow(roi, date_range)
# Run environmental monitoring workflow
monitoring_result = await environmental_monitoring_workflow(roi, monitoring_period_months=12)
# Run operational monitoring
operational_result = await operational_monitoring_workflow(roi)
# Multi-site analysis
sites = {
'milan': box(9.04, 45.40, 9.28, 45.52),
'rome': box(12.35, 41.70, 12.65, 42.00),
'naples': box(14.15, 40.75, 14.35, 40.95)
}
multi_site_result = await multi_site_analysis_workflow(sites, date_range)
print("\n🎉 All workflows completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
Workflow Scheduling and Automation
Scheduled Monitoring
import schedule
import time
from datetime import datetime
def setup_scheduled_monitoring():
"""Setup scheduled monitoring workflows."""
# Daily operational monitoring
schedule.every().day.at("08:00").do(
lambda: asyncio.run(operational_monitoring_workflow(roi))
)
# Weekly environmental monitoring
schedule.every().monday.at("06:00").do(
lambda: asyncio.run(environmental_monitoring_workflow(roi))
)
# Monthly comprehensive analysis
schedule.every().month.do(
lambda: asyncio.run(standard_research_workflow(roi, get_last_month_range()))
)
print("📅 Scheduled monitoring setup complete")
print(" Daily operational monitoring: 08:00")
print(" Weekly environmental monitoring: Monday 06:00")
print(" Monthly comprehensive analysis: 1st of month")
def run_scheduler():
"""Run the monitoring scheduler."""
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def get_last_month_range():
"""Get date range for last month."""
now = datetime.now()
if now.month == 1:
last_month = 12
year = now.year - 1
else:
last_month = now.month - 1
year = now.year
start_date = f"{year}-{last_month:02d}-01"
# Calculate end date (last day of last month)
import calendar
last_day = calendar.monthrange(year, last_month)[1]
end_date = f"{year}-{last_month:02d}-{last_day}"
return (start_date, end_date)
Docker Deployment
# Dockerfile for automated workflows
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gdal-bin \
libgdal-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy workflow code
COPY workflows/ ./workflows/
COPY config/ ./config/
# Set environment variables
ENV PLANET_API_KEY=""
ENV PYTHONPATH=/app
# Default command
CMD ["python", "workflows/automated_monitoring.py"]
# docker-compose.yml for workflow services
version: '3.8'
services:
planetscope-monitor:
build: .
environment:
- PLANET_API_KEY=${PLANET_API_KEY}
volumes:
- ./data:/app/data
- ./config:/app/config
restart: unless-stopped
planetscope-scheduler:
build: .
command: python workflows/scheduler.py
environment:
- PLANET_API_KEY=${PLANET_API_KEY}
volumes:
- ./data:/app/data
- ./config:/app/config
restart: unless-stopped
Advanced Workflow Patterns
Conditional Workflows
async def adaptive_workflow(roi, date_range, conditions=None):
"""
Adaptive workflow that changes behavior based on conditions.
"""
conditions = conditions or {}
print("🔄 Adaptive Workflow")
print("=" * 20)
# Initialize query
query = PlanetScopeQuery()
# Adaptive scene discovery based on data availability
initial_results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.2, # Start strict
item_types=["PSScene"]
)
scenes = initial_results['features']
# Adapt strategy based on initial results
if len(scenes) < 10:
print(" 📊 Low data availability - relaxing criteria")
# Relax cloud cover requirements
relaxed_results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.5, # More permissive
item_types=["PSScene"]
)
scenes = relaxed_results['features']
workflow_type = 'relaxed_quality'
elif len(scenes) > 1000:
print(" 📊 High data availability - applying strict filtering")
# Apply stricter quality filters
high_quality_scenes = [
scene for scene in scenes
if (scene['properties'].get('cloud_cover', 1.0) < 0.05 and
scene['properties'].get('sun_elevation', 0) > 45)
]
if len(high_quality_scenes) > 50:
scenes = high_quality_scenes
workflow_type = 'high_quality'
else:
workflow_type = 'standard'
else:
workflow_type = 'standard'
print(f" 🎯 Selected workflow type: {workflow_type}")
print(f" 📡 Final scene count: {len(scenes)}")
# Conditional analysis based on workflow type
if workflow_type == 'high_quality':
# High-resolution spatial analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scenes, roi, config=DensityConfig(resolution=30.0)
)
# Detailed temporal analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.DAILY,
enable_gap_analysis=True
)
elif workflow_type == 'relaxed_quality':
# Faster, lower-resolution analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(
scenes, roi, config=DensityConfig(resolution=1000.0)
)
# Weekly temporal analysis
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
enable_gap_analysis=False
)
else: # standard
# Balanced analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(scenes, roi)
temporal_config = TemporalConfig(
temporal_resolution=TemporalResolution.WEEKLY,
enable_gap_analysis=True
)
# Continue with conditional processing...
temporal_analyzer = TemporalAnalyzer(temporal_config)
datacube = temporal_analyzer.create_spatiotemporal_datacube(scenes, roi)
# Adaptive export based on scene count
if len(scenes) > 500:
# Large dataset - create compressed outputs
geopackage_config = GeoPackageConfig(
include_imagery=False, # Skip imagery for large datasets
attribute_schema="minimal"
)
else:
# Standard dataset - full export
geopackage_config = GeoPackageConfig(
include_imagery=True,
attribute_schema="comprehensive"
)
# Create adaptive report
adaptive_report = {
'workflow_type': workflow_type,
'adaptation_reason': get_adaptation_reason(workflow_type, len(scenes)),
'final_scene_count': len(scenes),
'processing_parameters': {
'spatial_resolution': spatial_result.resolution_meters,
'temporal_resolution': temporal_config.temporal_resolution.value,
'include_imagery': geopackage_config.include_imagery
}
}
return adaptive_report
def get_adaptation_reason(workflow_type, scene_count):
"""Get human-readable adaptation reason."""
if workflow_type == 'high_quality':
return f"High data availability ({scene_count} scenes) enabled high-quality analysis"
elif workflow_type == 'relaxed_quality':
return f"Low data availability ({scene_count} scenes) required relaxed quality criteria"
else:
return f"Standard analysis with {scene_count} scenes"
Parallel Processing Workflows
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
async def parallel_processing_workflow(sites, date_range, max_workers=4):
"""
Process multiple sites in parallel for faster execution.
"""
print("⚡ Parallel Processing Workflow")
print("=" * 30)
# Create semaphore to limit concurrent API calls
api_semaphore = asyncio.Semaphore(max_workers)
async def process_site_with_semaphore(site_name, roi):
"""Process single site with API rate limiting."""
async with api_semaphore:
return await process_single_site(site_name, roi, date_range)
# Process all sites concurrently
tasks = [
process_site_with_semaphore(site_name, roi)
for site_name, roi in sites.items()
]
print(f" 🚀 Starting {len(tasks)} parallel site analyses...")
# Execute with progress tracking
results = {}
completed = 0
for task in asyncio.as_completed(tasks):
site_result = await task
site_name = site_result['site_name']
results[site_name] = site_result
completed += 1
print(f" ✅ Completed {site_name} ({completed}/{len(tasks)})")
print(f" 🎯 All {len(tasks)} sites processed")
return results
async def process_single_site(site_name, roi, date_range):
"""Process a single site (can be run in parallel)."""
try:
# Scene discovery
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
scenes = results['features']
# Quick spatial analysis
spatial_engine = SpatialDensityEngine()
spatial_result = spatial_engine.calculate_density(scenes, roi)
return {
'site_name': site_name,
'status': 'success',
'scene_count': len(scenes),
'spatial_stats': spatial_result.stats,
'processing_time': datetime.now().isoformat()
}
except Exception as e:
return {
'site_name': site_name,
'status': 'failed',
'error': str(e),
'processing_time': datetime.now().isoformat()
}
# CPU-intensive processing with ProcessPoolExecutor
def cpu_intensive_analysis(scenes_data):
"""CPU-intensive analysis that can be parallelized."""
import numpy as np
# Example: Complex statistical analysis
cloud_covers = [scene['properties'].get('cloud_cover', 1.0) for scene in scenes_data]
# Simulate complex calculations
stats = {
'mean': np.mean(cloud_covers),
'std': np.std(cloud_covers),
'percentiles': np.percentile(cloud_covers, [10, 25, 50, 75, 90]).tolist()
}
return stats
async def workflow_with_cpu_processing(sites, date_range):
"""Workflow combining async I/O with CPU-intensive processing."""
# Step 1: Async data collection
all_scenes = {}
for site_name, roi in sites.items():
query = PlanetScopeQuery()
results = query.search_scenes(
geometry=roi,
start_date=date_range[0],
end_date=date_range[1],
cloud_cover_max=0.3,
item_types=["PSScene"]
)
all_scenes[site_name] = results['features']
# Step 2: CPU-intensive processing in parallel
with ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_event_loop()
cpu_tasks = {
site_name: loop.run_in_executor(
executor, cpu_intensive_analysis, scenes
)
for site_name, scenes in all_scenes.items()
}
# Wait for all CPU tasks to complete
cpu_results = {}
for site_name, task in cpu_tasks.items():
cpu_results[site_name] = await task
return cpu_results
Workflow Testing and Validation
Unit Testing Workflows
import unittest
from unittest.mock import Mock, patch
import asyncio
class TestWorkflows(unittest.TestCase):
"""Test suite for workflow functions."""
def setUp(self):
"""Set up test fixtures."""
self.test_roi = box(9.0, 45.0, 9.1, 45.1)
self.test_date_range = ("2024-01-01", "2024-01-31")
# Mock scene data
self.mock_scene = {
'id': 'test_scene_001',
'properties': {
'acquired': '2024-01-15T10:30:00Z',
'cloud_cover': 0.1,
'sun_elevation': 45.0
},
'geometry': {
'type': 'Polygon',
'coordinates': [[
[9.0, 45.0], [9.1, 45.0], [9.1, 45.1], [9.0, 45.1], [9.0, 45.0]
]]
}
}
@patch('planetscope_py.PlanetScopeQuery')
def test_standard_workflow_success(self, mock_query_class):
"""Test successful execution of standard workflow."""
# Setup mock
mock_query = mock_query_class.return_value
mock_query.search_scenes.return_value = {
'features': [self.mock_scene] * 10
}
# Run workflow
async def run_test():
result = await standard_research_workflow(
self.test_roi,
self.test_date_range,
output_dir="test_output"
)
return result
result = asyncio.run(run_test())
# Assertions
self.assertIsNotNone(result)
self.assertEqual(result['results']['total_scenes'], 10)
self.assertEqual(result['workflow'], 'standard_research')
@patch('planetscope_py.PlanetScopeQuery')
def test_workflow_no_scenes(self, mock_query_class):
"""Test workflow behavior when no scenes are found."""
# Setup mock to return no scenes
mock_query = mock_query_class.return_value
mock_query.search_scenes.return_value = {'features': []}
# Run workflow
async def run_test():
result = await standard_research_workflow(
self.test_roi,
self.test_date_range
)
return result
result = asyncio.run(run_test())
# Should return None when no scenes found
self.assertIsNone(result)
def test_get_season_from_scene(self):
"""Test season extraction from scene."""
# Test winter scene
winter_scene = self.mock_scene.copy()
winter_scene['properties']['acquired'] = '2024-12-15T10:30:00Z'
self.assertEqual(get_season_from_scene(winter_scene), 'winter')
# Test spring scene
spring_scene = self.mock_scene.copy()
spring_scene['properties']['acquired'] = '2024-04-15T10:30:00Z'
self.assertEqual(get_season_from_scene(spring_scene), 'spring')
def test_assess_monitoring_quality(self):
"""Test quality assessment function."""
# Excellent quality scene
excellent_scene = self.mock_scene.copy()
excellent_scene['properties']['cloud_cover'] = 0.02
excellent_scene['properties']['sun_elevation'] = 55
self.assertEqual(assess_monitoring_quality(excellent_scene), 'excellent')
# Poor quality scene
poor_scene = self.mock_scene.copy()
poor_scene['properties']['cloud_cover'] = 0.8
poor_scene['properties']['sun_elevation'] = 20
self.assertEqual(assess_monitoring_quality(poor_scene), 'poor')
class TestWorkflowIntegration(unittest.TestCase):
"""Integration tests for complete workflows."""
@unittest.skipUnless(
os.getenv('PLANET_API_KEY'),
"Requires valid Planet API key"
)
def test_real_api_integration(self):
"""Test workflow with real API (requires API key)."""
# Small test area
test_roi = box(9.18, 45.46, 9.19, 45.47) # Very small area in Milan
test_date_range = ("2024-01-01", "2024-01-07") # One week
async def run_integration_test():
try:
result = await standard_research_workflow(
test_roi,
test_date_range,
output_dir="integration_test_output"
)
return result
except Exception as e:
self.fail(f"Integration test failed: {e}")
result = asyncio.run(run_integration_test())
self.assertIsNotNone(result)
if __name__ == '__main__':
unittest.main()
Workflow Validation
def validate_workflow_outputs(workflow_result, expected_outputs=None):
"""
Validate workflow outputs meet expected criteria.
Args:
workflow_result: Result dictionary from workflow
expected_outputs: Dictionary of expected output criteria
"""
expected_outputs = expected_outputs or {}
validation_results = {
'valid': True,
'errors': [],
'warnings': []
}
# Check required fields
required_fields = ['workflow', 'completion_time', 'results', 'outputs']
for field in required_fields:
if field not in workflow_result:
validation_results['valid'] = False
validation_results['errors'].append(f"Missing required field: {field}")
# Validate results structure
if 'results' in workflow_result:
results = workflow_result['results']
# Check scene counts are reasonable
total_scenes = results.get('total_scenes', 0)
if total_scenes == 0:
validation_results['warnings'].append("No scenes found in analysis")
elif total_scenes > 10000:
validation_results['warnings'].append(f"Very large scene count: {total_scenes}")
# Check data availability
temporal_analysis = results.get('temporal_analysis', {})
data_availability = temporal_analysis.get('data_availability')
if data_availability is not None and data_availability < 0.5:
validation_results['warnings'].append(
f"Low data availability: {data_availability:.1%}"
)
# Validate output files exist
if 'outputs' in workflow_result:
outputs = workflow_result['outputs']
for output_type, output_path in outputs.items():
if output_path and not os.path.exists(output_path):
validation_results['valid'] = False
validation_results['errors'].append(
f"Output file missing: {output_path}"
)
# Custom validation criteria
if expected_outputs:
min_scenes = expected_outputs.get('min_scenes')
if min_scenes and workflow_result.get('results', {}).get('total_scenes', 0) < min_scenes:
validation_results['valid'] = False
validation_results['errors'].append(
f"Scene count below minimum: {min_scenes}"
)
return validation_results
# Example usage
async def validated_workflow_execution():
"""Execute workflow with validation."""
roi = box(9.04, 45.40, 9.28, 45.52)
date_range = ("2024-01-01", "2024-06-30")
# Run workflow
result = await standard_research_workflow(roi, date_range)
# Validate results
validation = validate_workflow_outputs(
result,
expected_outputs={'min_scenes': 10}
)
if validation['valid']:
print("✅ Workflow validation passed")
else:
print("❌ Workflow validation failed:")
for error in validation['errors']:
print(f" Error: {error}")
if validation['warnings']:
print("⚠️ Warnings:")
for warning in validation['warnings']:
print(f" Warning: {warning}")
return result, validation
These complete workflow examples demonstrate how to integrate all PlanetScope-py components into comprehensive, production-ready analysis pipelines suitable for research, operational monitoring, and automated processing scenarios. The workflows include error handling, performance monitoring, validation, and deployment patterns for real-world usage.