Web Interface Architecture¶
Overview¶
The EEMT Web Interface is a modern, containerized FastAPI application that provides browser-based access to EEMT workflows. This document details the architectural design, component interactions, and implementation patterns that power the web interface.
System Architecture¶
High-Level Design¶
graph TB
subgraph "Client Layer"
Browser[Web Browser]
API[REST API Client]
end
subgraph "Application Layer"
subgraph "FastAPI Application"
Routes[API Routes]
Templates[HTML Templates]
Static[Static Assets]
WM[Workflow Manager]
DB[SQLite Database]
end
end
subgraph "Container Layer"
Docker[Docker SDK]
Containers[Workflow Containers]
end
subgraph "Storage Layer"
Uploads[Uploads Volume]
Results[Results Volume]
Temp[Temp Volume]
end
Browser --> Routes
API --> Routes
Routes --> WM
Routes --> DB
WM --> Docker
Docker --> Containers
Containers --> Uploads
Containers --> Results
Containers --> Temp
Routes --> Templates
Templates --> Static
Component Overview¶
The web interface consists of several interconnected components:
- FastAPI Application: Core web framework handling HTTP requests
- Workflow Manager: Orchestrates container execution and job management
- Database Layer: SQLite for job persistence and tracking
- Docker Integration: Container lifecycle management
- Frontend: HTML/JavaScript interface for user interaction
- Storage Management: Volume handling for data persistence
FastAPI Application Structure¶
Directory Layout¶
web-interface/
├── app.py # Main FastAPI application
├── containers/
│ ├── __init__.py
│ └── workflow_manager.py # Container orchestration
├── models/
│ ├── __init__.py
│ └── job.py # Pydantic models
├── templates/
│ ├── index.html # Job submission page
│ ├── monitor.html # Job monitoring dashboard
│ └── base.html # Base template
├── static/
│ ├── css/
│ │ └── style.css
│ ├── js/
│ │ ├── main.js
│ │ └── monitor.js
│ └── img/
├── uploads/ # DEM file uploads
├── results/ # Job outputs
├── temp/ # Temporary processing
├── cache/ # Workflow cache
├── jobs.db # SQLite database
└── requirements.txt # Python dependencies
Application Initialization¶
# app.py core structure
from fastapi import FastAPI, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import sqlite3
app = FastAPI(
title="EEMT Web Interface",
description="Effective Energy and Mass Transfer Workflow System",
version="2.0.0"
)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Configure templates
templates = Jinja2Templates(directory="templates")
# Initialize database
def init_db():
conn = sqlite3.connect('jobs.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
workflow_type TEXT,
status TEXT,
created_at TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
parameters JSON,
dem_filename TEXT,
error_message TEXT,
progress INTEGER DEFAULT 0
)
''')
conn.commit()
conn.close()
@app.on_event("startup")
async def startup_event():
init_db()
ensure_directories()
check_docker_availability()
Request Handling Flow¶
Job Submission Workflow¶
sequenceDiagram
participant User
participant Browser
participant FastAPI
participant Validator
participant Storage
participant WorkflowManager
participant Docker
participant Container
User->>Browser: Select DEM & Parameters
Browser->>FastAPI: POST /api/submit-job
FastAPI->>Validator: Validate Input
Validator-->>FastAPI: Validation Result
alt Validation Success
FastAPI->>Storage: Save DEM File
FastAPI->>WorkflowManager: Create Job
WorkflowManager->>Docker: Start Container
Docker->>Container: Execute Workflow
FastAPI-->>Browser: Return Job ID
Browser-->>User: Show Success
else Validation Failed
FastAPI-->>Browser: Return Error
Browser-->>User: Show Error
end
API Route Structure¶
# Core API routes
@app.post("/api/submit-job")
async def submit_job(
workflow_type: str = Form(...),
dem_file: UploadFile = File(...),
step: float = Form(15.0),
linke_value: float = Form(3.0),
albedo_value: float = Form(0.2),
num_threads: int = Form(4),
background_tasks: BackgroundTasks = BackgroundTasks()
):
# Validate input
job_id = str(uuid.uuid4())
# Save uploaded file
file_path = save_upload(dem_file, job_id)
# Create job record
create_job_record(job_id, workflow_type, parameters)
# Start workflow in background
background_tasks.add_task(
workflow_manager.execute_workflow,
job_id, workflow_type, file_path, parameters
)
return {"job_id": job_id, "status": "submitted"}
@app.get("/api/jobs")
async def list_jobs():
return get_all_jobs()
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str):
job = get_job_by_id(job_id)
if not job:
raise HTTPException(404, "Job not found")
return job
@app.get("/api/jobs/{job_id}/results")
async def download_results(job_id: str):
results_path = f"results/{job_id}"
if not os.path.exists(results_path):
raise HTTPException(404, "Results not found")
zip_path = create_results_archive(job_id)
return FileResponse(zip_path, media_type='application/zip')
Workflow Manager Architecture¶
Container Orchestration¶
The Workflow Manager is responsible for container lifecycle management:
class WorkflowManager:
def __init__(self, base_dir: Path):
self.base_dir = base_dir
self.docker_client = docker.from_env()
self.image_name = "eemt:ubuntu24.04"
async def execute_workflow(
self,
job_id: str,
workflow_type: str,
dem_path: Path,
parameters: dict
):
"""Execute workflow in container"""
# Prepare container configuration
container_config = self._prepare_container_config(
job_id, workflow_type, dem_path, parameters
)
# Start container
container = self._start_container(container_config)
# Monitor execution
await self._monitor_container(container, job_id)
# Collect results
self._collect_results(container, job_id)
# Cleanup
self._cleanup_container(container)
def _prepare_container_config(self, job_id, workflow_type, dem_path, parameters):
"""Prepare Docker container configuration"""
# Base configuration
config = {
'image': self.image_name,
'name': f'eemt-job-{job_id}',
'detach': True,
'remove': False,
'volumes': {
str(self.base_dir / 'uploads'): {
'bind': '/data/input',
'mode': 'ro'
},
str(self.base_dir / 'results' / job_id): {
'bind': '/data/output',
'mode': 'rw'
},
str(self.base_dir / 'temp'): {
'bind': '/tmp/eemt',
'mode': 'rw'
}
},
'environment': {
'JOB_ID': job_id,
'WORKFLOW_TYPE': workflow_type,
**self._parameters_to_env(parameters)
},
'cpu_quota': parameters.get('num_threads', 4) * 100000,
'mem_limit': '8g'
}
# Workflow-specific command
if workflow_type == 'sol':
config['command'] = self._build_solar_command(parameters)
else:
config['command'] = self._build_eemt_command(parameters)
return config
Container Monitoring¶
class ContainerMonitor:
def __init__(self, docker_client):
self.docker_client = docker_client
self.active_containers = {}
async def monitor_container(self, container, job_id):
"""Monitor container execution and update progress"""
# Track container
self.active_containers[job_id] = container
try:
# Update job status to running
update_job_status(job_id, 'running')
# Stream logs and parse progress
for line in container.logs(stream=True, follow=True):
progress = self._parse_progress(line)
if progress:
update_job_progress(job_id, progress)
# Wait for completion
result = container.wait()
if result['StatusCode'] == 0:
update_job_status(job_id, 'completed')
else:
error_msg = self._get_error_message(container)
update_job_status(job_id, 'failed', error_msg)
except Exception as e:
update_job_status(job_id, 'failed', str(e))
finally:
# Cleanup
del self.active_containers[job_id]
def _parse_progress(self, log_line):
"""Extract progress from container logs"""
# Parse progress indicators from workflow output
if b'Processing day' in log_line:
match = re.search(rb'Processing day (\d+) of 365', log_line)
if match:
day = int(match.group(1))
return int((day / 365) * 100)
return None
Database Architecture¶
Schema Design¶
-- Jobs table
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
workflow_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
parameters JSON,
dem_filename TEXT,
container_id TEXT,
error_message TEXT,
progress INTEGER DEFAULT 0,
CHECK (status IN ('pending', 'running', 'completed', 'failed')),
CHECK (workflow_type IN ('sol', 'eemt'))
);
-- Job logs table
CREATE TABLE job_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
level TEXT,
message TEXT,
FOREIGN KEY (job_id) REFERENCES jobs(id)
);
-- Metrics table
CREATE TABLE metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
cpu_usage REAL,
memory_usage INTEGER,
disk_io INTEGER,
FOREIGN KEY (job_id) REFERENCES jobs(id)
);
-- Indexes for performance
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_created ON jobs(created_at);
CREATE INDEX idx_logs_job ON job_logs(job_id);
CREATE INDEX idx_metrics_job ON metrics(job_id);
Database Access Layer¶
class JobDatabase:
def __init__(self, db_path='jobs.db'):
self.db_path = db_path
def create_job(self, job_id, workflow_type, parameters, dem_filename):
"""Create new job record"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO jobs (id, workflow_type, parameters, dem_filename)
VALUES (?, ?, ?, ?)
''', (job_id, workflow_type, json.dumps(parameters), dem_filename))
conn.commit()
def update_job_status(self, job_id, status, error_message=None):
"""Update job status"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if status == 'running':
cursor.execute('''
UPDATE jobs
SET status = ?, started_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, job_id))
elif status in ['completed', 'failed']:
cursor.execute('''
UPDATE jobs
SET status = ?, completed_at = CURRENT_TIMESTAMP,
error_message = ?
WHERE id = ?
''', (status, error_message, job_id))
else:
cursor.execute('''
UPDATE jobs SET status = ? WHERE id = ?
''', (status, job_id))
conn.commit()
def get_job(self, job_id):
"""Get job by ID"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM jobs WHERE id = ?', (job_id,))
row = cursor.fetchone()
return dict(row) if row else None
Frontend Architecture¶
HTML Templates¶
The frontend uses Jinja2 templates with Bootstrap for styling:
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}EEMT Web Interface{% endblock %}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="{{ url_for('static', path='/css/style.css') }}" rel="stylesheet">
{% block head %}{% endblock %}
</head>
<body>
<nav class="navbar navbar-dark bg-primary">
<div class="container-fluid">
<a class="navbar-brand" href="/">EEMT Web Interface</a>
<ul class="navbar-nav ms-auto">
<li class="nav-item">
<a class="nav-link" href="/monitor">Monitor Jobs</a>
</li>
</ul>
</div>
</nav>
<div class="container mt-4">
{% block content %}{% endblock %}
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="{{ url_for('static', path='/js/main.js') }}"></script>
{% block scripts %}{% endblock %}
</body>
</html>
JavaScript Architecture¶
// static/js/monitor.js
class JobMonitor {
constructor() {
this.jobs = new Map();
this.updateInterval = 5000; // 5 seconds
this.intervalId = null;
}
async start() {
// Initial load
await this.updateJobs();
// Start periodic updates
this.intervalId = setInterval(() => {
this.updateJobs();
}, this.updateInterval);
}
async updateJobs() {
try {
const response = await fetch('/api/jobs');
const jobs = await response.json();
// Update job list
this.renderJobs(jobs);
// Update statistics
this.updateStatistics(jobs);
} catch (error) {
console.error('Failed to update jobs:', error);
}
}
renderJobs(jobs) {
const tbody = document.getElementById('job-table-body');
tbody.innerHTML = '';
jobs.forEach(job => {
const row = this.createJobRow(job);
tbody.appendChild(row);
});
}
createJobRow(job) {
const row = document.createElement('tr');
row.innerHTML = `
<td>${job.id.substring(0, 8)}</td>
<td>${job.workflow_type.toUpperCase()}</td>
<td>${this.renderStatus(job.status)}</td>
<td>${this.renderProgress(job.progress)}</td>
<td>${new Date(job.created_at).toLocaleString()}</td>
<td>${this.renderActions(job)}</td>
`;
return row;
}
renderStatus(status) {
const badges = {
'pending': 'badge bg-secondary',
'running': 'badge bg-primary',
'completed': 'badge bg-success',
'failed': 'badge bg-danger'
};
return `<span class="${badges[status]}">${status}</span>`;
}
renderProgress(progress) {
return `
<div class="progress">
<div class="progress-bar" style="width: ${progress}%">
${progress}%
</div>
</div>
`;
}
}
// Initialize monitor on page load
document.addEventListener('DOMContentLoaded', () => {
const monitor = new JobMonitor();
monitor.start();
});
Storage Management¶
Volume Architecture¶
graph LR
subgraph "Host Filesystem"
HD[Host Data Directory]
end
subgraph "Docker Volumes"
UV[Uploads Volume]
RV[Results Volume]
TV[Temp Volume]
CV[Cache Volume]
end
subgraph "Container Mounts"
CI[/data/input]
CO[/data/output]
CT[/tmp/eemt]
CC[/data/cache]
end
HD --> UV
HD --> RV
HD --> TV
HD --> CV
UV --> CI
RV --> CO
TV --> CT
CV --> CC
File Management¶
class FileManager:
def __init__(self, base_dir: Path):
self.base_dir = base_dir
self.uploads_dir = base_dir / 'uploads'
self.results_dir = base_dir / 'results'
self.temp_dir = base_dir / 'temp'
self.cache_dir = base_dir / 'cache'
# Ensure directories exist
for dir_path in [self.uploads_dir, self.results_dir,
self.temp_dir, self.cache_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
async def save_upload(self, file: UploadFile, job_id: str) -> Path:
"""Save uploaded file"""
file_path = self.uploads_dir / f"{job_id}_{file.filename}"
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
return file_path
def create_results_archive(self, job_id: str) -> Path:
"""Create ZIP archive of results"""
results_path = self.results_dir / job_id
archive_path = self.temp_dir / f"results_{job_id}.zip"
with zipfile.ZipFile(archive_path, 'w') as zipf:
for root, dirs, files in os.walk(results_path):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(results_path)
zipf.write(file_path, arcname)
return archive_path
def cleanup_job_files(self, job_id: str, keep_results=False):
"""Clean up job-related files"""
# Remove upload
upload_files = self.uploads_dir.glob(f"{job_id}_*")
for file in upload_files:
file.unlink()
# Remove temp files
temp_files = self.temp_dir.glob(f"*{job_id}*")
for file in temp_files:
file.unlink()
# Optionally remove results
if not keep_results:
results_path = self.results_dir / job_id
if results_path.exists():
shutil.rmtree(results_path)
Security Architecture¶
Input Validation¶
from pydantic import BaseModel, Field, validator
class WorkflowParameters(BaseModel):
workflow_type: str = Field(..., regex='^(sol|eemt)$')
step: float = Field(15.0, ge=3.0, le=60.0)
linke_value: float = Field(3.0, ge=1.0, le=8.0)
albedo_value: float = Field(0.2, ge=0.0, le=1.0)
num_threads: int = Field(4, ge=1, le=32)
start_year: Optional[int] = Field(None, ge=1980, le=2024)
end_year: Optional[int] = Field(None, ge=1980, le=2024)
@validator('end_year')
def validate_year_range(cls, v, values):
if v and 'start_year' in values:
if v < values['start_year']:
raise ValueError('End year must be >= start year')
return v
class FileValidator:
@staticmethod
def validate_dem_file(file: UploadFile) -> bool:
"""Validate uploaded DEM file"""
# Check file extension
if not file.filename.lower().endswith(('.tif', '.tiff')):
raise ValueError("File must be GeoTIFF format")
# Check file size (max 1GB)
if file.size > 1024 * 1024 * 1024:
raise ValueError("File size exceeds 1GB limit")
# Verify GDAL can read it
# (Implementation would check file headers)
return True
Authentication & Authorization (Future)¶
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials):
"""Verify JWT token"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
@app.post("/api/submit-job")
async def submit_job(
credentials: HTTPAuthorizationCredentials = Security(security),
...
):
user = await verify_token(credentials)
# Process job with user context
Performance Considerations¶
Async Operations¶
class AsyncWorkflowManager:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
self.loop = asyncio.get_event_loop()
async def execute_workflow_async(self, job_id, params):
"""Execute workflow asynchronously"""
# Run blocking Docker operations in thread pool
container = await self.loop.run_in_executor(
self.executor,
self._start_container_sync,
job_id, params
)
# Monitor asynchronously
await self._monitor_container_async(container, job_id)
Caching Strategy¶
from functools import lru_cache
import redis
class CacheManager:
def __init__(self):
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
@lru_cache(maxsize=128)
def get_cached_result(self, cache_key: str):
"""Get cached computation result"""
return self.redis_client.get(cache_key)
def cache_result(self, cache_key: str, data: dict, ttl=3600):
"""Cache computation result"""
self.redis_client.setex(
cache_key,
ttl,
json.dumps(data)
)
Monitoring & Observability¶
Metrics Collection¶
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
job_submissions = Counter('eemt_job_submissions_total',
'Total job submissions',
['workflow_type'])
job_duration = Histogram('eemt_job_duration_seconds',
'Job execution duration',
['workflow_type'])
active_jobs = Gauge('eemt_active_jobs',
'Currently active jobs')
# Use in application
@app.post("/api/submit-job")
async def submit_job(...):
job_submissions.labels(workflow_type=workflow_type).inc()
active_jobs.inc()
# ...
Health Checks¶
@app.get("/health")
async def health_check():
"""Application health check endpoint"""
checks = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'checks': {}
}
# Check Docker availability
try:
docker_client = docker.from_env()
docker_client.ping()
checks['checks']['docker'] = 'ok'
except:
checks['checks']['docker'] = 'failed'
checks['status'] = 'degraded'
# Check database
try:
conn = sqlite3.connect('jobs.db')
conn.execute('SELECT 1')
conn.close()
checks['checks']['database'] = 'ok'
except:
checks['checks']['database'] = 'failed'
checks['status'] = 'unhealthy'
# Check disk space
disk_usage = shutil.disk_usage('/')
if disk_usage.free < 1024 * 1024 * 1024: # Less than 1GB
checks['checks']['disk'] = 'low'
checks['status'] = 'degraded'
else:
checks['checks']['disk'] = 'ok'
return checks
Future Enhancements¶
Planned Features¶
- WebSocket Support: Real-time progress updates
- User Authentication: JWT-based auth system
- Job Queuing: Advanced queue management with priorities
- Result Visualization: In-browser GeoTIFF viewing
- Workflow Templates: Pre-configured parameter sets
- API Rate Limiting: Request throttling
- Distributed Tracing: OpenTelemetry integration
Architecture Evolution¶
graph TD
subgraph "Current Architecture"
A1[Monolithic FastAPI]
A2[SQLite]
A3[Local Docker]
end
subgraph "Future Architecture"
B1[API Gateway]
B2[Microservices]
B3[PostgreSQL]
B4[Redis Cache]
B5[Message Queue]
B6[Kubernetes]
end
A1 --> B1
A1 --> B2
A2 --> B3
A3 --> B6
B2 --> B4
B2 --> B5