Spaces:
Running
on
T4
Running
on
T4
| import os | |
| import uuid | |
| import shutil | |
| import re | |
| from datetime import datetime, timedelta, date | |
| from typing import Dict, List, Optional | |
| import numpy as np | |
| from fastapi import ( | |
| FastAPI, | |
| UploadFile, | |
| File, | |
| HTTPException, | |
| Depends, | |
| Header, | |
| Request, | |
| Form, | |
| ) | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from PIL import Image | |
| import cv2 | |
| import logging | |
| from bson import ObjectId | |
| from pymongo import MongoClient | |
| import time | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("api") | |
| from src.core import process_inpaint | |
| # Directories (use writable space on HF Spaces) | |
| BASE_DIR = os.environ.get("DATA_DIR", "/data") | |
| if not os.path.isdir(BASE_DIR): | |
| # Fallback to /tmp if /data not available | |
| BASE_DIR = "/tmp" | |
| UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") | |
| OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open | |
| ENV_TOKEN = os.environ.get("API_TOKEN") | |
| app = FastAPI(title="Photo Object Removal API", version="1.0.0") | |
| # In-memory stores | |
| file_store: Dict[str, Dict[str, str]] = {} | |
| logs: List[Dict[str, str]] = [] | |
| MONGO_URI = "mongodb+srv://harilogicgo_db_user:[email protected]/?appName=KiddoImages" | |
| mongo_client = MongoClient(MONGO_URI) | |
| mongo_db = mongo_client["object_remover"] | |
| mongo_logs = mongo_db["api_logs"] | |
| ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") | |
| DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" | |
| admin_media_clicks = None | |
| def _init_admin_mongo() -> None: | |
| global admin_media_clicks | |
| if not ADMIN_MONGO_URI: | |
| log.info("Admin Mongo URI not provided; media click logging disabled") | |
| return | |
| try: | |
| admin_client = MongoClient(ADMIN_MONGO_URI) | |
| # get_default_database() extracts database from connection string (e.g., /adminPanel) | |
| admin_db = admin_client.get_default_database() | |
| if admin_db is None: | |
| # Fallback if no database in URI | |
| admin_db = admin_client["admin"] | |
| log.warning("No database in connection string, defaulting to 'admin'") | |
| admin_media_clicks = admin_db["media_clicks"] | |
| log.info( | |
| "Admin media click logging initialized: db=%s collection=%s", | |
| admin_db.name, | |
| admin_media_clicks.name, | |
| ) | |
| try: | |
| admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") | |
| log.info("Dropped legacy index user_id_1_header_1_media_id_1") | |
| except Exception as idx_err: | |
| # Index drop failure is non-critical (often permission issue) | |
| if "Unauthorized" not in str(idx_err): | |
| log.info("Skipping legacy index drop: %s", idx_err) | |
| except Exception as err: | |
| log.error("Failed to init admin Mongo client: %s", err) | |
| admin_media_clicks = None | |
| _init_admin_mongo() | |
| def _admin_logging_status() -> Dict[str, object]: | |
| if admin_media_clicks is None: | |
| return { | |
| "enabled": False, | |
| "db": None, | |
| "collection": None, | |
| } | |
| return { | |
| "enabled": True, | |
| "db": admin_media_clicks.database.name, | |
| "collection": admin_media_clicks.name, | |
| } | |
| def _build_ai_edit_daily_count( | |
| existing: Optional[List[Dict[str, object]]], | |
| today: date, | |
| ) -> List[Dict[str, object]]: | |
| """ | |
| Build / extend the ai_edit_daily_count array with the following rules: | |
| - Case A (no existing data): return [{date: today, count: 1}] | |
| - Case B (today already recorded): return list unchanged | |
| - Case C (gap in days): fill missing days with count=0 and append today with count=1 | |
| Additionally, the returned list is capped to the most recent 32 entries. | |
| The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. | |
| """ | |
| def _to_date_only(value: object) -> date: | |
| if isinstance(value, datetime): | |
| return value.date() | |
| if isinstance(value, date): | |
| return value | |
| # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime | |
| try: | |
| text = str(value) | |
| if len(text) == 10: | |
| return datetime.strptime(text, "%Y-%m-%d").date() | |
| return datetime.fromisoformat(text).date() | |
| except Exception: | |
| # If parsing fails, just treat as today to avoid crashing | |
| return today | |
| # Case A: first ever use (no array yet) | |
| if not existing: | |
| return [ | |
| { | |
| "date": datetime(today.year, today.month, today.day), | |
| "count": 1, | |
| } | |
| ] | |
| # Work on a shallow copy so we don't mutate original in-place | |
| result: List[Dict[str, object]] = list(existing) | |
| last_entry = result[-1] if result else None | |
| if not last_entry or "date" not in last_entry: | |
| # If structure is unexpected, re-initialize safely | |
| return [ | |
| { | |
| "date": datetime(today.year, today.month, today.day), | |
| "count": 1, | |
| } | |
| ] | |
| last_date = _to_date_only(last_entry["date"]) | |
| # If somehow the last stored date is in the future, do nothing to avoid corrupting history | |
| if last_date > today: | |
| return result | |
| # Case B: today's date already present as the last entry → unchanged | |
| if last_date == today: | |
| return result | |
| # Case C: there is a gap, fill missing days with count=0 and append today with count=1 | |
| cursor = last_date + timedelta(days=1) | |
| while cursor < today: | |
| result.append( | |
| { | |
| "date": datetime(cursor.year, cursor.month, cursor.day), | |
| "count": 0, | |
| } | |
| ) | |
| cursor += timedelta(days=1) | |
| # Finally add today's presence indicator | |
| result.append( | |
| { | |
| "date": datetime(today.year, today.month, today.day), | |
| "count": 1, | |
| } | |
| ) | |
| # Sort by date ascending (older dates first) to guarantee stable ordering: | |
| # [oldest, ..., newest] | |
| try: | |
| result.sort(key=lambda entry: _to_date_only(entry.get("date"))) | |
| except Exception: | |
| # If anything goes wrong during sort, fall back to current ordering | |
| pass | |
| # Enforce 32-entry limit (keep the most recent 32 days) | |
| if len(result) > 32: | |
| result = result[-32:] | |
| return result | |
| def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: | |
| if not ENV_TOKEN: | |
| return | |
| if authorization is None or not authorization.lower().startswith("bearer "): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| token = authorization.split(" ", 1)[1] | |
| if token != ENV_TOKEN: | |
| raise HTTPException(status_code=403, detail="Forbidden") | |
| class InpaintRequest(BaseModel): | |
| image_id: str | |
| mask_id: str | |
| invert_mask: bool = True # True => selected/painted area is removed | |
| passthrough: bool = False # If True, return the original image unchanged | |
| user_id: Optional[str] = None | |
| category_id: Optional[str] = None | |
| class SimpleRemoveRequest(BaseModel): | |
| image_id: str # Image with pink/magenta segments to remove | |
| def _coerce_object_id(value: Optional[str]) -> ObjectId: | |
| if value is None: | |
| return ObjectId() | |
| value_str = str(value).strip() | |
| if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): | |
| return ObjectId(value_str) | |
| if value_str.isdigit(): | |
| hex_str = format(int(value_str), "x") | |
| if len(hex_str) > 24: | |
| hex_str = hex_str[-24:] | |
| hex_str = hex_str.rjust(24, "0") | |
| return ObjectId(hex_str) | |
| return ObjectId() | |
| def _coerce_category_id(category_id: Optional[str]) -> ObjectId: | |
| raw = category_id or DEFAULT_CATEGORY_ID | |
| raw_str = str(raw).strip() | |
| if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): | |
| return ObjectId(raw_str) | |
| return _coerce_object_id(raw_str) | |
| def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: | |
| """Log to admin media_clicks collection only if user_id is provided.""" | |
| if admin_media_clicks is None: | |
| return | |
| # Only log if user_id is provided (not None/empty) | |
| if not user_id or not user_id.strip(): | |
| return | |
| try: | |
| user_obj = _coerce_object_id(user_id) | |
| category_obj = _coerce_category_id(category_id) | |
| now = datetime.utcnow() | |
| today = now.date() | |
| doc = admin_media_clicks.find_one({"userId": user_obj}) | |
| if doc: | |
| existing_daily = doc.get("ai_edit_daily_count") | |
| updated_daily = _build_ai_edit_daily_count(existing_daily, today) | |
| categories = doc.get("categories") or [] | |
| if any(cat.get("categoryId") == category_obj for cat in categories): | |
| # Category exists: increment click_count and ai_edit_complete, update dates | |
| admin_media_clicks.update_one( | |
| {"_id": doc["_id"], "categories.categoryId": category_obj}, | |
| { | |
| "$inc": { | |
| "categories.$.click_count": 1, | |
| "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) | |
| }, | |
| "$set": { | |
| "categories.$.lastClickedAt": now, | |
| "updatedAt": now, | |
| "ai_edit_last_date": now, | |
| "ai_edit_daily_count": updated_daily, | |
| }, | |
| }, | |
| ) | |
| else: | |
| # New category to existing document: push category, increment ai_edit_complete | |
| admin_media_clicks.update_one( | |
| {"_id": doc["_id"]}, | |
| { | |
| "$push": { | |
| "categories": { | |
| "categoryId": category_obj, | |
| "click_count": 1, | |
| "lastClickedAt": now, | |
| } | |
| }, | |
| "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields | |
| "$set": { | |
| "updatedAt": now, | |
| "ai_edit_last_date": now, | |
| "ai_edit_daily_count": updated_daily, | |
| }, | |
| }, | |
| ) | |
| else: | |
| # New user: create document with default ai_edit_complete=0, then increment to 1 | |
| daily_for_new = _build_ai_edit_daily_count(None, today) | |
| admin_media_clicks.update_one( | |
| {"userId": user_obj}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_obj, | |
| "categories": [ | |
| { | |
| "categoryId": category_obj, | |
| "click_count": 1, | |
| "lastClickedAt": now, | |
| } | |
| ], | |
| "createdAt": now, | |
| "updatedAt": now, | |
| "ai_edit_complete": 0, # Default for new users | |
| "ai_edit_daily_count": daily_for_new, | |
| }, | |
| "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use | |
| "$set": { | |
| "updatedAt": now, | |
| "ai_edit_last_date": now, | |
| }, | |
| }, | |
| upsert=True, | |
| ) | |
| except Exception as err: | |
| err_str = str(err) | |
| if "Unauthorized" in err_str or "not authorized" in err_str.lower(): | |
| log.warning( | |
| "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " | |
| "Check MongoDB user permissions.", | |
| admin_media_clicks.database.name, | |
| admin_media_clicks.name, | |
| ) | |
| else: | |
| log.warning("Admin media click logging failed: %s", err) | |
| def root() -> Dict[str, object]: | |
| return { | |
| "name": "Photo Object Removal API", | |
| "status": "ok", | |
| "endpoints": { | |
| "GET /health": "health check", | |
| "POST /upload-image": "form-data: image=file", | |
| "POST /upload-mask": "form-data: mask=file", | |
| "POST /inpaint": "JSON: {image_id, mask_id}", | |
| "POST /inpaint-multipart": "form-data: image=file, mask=file", | |
| "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", | |
| "GET /download/{filename}": "download result image", | |
| "GET /result/{filename}": "view result image in browser", | |
| "GET /logs": "recent uploads/results", | |
| }, | |
| "auth": "set API_TOKEN env var to require Authorization: Bearer <token> (except /health)", | |
| } | |
| def health() -> Dict[str, str]: | |
| return {"status": "healthy"} | |
| def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: | |
| """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" | |
| return _admin_logging_status() | |
| def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| ext = os.path.splitext(image.filename)[1] or ".png" | |
| file_id = str(uuid.uuid4()) | |
| stored_name = f"{file_id}{ext}" | |
| stored_path = os.path.join(UPLOAD_DIR, stored_name) | |
| with open(stored_path, "wb") as f: | |
| shutil.copyfileobj(image.file, f) | |
| file_store[file_id] = { | |
| "type": "image", | |
| "filename": image.filename, | |
| "stored_name": stored_name, | |
| "path": stored_path, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) | |
| return {"id": file_id, "filename": image.filename} | |
| def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| ext = os.path.splitext(mask.filename)[1] or ".png" | |
| file_id = str(uuid.uuid4()) | |
| stored_name = f"{file_id}{ext}" | |
| stored_path = os.path.join(UPLOAD_DIR, stored_name) | |
| with open(stored_path, "wb") as f: | |
| shutil.copyfileobj(mask.file, f) | |
| file_store[file_id] = { | |
| "type": "mask", | |
| "filename": mask.filename, | |
| "stored_name": stored_name, | |
| "path": stored_path, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) | |
| return {"id": file_id, "filename": mask.filename} | |
| def _load_rgba_image(path: str) -> Image.Image: | |
| img = Image.open(path) | |
| return img.convert("RGBA") | |
| def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: | |
| """ | |
| Convert mask image to RGBA format (black/white mask). | |
| Standard convention: white (255) = area to remove, black (0) = area to keep | |
| Returns RGBA with white in RGB channels where removal is needed, alpha=255 | |
| """ | |
| if img.mode != "RGBA": | |
| # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep | |
| gray = img.convert("L") | |
| arr = np.array(gray) | |
| # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep | |
| mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) | |
| rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) | |
| rgba[:, :, 0] = mask_bw # R | |
| rgba[:, :, 1] = mask_bw # G | |
| rgba[:, :, 2] = mask_bw # B | |
| rgba[:, :, 3] = 255 # Fully opaque | |
| log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| return rgba | |
| # For RGBA: check if alpha channel is meaningful | |
| arr = np.array(img) | |
| alpha = arr[:, :, 3] | |
| rgb = arr[:, :, :3] | |
| # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values | |
| if alpha.mean() > 200: | |
| # Use RGB to determine mask: white/bright in RGB = remove | |
| gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) | |
| # Also detect magenta specifically | |
| magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 | |
| mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) | |
| rgba = arr.copy() | |
| rgba[:, :, 0] = mask_bw # R | |
| rgba[:, :, 1] = mask_bw # G | |
| rgba[:, :, 2] = mask_bw # B | |
| rgba[:, :, 3] = 255 # Fully opaque | |
| log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| return rgba | |
| # Alpha channel encodes the mask - convert to RGB-based | |
| # Transparent areas (alpha < 128) = remove, Opaque areas = keep | |
| mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) | |
| rgba = arr.copy() | |
| rgba[:, :, 0] = mask_bw | |
| rgba[:, :, 1] = mask_bw | |
| rgba[:, :, 2] = mask_bw | |
| rgba[:, :, 3] = 255 | |
| log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| return rgba | |
| def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| start_time = time.time() | |
| status = "success" | |
| error_msg = None | |
| output_name = None | |
| try: | |
| if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| raise HTTPException(status_code=404, detail="image_id not found") | |
| if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| raise HTTPException(status_code=404, detail="mask_id not found") | |
| img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| mask_img = Image.open(file_store[req.mask_id]["path"]) | |
| mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| if req.passthrough: | |
| result = np.array(img_rgba.convert("RGB")) | |
| else: | |
| result = process_inpaint( | |
| np.array(img_rgba), | |
| mask_rgba, | |
| invert_mask=req.invert_mask | |
| ) | |
| output_name = f"output_{uuid.uuid4().hex}.png" | |
| output_path = os.path.join(OUTPUT_DIR, output_name) | |
| Image.fromarray(result).save( | |
| output_path, "PNG", optimize=False, compress_level=1 | |
| ) | |
| log_media_click(req.user_id, req.category_id) | |
| return {"result": output_name} | |
| except Exception as e: | |
| status = "fail" | |
| error_msg = str(e) | |
| raise | |
| finally: | |
| end_time = time.time() | |
| response_time_ms = (end_time - start_time) * 1000 | |
| log_doc = { | |
| "input_image_id": req.image_id, | |
| "input_mask_id": req.mask_id, | |
| "output_id": output_name, | |
| "status": status, | |
| "timestamp": datetime.utcnow(), | |
| "ts": int(time.time()), | |
| "response_time_ms": response_time_ms | |
| } | |
| if error_msg: | |
| log_doc["error"] = error_msg | |
| try: | |
| mongo_logs.insert_one(log_doc) | |
| except Exception as mongo_err: | |
| log.error(f"Mongo log insert failed: {mongo_err}") | |
| # @app.post("/inpaint") | |
| # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| # raise HTTPException(status_code=404, detail="image_id not found") | |
| # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| # raise HTTPException(status_code=404, detail="mask_id not found") | |
| # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA | |
| # mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| # # Debug: check mask before processing | |
| # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) | |
| # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") | |
| # if req.passthrough: | |
| # result = np.array(img_rgba.convert("RGB")) | |
| # else: | |
| # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) | |
| # return {"result": result_name} | |
| def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" | |
| start_time = time.time() | |
| status = "success" | |
| error_msg = None | |
| result_name = None | |
| try: | |
| if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| raise HTTPException(status_code=404, detail="image_id not found") | |
| if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| raise HTTPException(status_code=404, detail="mask_id not found") | |
| img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA | |
| mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| if req.passthrough: | |
| result = np.array(img_rgba.convert("RGB")) | |
| else: | |
| result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| url = str(request.url_for("download_file", filename=result_name)) | |
| logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) | |
| log_media_click(req.user_id, req.category_id) | |
| return {"result": result_name, "url": url} | |
| except Exception as e: | |
| status = "fail" | |
| error_msg = str(e) | |
| raise | |
| finally: | |
| # Always log to regular MongoDB (mandatory) | |
| end_time = time.time() | |
| response_time_ms = (end_time - start_time) * 1000 | |
| log_doc = { | |
| "input_image_id": req.image_id, | |
| "input_mask_id": req.mask_id, | |
| "output_id": result_name, | |
| "status": status, | |
| "timestamp": datetime.utcnow(), | |
| "ts": int(time.time()), | |
| "response_time_ms": response_time_ms, | |
| } | |
| if error_msg: | |
| log_doc["error"] = error_msg | |
| try: | |
| mongo_logs.insert_one(log_doc) | |
| except Exception as mongo_err: | |
| log.error("Mongo log insert failed: %s", mongo_err) | |
| def inpaint_multipart( | |
| image: UploadFile = File(...), | |
| mask: UploadFile = File(...), | |
| request: Request = None, | |
| invert_mask: bool = True, | |
| mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) | |
| passthrough: bool = False, | |
| user_id: Optional[str] = Form(None), | |
| category_id: Optional[str] = Form(None), | |
| _: None = Depends(bearer_auth), | |
| ) -> Dict[str, str]: | |
| start_time = time.time() | |
| status = "success" | |
| error_msg = None | |
| result_name = None | |
| try: | |
| # Load in-memory | |
| img = Image.open(image.file).convert("RGBA") | |
| m = Image.open(mask.file).convert("RGBA") | |
| if passthrough: | |
| # Just echo the input image, ignore mask | |
| result = np.array(img.convert("RGB")) | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| url: Optional[str] = None | |
| try: | |
| if request is not None: | |
| url = str(request.url_for("download_file", filename=result_name)) | |
| except Exception: | |
| url = None | |
| entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} | |
| if url: | |
| entry["url"] = url | |
| logs.append(entry) | |
| resp: Dict[str, str] = {"result": result_name} | |
| if url: | |
| resp["url"] = url | |
| log_media_click(user_id, category_id) | |
| return resp | |
| if mask_is_painted: | |
| # Auto-detect pink/magenta paint and convert to black/white mask | |
| # White pixels = areas to remove, Black pixels = areas to keep | |
| log.info("Auto-detecting pink/magenta paint from uploaded image...") | |
| m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) | |
| # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) | |
| lower = np.array([150, 0, 100], dtype=np.uint8) | |
| upper = np.array([255, 120, 255], dtype=np.uint8) | |
| magenta_detected = ( | |
| (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & | |
| (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & | |
| (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) | |
| ).astype(np.uint8) * 255 | |
| # Method 2: Also check if original image was provided to find differences | |
| if img is not None: | |
| img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) | |
| if img_rgb.shape == m_rgb.shape: | |
| diff = cv2.absdiff(img_rgb, m_rgb) | |
| gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) | |
| # Any significant difference (>50) could be paint | |
| diff_mask = (gray_diff > 50).astype(np.uint8) * 255 | |
| # Combine with magenta detection | |
| binmask = cv2.bitwise_or(magenta_detected, diff_mask) | |
| else: | |
| binmask = magenta_detected | |
| else: | |
| # No original image provided, use magenta detection only | |
| binmask = magenta_detected | |
| # Clean up the mask: remove noise and fill small holes | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| # Close small gaps in the mask | |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) | |
| # Remove small noise | |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) | |
| nonzero = int((binmask > 0).sum()) | |
| log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) | |
| # If very few pixels detected, assume the user may already be providing a BW mask | |
| # and proceed without forcing strict detection | |
| if nonzero < 50: | |
| log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") | |
| result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} | |
| # Create binary mask: Pink pixels → white (255), Everything else → black (0) | |
| # Encode in RGBA format for process_inpaint | |
| # process_inpaint does: mask = 255 - mask[:,:,3] | |
| # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) | |
| # alpha=255 (opaque/keep) → becomes 0 (black/keep) | |
| mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) | |
| mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) | |
| mask_rgba[:, :, 1] = binmask # G: white where pink | |
| mask_rgba[:, :, 2] = binmask # B: white where pink | |
| # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha | |
| mask_rgba[:, :, 3] = 255 - binmask | |
| log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", | |
| nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) | |
| else: | |
| mask_rgba = _load_rgba_mask_from_image(m) | |
| # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly | |
| actual_invert = invert_mask # Use default True for painted masks | |
| log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) | |
| result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| url: Optional[str] = None | |
| try: | |
| if request is not None: | |
| url = str(request.url_for("download_file", filename=result_name)) | |
| except Exception: | |
| url = None | |
| entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} | |
| if url: | |
| entry["url"] = url | |
| logs.append(entry) | |
| resp: Dict[str, str] = {"result": result_name} | |
| if url: | |
| resp["url"] = url | |
| log_media_click(user_id, category_id) | |
| return resp | |
| except Exception as e: | |
| status = "fail" | |
| error_msg = str(e) | |
| raise | |
| finally: | |
| # Always log to regular MongoDB (mandatory) | |
| end_time = time.time() | |
| response_time_ms = (end_time - start_time) * 1000 | |
| log_doc = { | |
| "endpoint": "inpaint-multipart", | |
| "output_id": result_name, | |
| "status": status, | |
| "timestamp": datetime.utcnow(), | |
| "ts": int(time.time()), | |
| "response_time_ms": response_time_ms, | |
| } | |
| if error_msg: | |
| log_doc["error"] = error_msg | |
| try: | |
| mongo_logs.insert_one(log_doc) | |
| except Exception as mongo_err: | |
| log.error("Mongo log insert failed: %s", mongo_err) | |
| def remove_pink_segments( | |
| image: UploadFile = File(...), | |
| request: Request = None, | |
| user_id: Optional[str] = Form(None), | |
| category_id: Optional[str] = Form(None), | |
| _: None = Depends(bearer_auth), | |
| ) -> Dict[str, str]: | |
| """ | |
| Simple endpoint: upload an image with pink/magenta segments to remove. | |
| - Pink/Magenta segments → automatically removed (white in mask) | |
| - Everything else → automatically kept (black in mask) | |
| Just paint pink/magenta on areas you want to remove, upload the image, and it works! | |
| """ | |
| start_time = time.time() | |
| status = "success" | |
| error_msg = None | |
| result_name = None | |
| try: | |
| log.info(f"Simple remove-pink: processing image {image.filename}") | |
| # Load the image (with pink paint on it) | |
| img = Image.open(image.file).convert("RGBA") | |
| img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) | |
| # Auto-detect pink/magenta segments to remove | |
| # Pink/Magenta → white in mask (remove) | |
| # Everything else (natural image colors, including dark areas) → black in mask (keep) | |
| # Detect pink/magenta using fixed RGB bounds per requested logic | |
| lower = np.array([150, 0, 100], dtype=np.uint8) | |
| upper = np.array([255, 120, 255], dtype=np.uint8) | |
| binmask = ( | |
| (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & | |
| (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & | |
| (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) | |
| ).astype(np.uint8) * 255 | |
| # Clean up the pink mask | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) | |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) | |
| nonzero = int((binmask > 0).sum()) | |
| total_pixels = binmask.shape[0] * binmask.shape[1] | |
| log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") | |
| # Debug: log bounds used | |
| log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") | |
| if nonzero < 50: | |
| log.error("No pink segments detected! Returning original image.") | |
| result = np.array(img.convert("RGB")) | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| return { | |
| "result": result_name, | |
| "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." | |
| } | |
| # Create binary mask: Pink pixels → white (255), Everything else → black (0) | |
| # Encode in RGBA format that process_inpaint expects | |
| # process_inpaint does: mask = 255 - mask[:,:,3] | |
| # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) | |
| # alpha=255 (opaque/keep) → becomes 0 (black/keep) | |
| mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) | |
| # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization | |
| mask_rgba[:, :, 0] = binmask # R: white where pink | |
| mask_rgba[:, :, 1] = binmask # G: white where pink | |
| mask_rgba[:, :, 2] = binmask # B: white where pink | |
| # Alpha: 0 (transparent) where pink → will become white after 255-alpha | |
| # 255 (opaque) everywhere else → will become black after 255-alpha | |
| mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 | |
| # Verify mask encoding | |
| alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) | |
| alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) | |
| total_pixels = binmask.shape[0] * binmask.shape[1] | |
| log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") | |
| log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") | |
| # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! | |
| # Remove pink from the original image before processing | |
| # Create a clean version: where pink was detected, keep original image colors | |
| img_clean = np.array(img.convert("RGBA")) | |
| # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) | |
| # Actually, the model will inpaint over those areas, so we can pass the original | |
| # But for better results, we might want to remove the pink overlay first | |
| # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal | |
| log.info(f"Starting inpainting process...") | |
| result = process_inpaint(img_clean, mask_rgba, invert_mask=True) | |
| log.info(f"Inpainting complete, result shape: {result.shape}") | |
| result_name = f"output_{uuid.uuid4().hex}.png" | |
| result_path = os.path.join(OUTPUT_DIR, result_name) | |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| url: Optional[str] = None | |
| try: | |
| if request is not None: | |
| url = str(request.url_for("download_file", filename=result_name)) | |
| except Exception: | |
| url = None | |
| logs.append({ | |
| "result": result_name, | |
| "filename": image.filename, | |
| "pink_pixels": nonzero, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} | |
| if url: | |
| resp["url"] = url | |
| log_media_click(user_id, category_id) | |
| return resp | |
| except Exception as e: | |
| status = "fail" | |
| error_msg = str(e) | |
| raise | |
| finally: | |
| # Always log to regular MongoDB (mandatory) | |
| end_time = time.time() | |
| response_time_ms = (end_time - start_time) * 1000 | |
| log_doc = { | |
| "endpoint": "remove-pink", | |
| "output_id": result_name, | |
| "status": status, | |
| "timestamp": datetime.utcnow(), | |
| "ts": int(time.time()), | |
| "response_time_ms": response_time_ms, | |
| } | |
| if error_msg: | |
| log_doc["error"] = error_msg | |
| try: | |
| mongo_logs.insert_one(log_doc) | |
| except Exception as mongo_err: | |
| log.error("Mongo log insert failed: %s", mongo_err) | |
| def download_file(filename: str): | |
| path = os.path.join(OUTPUT_DIR, filename) | |
| if not os.path.isfile(path): | |
| raise HTTPException(status_code=404, detail="file not found") | |
| return FileResponse(path) | |
| def view_result(filename: str): | |
| """View result image directly in browser (same as download but with proper content-type for viewing)""" | |
| path = os.path.join(OUTPUT_DIR, filename) | |
| if not os.path.isfile(path): | |
| raise HTTPException(status_code=404, detail="file not found") | |
| return FileResponse(path, media_type="image/png") | |
| def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: | |
| return JSONResponse(content=logs) | |
| # import os | |
| # import uuid | |
| # import shutil | |
| # import re | |
| # from datetime import datetime, timedelta, date | |
| # from typing import Dict, List, Optional | |
| # import numpy as np | |
| # from fastapi import ( | |
| # FastAPI, | |
| # UploadFile, | |
| # File, | |
| # HTTPException, | |
| # Depends, | |
| # Header, | |
| # Request, | |
| # Form, | |
| # ) | |
| # from fastapi.responses import FileResponse, JSONResponse | |
| # from pydantic import BaseModel | |
| # from PIL import Image | |
| # import cv2 | |
| # import logging | |
| # from bson import ObjectId | |
| # from pymongo import MongoClient | |
| # import time | |
| # logging.basicConfig(level=logging.INFO) | |
| # log = logging.getLogger("api") | |
| # from src.core import process_inpaint | |
| # # Directories (use writable space on HF Spaces) | |
| # BASE_DIR = os.environ.get("DATA_DIR", "/data") | |
| # if not os.path.isdir(BASE_DIR): | |
| # # Fallback to /tmp if /data not available | |
| # BASE_DIR = "/tmp" | |
| # UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") | |
| # OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") | |
| # os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| # os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open | |
| # ENV_TOKEN = os.environ.get("API_TOKEN") | |
| # app = FastAPI(title="Photo Object Removal API", version="1.0.0") | |
| # # In-memory stores | |
| # file_store: Dict[str, Dict[str, str]] = {} | |
| # logs: List[Dict[str, str]] = [] | |
| # MONGO_URI = "mongodb+srv://harilogicgo_db_user:[email protected]/?appName=KiddoImages" | |
| # mongo_client = MongoClient(MONGO_URI) | |
| # mongo_db = mongo_client["object_remover"] | |
| # mongo_logs = mongo_db["api_logs"] | |
| # ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") | |
| # DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" | |
| # admin_media_clicks = None | |
| # def _init_admin_mongo() -> None: | |
| # global admin_media_clicks | |
| # if not ADMIN_MONGO_URI: | |
| # log.info("Admin Mongo URI not provided; media click logging disabled") | |
| # return | |
| # try: | |
| # admin_client = MongoClient(ADMIN_MONGO_URI) | |
| # # get_default_database() extracts database from connection string (e.g., /adminPanel) | |
| # admin_db = admin_client.get_default_database() | |
| # if admin_db is None: | |
| # # Fallback if no database in URI | |
| # admin_db = admin_client["admin"] | |
| # log.warning("No database in connection string, defaulting to 'admin'") | |
| # admin_media_clicks = admin_db["media_clicks"] | |
| # log.info( | |
| # "Admin media click logging initialized: db=%s collection=%s", | |
| # admin_db.name, | |
| # admin_media_clicks.name, | |
| # ) | |
| # try: | |
| # admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") | |
| # log.info("Dropped legacy index user_id_1_header_1_media_id_1") | |
| # except Exception as idx_err: | |
| # # Index drop failure is non-critical (often permission issue) | |
| # if "Unauthorized" not in str(idx_err): | |
| # log.info("Skipping legacy index drop: %s", idx_err) | |
| # except Exception as err: | |
| # log.error("Failed to init admin Mongo client: %s", err) | |
| # admin_media_clicks = None | |
| # _init_admin_mongo() | |
| # def _admin_logging_status() -> Dict[str, object]: | |
| # if admin_media_clicks is None: | |
| # return { | |
| # "enabled": False, | |
| # "db": None, | |
| # "collection": None, | |
| # } | |
| # return { | |
| # "enabled": True, | |
| # "db": admin_media_clicks.database.name, | |
| # "collection": admin_media_clicks.name, | |
| # } | |
| # def _build_ai_edit_daily_count( | |
| # existing: Optional[List[Dict[str, object]]], | |
| # today: date, | |
| # ) -> List[Dict[str, object]]: | |
| # """ | |
| # Build / extend the ai_edit_daily_count array with the following rules: | |
| # - Case A (no existing data): return [{date: today, count: 1}] | |
| # - Case B (today already recorded): return list unchanged | |
| # - Case C (gap in days): fill missing days with count=0 and append today with count=1 | |
| # Additionally, the returned list is capped to the most recent 32 entries. | |
| # The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. | |
| # """ | |
| # def _to_date_only(value: object) -> date: | |
| # if isinstance(value, datetime): | |
| # return value.date() | |
| # if isinstance(value, date): | |
| # return value | |
| # # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime | |
| # try: | |
| # text = str(value) | |
| # if len(text) == 10: | |
| # return datetime.strptime(text, "%Y-%m-%d").date() | |
| # return datetime.fromisoformat(text).date() | |
| # except Exception: | |
| # # If parsing fails, just treat as today to avoid crashing | |
| # return today | |
| # # Case A: first ever use (no array yet) | |
| # if not existing: | |
| # return [ | |
| # { | |
| # "date": datetime(today.year, today.month, today.day), | |
| # "count": 1, | |
| # } | |
| # ] | |
| # # Work on a shallow copy so we don't mutate original in-place | |
| # result: List[Dict[str, object]] = list(existing) | |
| # last_entry = result[-1] if result else None | |
| # if not last_entry or "date" not in last_entry: | |
| # # If structure is unexpected, re-initialize safely | |
| # return [ | |
| # { | |
| # "date": datetime(today.year, today.month, today.day), | |
| # "count": 1, | |
| # } | |
| # ] | |
| # last_date = _to_date_only(last_entry["date"]) | |
| # # If somehow the last stored date is in the future, do nothing to avoid corrupting history | |
| # if last_date > today: | |
| # return result | |
| # # Case B: today's date already present as the last entry → unchanged | |
| # if last_date == today: | |
| # return result | |
| # # Case C: there is a gap, fill missing days with count=0 and append today with count=1 | |
| # cursor = last_date + timedelta(days=1) | |
| # while cursor < today: | |
| # result.append( | |
| # { | |
| # "date": datetime(cursor.year, cursor.month, cursor.day), | |
| # "count": 0, | |
| # } | |
| # ) | |
| # cursor += timedelta(days=1) | |
| # # Finally add today's presence indicator | |
| # result.append( | |
| # { | |
| # "date": datetime(today.year, today.month, today.day), | |
| # "count": 1, | |
| # } | |
| # ) | |
| # # Enforce 32-entry limit (keep the most recent 32 days) | |
| # if len(result) > 32: | |
| # result = result[-32:] | |
| # return result | |
| # def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: | |
| # if not ENV_TOKEN: | |
| # return | |
| # if authorization is None or not authorization.lower().startswith("bearer "): | |
| # raise HTTPException(status_code=401, detail="Unauthorized") | |
| # token = authorization.split(" ", 1)[1] | |
| # if token != ENV_TOKEN: | |
| # raise HTTPException(status_code=403, detail="Forbidden") | |
| # class InpaintRequest(BaseModel): | |
| # image_id: str | |
| # mask_id: str | |
| # invert_mask: bool = True # True => selected/painted area is removed | |
| # passthrough: bool = False # If True, return the original image unchanged | |
| # user_id: Optional[str] = None | |
| # category_id: Optional[str] = None | |
| # class SimpleRemoveRequest(BaseModel): | |
| # image_id: str # Image with pink/magenta segments to remove | |
| # def _coerce_object_id(value: Optional[str]) -> ObjectId: | |
| # if value is None: | |
| # return ObjectId() | |
| # value_str = str(value).strip() | |
| # if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): | |
| # return ObjectId(value_str) | |
| # if value_str.isdigit(): | |
| # hex_str = format(int(value_str), "x") | |
| # if len(hex_str) > 24: | |
| # hex_str = hex_str[-24:] | |
| # hex_str = hex_str.rjust(24, "0") | |
| # return ObjectId(hex_str) | |
| # return ObjectId() | |
| # def _coerce_category_id(category_id: Optional[str]) -> ObjectId: | |
| # raw = category_id or DEFAULT_CATEGORY_ID | |
| # raw_str = str(raw).strip() | |
| # if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): | |
| # return ObjectId(raw_str) | |
| # return _coerce_object_id(raw_str) | |
| # def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: | |
| # """Log to admin media_clicks collection only if user_id is provided.""" | |
| # if admin_media_clicks is None: | |
| # return | |
| # # Only log if user_id is provided (not None/empty) | |
| # if not user_id or not user_id.strip(): | |
| # return | |
| # try: | |
| # user_obj = _coerce_object_id(user_id) | |
| # category_obj = _coerce_category_id(category_id) | |
| # now = datetime.utcnow() | |
| # today = now.date() | |
| # doc = admin_media_clicks.find_one({"userId": user_obj}) | |
| # if doc: | |
| # existing_daily = doc.get("ai_edit_daily_count") | |
| # updated_daily = _build_ai_edit_daily_count(existing_daily, today) | |
| # categories = doc.get("categories") or [] | |
| # if any(cat.get("categoryId") == category_obj for cat in categories): | |
| # # Category exists: increment click_count and ai_edit_complete, update dates | |
| # admin_media_clicks.update_one( | |
| # {"_id": doc["_id"], "categories.categoryId": category_obj}, | |
| # { | |
| # "$inc": { | |
| # "categories.$.click_count": 1, | |
| # "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) | |
| # }, | |
| # "$set": { | |
| # "categories.$.lastClickedAt": now, | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now, | |
| # "ai_edit_daily_count": updated_daily, | |
| # }, | |
| # }, | |
| # ) | |
| # else: | |
| # # New category to existing document: push category, increment ai_edit_complete | |
| # admin_media_clicks.update_one( | |
| # {"_id": doc["_id"]}, | |
| # { | |
| # "$push": { | |
| # "categories": { | |
| # "categoryId": category_obj, | |
| # "click_count": 1, | |
| # "lastClickedAt": now, | |
| # } | |
| # }, | |
| # "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields | |
| # "$set": { | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now, | |
| # "ai_edit_daily_count": updated_daily, | |
| # }, | |
| # }, | |
| # ) | |
| # else: | |
| # # New user: create document with default ai_edit_complete=0, then increment to 1 | |
| # daily_for_new = _build_ai_edit_daily_count(None, today) | |
| # admin_media_clicks.update_one( | |
| # {"userId": user_obj}, | |
| # { | |
| # "$setOnInsert": { | |
| # "userId": user_obj, | |
| # "categories": [ | |
| # { | |
| # "categoryId": category_obj, | |
| # "click_count": 1, | |
| # "lastClickedAt": now, | |
| # } | |
| # ], | |
| # "createdAt": now, | |
| # "updatedAt": now, | |
| # "ai_edit_complete": 0, # Default for new users | |
| # "ai_edit_daily_count": daily_for_new, | |
| # }, | |
| # "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use | |
| # "$set": { | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now, | |
| # }, | |
| # }, | |
| # upsert=True, | |
| # ) | |
| # except Exception as err: | |
| # err_str = str(err) | |
| # if "Unauthorized" in err_str or "not authorized" in err_str.lower(): | |
| # log.warning( | |
| # "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " | |
| # "Check MongoDB user permissions.", | |
| # admin_media_clicks.database.name, | |
| # admin_media_clicks.name, | |
| # ) | |
| # else: | |
| # log.warning("Admin media click logging failed: %s", err) | |
| # @app.get("/") | |
| # def root() -> Dict[str, object]: | |
| # return { | |
| # "name": "Photo Object Removal API", | |
| # "status": "ok", | |
| # "endpoints": { | |
| # "GET /health": "health check", | |
| # "POST /upload-image": "form-data: image=file", | |
| # "POST /upload-mask": "form-data: mask=file", | |
| # "POST /inpaint": "JSON: {image_id, mask_id}", | |
| # "POST /inpaint-multipart": "form-data: image=file, mask=file", | |
| # "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", | |
| # "GET /download/{filename}": "download result image", | |
| # "GET /result/{filename}": "view result image in browser", | |
| # "GET /logs": "recent uploads/results", | |
| # }, | |
| # "auth": "set API_TOKEN env var to require Authorization: Bearer <token> (except /health)", | |
| # } | |
| # @app.get("/health") | |
| # def health() -> Dict[str, str]: | |
| # return {"status": "healthy"} | |
| # @app.get("/logging-status") | |
| # def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: | |
| # """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" | |
| # return _admin_logging_status() | |
| # @app.post("/upload-image") | |
| # def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # ext = os.path.splitext(image.filename)[1] or ".png" | |
| # file_id = str(uuid.uuid4()) | |
| # stored_name = f"{file_id}{ext}" | |
| # stored_path = os.path.join(UPLOAD_DIR, stored_name) | |
| # with open(stored_path, "wb") as f: | |
| # shutil.copyfileobj(image.file, f) | |
| # file_store[file_id] = { | |
| # "type": "image", | |
| # "filename": image.filename, | |
| # "stored_name": stored_name, | |
| # "path": stored_path, | |
| # "timestamp": datetime.utcnow().isoformat(), | |
| # } | |
| # logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) | |
| # return {"id": file_id, "filename": image.filename} | |
| # @app.post("/upload-mask") | |
| # def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # ext = os.path.splitext(mask.filename)[1] or ".png" | |
| # file_id = str(uuid.uuid4()) | |
| # stored_name = f"{file_id}{ext}" | |
| # stored_path = os.path.join(UPLOAD_DIR, stored_name) | |
| # with open(stored_path, "wb") as f: | |
| # shutil.copyfileobj(mask.file, f) | |
| # file_store[file_id] = { | |
| # "type": "mask", | |
| # "filename": mask.filename, | |
| # "stored_name": stored_name, | |
| # "path": stored_path, | |
| # "timestamp": datetime.utcnow().isoformat(), | |
| # } | |
| # logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) | |
| # return {"id": file_id, "filename": mask.filename} | |
| # def _load_rgba_image(path: str) -> Image.Image: | |
| # img = Image.open(path) | |
| # return img.convert("RGBA") | |
| # def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: | |
| # """ | |
| # Convert mask image to RGBA format (black/white mask). | |
| # Standard convention: white (255) = area to remove, black (0) = area to keep | |
| # Returns RGBA with white in RGB channels where removal is needed, alpha=255 | |
| # """ | |
| # if img.mode != "RGBA": | |
| # # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep | |
| # gray = img.convert("L") | |
| # arr = np.array(gray) | |
| # # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep | |
| # mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) | |
| # rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) | |
| # rgba[:, :, 0] = mask_bw # R | |
| # rgba[:, :, 1] = mask_bw # G | |
| # rgba[:, :, 2] = mask_bw # B | |
| # rgba[:, :, 3] = 255 # Fully opaque | |
| # log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| # return rgba | |
| # # For RGBA: check if alpha channel is meaningful | |
| # arr = np.array(img) | |
| # alpha = arr[:, :, 3] | |
| # rgb = arr[:, :, :3] | |
| # # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values | |
| # if alpha.mean() > 200: | |
| # # Use RGB to determine mask: white/bright in RGB = remove | |
| # gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) | |
| # # Also detect magenta specifically | |
| # magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 | |
| # mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) | |
| # rgba = arr.copy() | |
| # rgba[:, :, 0] = mask_bw # R | |
| # rgba[:, :, 1] = mask_bw # G | |
| # rgba[:, :, 2] = mask_bw # B | |
| # rgba[:, :, 3] = 255 # Fully opaque | |
| # log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| # return rgba | |
| # # Alpha channel encodes the mask - convert to RGB-based | |
| # # Transparent areas (alpha < 128) = remove, Opaque areas = keep | |
| # mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) | |
| # rgba = arr.copy() | |
| # rgba[:, :, 0] = mask_bw | |
| # rgba[:, :, 1] = mask_bw | |
| # rgba[:, :, 2] = mask_bw | |
| # rgba[:, :, 3] = 255 | |
| # log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") | |
| # return rgba | |
| # @app.post("/inpaint") | |
| # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # start_time = time.time() | |
| # status = "success" | |
| # error_msg = None | |
| # output_name = None | |
| # try: | |
| # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| # raise HTTPException(status_code=404, detail="image_id not found") | |
| # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| # raise HTTPException(status_code=404, detail="mask_id not found") | |
| # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| # mask_img = Image.open(file_store[req.mask_id]["path"]) | |
| # mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| # if req.passthrough: | |
| # result = np.array(img_rgba.convert("RGB")) | |
| # else: | |
| # result = process_inpaint( | |
| # np.array(img_rgba), | |
| # mask_rgba, | |
| # invert_mask=req.invert_mask | |
| # ) | |
| # output_name = f"output_{uuid.uuid4().hex}.png" | |
| # output_path = os.path.join(OUTPUT_DIR, output_name) | |
| # Image.fromarray(result).save( | |
| # output_path, "PNG", optimize=False, compress_level=1 | |
| # ) | |
| # log_media_click(req.user_id, req.category_id) | |
| # return {"result": output_name} | |
| # except Exception as e: | |
| # status = "fail" | |
| # error_msg = str(e) | |
| # raise | |
| # finally: | |
| # end_time = time.time() | |
| # response_time_ms = (end_time - start_time) * 1000 | |
| # log_doc = { | |
| # "input_image_id": req.image_id, | |
| # "input_mask_id": req.mask_id, | |
| # "output_id": output_name, | |
| # "status": status, | |
| # "timestamp": datetime.utcnow(), | |
| # "ts": int(time.time()), | |
| # "response_time_ms": response_time_ms | |
| # } | |
| # if error_msg: | |
| # log_doc["error"] = error_msg | |
| # try: | |
| # mongo_logs.insert_one(log_doc) | |
| # except Exception as mongo_err: | |
| # log.error(f"Mongo log insert failed: {mongo_err}") | |
| # # @app.post("/inpaint") | |
| # # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| # # raise HTTPException(status_code=404, detail="image_id not found") | |
| # # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| # # raise HTTPException(status_code=404, detail="mask_id not found") | |
| # # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| # # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA | |
| # # mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| # # # Debug: check mask before processing | |
| # # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) | |
| # # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") | |
| # # if req.passthrough: | |
| # # result = np.array(img_rgba.convert("RGB")) | |
| # # else: | |
| # # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) | |
| # # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) | |
| # # return {"result": result_name} | |
| # @app.post("/inpaint-url") | |
| # def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: | |
| # """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" | |
| # start_time = time.time() | |
| # status = "success" | |
| # error_msg = None | |
| # result_name = None | |
| # try: | |
| # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": | |
| # raise HTTPException(status_code=404, detail="image_id not found") | |
| # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": | |
| # raise HTTPException(status_code=404, detail="mask_id not found") | |
| # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) | |
| # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA | |
| # mask_rgba = _load_rgba_mask_from_image(mask_img) | |
| # if req.passthrough: | |
| # result = np.array(img_rgba.convert("RGB")) | |
| # else: | |
| # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # url = str(request.url_for("download_file", filename=result_name)) | |
| # logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) | |
| # log_media_click(req.user_id, req.category_id) | |
| # return {"result": result_name, "url": url} | |
| # except Exception as e: | |
| # status = "fail" | |
| # error_msg = str(e) | |
| # raise | |
| # finally: | |
| # # Always log to regular MongoDB (mandatory) | |
| # end_time = time.time() | |
| # response_time_ms = (end_time - start_time) * 1000 | |
| # log_doc = { | |
| # "input_image_id": req.image_id, | |
| # "input_mask_id": req.mask_id, | |
| # "output_id": result_name, | |
| # "status": status, | |
| # "timestamp": datetime.utcnow(), | |
| # "ts": int(time.time()), | |
| # "response_time_ms": response_time_ms, | |
| # } | |
| # if error_msg: | |
| # log_doc["error"] = error_msg | |
| # try: | |
| # mongo_logs.insert_one(log_doc) | |
| # except Exception as mongo_err: | |
| # log.error("Mongo log insert failed: %s", mongo_err) | |
| # @app.post("/inpaint-multipart") | |
| # def inpaint_multipart( | |
| # image: UploadFile = File(...), | |
| # mask: UploadFile = File(...), | |
| # request: Request = None, | |
| # invert_mask: bool = True, | |
| # mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) | |
| # passthrough: bool = False, | |
| # user_id: Optional[str] = Form(None), | |
| # category_id: Optional[str] = Form(None), | |
| # _: None = Depends(bearer_auth), | |
| # ) -> Dict[str, str]: | |
| # start_time = time.time() | |
| # status = "success" | |
| # error_msg = None | |
| # result_name = None | |
| # try: | |
| # # Load in-memory | |
| # img = Image.open(image.file).convert("RGBA") | |
| # m = Image.open(mask.file).convert("RGBA") | |
| # if passthrough: | |
| # # Just echo the input image, ignore mask | |
| # result = np.array(img.convert("RGB")) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # url: Optional[str] = None | |
| # try: | |
| # if request is not None: | |
| # url = str(request.url_for("download_file", filename=result_name)) | |
| # except Exception: | |
| # url = None | |
| # entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} | |
| # if url: | |
| # entry["url"] = url | |
| # logs.append(entry) | |
| # resp: Dict[str, str] = {"result": result_name} | |
| # if url: | |
| # resp["url"] = url | |
| # log_media_click(user_id, category_id) | |
| # return resp | |
| # if mask_is_painted: | |
| # # Auto-detect pink/magenta paint and convert to black/white mask | |
| # # White pixels = areas to remove, Black pixels = areas to keep | |
| # log.info("Auto-detecting pink/magenta paint from uploaded image...") | |
| # m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) | |
| # # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) | |
| # lower = np.array([150, 0, 100], dtype=np.uint8) | |
| # upper = np.array([255, 120, 255], dtype=np.uint8) | |
| # magenta_detected = ( | |
| # (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & | |
| # (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & | |
| # (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) | |
| # ).astype(np.uint8) * 255 | |
| # # Method 2: Also check if original image was provided to find differences | |
| # if img is not None: | |
| # img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) | |
| # if img_rgb.shape == m_rgb.shape: | |
| # diff = cv2.absdiff(img_rgb, m_rgb) | |
| # gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) | |
| # # Any significant difference (>50) could be paint | |
| # diff_mask = (gray_diff > 50).astype(np.uint8) * 255 | |
| # # Combine with magenta detection | |
| # binmask = cv2.bitwise_or(magenta_detected, diff_mask) | |
| # else: | |
| # binmask = magenta_detected | |
| # else: | |
| # # No original image provided, use magenta detection only | |
| # binmask = magenta_detected | |
| # # Clean up the mask: remove noise and fill small holes | |
| # kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| # # Close small gaps in the mask | |
| # binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) | |
| # # Remove small noise | |
| # binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) | |
| # nonzero = int((binmask > 0).sum()) | |
| # log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) | |
| # # If very few pixels detected, assume the user may already be providing a BW mask | |
| # # and proceed without forcing strict detection | |
| # if nonzero < 50: | |
| # log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") | |
| # result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} | |
| # # Create binary mask: Pink pixels → white (255), Everything else → black (0) | |
| # # Encode in RGBA format for process_inpaint | |
| # # process_inpaint does: mask = 255 - mask[:,:,3] | |
| # # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) | |
| # # alpha=255 (opaque/keep) → becomes 0 (black/keep) | |
| # mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) | |
| # mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) | |
| # mask_rgba[:, :, 1] = binmask # G: white where pink | |
| # mask_rgba[:, :, 2] = binmask # B: white where pink | |
| # # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha | |
| # mask_rgba[:, :, 3] = 255 - binmask | |
| # log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", | |
| # nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) | |
| # else: | |
| # mask_rgba = _load_rgba_mask_from_image(m) | |
| # # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly | |
| # actual_invert = invert_mask # Use default True for painted masks | |
| # log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) | |
| # result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # url: Optional[str] = None | |
| # try: | |
| # if request is not None: | |
| # url = str(request.url_for("download_file", filename=result_name)) | |
| # except Exception: | |
| # url = None | |
| # entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} | |
| # if url: | |
| # entry["url"] = url | |
| # logs.append(entry) | |
| # resp: Dict[str, str] = {"result": result_name} | |
| # if url: | |
| # resp["url"] = url | |
| # log_media_click(user_id, category_id) | |
| # return resp | |
| # except Exception as e: | |
| # status = "fail" | |
| # error_msg = str(e) | |
| # raise | |
| # finally: | |
| # # Always log to regular MongoDB (mandatory) | |
| # end_time = time.time() | |
| # response_time_ms = (end_time - start_time) * 1000 | |
| # log_doc = { | |
| # "endpoint": "inpaint-multipart", | |
| # "output_id": result_name, | |
| # "status": status, | |
| # "timestamp": datetime.utcnow(), | |
| # "ts": int(time.time()), | |
| # "response_time_ms": response_time_ms, | |
| # } | |
| # if error_msg: | |
| # log_doc["error"] = error_msg | |
| # try: | |
| # mongo_logs.insert_one(log_doc) | |
| # except Exception as mongo_err: | |
| # log.error("Mongo log insert failed: %s", mongo_err) | |
| # @app.post("/remove-pink") | |
| # def remove_pink_segments( | |
| # image: UploadFile = File(...), | |
| # request: Request = None, | |
| # user_id: Optional[str] = Form(None), | |
| # category_id: Optional[str] = Form(None), | |
| # _: None = Depends(bearer_auth), | |
| # ) -> Dict[str, str]: | |
| # """ | |
| # Simple endpoint: upload an image with pink/magenta segments to remove. | |
| # - Pink/Magenta segments → automatically removed (white in mask) | |
| # - Everything else → automatically kept (black in mask) | |
| # Just paint pink/magenta on areas you want to remove, upload the image, and it works! | |
| # """ | |
| # start_time = time.time() | |
| # status = "success" | |
| # error_msg = None | |
| # result_name = None | |
| # try: | |
| # log.info(f"Simple remove-pink: processing image {image.filename}") | |
| # # Load the image (with pink paint on it) | |
| # img = Image.open(image.file).convert("RGBA") | |
| # img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) | |
| # # Auto-detect pink/magenta segments to remove | |
| # # Pink/Magenta → white in mask (remove) | |
| # # Everything else (natural image colors, including dark areas) → black in mask (keep) | |
| # # Detect pink/magenta using fixed RGB bounds per requested logic | |
| # lower = np.array([150, 0, 100], dtype=np.uint8) | |
| # upper = np.array([255, 120, 255], dtype=np.uint8) | |
| # binmask = ( | |
| # (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & | |
| # (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & | |
| # (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) | |
| # ).astype(np.uint8) * 255 | |
| # # Clean up the pink mask | |
| # kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| # binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) | |
| # binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) | |
| # nonzero = int((binmask > 0).sum()) | |
| # total_pixels = binmask.shape[0] * binmask.shape[1] | |
| # log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") | |
| # # Debug: log bounds used | |
| # log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") | |
| # if nonzero < 50: | |
| # log.error("No pink segments detected! Returning original image.") | |
| # result = np.array(img.convert("RGB")) | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # return { | |
| # "result": result_name, | |
| # "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." | |
| # } | |
| # # Create binary mask: Pink pixels → white (255), Everything else → black (0) | |
| # # Encode in RGBA format that process_inpaint expects | |
| # # process_inpaint does: mask = 255 - mask[:,:,3] | |
| # # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) | |
| # # alpha=255 (opaque/keep) → becomes 0 (black/keep) | |
| # mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) | |
| # # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization | |
| # mask_rgba[:, :, 0] = binmask # R: white where pink | |
| # mask_rgba[:, :, 1] = binmask # G: white where pink | |
| # mask_rgba[:, :, 2] = binmask # B: white where pink | |
| # # Alpha: 0 (transparent) where pink → will become white after 255-alpha | |
| # # 255 (opaque) everywhere else → will become black after 255-alpha | |
| # mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 | |
| # # Verify mask encoding | |
| # alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) | |
| # alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) | |
| # total_pixels = binmask.shape[0] * binmask.shape[1] | |
| # log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") | |
| # log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") | |
| # # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! | |
| # # Remove pink from the original image before processing | |
| # # Create a clean version: where pink was detected, keep original image colors | |
| # img_clean = np.array(img.convert("RGBA")) | |
| # # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) | |
| # # Actually, the model will inpaint over those areas, so we can pass the original | |
| # # But for better results, we might want to remove the pink overlay first | |
| # # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal | |
| # log.info(f"Starting inpainting process...") | |
| # result = process_inpaint(img_clean, mask_rgba, invert_mask=True) | |
| # log.info(f"Inpainting complete, result shape: {result.shape}") | |
| # result_name = f"output_{uuid.uuid4().hex}.png" | |
| # result_path = os.path.join(OUTPUT_DIR, result_name) | |
| # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) | |
| # url: Optional[str] = None | |
| # try: | |
| # if request is not None: | |
| # url = str(request.url_for("download_file", filename=result_name)) | |
| # except Exception: | |
| # url = None | |
| # logs.append({ | |
| # "result": result_name, | |
| # "filename": image.filename, | |
| # "pink_pixels": nonzero, | |
| # "timestamp": datetime.utcnow().isoformat() | |
| # }) | |
| # resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} | |
| # if url: | |
| # resp["url"] = url | |
| # log_media_click(user_id, category_id) | |
| # return resp | |
| # except Exception as e: | |
| # status = "fail" | |
| # error_msg = str(e) | |
| # raise | |
| # finally: | |
| # # Always log to regular MongoDB (mandatory) | |
| # end_time = time.time() | |
| # response_time_ms = (end_time - start_time) * 1000 | |
| # log_doc = { | |
| # "endpoint": "remove-pink", | |
| # "output_id": result_name, | |
| # "status": status, | |
| # "timestamp": datetime.utcnow(), | |
| # "ts": int(time.time()), | |
| # "response_time_ms": response_time_ms, | |
| # } | |
| # if error_msg: | |
| # log_doc["error"] = error_msg | |
| # try: | |
| # mongo_logs.insert_one(log_doc) | |
| # except Exception as mongo_err: | |
| # log.error("Mongo log insert failed: %s", mongo_err) | |
| # @app.get("/download/{filename}") | |
| # def download_file(filename: str): | |
| # path = os.path.join(OUTPUT_DIR, filename) | |
| # if not os.path.isfile(path): | |
| # raise HTTPException(status_code=404, detail="file not found") | |
| # return FileResponse(path) | |
| # @app.get("/result/{filename}") | |
| # def view_result(filename: str): | |
| # """View result image directly in browser (same as download but with proper content-type for viewing)""" | |
| # path = os.path.join(OUTPUT_DIR, filename) | |
| # if not os.path.isfile(path): | |
| # raise HTTPException(status_code=404, detail="file not found") | |
| # return FileResponse(path, media_type="image/png") | |
| # @app.get("/logs") | |
| # def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: | |
| # return JSONResponse(content=logs) |