import os import uuid import shutil import re from datetime import datetime, timedelta, date from typing import Dict, List, Optional import numpy as np from fastapi import ( FastAPI, UploadFile, File, HTTPException, Depends, Header, Request, Form, ) from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from PIL import Image import cv2 import logging from bson import ObjectId from pymongo import MongoClient import time logging.basicConfig(level=logging.INFO) log = logging.getLogger("api") from src.core import process_inpaint # Directories (use writable space on HF Spaces) BASE_DIR = os.environ.get("DATA_DIR", "/data") if not os.path.isdir(BASE_DIR): # Fallback to /tmp if /data not available BASE_DIR = "/tmp" UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open ENV_TOKEN = os.environ.get("API_TOKEN") app = FastAPI(title="Photo Object Removal API", version="1.0.0") # In-memory stores file_store: Dict[str, Dict[str, str]] = {} logs: List[Dict[str, str]] = [] MONGO_URI = "mongodb+srv://harilogicgo_db_user:pdnh6UCMsWvuTCoi@kiddoimages.k2a4nuv.mongodb.net/?appName=KiddoImages" mongo_client = MongoClient(MONGO_URI) mongo_db = mongo_client["object_remover"] mongo_logs = mongo_db["api_logs"] ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" admin_media_clicks = None def _init_admin_mongo() -> None: global admin_media_clicks if not ADMIN_MONGO_URI: log.info("Admin Mongo URI not provided; media click logging disabled") return try: admin_client = MongoClient(ADMIN_MONGO_URI) # get_default_database() extracts database from connection string (e.g., /adminPanel) admin_db = admin_client.get_default_database() if admin_db is None: # Fallback if no database in URI admin_db = admin_client["admin"] log.warning("No database in connection string, defaulting to 'admin'") admin_media_clicks = admin_db["media_clicks"] log.info( "Admin media click logging initialized: db=%s collection=%s", admin_db.name, admin_media_clicks.name, ) try: admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") log.info("Dropped legacy index user_id_1_header_1_media_id_1") except Exception as idx_err: # Index drop failure is non-critical (often permission issue) if "Unauthorized" not in str(idx_err): log.info("Skipping legacy index drop: %s", idx_err) except Exception as err: log.error("Failed to init admin Mongo client: %s", err) admin_media_clicks = None _init_admin_mongo() def _admin_logging_status() -> Dict[str, object]: if admin_media_clicks is None: return { "enabled": False, "db": None, "collection": None, } return { "enabled": True, "db": admin_media_clicks.database.name, "collection": admin_media_clicks.name, } def _build_ai_edit_daily_count( existing: Optional[List[Dict[str, object]]], today: date, ) -> List[Dict[str, object]]: """ Build / extend the ai_edit_daily_count array with the following rules: - Case A (no existing data): return [{date: today, count: 1}] - Case B (today already recorded): return list unchanged - Case C (gap in days): fill missing days with count=0 and append today with count=1 Additionally, the returned list is capped to the most recent 32 entries. The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. """ def _to_date_only(value: object) -> date: if isinstance(value, datetime): return value.date() if isinstance(value, date): return value # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime try: text = str(value) if len(text) == 10: return datetime.strptime(text, "%Y-%m-%d").date() return datetime.fromisoformat(text).date() except Exception: # If parsing fails, just treat as today to avoid crashing return today # Case A: first ever use (no array yet) if not existing: return [ { "date": datetime(today.year, today.month, today.day), "count": 1, } ] # Work on a shallow copy so we don't mutate original in-place result: List[Dict[str, object]] = list(existing) last_entry = result[-1] if result else None if not last_entry or "date" not in last_entry: # If structure is unexpected, re-initialize safely return [ { "date": datetime(today.year, today.month, today.day), "count": 1, } ] last_date = _to_date_only(last_entry["date"]) # If somehow the last stored date is in the future, do nothing to avoid corrupting history if last_date > today: return result # Case B: today's date already present as the last entry → unchanged if last_date == today: return result # Case C: there is a gap, fill missing days with count=0 and append today with count=1 cursor = last_date + timedelta(days=1) while cursor < today: result.append( { "date": datetime(cursor.year, cursor.month, cursor.day), "count": 0, } ) cursor += timedelta(days=1) # Finally add today's presence indicator result.append( { "date": datetime(today.year, today.month, today.day), "count": 1, } ) # Sort by date ascending (older dates first) to guarantee stable ordering: # [oldest, ..., newest] try: result.sort(key=lambda entry: _to_date_only(entry.get("date"))) except Exception: # If anything goes wrong during sort, fall back to current ordering pass # Enforce 32-entry limit (keep the most recent 32 days) if len(result) > 32: result = result[-32:] return result def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: if not ENV_TOKEN: return if authorization is None or not authorization.lower().startswith("bearer "): raise HTTPException(status_code=401, detail="Unauthorized") token = authorization.split(" ", 1)[1] if token != ENV_TOKEN: raise HTTPException(status_code=403, detail="Forbidden") class InpaintRequest(BaseModel): image_id: str mask_id: str invert_mask: bool = True # True => selected/painted area is removed passthrough: bool = False # If True, return the original image unchanged user_id: Optional[str] = None category_id: Optional[str] = None class SimpleRemoveRequest(BaseModel): image_id: str # Image with pink/magenta segments to remove def _coerce_object_id(value: Optional[str]) -> ObjectId: if value is None: return ObjectId() value_str = str(value).strip() if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): return ObjectId(value_str) if value_str.isdigit(): hex_str = format(int(value_str), "x") if len(hex_str) > 24: hex_str = hex_str[-24:] hex_str = hex_str.rjust(24, "0") return ObjectId(hex_str) return ObjectId() def _coerce_category_id(category_id: Optional[str]) -> ObjectId: raw = category_id or DEFAULT_CATEGORY_ID raw_str = str(raw).strip() if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): return ObjectId(raw_str) return _coerce_object_id(raw_str) def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: """Log to admin media_clicks collection only if user_id is provided.""" if admin_media_clicks is None: return # Only log if user_id is provided (not None/empty) if not user_id or not user_id.strip(): return try: user_obj = _coerce_object_id(user_id) category_obj = _coerce_category_id(category_id) now = datetime.utcnow() today = now.date() doc = admin_media_clicks.find_one({"userId": user_obj}) if doc: existing_daily = doc.get("ai_edit_daily_count") updated_daily = _build_ai_edit_daily_count(existing_daily, today) categories = doc.get("categories") or [] if any(cat.get("categoryId") == category_obj for cat in categories): # Category exists: increment click_count and ai_edit_complete, update dates admin_media_clicks.update_one( {"_id": doc["_id"], "categories.categoryId": category_obj}, { "$inc": { "categories.$.click_count": 1, "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) }, "$set": { "categories.$.lastClickedAt": now, "updatedAt": now, "ai_edit_last_date": now, "ai_edit_daily_count": updated_daily, }, }, ) else: # New category to existing document: push category, increment ai_edit_complete admin_media_clicks.update_one( {"_id": doc["_id"]}, { "$push": { "categories": { "categoryId": category_obj, "click_count": 1, "lastClickedAt": now, } }, "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields "$set": { "updatedAt": now, "ai_edit_last_date": now, "ai_edit_daily_count": updated_daily, }, }, ) else: # New user: create document with default ai_edit_complete=0, then increment to 1 daily_for_new = _build_ai_edit_daily_count(None, today) admin_media_clicks.update_one( {"userId": user_obj}, { "$setOnInsert": { "userId": user_obj, "categories": [ { "categoryId": category_obj, "click_count": 1, "lastClickedAt": now, } ], "createdAt": now, "updatedAt": now, "ai_edit_complete": 0, # Default for new users "ai_edit_daily_count": daily_for_new, }, "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use "$set": { "updatedAt": now, "ai_edit_last_date": now, }, }, upsert=True, ) except Exception as err: err_str = str(err) if "Unauthorized" in err_str or "not authorized" in err_str.lower(): log.warning( "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " "Check MongoDB user permissions.", admin_media_clicks.database.name, admin_media_clicks.name, ) else: log.warning("Admin media click logging failed: %s", err) @app.get("/") def root() -> Dict[str, object]: return { "name": "Photo Object Removal API", "status": "ok", "endpoints": { "GET /health": "health check", "POST /upload-image": "form-data: image=file", "POST /upload-mask": "form-data: mask=file", "POST /inpaint": "JSON: {image_id, mask_id}", "POST /inpaint-multipart": "form-data: image=file, mask=file", "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", "GET /download/{filename}": "download result image", "GET /result/{filename}": "view result image in browser", "GET /logs": "recent uploads/results", }, "auth": "set API_TOKEN env var to require Authorization: Bearer (except /health)", } @app.get("/health") def health() -> Dict[str, str]: return {"status": "healthy"} @app.get("/logging-status") def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" return _admin_logging_status() @app.post("/upload-image") def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: ext = os.path.splitext(image.filename)[1] or ".png" file_id = str(uuid.uuid4()) stored_name = f"{file_id}{ext}" stored_path = os.path.join(UPLOAD_DIR, stored_name) with open(stored_path, "wb") as f: shutil.copyfileobj(image.file, f) file_store[file_id] = { "type": "image", "filename": image.filename, "stored_name": stored_name, "path": stored_path, "timestamp": datetime.utcnow().isoformat(), } logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) return {"id": file_id, "filename": image.filename} @app.post("/upload-mask") def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: ext = os.path.splitext(mask.filename)[1] or ".png" file_id = str(uuid.uuid4()) stored_name = f"{file_id}{ext}" stored_path = os.path.join(UPLOAD_DIR, stored_name) with open(stored_path, "wb") as f: shutil.copyfileobj(mask.file, f) file_store[file_id] = { "type": "mask", "filename": mask.filename, "stored_name": stored_name, "path": stored_path, "timestamp": datetime.utcnow().isoformat(), } logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) return {"id": file_id, "filename": mask.filename} def _load_rgba_image(path: str) -> Image.Image: img = Image.open(path) return img.convert("RGBA") def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: """ Convert mask image to RGBA format (black/white mask). Standard convention: white (255) = area to remove, black (0) = area to keep Returns RGBA with white in RGB channels where removal is needed, alpha=255 """ if img.mode != "RGBA": # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep gray = img.convert("L") arr = np.array(gray) # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) rgba[:, :, 0] = mask_bw # R rgba[:, :, 1] = mask_bw # G rgba[:, :, 2] = mask_bw # B rgba[:, :, 3] = 255 # Fully opaque log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") return rgba # For RGBA: check if alpha channel is meaningful arr = np.array(img) alpha = arr[:, :, 3] rgb = arr[:, :, :3] # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values if alpha.mean() > 200: # Use RGB to determine mask: white/bright in RGB = remove gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) # Also detect magenta specifically magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) rgba = arr.copy() rgba[:, :, 0] = mask_bw # R rgba[:, :, 1] = mask_bw # G rgba[:, :, 2] = mask_bw # B rgba[:, :, 3] = 255 # Fully opaque log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") return rgba # Alpha channel encodes the mask - convert to RGB-based # Transparent areas (alpha < 128) = remove, Opaque areas = keep mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) rgba = arr.copy() rgba[:, :, 0] = mask_bw rgba[:, :, 1] = mask_bw rgba[:, :, 2] = mask_bw rgba[:, :, 3] = 255 log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") return rgba @app.post("/inpaint") def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: start_time = time.time() status = "success" error_msg = None output_name = None try: if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": raise HTTPException(status_code=404, detail="image_id not found") if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": raise HTTPException(status_code=404, detail="mask_id not found") img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) mask_img = Image.open(file_store[req.mask_id]["path"]) mask_rgba = _load_rgba_mask_from_image(mask_img) if req.passthrough: result = np.array(img_rgba.convert("RGB")) else: result = process_inpaint( np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask ) output_name = f"output_{uuid.uuid4().hex}.png" output_path = os.path.join(OUTPUT_DIR, output_name) Image.fromarray(result).save( output_path, "PNG", optimize=False, compress_level=1 ) log_media_click(req.user_id, req.category_id) return {"result": output_name} except Exception as e: status = "fail" error_msg = str(e) raise finally: end_time = time.time() response_time_ms = (end_time - start_time) * 1000 log_doc = { "input_image_id": req.image_id, "input_mask_id": req.mask_id, "output_id": output_name, "status": status, "timestamp": datetime.utcnow(), "ts": int(time.time()), "response_time_ms": response_time_ms } if error_msg: log_doc["error"] = error_msg try: mongo_logs.insert_one(log_doc) except Exception as mongo_err: log.error(f"Mongo log insert failed: {mongo_err}") # @app.post("/inpaint") # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": # raise HTTPException(status_code=404, detail="image_id not found") # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": # raise HTTPException(status_code=404, detail="mask_id not found") # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA # mask_rgba = _load_rgba_mask_from_image(mask_img) # # Debug: check mask before processing # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") # if req.passthrough: # result = np.array(img_rgba.convert("RGB")) # else: # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) # return {"result": result_name} @app.post("/inpaint-url") def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" start_time = time.time() status = "success" error_msg = None result_name = None try: if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": raise HTTPException(status_code=404, detail="image_id not found") if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": raise HTTPException(status_code=404, detail="mask_id not found") img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA mask_rgba = _load_rgba_mask_from_image(mask_img) if req.passthrough: result = np.array(img_rgba.convert("RGB")) else: result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) url = str(request.url_for("download_file", filename=result_name)) logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) log_media_click(req.user_id, req.category_id) return {"result": result_name, "url": url} except Exception as e: status = "fail" error_msg = str(e) raise finally: # Always log to regular MongoDB (mandatory) end_time = time.time() response_time_ms = (end_time - start_time) * 1000 log_doc = { "input_image_id": req.image_id, "input_mask_id": req.mask_id, "output_id": result_name, "status": status, "timestamp": datetime.utcnow(), "ts": int(time.time()), "response_time_ms": response_time_ms, } if error_msg: log_doc["error"] = error_msg try: mongo_logs.insert_one(log_doc) except Exception as mongo_err: log.error("Mongo log insert failed: %s", mongo_err) @app.post("/inpaint-multipart") def inpaint_multipart( image: UploadFile = File(...), mask: UploadFile = File(...), request: Request = None, invert_mask: bool = True, mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) passthrough: bool = False, user_id: Optional[str] = Form(None), category_id: Optional[str] = Form(None), _: None = Depends(bearer_auth), ) -> Dict[str, str]: start_time = time.time() status = "success" error_msg = None result_name = None try: # Load in-memory img = Image.open(image.file).convert("RGBA") m = Image.open(mask.file).convert("RGBA") if passthrough: # Just echo the input image, ignore mask result = np.array(img.convert("RGB")) result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) url: Optional[str] = None try: if request is not None: url = str(request.url_for("download_file", filename=result_name)) except Exception: url = None entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} if url: entry["url"] = url logs.append(entry) resp: Dict[str, str] = {"result": result_name} if url: resp["url"] = url log_media_click(user_id, category_id) return resp if mask_is_painted: # Auto-detect pink/magenta paint and convert to black/white mask # White pixels = areas to remove, Black pixels = areas to keep log.info("Auto-detecting pink/magenta paint from uploaded image...") m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) lower = np.array([150, 0, 100], dtype=np.uint8) upper = np.array([255, 120, 255], dtype=np.uint8) magenta_detected = ( (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) ).astype(np.uint8) * 255 # Method 2: Also check if original image was provided to find differences if img is not None: img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) if img_rgb.shape == m_rgb.shape: diff = cv2.absdiff(img_rgb, m_rgb) gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) # Any significant difference (>50) could be paint diff_mask = (gray_diff > 50).astype(np.uint8) * 255 # Combine with magenta detection binmask = cv2.bitwise_or(magenta_detected, diff_mask) else: binmask = magenta_detected else: # No original image provided, use magenta detection only binmask = magenta_detected # Clean up the mask: remove noise and fill small holes kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) # Close small gaps in the mask binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) # Remove small noise binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) nonzero = int((binmask > 0).sum()) log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) # If very few pixels detected, assume the user may already be providing a BW mask # and proceed without forcing strict detection if nonzero < 50: log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} # Create binary mask: Pink pixels → white (255), Everything else → black (0) # Encode in RGBA format for process_inpaint # process_inpaint does: mask = 255 - mask[:,:,3] # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) # alpha=255 (opaque/keep) → becomes 0 (black/keep) mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) mask_rgba[:, :, 1] = binmask # G: white where pink mask_rgba[:, :, 2] = binmask # B: white where pink # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha mask_rgba[:, :, 3] = 255 - binmask log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) else: mask_rgba = _load_rgba_mask_from_image(m) # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly actual_invert = invert_mask # Use default True for painted masks log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) url: Optional[str] = None try: if request is not None: url = str(request.url_for("download_file", filename=result_name)) except Exception: url = None entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} if url: entry["url"] = url logs.append(entry) resp: Dict[str, str] = {"result": result_name} if url: resp["url"] = url log_media_click(user_id, category_id) return resp except Exception as e: status = "fail" error_msg = str(e) raise finally: # Always log to regular MongoDB (mandatory) end_time = time.time() response_time_ms = (end_time - start_time) * 1000 log_doc = { "endpoint": "inpaint-multipart", "output_id": result_name, "status": status, "timestamp": datetime.utcnow(), "ts": int(time.time()), "response_time_ms": response_time_ms, } if error_msg: log_doc["error"] = error_msg try: mongo_logs.insert_one(log_doc) except Exception as mongo_err: log.error("Mongo log insert failed: %s", mongo_err) @app.post("/remove-pink") def remove_pink_segments( image: UploadFile = File(...), request: Request = None, user_id: Optional[str] = Form(None), category_id: Optional[str] = Form(None), _: None = Depends(bearer_auth), ) -> Dict[str, str]: """ Simple endpoint: upload an image with pink/magenta segments to remove. - Pink/Magenta segments → automatically removed (white in mask) - Everything else → automatically kept (black in mask) Just paint pink/magenta on areas you want to remove, upload the image, and it works! """ start_time = time.time() status = "success" error_msg = None result_name = None try: log.info(f"Simple remove-pink: processing image {image.filename}") # Load the image (with pink paint on it) img = Image.open(image.file).convert("RGBA") img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) # Auto-detect pink/magenta segments to remove # Pink/Magenta → white in mask (remove) # Everything else (natural image colors, including dark areas) → black in mask (keep) # Detect pink/magenta using fixed RGB bounds per requested logic lower = np.array([150, 0, 100], dtype=np.uint8) upper = np.array([255, 120, 255], dtype=np.uint8) binmask = ( (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) ).astype(np.uint8) * 255 # Clean up the pink mask kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) nonzero = int((binmask > 0).sum()) total_pixels = binmask.shape[0] * binmask.shape[1] log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") # Debug: log bounds used log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") if nonzero < 50: log.error("No pink segments detected! Returning original image.") result = np.array(img.convert("RGB")) result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) return { "result": result_name, "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." } # Create binary mask: Pink pixels → white (255), Everything else → black (0) # Encode in RGBA format that process_inpaint expects # process_inpaint does: mask = 255 - mask[:,:,3] # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) # alpha=255 (opaque/keep) → becomes 0 (black/keep) mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization mask_rgba[:, :, 0] = binmask # R: white where pink mask_rgba[:, :, 1] = binmask # G: white where pink mask_rgba[:, :, 2] = binmask # B: white where pink # Alpha: 0 (transparent) where pink → will become white after 255-alpha # 255 (opaque) everywhere else → will become black after 255-alpha mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 # Verify mask encoding alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) total_pixels = binmask.shape[0] * binmask.shape[1] log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! # Remove pink from the original image before processing # Create a clean version: where pink was detected, keep original image colors img_clean = np.array(img.convert("RGBA")) # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) # Actually, the model will inpaint over those areas, so we can pass the original # But for better results, we might want to remove the pink overlay first # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal log.info(f"Starting inpainting process...") result = process_inpaint(img_clean, mask_rgba, invert_mask=True) log.info(f"Inpainting complete, result shape: {result.shape}") result_name = f"output_{uuid.uuid4().hex}.png" result_path = os.path.join(OUTPUT_DIR, result_name) Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) url: Optional[str] = None try: if request is not None: url = str(request.url_for("download_file", filename=result_name)) except Exception: url = None logs.append({ "result": result_name, "filename": image.filename, "pink_pixels": nonzero, "timestamp": datetime.utcnow().isoformat() }) resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} if url: resp["url"] = url log_media_click(user_id, category_id) return resp except Exception as e: status = "fail" error_msg = str(e) raise finally: # Always log to regular MongoDB (mandatory) end_time = time.time() response_time_ms = (end_time - start_time) * 1000 log_doc = { "endpoint": "remove-pink", "output_id": result_name, "status": status, "timestamp": datetime.utcnow(), "ts": int(time.time()), "response_time_ms": response_time_ms, } if error_msg: log_doc["error"] = error_msg try: mongo_logs.insert_one(log_doc) except Exception as mongo_err: log.error("Mongo log insert failed: %s", mongo_err) @app.get("/download/{filename}") def download_file(filename: str): path = os.path.join(OUTPUT_DIR, filename) if not os.path.isfile(path): raise HTTPException(status_code=404, detail="file not found") return FileResponse(path) @app.get("/result/{filename}") def view_result(filename: str): """View result image directly in browser (same as download but with proper content-type for viewing)""" path = os.path.join(OUTPUT_DIR, filename) if not os.path.isfile(path): raise HTTPException(status_code=404, detail="file not found") return FileResponse(path, media_type="image/png") @app.get("/logs") def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: return JSONResponse(content=logs) # import os # import uuid # import shutil # import re # from datetime import datetime, timedelta, date # from typing import Dict, List, Optional # import numpy as np # from fastapi import ( # FastAPI, # UploadFile, # File, # HTTPException, # Depends, # Header, # Request, # Form, # ) # from fastapi.responses import FileResponse, JSONResponse # from pydantic import BaseModel # from PIL import Image # import cv2 # import logging # from bson import ObjectId # from pymongo import MongoClient # import time # logging.basicConfig(level=logging.INFO) # log = logging.getLogger("api") # from src.core import process_inpaint # # Directories (use writable space on HF Spaces) # BASE_DIR = os.environ.get("DATA_DIR", "/data") # if not os.path.isdir(BASE_DIR): # # Fallback to /tmp if /data not available # BASE_DIR = "/tmp" # UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") # OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") # os.makedirs(UPLOAD_DIR, exist_ok=True) # os.makedirs(OUTPUT_DIR, exist_ok=True) # # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open # ENV_TOKEN = os.environ.get("API_TOKEN") # app = FastAPI(title="Photo Object Removal API", version="1.0.0") # # In-memory stores # file_store: Dict[str, Dict[str, str]] = {} # logs: List[Dict[str, str]] = [] # MONGO_URI = "mongodb+srv://harilogicgo_db_user:pdnh6UCMsWvuTCoi@kiddoimages.k2a4nuv.mongodb.net/?appName=KiddoImages" # mongo_client = MongoClient(MONGO_URI) # mongo_db = mongo_client["object_remover"] # mongo_logs = mongo_db["api_logs"] # ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") # DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" # admin_media_clicks = None # def _init_admin_mongo() -> None: # global admin_media_clicks # if not ADMIN_MONGO_URI: # log.info("Admin Mongo URI not provided; media click logging disabled") # return # try: # admin_client = MongoClient(ADMIN_MONGO_URI) # # get_default_database() extracts database from connection string (e.g., /adminPanel) # admin_db = admin_client.get_default_database() # if admin_db is None: # # Fallback if no database in URI # admin_db = admin_client["admin"] # log.warning("No database in connection string, defaulting to 'admin'") # admin_media_clicks = admin_db["media_clicks"] # log.info( # "Admin media click logging initialized: db=%s collection=%s", # admin_db.name, # admin_media_clicks.name, # ) # try: # admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") # log.info("Dropped legacy index user_id_1_header_1_media_id_1") # except Exception as idx_err: # # Index drop failure is non-critical (often permission issue) # if "Unauthorized" not in str(idx_err): # log.info("Skipping legacy index drop: %s", idx_err) # except Exception as err: # log.error("Failed to init admin Mongo client: %s", err) # admin_media_clicks = None # _init_admin_mongo() # def _admin_logging_status() -> Dict[str, object]: # if admin_media_clicks is None: # return { # "enabled": False, # "db": None, # "collection": None, # } # return { # "enabled": True, # "db": admin_media_clicks.database.name, # "collection": admin_media_clicks.name, # } # def _build_ai_edit_daily_count( # existing: Optional[List[Dict[str, object]]], # today: date, # ) -> List[Dict[str, object]]: # """ # Build / extend the ai_edit_daily_count array with the following rules: # - Case A (no existing data): return [{date: today, count: 1}] # - Case B (today already recorded): return list unchanged # - Case C (gap in days): fill missing days with count=0 and append today with count=1 # Additionally, the returned list is capped to the most recent 32 entries. # The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. # """ # def _to_date_only(value: object) -> date: # if isinstance(value, datetime): # return value.date() # if isinstance(value, date): # return value # # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime # try: # text = str(value) # if len(text) == 10: # return datetime.strptime(text, "%Y-%m-%d").date() # return datetime.fromisoformat(text).date() # except Exception: # # If parsing fails, just treat as today to avoid crashing # return today # # Case A: first ever use (no array yet) # if not existing: # return [ # { # "date": datetime(today.year, today.month, today.day), # "count": 1, # } # ] # # Work on a shallow copy so we don't mutate original in-place # result: List[Dict[str, object]] = list(existing) # last_entry = result[-1] if result else None # if not last_entry or "date" not in last_entry: # # If structure is unexpected, re-initialize safely # return [ # { # "date": datetime(today.year, today.month, today.day), # "count": 1, # } # ] # last_date = _to_date_only(last_entry["date"]) # # If somehow the last stored date is in the future, do nothing to avoid corrupting history # if last_date > today: # return result # # Case B: today's date already present as the last entry → unchanged # if last_date == today: # return result # # Case C: there is a gap, fill missing days with count=0 and append today with count=1 # cursor = last_date + timedelta(days=1) # while cursor < today: # result.append( # { # "date": datetime(cursor.year, cursor.month, cursor.day), # "count": 0, # } # ) # cursor += timedelta(days=1) # # Finally add today's presence indicator # result.append( # { # "date": datetime(today.year, today.month, today.day), # "count": 1, # } # ) # # Enforce 32-entry limit (keep the most recent 32 days) # if len(result) > 32: # result = result[-32:] # return result # def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: # if not ENV_TOKEN: # return # if authorization is None or not authorization.lower().startswith("bearer "): # raise HTTPException(status_code=401, detail="Unauthorized") # token = authorization.split(" ", 1)[1] # if token != ENV_TOKEN: # raise HTTPException(status_code=403, detail="Forbidden") # class InpaintRequest(BaseModel): # image_id: str # mask_id: str # invert_mask: bool = True # True => selected/painted area is removed # passthrough: bool = False # If True, return the original image unchanged # user_id: Optional[str] = None # category_id: Optional[str] = None # class SimpleRemoveRequest(BaseModel): # image_id: str # Image with pink/magenta segments to remove # def _coerce_object_id(value: Optional[str]) -> ObjectId: # if value is None: # return ObjectId() # value_str = str(value).strip() # if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): # return ObjectId(value_str) # if value_str.isdigit(): # hex_str = format(int(value_str), "x") # if len(hex_str) > 24: # hex_str = hex_str[-24:] # hex_str = hex_str.rjust(24, "0") # return ObjectId(hex_str) # return ObjectId() # def _coerce_category_id(category_id: Optional[str]) -> ObjectId: # raw = category_id or DEFAULT_CATEGORY_ID # raw_str = str(raw).strip() # if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): # return ObjectId(raw_str) # return _coerce_object_id(raw_str) # def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: # """Log to admin media_clicks collection only if user_id is provided.""" # if admin_media_clicks is None: # return # # Only log if user_id is provided (not None/empty) # if not user_id or not user_id.strip(): # return # try: # user_obj = _coerce_object_id(user_id) # category_obj = _coerce_category_id(category_id) # now = datetime.utcnow() # today = now.date() # doc = admin_media_clicks.find_one({"userId": user_obj}) # if doc: # existing_daily = doc.get("ai_edit_daily_count") # updated_daily = _build_ai_edit_daily_count(existing_daily, today) # categories = doc.get("categories") or [] # if any(cat.get("categoryId") == category_obj for cat in categories): # # Category exists: increment click_count and ai_edit_complete, update dates # admin_media_clicks.update_one( # {"_id": doc["_id"], "categories.categoryId": category_obj}, # { # "$inc": { # "categories.$.click_count": 1, # "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) # }, # "$set": { # "categories.$.lastClickedAt": now, # "updatedAt": now, # "ai_edit_last_date": now, # "ai_edit_daily_count": updated_daily, # }, # }, # ) # else: # # New category to existing document: push category, increment ai_edit_complete # admin_media_clicks.update_one( # {"_id": doc["_id"]}, # { # "$push": { # "categories": { # "categoryId": category_obj, # "click_count": 1, # "lastClickedAt": now, # } # }, # "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields # "$set": { # "updatedAt": now, # "ai_edit_last_date": now, # "ai_edit_daily_count": updated_daily, # }, # }, # ) # else: # # New user: create document with default ai_edit_complete=0, then increment to 1 # daily_for_new = _build_ai_edit_daily_count(None, today) # admin_media_clicks.update_one( # {"userId": user_obj}, # { # "$setOnInsert": { # "userId": user_obj, # "categories": [ # { # "categoryId": category_obj, # "click_count": 1, # "lastClickedAt": now, # } # ], # "createdAt": now, # "updatedAt": now, # "ai_edit_complete": 0, # Default for new users # "ai_edit_daily_count": daily_for_new, # }, # "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use # "$set": { # "updatedAt": now, # "ai_edit_last_date": now, # }, # }, # upsert=True, # ) # except Exception as err: # err_str = str(err) # if "Unauthorized" in err_str or "not authorized" in err_str.lower(): # log.warning( # "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " # "Check MongoDB user permissions.", # admin_media_clicks.database.name, # admin_media_clicks.name, # ) # else: # log.warning("Admin media click logging failed: %s", err) # @app.get("/") # def root() -> Dict[str, object]: # return { # "name": "Photo Object Removal API", # "status": "ok", # "endpoints": { # "GET /health": "health check", # "POST /upload-image": "form-data: image=file", # "POST /upload-mask": "form-data: mask=file", # "POST /inpaint": "JSON: {image_id, mask_id}", # "POST /inpaint-multipart": "form-data: image=file, mask=file", # "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", # "GET /download/{filename}": "download result image", # "GET /result/{filename}": "view result image in browser", # "GET /logs": "recent uploads/results", # }, # "auth": "set API_TOKEN env var to require Authorization: Bearer (except /health)", # } # @app.get("/health") # def health() -> Dict[str, str]: # return {"status": "healthy"} # @app.get("/logging-status") # def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: # """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" # return _admin_logging_status() # @app.post("/upload-image") # def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: # ext = os.path.splitext(image.filename)[1] or ".png" # file_id = str(uuid.uuid4()) # stored_name = f"{file_id}{ext}" # stored_path = os.path.join(UPLOAD_DIR, stored_name) # with open(stored_path, "wb") as f: # shutil.copyfileobj(image.file, f) # file_store[file_id] = { # "type": "image", # "filename": image.filename, # "stored_name": stored_name, # "path": stored_path, # "timestamp": datetime.utcnow().isoformat(), # } # logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) # return {"id": file_id, "filename": image.filename} # @app.post("/upload-mask") # def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: # ext = os.path.splitext(mask.filename)[1] or ".png" # file_id = str(uuid.uuid4()) # stored_name = f"{file_id}{ext}" # stored_path = os.path.join(UPLOAD_DIR, stored_name) # with open(stored_path, "wb") as f: # shutil.copyfileobj(mask.file, f) # file_store[file_id] = { # "type": "mask", # "filename": mask.filename, # "stored_name": stored_name, # "path": stored_path, # "timestamp": datetime.utcnow().isoformat(), # } # logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) # return {"id": file_id, "filename": mask.filename} # def _load_rgba_image(path: str) -> Image.Image: # img = Image.open(path) # return img.convert("RGBA") # def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: # """ # Convert mask image to RGBA format (black/white mask). # Standard convention: white (255) = area to remove, black (0) = area to keep # Returns RGBA with white in RGB channels where removal is needed, alpha=255 # """ # if img.mode != "RGBA": # # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep # gray = img.convert("L") # arr = np.array(gray) # # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep # mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) # rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) # rgba[:, :, 0] = mask_bw # R # rgba[:, :, 1] = mask_bw # G # rgba[:, :, 2] = mask_bw # B # rgba[:, :, 3] = 255 # Fully opaque # log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") # return rgba # # For RGBA: check if alpha channel is meaningful # arr = np.array(img) # alpha = arr[:, :, 3] # rgb = arr[:, :, :3] # # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values # if alpha.mean() > 200: # # Use RGB to determine mask: white/bright in RGB = remove # gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) # # Also detect magenta specifically # magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 # mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) # rgba = arr.copy() # rgba[:, :, 0] = mask_bw # R # rgba[:, :, 1] = mask_bw # G # rgba[:, :, 2] = mask_bw # B # rgba[:, :, 3] = 255 # Fully opaque # log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") # return rgba # # Alpha channel encodes the mask - convert to RGB-based # # Transparent areas (alpha < 128) = remove, Opaque areas = keep # mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) # rgba = arr.copy() # rgba[:, :, 0] = mask_bw # rgba[:, :, 1] = mask_bw # rgba[:, :, 2] = mask_bw # rgba[:, :, 3] = 255 # log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") # return rgba # @app.post("/inpaint") # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: # start_time = time.time() # status = "success" # error_msg = None # output_name = None # try: # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": # raise HTTPException(status_code=404, detail="image_id not found") # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": # raise HTTPException(status_code=404, detail="mask_id not found") # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) # mask_img = Image.open(file_store[req.mask_id]["path"]) # mask_rgba = _load_rgba_mask_from_image(mask_img) # if req.passthrough: # result = np.array(img_rgba.convert("RGB")) # else: # result = process_inpaint( # np.array(img_rgba), # mask_rgba, # invert_mask=req.invert_mask # ) # output_name = f"output_{uuid.uuid4().hex}.png" # output_path = os.path.join(OUTPUT_DIR, output_name) # Image.fromarray(result).save( # output_path, "PNG", optimize=False, compress_level=1 # ) # log_media_click(req.user_id, req.category_id) # return {"result": output_name} # except Exception as e: # status = "fail" # error_msg = str(e) # raise # finally: # end_time = time.time() # response_time_ms = (end_time - start_time) * 1000 # log_doc = { # "input_image_id": req.image_id, # "input_mask_id": req.mask_id, # "output_id": output_name, # "status": status, # "timestamp": datetime.utcnow(), # "ts": int(time.time()), # "response_time_ms": response_time_ms # } # if error_msg: # log_doc["error"] = error_msg # try: # mongo_logs.insert_one(log_doc) # except Exception as mongo_err: # log.error(f"Mongo log insert failed: {mongo_err}") # # @app.post("/inpaint") # # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: # # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": # # raise HTTPException(status_code=404, detail="image_id not found") # # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": # # raise HTTPException(status_code=404, detail="mask_id not found") # # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) # # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA # # mask_rgba = _load_rgba_mask_from_image(mask_img) # # # Debug: check mask before processing # # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) # # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") # # if req.passthrough: # # result = np.array(img_rgba.convert("RGB")) # # else: # # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) # # result_name = f"output_{uuid.uuid4().hex}.png" # # result_path = os.path.join(OUTPUT_DIR, result_name) # # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) # # return {"result": result_name} # @app.post("/inpaint-url") # def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: # """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" # start_time = time.time() # status = "success" # error_msg = None # result_name = None # try: # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": # raise HTTPException(status_code=404, detail="image_id not found") # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": # raise HTTPException(status_code=404, detail="mask_id not found") # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA # mask_rgba = _load_rgba_mask_from_image(mask_img) # if req.passthrough: # result = np.array(img_rgba.convert("RGB")) # else: # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # url = str(request.url_for("download_file", filename=result_name)) # logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) # log_media_click(req.user_id, req.category_id) # return {"result": result_name, "url": url} # except Exception as e: # status = "fail" # error_msg = str(e) # raise # finally: # # Always log to regular MongoDB (mandatory) # end_time = time.time() # response_time_ms = (end_time - start_time) * 1000 # log_doc = { # "input_image_id": req.image_id, # "input_mask_id": req.mask_id, # "output_id": result_name, # "status": status, # "timestamp": datetime.utcnow(), # "ts": int(time.time()), # "response_time_ms": response_time_ms, # } # if error_msg: # log_doc["error"] = error_msg # try: # mongo_logs.insert_one(log_doc) # except Exception as mongo_err: # log.error("Mongo log insert failed: %s", mongo_err) # @app.post("/inpaint-multipart") # def inpaint_multipart( # image: UploadFile = File(...), # mask: UploadFile = File(...), # request: Request = None, # invert_mask: bool = True, # mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) # passthrough: bool = False, # user_id: Optional[str] = Form(None), # category_id: Optional[str] = Form(None), # _: None = Depends(bearer_auth), # ) -> Dict[str, str]: # start_time = time.time() # status = "success" # error_msg = None # result_name = None # try: # # Load in-memory # img = Image.open(image.file).convert("RGBA") # m = Image.open(mask.file).convert("RGBA") # if passthrough: # # Just echo the input image, ignore mask # result = np.array(img.convert("RGB")) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # url: Optional[str] = None # try: # if request is not None: # url = str(request.url_for("download_file", filename=result_name)) # except Exception: # url = None # entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} # if url: # entry["url"] = url # logs.append(entry) # resp: Dict[str, str] = {"result": result_name} # if url: # resp["url"] = url # log_media_click(user_id, category_id) # return resp # if mask_is_painted: # # Auto-detect pink/magenta paint and convert to black/white mask # # White pixels = areas to remove, Black pixels = areas to keep # log.info("Auto-detecting pink/magenta paint from uploaded image...") # m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) # # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) # lower = np.array([150, 0, 100], dtype=np.uint8) # upper = np.array([255, 120, 255], dtype=np.uint8) # magenta_detected = ( # (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & # (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & # (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) # ).astype(np.uint8) * 255 # # Method 2: Also check if original image was provided to find differences # if img is not None: # img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) # if img_rgb.shape == m_rgb.shape: # diff = cv2.absdiff(img_rgb, m_rgb) # gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) # # Any significant difference (>50) could be paint # diff_mask = (gray_diff > 50).astype(np.uint8) * 255 # # Combine with magenta detection # binmask = cv2.bitwise_or(magenta_detected, diff_mask) # else: # binmask = magenta_detected # else: # # No original image provided, use magenta detection only # binmask = magenta_detected # # Clean up the mask: remove noise and fill small holes # kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) # # Close small gaps in the mask # binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) # # Remove small noise # binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) # nonzero = int((binmask > 0).sum()) # log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) # # If very few pixels detected, assume the user may already be providing a BW mask # # and proceed without forcing strict detection # if nonzero < 50: # log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") # result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} # # Create binary mask: Pink pixels → white (255), Everything else → black (0) # # Encode in RGBA format for process_inpaint # # process_inpaint does: mask = 255 - mask[:,:,3] # # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) # # alpha=255 (opaque/keep) → becomes 0 (black/keep) # mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) # mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) # mask_rgba[:, :, 1] = binmask # G: white where pink # mask_rgba[:, :, 2] = binmask # B: white where pink # # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha # mask_rgba[:, :, 3] = 255 - binmask # log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", # nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) # else: # mask_rgba = _load_rgba_mask_from_image(m) # # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly # actual_invert = invert_mask # Use default True for painted masks # log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) # result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # url: Optional[str] = None # try: # if request is not None: # url = str(request.url_for("download_file", filename=result_name)) # except Exception: # url = None # entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} # if url: # entry["url"] = url # logs.append(entry) # resp: Dict[str, str] = {"result": result_name} # if url: # resp["url"] = url # log_media_click(user_id, category_id) # return resp # except Exception as e: # status = "fail" # error_msg = str(e) # raise # finally: # # Always log to regular MongoDB (mandatory) # end_time = time.time() # response_time_ms = (end_time - start_time) * 1000 # log_doc = { # "endpoint": "inpaint-multipart", # "output_id": result_name, # "status": status, # "timestamp": datetime.utcnow(), # "ts": int(time.time()), # "response_time_ms": response_time_ms, # } # if error_msg: # log_doc["error"] = error_msg # try: # mongo_logs.insert_one(log_doc) # except Exception as mongo_err: # log.error("Mongo log insert failed: %s", mongo_err) # @app.post("/remove-pink") # def remove_pink_segments( # image: UploadFile = File(...), # request: Request = None, # user_id: Optional[str] = Form(None), # category_id: Optional[str] = Form(None), # _: None = Depends(bearer_auth), # ) -> Dict[str, str]: # """ # Simple endpoint: upload an image with pink/magenta segments to remove. # - Pink/Magenta segments → automatically removed (white in mask) # - Everything else → automatically kept (black in mask) # Just paint pink/magenta on areas you want to remove, upload the image, and it works! # """ # start_time = time.time() # status = "success" # error_msg = None # result_name = None # try: # log.info(f"Simple remove-pink: processing image {image.filename}") # # Load the image (with pink paint on it) # img = Image.open(image.file).convert("RGBA") # img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) # # Auto-detect pink/magenta segments to remove # # Pink/Magenta → white in mask (remove) # # Everything else (natural image colors, including dark areas) → black in mask (keep) # # Detect pink/magenta using fixed RGB bounds per requested logic # lower = np.array([150, 0, 100], dtype=np.uint8) # upper = np.array([255, 120, 255], dtype=np.uint8) # binmask = ( # (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & # (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & # (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) # ).astype(np.uint8) * 255 # # Clean up the pink mask # kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) # binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) # binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) # nonzero = int((binmask > 0).sum()) # total_pixels = binmask.shape[0] * binmask.shape[1] # log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") # # Debug: log bounds used # log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") # if nonzero < 50: # log.error("No pink segments detected! Returning original image.") # result = np.array(img.convert("RGB")) # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # return { # "result": result_name, # "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." # } # # Create binary mask: Pink pixels → white (255), Everything else → black (0) # # Encode in RGBA format that process_inpaint expects # # process_inpaint does: mask = 255 - mask[:,:,3] # # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) # # alpha=255 (opaque/keep) → becomes 0 (black/keep) # mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) # # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization # mask_rgba[:, :, 0] = binmask # R: white where pink # mask_rgba[:, :, 1] = binmask # G: white where pink # mask_rgba[:, :, 2] = binmask # B: white where pink # # Alpha: 0 (transparent) where pink → will become white after 255-alpha # # 255 (opaque) everywhere else → will become black after 255-alpha # mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 # # Verify mask encoding # alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) # alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) # total_pixels = binmask.shape[0] * binmask.shape[1] # log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") # log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") # # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! # # Remove pink from the original image before processing # # Create a clean version: where pink was detected, keep original image colors # img_clean = np.array(img.convert("RGBA")) # # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) # # Actually, the model will inpaint over those areas, so we can pass the original # # But for better results, we might want to remove the pink overlay first # # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal # log.info(f"Starting inpainting process...") # result = process_inpaint(img_clean, mask_rgba, invert_mask=True) # log.info(f"Inpainting complete, result shape: {result.shape}") # result_name = f"output_{uuid.uuid4().hex}.png" # result_path = os.path.join(OUTPUT_DIR, result_name) # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) # url: Optional[str] = None # try: # if request is not None: # url = str(request.url_for("download_file", filename=result_name)) # except Exception: # url = None # logs.append({ # "result": result_name, # "filename": image.filename, # "pink_pixels": nonzero, # "timestamp": datetime.utcnow().isoformat() # }) # resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} # if url: # resp["url"] = url # log_media_click(user_id, category_id) # return resp # except Exception as e: # status = "fail" # error_msg = str(e) # raise # finally: # # Always log to regular MongoDB (mandatory) # end_time = time.time() # response_time_ms = (end_time - start_time) * 1000 # log_doc = { # "endpoint": "remove-pink", # "output_id": result_name, # "status": status, # "timestamp": datetime.utcnow(), # "ts": int(time.time()), # "response_time_ms": response_time_ms, # } # if error_msg: # log_doc["error"] = error_msg # try: # mongo_logs.insert_one(log_doc) # except Exception as mongo_err: # log.error("Mongo log insert failed: %s", mongo_err) # @app.get("/download/{filename}") # def download_file(filename: str): # path = os.path.join(OUTPUT_DIR, filename) # if not os.path.isfile(path): # raise HTTPException(status_code=404, detail="file not found") # return FileResponse(path) # @app.get("/result/{filename}") # def view_result(filename: str): # """View result image directly in browser (same as download but with proper content-type for viewing)""" # path = os.path.join(OUTPUT_DIR, filename) # if not os.path.isfile(path): # raise HTTPException(status_code=404, detail="file not found") # return FileResponse(path, media_type="image/png") # @app.get("/logs") # def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: # return JSONResponse(content=logs)