# Standard library imports import os import json import time import random import logging import hashlib from functools import wraps from pathlib import Path from typing import Tuple, Optional, Dict, Any from enum import Enum import shutil # Third-party library imports import requests from requests import ConnectionError from flask import Flask, send_file, jsonify, request from PIL import Image import numpy as np import cv2 from pydantic import BaseModel, Field, HttpUrl, field_validator from dotenv import load_dotenv import psutil load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Directory setup IMAGES_DIR = Path("images") IMAGES_DIR.mkdir(exist_ok=True) # Custom Exceptions class ImageProcessingError(Exception): """Base exception for image processing errors""" pass class APIError(Exception): """Exception for API-related errors""" pass class ImageNotFoundError(Exception): """Exception for missing image files""" pass # Cache Configuration CACHE_EXPIRY_SECONDS = 3 * 60 * 60 # 3 hours _api_cache: Dict[str, Dict[str, Any]] = {} def get_cache_key(url: str, method: str = "GET", data: Optional[Dict] = None, headers: Optional[Dict] = None, params: Optional[Dict] = None) -> str: """ Generate a unique cache key for API requests. Args: url (str): The API endpoint URL method (str, optional): HTTP method. Defaults to "GET" data (Optional[Dict], optional): Request data for POST requests. Defaults to None headers (Optional[Dict], optional): Request headers. Defaults to None params (Optional[Dict], optional): URL parameters. Defaults to None Returns: str: MD5 hash of the request parameters for use as cache key """ key_data = f"{method}:{url}" if data: key_data += f":data:{json.dumps(data, sort_keys=True)}" if headers: # Sort headers to ensure consistent key generation sorted_headers = json.dumps(dict(sorted(headers.items())), sort_keys=True) key_data += f":headers:{sorted_headers}" if params: key_data += f":params:{json.dumps(params, sort_keys=True)}" return hashlib.md5(key_data.encode()).hexdigest() def is_cache_valid(cache_entry: Dict[str, Any]) -> bool: """ Check if a cache entry is still valid based on expiry time. Args: cache_entry (Dict[str, Any]): Cache entry containing timestamp and data Returns: bool: True if cache entry is still valid, False if expired """ return time.time() - cache_entry["timestamp"] < CACHE_EXPIRY_SECONDS def cached_request(method: str, url: str, **kwargs) -> requests.Response: """ Wrapper for requests that caches responses for 3 hours. Excludes 'original' endpoints where actual image data is downloaded. This function implements intelligent caching for API requests: - Caches JSON responses for non-image endpoints - Bypasses cache for '/original' endpoints to ensure fresh image data - Automatically handles cache expiry and cleanup Args: method (str): HTTP method (GET, POST, etc.) url (str): Target URL for the request **kwargs: Additional arguments passed to requests.request() Returns: requests.Response: Either cached response or fresh API response Raises: requests.RequestException: For network or HTTP errors """ # Don't cache original image downloads if "/original" in url: logger.info(f"Bypassing cache for original image request: {url}") return requests.request(method, url, **kwargs) # Generate cache key including headers and other parameters data = kwargs.get('json') if method.upper() == 'POST' else kwargs.get('data') headers = kwargs.get('headers') params = kwargs.get('params') cache_key = get_cache_key(url, method, data, headers, params) # Check if we have a valid cached response if cache_key in _api_cache and is_cache_valid(_api_cache[cache_key]): logger.info(f"Using cached response for: {url}") cached_entry = _api_cache[cache_key] # Create a mock response object with cached data response = requests.Response() response._content = json.dumps(cached_entry["data"]).encode() response.status_code = cached_entry["status_code"] response.headers.update(cached_entry["headers"]) return response # Make the actual request logger.info(f"Making fresh API request to: {url}") response = requests.request(method, url, **kwargs) # Cache successful responses (excluding binary content) if response.status_code == 200: try: # Only cache JSON responses response_data = response.json() _api_cache[cache_key] = { "data": response_data, "status_code": response.status_code, "headers": dict(response.headers), "timestamp": time.time() } logger.info(f"Cached response for: {url}") except (json.JSONDecodeError, ValueError): # Don't cache non-JSON responses logger.info(f"Not caching non-JSON response for: {url}") return response def clear_expired_cache(): """ Remove expired entries from the cache to prevent memory bloat. This function iterates through all cache entries and removes those that have exceeded the CACHE_EXPIRY_SECONDS threshold. """ current_time = time.time() expired_keys = [ key for key, entry in _api_cache.items() if current_time - entry["timestamp"] >= CACHE_EXPIRY_SECONDS ] for key in expired_keys: del _api_cache[key] if expired_keys: logger.info(f"Cleared {len(expired_keys)} expired cache entries") # Pydantic Models class ImageFormat(str, Enum): """Supported image output formats.""" PNG = "png" JPEG = "jpeg" HEIC = "heic" class SearchQuery(BaseModel): """Model for Immich search API requests.""" query: str = Field(..., min_length=1, description="Search query string") type: str = Field("IMAGE", description="Asset type to search for") @field_validator('query') def validate_query(cls, v): # Add query sanitization forbidden_chars = ['<', '>', '"', "'"] if any(char in v for char in forbidden_chars): raise ValueError("Query contains forbidden characters") return v.strip() @field_validator("type") def validate_asset_type(cls, v): allowed_types = ["IMAGE", "VIDEO"] if v not in allowed_types: raise ValueError(f"Invalid asset type: {v}. Allowed values are {allowed_types}.") return v class APIConfig(BaseModel): """Configuration for Immich API connection.""" base_url: HttpUrl = Field(os.getenv("IMMICH_URL"), description="Base URL for Immich API") api_key: str = Field(os.getenv("IMMICH_API_KEY"), min_length=1, description="API key for authentication") @field_validator("base_url") def validate_url(cls, v): if not v.endswith("/api"): raise ValueError("Base URL must end with '/api'.") return v class ImageDimensions(BaseModel): """Target dimensions for image processing.""" width: int = Field(..., gt=0, description="Target width in pixels") height: int = Field(..., gt=0, description="Target height in pixels") @field_validator("width", "height") def validate_positive(cls, v): if v <= 0: raise ValueError("Width and height must be positive integers.") return v class ProcessingConfig(BaseModel): """Configuration for image processing operations.""" target_dimensions: ImageDimensions = ImageDimensions(width=3840, height=2160) target_aspect_ratio: Tuple[int, int] = Field(default=(16, 9), description="Target aspect ratio as (width, height)") output_format: ImageFormat = ImageFormat.PNG output_filename: str = "current_image.png" @field_validator("output_filename") def validate_filename(cls, v): if not v.endswith((".png", ".jpeg", ".jpg", ".heic")): raise ValueError("Output filename must end with a supported file extension (.png, .jpeg, .heic).") return v.strip() @property def get_output_path(self) -> str: """Generate full path for output file.""" return IMAGES_DIR / self.output_filename # Initialize Flask app and configs app = Flask(__name__) api_config = APIConfig() processing_config = ProcessingConfig() def error_handler(func): """ Decorator for consistent error handling across Flask routes. This decorator catches and properly formats various types of exceptions: - APIError: Network/API related issues (503 status) - ImageProcessingError: Image manipulation issues (500 status) - General exceptions: Unexpected errors (500 status) Args: func: The Flask route function to wrap Returns: Wrapped function with error handling """ @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except APIError as e: logger.error(f"API Error: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 503 except ImageProcessingError as e: logger.error(f"Image Processing Error: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 except Exception as e: logger.error(f"Unexpected Error: {str(e)}") return jsonify({"status": "error", "message": "Internal server error"}), 500 return wrapper def resize_to_4k(img: np.ndarray) -> np.ndarray: """ Resize an image to 4K resolution while maintaining aspect ratio. This function calculates the optimal scale factor to fit the image within 4K dimensions (3840x2160) without distortion. Args: img (np.ndarray): Input image as OpenCV array Returns: np.ndarray: Resized image array Raises: ImageProcessingError: If resize operation fails """ try: target = processing_config.target_dimensions h, w = img.shape[:2] scale = min(target.width / w, target.height / h) new_w = int(w * scale) new_h = int(h * scale) return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) except Exception as e: raise ImageProcessingError(f"Failed to resize image: {str(e)}") def get_salient_crop(img: np.ndarray, target_aspect_ratio: Optional[Tuple[int, int]] = None) -> np.ndarray: """ Crop image around the most salient region while maintaining specified aspect ratio. This function uses gradient-based saliency detection to identify the most interesting region of the image, then crops around that area while maintaining the target aspect ratio. Algorithm: 1. Convert to grayscale and compute gradients using Sobel operators 2. Calculate gradient magnitude as saliency measure 3. Apply Gaussian blur to smooth the saliency map 4. Find the point of maximum saliency 5. Crop around this point with the target aspect ratio Args: img (np.ndarray): Input image as OpenCV array target_aspect_ratio (Optional[Tuple[int, int]]): Desired aspect ratio as (width, height). Defaults to config value if None. Returns: np.ndarray: Cropped image array Raises: ImageProcessingError: If crop operation fails """ try: target_aspect_ratio = target_aspect_ratio or processing_config.target_aspect_ratio height, width = img.shape[:2] # Convert to grayscale and calculate saliency gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) gradient_magnitude = np.sqrt(grad_x ** 2 + grad_y ** 2) gradient_magnitude = cv2.normalize(gradient_magnitude, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) saliency_map = cv2.GaussianBlur(gradient_magnitude, (21, 21), 0) _, _, _, max_loc = cv2.minMaxLoc(saliency_map) # Calculate crop dimensions crop_ratio = target_aspect_ratio[0] / target_aspect_ratio[1] crop_width = min(width, int(height * crop_ratio)) crop_height = min(height, int(width / crop_ratio)) x_center, y_center = max_loc x1 = max(0, x_center - crop_width // 2) y1 = max(0, y_center - crop_height // 2) x2 = min(width, x1 + crop_width) y2 = min(height, y1 + crop_height) # Adjust crop coordinates if necessary if x2 - x1 < crop_width: x1 = 0 if x1 == 0 else width - crop_width x2 = crop_width if x1 == 0 else width if y2 - y1 < crop_height: y1 = 0 if y1 == 0 else height - crop_height y2 = crop_height if y1 == 0 else height return img[y1:y2, x1:x2] except Exception as e: raise ImageProcessingError(f"Failed to crop image: {str(e)}") def get_new_random_image(query: Optional[str] = None, range_limit: Optional[int] = None) -> bool: """ Fetch and process a random image from Immich API. This function performs the complete workflow: 1. Search for images matching the query using cached requests 2. Select a random image from search results 3. Download the original image (bypasses cache) 4. Fetch image metadata (cached) 5. Process the image (crop and resize) 6. Save the final image to the images directory The function uses smart caching to reduce API calls while ensuring fresh image data for downloads. Args: query (Optional[str]): Search query string. Defaults to "Mountains. Landscape. Nature" range_limit (Optional[int]): Maximum number of results to consider. Defaults to 50 Returns: bool: True if image processing completed successfully Raises: APIError: If API requests fail or no images found ImageProcessingError: If image processing operations fail """ try: # Clear expired cache entries clear_expired_cache() # Use provided query or default search_query_text = query or os.getenv("DEFAULT_QUERY", "Mountains") result_limit = range_limit or 50 # Prepare API request search_query = SearchQuery( query=search_query_text ) headers = { 'x-api-key': api_config.api_key, 'Content-Type': 'application/json' } # Search for images - this will be cached response = cached_request( "POST", f"{api_config.base_url}/search/smart", headers=headers, json=search_query.model_dump() ) response.raise_for_status() # Process response response_data = response.json() asset_count = min(result_limit, response_data["assets"]["count"]) if asset_count == 0: raise APIError("No images found") random_asset_id = response_data["assets"]["items"][random.randint(0, asset_count - 1)]["id"] # Fetch image - this will NOT be cached (bypassed for /original endpoints) random_asset = cached_request( "GET", f"{api_config.base_url}/assets/{random_asset_id}/original", headers=headers ) random_asset.raise_for_status() # Fetch metadata - this will be cached asset_metadata = cached_request( "GET", f"{api_config.base_url}/assets/{random_asset_id}", headers=headers ) asset_metadata.raise_for_status() # Process image actual_extension = asset_metadata.json()["originalMimeType"].split("/")[1] temp_image_path = IMAGES_DIR / f"temp_random_asset.{actual_extension}" output_path = processing_config.get_output_path try: # Save temporary image file with open(temp_image_path, "wb") as f: f.write(random_asset.content) # Handle HEIC format if actual_extension.lower() == 'heic': from pillow_heif import register_heif_opener register_heif_opener() # Process the image with Image.open(temp_image_path) as img: # Convert to appropriate color mode if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info): img = img.convert('RGBA') else: img = img.convert('RGB') # Convert to OpenCV format and process opencv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) cropped_img = get_salient_crop(opencv_img) resized_img = resize_to_4k(cropped_img) # Convert back to PIL and save final image final_img = Image.fromarray(cv2.cvtColor(resized_img, cv2.COLOR_BGR2RGB)) final_img.save( output_path, processing_config.output_format.value, optimize=True ) finally: # Clean up temporary file if temp_image_path.exists(): temp_image_path.unlink() logger.info(f"Successfully processed and saved image to: {output_path}") return True except requests.exceptions.RequestException as e: raise APIError(f"Failed to fetch image: {str(e)}") except Exception as e: raise ImageProcessingError(f"Failed to process image: {str(e)}") @app.route('/new-image') @error_handler def new_image(): """ Flask endpoint to generate a new processed image. This endpoint triggers the complete image processing workflow: fetching, processing, and saving a new random image. Query Parameters: query (str, optional): Search query for image selection range (int, optional): Maximum number of search results to consider (default: 50) Returns: JSON response indicating success or error status """ # Get optional query parameters query = request.args.get('query') range_limit = request.args.get('range', type=int) get_new_random_image(query=query, range_limit=range_limit) return jsonify({"status": "success", "message": "New image generated successfully"}) @app.route('/image') @error_handler def serve_image(): """ Flask endpoint to serve the current processed image. Returns the most recently processed image file from the images directory. Returns: Flask file response with the current image Raises: ImageNotFoundError: If no processed image is available """ output_path = processing_config.get_output_path if not output_path.exists(): raise ImageNotFoundError("No image available") return send_file( output_path, mimetype=f'image/{processing_config.output_format.value}' ) @app.route('/cache-status') @error_handler def cache_status(): """ Flask endpoint to check cache status and statistics. Provides information about the current state of the API cache, including number of entries and expiry configuration. Returns: JSON response with cache statistics """ clear_expired_cache() return jsonify({ "status": "success", "cache_entries": len(_api_cache), "cache_expiry_hours": CACHE_EXPIRY_SECONDS / 3600, "images_directory": str(IMAGES_DIR.absolute()) }) @app.route('/images-info') @error_handler def images_info(): """ Flask endpoint to get information about the images directory. Returns statistics about saved images including directory path, current image status, and directory size. Returns: JSON response with images directory information """ output_path = processing_config.get_output_path return jsonify({ "status": "success", "images_directory": str(IMAGES_DIR.absolute()), "current_image_exists": output_path.exists(), "current_image_path": str(output_path) if output_path.exists() else None, "current_image_size_bytes": output_path.stat().st_size if output_path.exists() else 0 }) @app.route('/health', methods=['GET']) @error_handler def health_check(): """ Health check endpoint for monitoring system status. Returns: JSON response with service status, container health, and dependency checks. """ try: # Check disk space in container total, used, free = shutil.disk_usage("/") disk_ok = free / total > 0.1 # Ensure at least 10% free disk space # Memory usage memory = psutil.virtual_memory() memory_ok = memory.available / memory.total > 0.1 # At least 10% memory free # Check Immich API connectivity try: response = requests.get(str(api_config.base_url)) api_accessible = response.status_code == 404 except ConnectionError: api_accessible = False # Determine overall status healthy = disk_ok and memory_ok and api_accessible # Construct the response return jsonify({ "status": "healthy" if healthy else "unhealthy", "disk_usage": { "total_gb": total / (1024 ** 3), "used_gb": used / (1024 ** 3), "free_gb": free / (1024 ** 3), "status_ok": disk_ok }, "memory_usage": { "total_gb": memory.total / (1024 ** 3), "available_gb": memory.available / (1024 ** 3), "status_ok": memory_ok }, "api_connectivity": { "url": api_config.base_url, "status_ok": api_accessible } }), 200 except Exception as e: logger.error(f"Health check error: {str(e)}") return jsonify({ "status": "error", "message": str(e) }), 500 if __name__ == '__main__': # Ensure images directory exists and generate initial image IMAGES_DIR.mkdir(exist_ok=True) get_new_random_image() app.run()