object_remover / api /main.py
LogicGoInfotechSpaces's picture
Update api/main.py
f2ba5b8 verified
import os
import uuid
import shutil
import re
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional
import numpy as np
from fastapi import (
FastAPI,
UploadFile,
File,
HTTPException,
Depends,
Header,
Request,
Form,
)
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from PIL import Image
import cv2
import logging
from bson import ObjectId
from pymongo import MongoClient
import time
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("api")
from src.core import process_inpaint
# Directories (use writable space on HF Spaces)
BASE_DIR = os.environ.get("DATA_DIR", "/data")
if not os.path.isdir(BASE_DIR):
# Fallback to /tmp if /data not available
BASE_DIR = "/tmp"
UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open
ENV_TOKEN = os.environ.get("API_TOKEN")
app = FastAPI(title="Photo Object Removal API", version="1.0.0")
# In-memory stores
file_store: Dict[str, Dict[str, str]] = {}
logs: List[Dict[str, str]] = []
MONGO_URI = "mongodb+srv://harilogicgo_db_user:[email protected]/?appName=KiddoImages"
mongo_client = MongoClient(MONGO_URI)
mongo_db = mongo_client["object_remover"]
mongo_logs = mongo_db["api_logs"]
ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN")
DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984"
admin_media_clicks = None
def _init_admin_mongo() -> None:
global admin_media_clicks
if not ADMIN_MONGO_URI:
log.info("Admin Mongo URI not provided; media click logging disabled")
return
try:
admin_client = MongoClient(ADMIN_MONGO_URI)
# get_default_database() extracts database from connection string (e.g., /adminPanel)
admin_db = admin_client.get_default_database()
if admin_db is None:
# Fallback if no database in URI
admin_db = admin_client["admin"]
log.warning("No database in connection string, defaulting to 'admin'")
admin_media_clicks = admin_db["media_clicks"]
log.info(
"Admin media click logging initialized: db=%s collection=%s",
admin_db.name,
admin_media_clicks.name,
)
try:
admin_media_clicks.drop_index("user_id_1_header_1_media_id_1")
log.info("Dropped legacy index user_id_1_header_1_media_id_1")
except Exception as idx_err:
# Index drop failure is non-critical (often permission issue)
if "Unauthorized" not in str(idx_err):
log.info("Skipping legacy index drop: %s", idx_err)
except Exception as err:
log.error("Failed to init admin Mongo client: %s", err)
admin_media_clicks = None
_init_admin_mongo()
def _admin_logging_status() -> Dict[str, object]:
if admin_media_clicks is None:
return {
"enabled": False,
"db": None,
"collection": None,
}
return {
"enabled": True,
"db": admin_media_clicks.database.name,
"collection": admin_media_clicks.name,
}
def _build_ai_edit_daily_count(
existing: Optional[List[Dict[str, object]]],
today: date,
) -> List[Dict[str, object]]:
"""
Build / extend the ai_edit_daily_count array with the following rules:
- Case A (no existing data): return [{date: today, count: 1}]
- Case B (today already recorded): return list unchanged
- Case C (gap in days): fill missing days with count=0 and append today with count=1
Additionally, the returned list is capped to the most recent 32 entries.
The stored "date" value is a midnight UTC (naive UTC) datetime for the given day.
"""
def _to_date_only(value: object) -> date:
if isinstance(value, datetime):
return value.date()
if isinstance(value, date):
return value
# Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime
try:
text = str(value)
if len(text) == 10:
return datetime.strptime(text, "%Y-%m-%d").date()
return datetime.fromisoformat(text).date()
except Exception:
# If parsing fails, just treat as today to avoid crashing
return today
# Case A: first ever use (no array yet)
if not existing:
return [
{
"date": datetime(today.year, today.month, today.day),
"count": 1,
}
]
# Work on a shallow copy so we don't mutate original in-place
result: List[Dict[str, object]] = list(existing)
last_entry = result[-1] if result else None
if not last_entry or "date" not in last_entry:
# If structure is unexpected, re-initialize safely
return [
{
"date": datetime(today.year, today.month, today.day),
"count": 1,
}
]
last_date = _to_date_only(last_entry["date"])
# If somehow the last stored date is in the future, do nothing to avoid corrupting history
if last_date > today:
return result
# Case B: today's date already present as the last entry → unchanged
if last_date == today:
return result
# Case C: there is a gap, fill missing days with count=0 and append today with count=1
cursor = last_date + timedelta(days=1)
while cursor < today:
result.append(
{
"date": datetime(cursor.year, cursor.month, cursor.day),
"count": 0,
}
)
cursor += timedelta(days=1)
# Finally add today's presence indicator
result.append(
{
"date": datetime(today.year, today.month, today.day),
"count": 1,
}
)
# Sort by date ascending (older dates first) to guarantee stable ordering:
# [oldest, ..., newest]
try:
result.sort(key=lambda entry: _to_date_only(entry.get("date")))
except Exception:
# If anything goes wrong during sort, fall back to current ordering
pass
# Enforce 32-entry limit (keep the most recent 32 days)
if len(result) > 32:
result = result[-32:]
return result
def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None:
if not ENV_TOKEN:
return
if authorization is None or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="Unauthorized")
token = authorization.split(" ", 1)[1]
if token != ENV_TOKEN:
raise HTTPException(status_code=403, detail="Forbidden")
class InpaintRequest(BaseModel):
image_id: str
mask_id: str
invert_mask: bool = True # True => selected/painted area is removed
passthrough: bool = False # If True, return the original image unchanged
user_id: Optional[str] = None
category_id: Optional[str] = None
class SimpleRemoveRequest(BaseModel):
image_id: str # Image with pink/magenta segments to remove
def _coerce_object_id(value: Optional[str]) -> ObjectId:
if value is None:
return ObjectId()
value_str = str(value).strip()
if re.fullmatch(r"[0-9a-fA-F]{24}", value_str):
return ObjectId(value_str)
if value_str.isdigit():
hex_str = format(int(value_str), "x")
if len(hex_str) > 24:
hex_str = hex_str[-24:]
hex_str = hex_str.rjust(24, "0")
return ObjectId(hex_str)
return ObjectId()
def _coerce_category_id(category_id: Optional[str]) -> ObjectId:
raw = category_id or DEFAULT_CATEGORY_ID
raw_str = str(raw).strip()
if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str):
return ObjectId(raw_str)
return _coerce_object_id(raw_str)
def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None:
"""Log to admin media_clicks collection only if user_id is provided."""
if admin_media_clicks is None:
return
# Only log if user_id is provided (not None/empty)
if not user_id or not user_id.strip():
return
try:
user_obj = _coerce_object_id(user_id)
category_obj = _coerce_category_id(category_id)
now = datetime.utcnow()
today = now.date()
doc = admin_media_clicks.find_one({"userId": user_obj})
if doc:
existing_daily = doc.get("ai_edit_daily_count")
updated_daily = _build_ai_edit_daily_count(existing_daily, today)
categories = doc.get("categories") or []
if any(cat.get("categoryId") == category_obj for cat in categories):
# Category exists: increment click_count and ai_edit_complete, update dates
admin_media_clicks.update_one(
{"_id": doc["_id"], "categories.categoryId": category_obj},
{
"$inc": {
"categories.$.click_count": 1,
"ai_edit_complete": 1, # $inc handles missing fields (backward compatible)
},
"$set": {
"categories.$.lastClickedAt": now,
"updatedAt": now,
"ai_edit_last_date": now,
"ai_edit_daily_count": updated_daily,
},
},
)
else:
# New category to existing document: push category, increment ai_edit_complete
admin_media_clicks.update_one(
{"_id": doc["_id"]},
{
"$push": {
"categories": {
"categoryId": category_obj,
"click_count": 1,
"lastClickedAt": now,
}
},
"$inc": {"ai_edit_complete": 1}, # $inc handles missing fields
"$set": {
"updatedAt": now,
"ai_edit_last_date": now,
"ai_edit_daily_count": updated_daily,
},
},
)
else:
# New user: create document with default ai_edit_complete=0, then increment to 1
daily_for_new = _build_ai_edit_daily_count(None, today)
admin_media_clicks.update_one(
{"userId": user_obj},
{
"$setOnInsert": {
"userId": user_obj,
"categories": [
{
"categoryId": category_obj,
"click_count": 1,
"lastClickedAt": now,
}
],
"createdAt": now,
"updatedAt": now,
"ai_edit_complete": 0, # Default for new users
"ai_edit_daily_count": daily_for_new,
},
"$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use
"$set": {
"updatedAt": now,
"ai_edit_last_date": now,
},
},
upsert=True,
)
except Exception as err:
err_str = str(err)
if "Unauthorized" in err_str or "not authorized" in err_str.lower():
log.warning(
"Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. "
"Check MongoDB user permissions.",
admin_media_clicks.database.name,
admin_media_clicks.name,
)
else:
log.warning("Admin media click logging failed: %s", err)
@app.get("/")
def root() -> Dict[str, object]:
return {
"name": "Photo Object Removal API",
"status": "ok",
"endpoints": {
"GET /health": "health check",
"POST /upload-image": "form-data: image=file",
"POST /upload-mask": "form-data: mask=file",
"POST /inpaint": "JSON: {image_id, mask_id}",
"POST /inpaint-multipart": "form-data: image=file, mask=file",
"POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)",
"GET /download/{filename}": "download result image",
"GET /result/{filename}": "view result image in browser",
"GET /logs": "recent uploads/results",
},
"auth": "set API_TOKEN env var to require Authorization: Bearer <token> (except /health)",
}
@app.get("/health")
def health() -> Dict[str, str]:
return {"status": "healthy"}
@app.get("/logging-status")
def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]:
"""Helper endpoint to verify admin media logging wiring (no secrets exposed)."""
return _admin_logging_status()
@app.post("/upload-image")
def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]:
ext = os.path.splitext(image.filename)[1] or ".png"
file_id = str(uuid.uuid4())
stored_name = f"{file_id}{ext}"
stored_path = os.path.join(UPLOAD_DIR, stored_name)
with open(stored_path, "wb") as f:
shutil.copyfileobj(image.file, f)
file_store[file_id] = {
"type": "image",
"filename": image.filename,
"stored_name": stored_name,
"path": stored_path,
"timestamp": datetime.utcnow().isoformat(),
}
logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()})
return {"id": file_id, "filename": image.filename}
@app.post("/upload-mask")
def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]:
ext = os.path.splitext(mask.filename)[1] or ".png"
file_id = str(uuid.uuid4())
stored_name = f"{file_id}{ext}"
stored_path = os.path.join(UPLOAD_DIR, stored_name)
with open(stored_path, "wb") as f:
shutil.copyfileobj(mask.file, f)
file_store[file_id] = {
"type": "mask",
"filename": mask.filename,
"stored_name": stored_name,
"path": stored_path,
"timestamp": datetime.utcnow().isoformat(),
}
logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()})
return {"id": file_id, "filename": mask.filename}
def _load_rgba_image(path: str) -> Image.Image:
img = Image.open(path)
return img.convert("RGBA")
def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray:
"""
Convert mask image to RGBA format (black/white mask).
Standard convention: white (255) = area to remove, black (0) = area to keep
Returns RGBA with white in RGB channels where removal is needed, alpha=255
"""
if img.mode != "RGBA":
# For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep
gray = img.convert("L")
arr = np.array(gray)
# Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep
mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8)
rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8)
rgba[:, :, 0] = mask_bw # R
rgba[:, :, 1] = mask_bw # G
rgba[:, :, 2] = mask_bw # B
rgba[:, :, 3] = 255 # Fully opaque
log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)")
return rgba
# For RGBA: check if alpha channel is meaningful
arr = np.array(img)
alpha = arr[:, :, 3]
rgb = arr[:, :, :3]
# If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values
if alpha.mean() > 200:
# Use RGB to determine mask: white/bright in RGB = remove
gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
# Also detect magenta specifically
magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255
mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta)
rgba = arr.copy()
rgba[:, :, 0] = mask_bw # R
rgba[:, :, 1] = mask_bw # G
rgba[:, :, 2] = mask_bw # B
rgba[:, :, 3] = 255 # Fully opaque
log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)")
return rgba
# Alpha channel encodes the mask - convert to RGB-based
# Transparent areas (alpha < 128) = remove, Opaque areas = keep
mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8)
rgba = arr.copy()
rgba[:, :, 0] = mask_bw
rgba[:, :, 1] = mask_bw
rgba[:, :, 2] = mask_bw
rgba[:, :, 3] = 255
log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)")
return rgba
@app.post("/inpaint")
def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]:
start_time = time.time()
status = "success"
error_msg = None
output_name = None
try:
if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
raise HTTPException(status_code=404, detail="image_id not found")
if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
raise HTTPException(status_code=404, detail="mask_id not found")
img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
mask_img = Image.open(file_store[req.mask_id]["path"])
mask_rgba = _load_rgba_mask_from_image(mask_img)
if req.passthrough:
result = np.array(img_rgba.convert("RGB"))
else:
result = process_inpaint(
np.array(img_rgba),
mask_rgba,
invert_mask=req.invert_mask
)
output_name = f"output_{uuid.uuid4().hex}.png"
output_path = os.path.join(OUTPUT_DIR, output_name)
Image.fromarray(result).save(
output_path, "PNG", optimize=False, compress_level=1
)
log_media_click(req.user_id, req.category_id)
return {"result": output_name}
except Exception as e:
status = "fail"
error_msg = str(e)
raise
finally:
end_time = time.time()
response_time_ms = (end_time - start_time) * 1000
log_doc = {
"input_image_id": req.image_id,
"input_mask_id": req.mask_id,
"output_id": output_name,
"status": status,
"timestamp": datetime.utcnow(),
"ts": int(time.time()),
"response_time_ms": response_time_ms
}
if error_msg:
log_doc["error"] = error_msg
try:
mongo_logs.insert_one(log_doc)
except Exception as mongo_err:
log.error(f"Mongo log insert failed: {mongo_err}")
# @app.post("/inpaint")
# def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]:
# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
# raise HTTPException(status_code=404, detail="image_id not found")
# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
# raise HTTPException(status_code=404, detail="mask_id not found")
# img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
# mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA
# mask_rgba = _load_rgba_mask_from_image(mask_img)
# # Debug: check mask before processing
# white_pixels = int((mask_rgba[:,:,0] > 128).sum())
# log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}")
# if req.passthrough:
# result = np.array(img_rgba.convert("RGB"))
# else:
# result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask)
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()})
# return {"result": result_name}
@app.post("/inpaint-url")
def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]:
"""Same as /inpaint but returns a JSON with a public download URL instead of image bytes."""
start_time = time.time()
status = "success"
error_msg = None
result_name = None
try:
if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
raise HTTPException(status_code=404, detail="image_id not found")
if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
raise HTTPException(status_code=404, detail="mask_id not found")
img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA
mask_rgba = _load_rgba_mask_from_image(mask_img)
if req.passthrough:
result = np.array(img_rgba.convert("RGB"))
else:
result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask)
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
url = str(request.url_for("download_file", filename=result_name))
logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()})
log_media_click(req.user_id, req.category_id)
return {"result": result_name, "url": url}
except Exception as e:
status = "fail"
error_msg = str(e)
raise
finally:
# Always log to regular MongoDB (mandatory)
end_time = time.time()
response_time_ms = (end_time - start_time) * 1000
log_doc = {
"input_image_id": req.image_id,
"input_mask_id": req.mask_id,
"output_id": result_name,
"status": status,
"timestamp": datetime.utcnow(),
"ts": int(time.time()),
"response_time_ms": response_time_ms,
}
if error_msg:
log_doc["error"] = error_msg
try:
mongo_logs.insert_one(log_doc)
except Exception as mongo_err:
log.error("Mongo log insert failed: %s", mongo_err)
@app.post("/inpaint-multipart")
def inpaint_multipart(
image: UploadFile = File(...),
mask: UploadFile = File(...),
request: Request = None,
invert_mask: bool = True,
mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original)
passthrough: bool = False,
user_id: Optional[str] = Form(None),
category_id: Optional[str] = Form(None),
_: None = Depends(bearer_auth),
) -> Dict[str, str]:
start_time = time.time()
status = "success"
error_msg = None
result_name = None
try:
# Load in-memory
img = Image.open(image.file).convert("RGBA")
m = Image.open(mask.file).convert("RGBA")
if passthrough:
# Just echo the input image, ignore mask
result = np.array(img.convert("RGB"))
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
url: Optional[str] = None
try:
if request is not None:
url = str(request.url_for("download_file", filename=result_name))
except Exception:
url = None
entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()}
if url:
entry["url"] = url
logs.append(entry)
resp: Dict[str, str] = {"result": result_name}
if url:
resp["url"] = url
log_media_click(user_id, category_id)
return resp
if mask_is_painted:
# Auto-detect pink/magenta paint and convert to black/white mask
# White pixels = areas to remove, Black pixels = areas to keep
log.info("Auto-detecting pink/magenta paint from uploaded image...")
m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB)
# Detect pink/magenta using fixed RGB bounds (same as /remove-pink)
lower = np.array([150, 0, 100], dtype=np.uint8)
upper = np.array([255, 120, 255], dtype=np.uint8)
magenta_detected = (
(m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) &
(m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) &
(m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2])
).astype(np.uint8) * 255
# Method 2: Also check if original image was provided to find differences
if img is not None:
img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB)
if img_rgb.shape == m_rgb.shape:
diff = cv2.absdiff(img_rgb, m_rgb)
gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY)
# Any significant difference (>50) could be paint
diff_mask = (gray_diff > 50).astype(np.uint8) * 255
# Combine with magenta detection
binmask = cv2.bitwise_or(magenta_detected, diff_mask)
else:
binmask = magenta_detected
else:
# No original image provided, use magenta detection only
binmask = magenta_detected
# Clean up the mask: remove noise and fill small holes
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# Close small gaps in the mask
binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2)
# Remove small noise
binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1)
nonzero = int((binmask > 0).sum())
log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero)
# If very few pixels detected, assume the user may already be providing a BW mask
# and proceed without forcing strict detection
if nonzero < 50:
log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.")
result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB"))
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"}
# Create binary mask: Pink pixels → white (255), Everything else → black (0)
# Encode in RGBA format for process_inpaint
# process_inpaint does: mask = 255 - mask[:,:,3]
# So: alpha=0 (transparent/pink) → becomes 255 (white/remove)
# alpha=255 (opaque/keep) → becomes 0 (black/keep)
mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8)
mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization)
mask_rgba[:, :, 1] = binmask # G: white where pink
mask_rgba[:, :, 2] = binmask # B: white where pink
# Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha
mask_rgba[:, :, 3] = 255 - binmask
log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)",
nonzero, binmask.shape[0] * binmask.shape[1] - nonzero)
else:
mask_rgba = _load_rgba_mask_from_image(m)
# When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly
actual_invert = invert_mask # Use default True for painted masks
log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted)
result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert)
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
url: Optional[str] = None
try:
if request is not None:
url = str(request.url_for("download_file", filename=result_name))
except Exception:
url = None
entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()}
if url:
entry["url"] = url
logs.append(entry)
resp: Dict[str, str] = {"result": result_name}
if url:
resp["url"] = url
log_media_click(user_id, category_id)
return resp
except Exception as e:
status = "fail"
error_msg = str(e)
raise
finally:
# Always log to regular MongoDB (mandatory)
end_time = time.time()
response_time_ms = (end_time - start_time) * 1000
log_doc = {
"endpoint": "inpaint-multipart",
"output_id": result_name,
"status": status,
"timestamp": datetime.utcnow(),
"ts": int(time.time()),
"response_time_ms": response_time_ms,
}
if error_msg:
log_doc["error"] = error_msg
try:
mongo_logs.insert_one(log_doc)
except Exception as mongo_err:
log.error("Mongo log insert failed: %s", mongo_err)
@app.post("/remove-pink")
def remove_pink_segments(
image: UploadFile = File(...),
request: Request = None,
user_id: Optional[str] = Form(None),
category_id: Optional[str] = Form(None),
_: None = Depends(bearer_auth),
) -> Dict[str, str]:
"""
Simple endpoint: upload an image with pink/magenta segments to remove.
- Pink/Magenta segments → automatically removed (white in mask)
- Everything else → automatically kept (black in mask)
Just paint pink/magenta on areas you want to remove, upload the image, and it works!
"""
start_time = time.time()
status = "success"
error_msg = None
result_name = None
try:
log.info(f"Simple remove-pink: processing image {image.filename}")
# Load the image (with pink paint on it)
img = Image.open(image.file).convert("RGBA")
img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB)
# Auto-detect pink/magenta segments to remove
# Pink/Magenta → white in mask (remove)
# Everything else (natural image colors, including dark areas) → black in mask (keep)
# Detect pink/magenta using fixed RGB bounds per requested logic
lower = np.array([150, 0, 100], dtype=np.uint8)
upper = np.array([255, 120, 255], dtype=np.uint8)
binmask = (
(img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) &
(img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) &
(img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2])
).astype(np.uint8) * 255
# Clean up the pink mask
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2)
binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1)
nonzero = int((binmask > 0).sum())
total_pixels = binmask.shape[0] * binmask.shape[1]
log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove")
# Debug: log bounds used
log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]")
if nonzero < 50:
log.error("No pink segments detected! Returning original image.")
result = np.array(img.convert("RGB"))
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
return {
"result": result_name,
"error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)."
}
# Create binary mask: Pink pixels → white (255), Everything else → black (0)
# Encode in RGBA format that process_inpaint expects
# process_inpaint does: mask = 255 - mask[:,:,3]
# So: alpha=0 (transparent/pink) → becomes 255 (white/remove)
# alpha=255 (opaque/keep) → becomes 0 (black/keep)
mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8)
# RGB channels don't matter for process_inpaint, but set them to white where pink for visualization
mask_rgba[:, :, 0] = binmask # R: white where pink
mask_rgba[:, :, 1] = binmask # G: white where pink
mask_rgba[:, :, 2] = binmask # B: white where pink
# Alpha: 0 (transparent) where pink → will become white after 255-alpha
# 255 (opaque) everywhere else → will become black after 255-alpha
mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255
# Verify mask encoding
alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum())
alpha_255_count = int((mask_rgba[:,:,3] == 255).sum())
total_pixels = binmask.shape[0] * binmask.shape[1]
log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)")
log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)")
# IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting!
# Remove pink from the original image before processing
# Create a clean version: where pink was detected, keep original image colors
img_clean = np.array(img.convert("RGBA"))
# Where pink is detected, we want to inpaint, so we can leave it (or blend it out)
# Actually, the model will inpaint over those areas, so we can pass the original
# But for better results, we might want to remove the pink overlay first
# Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal
log.info(f"Starting inpainting process...")
result = process_inpaint(img_clean, mask_rgba, invert_mask=True)
log.info(f"Inpainting complete, result shape: {result.shape}")
result_name = f"output_{uuid.uuid4().hex}.png"
result_path = os.path.join(OUTPUT_DIR, result_name)
Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
url: Optional[str] = None
try:
if request is not None:
url = str(request.url_for("download_file", filename=result_name))
except Exception:
url = None
logs.append({
"result": result_name,
"filename": image.filename,
"pink_pixels": nonzero,
"timestamp": datetime.utcnow().isoformat()
})
resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)}
if url:
resp["url"] = url
log_media_click(user_id, category_id)
return resp
except Exception as e:
status = "fail"
error_msg = str(e)
raise
finally:
# Always log to regular MongoDB (mandatory)
end_time = time.time()
response_time_ms = (end_time - start_time) * 1000
log_doc = {
"endpoint": "remove-pink",
"output_id": result_name,
"status": status,
"timestamp": datetime.utcnow(),
"ts": int(time.time()),
"response_time_ms": response_time_ms,
}
if error_msg:
log_doc["error"] = error_msg
try:
mongo_logs.insert_one(log_doc)
except Exception as mongo_err:
log.error("Mongo log insert failed: %s", mongo_err)
@app.get("/download/{filename}")
def download_file(filename: str):
path = os.path.join(OUTPUT_DIR, filename)
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="file not found")
return FileResponse(path)
@app.get("/result/{filename}")
def view_result(filename: str):
"""View result image directly in browser (same as download but with proper content-type for viewing)"""
path = os.path.join(OUTPUT_DIR, filename)
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="file not found")
return FileResponse(path, media_type="image/png")
@app.get("/logs")
def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse:
return JSONResponse(content=logs)
# import os
# import uuid
# import shutil
# import re
# from datetime import datetime, timedelta, date
# from typing import Dict, List, Optional
# import numpy as np
# from fastapi import (
# FastAPI,
# UploadFile,
# File,
# HTTPException,
# Depends,
# Header,
# Request,
# Form,
# )
# from fastapi.responses import FileResponse, JSONResponse
# from pydantic import BaseModel
# from PIL import Image
# import cv2
# import logging
# from bson import ObjectId
# from pymongo import MongoClient
# import time
# logging.basicConfig(level=logging.INFO)
# log = logging.getLogger("api")
# from src.core import process_inpaint
# # Directories (use writable space on HF Spaces)
# BASE_DIR = os.environ.get("DATA_DIR", "/data")
# if not os.path.isdir(BASE_DIR):
# # Fallback to /tmp if /data not available
# BASE_DIR = "/tmp"
# UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
# OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")
# os.makedirs(UPLOAD_DIR, exist_ok=True)
# os.makedirs(OUTPUT_DIR, exist_ok=True)
# # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open
# ENV_TOKEN = os.environ.get("API_TOKEN")
# app = FastAPI(title="Photo Object Removal API", version="1.0.0")
# # In-memory stores
# file_store: Dict[str, Dict[str, str]] = {}
# logs: List[Dict[str, str]] = []
# MONGO_URI = "mongodb+srv://harilogicgo_db_user:[email protected]/?appName=KiddoImages"
# mongo_client = MongoClient(MONGO_URI)
# mongo_db = mongo_client["object_remover"]
# mongo_logs = mongo_db["api_logs"]
# ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN")
# DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984"
# admin_media_clicks = None
# def _init_admin_mongo() -> None:
# global admin_media_clicks
# if not ADMIN_MONGO_URI:
# log.info("Admin Mongo URI not provided; media click logging disabled")
# return
# try:
# admin_client = MongoClient(ADMIN_MONGO_URI)
# # get_default_database() extracts database from connection string (e.g., /adminPanel)
# admin_db = admin_client.get_default_database()
# if admin_db is None:
# # Fallback if no database in URI
# admin_db = admin_client["admin"]
# log.warning("No database in connection string, defaulting to 'admin'")
# admin_media_clicks = admin_db["media_clicks"]
# log.info(
# "Admin media click logging initialized: db=%s collection=%s",
# admin_db.name,
# admin_media_clicks.name,
# )
# try:
# admin_media_clicks.drop_index("user_id_1_header_1_media_id_1")
# log.info("Dropped legacy index user_id_1_header_1_media_id_1")
# except Exception as idx_err:
# # Index drop failure is non-critical (often permission issue)
# if "Unauthorized" not in str(idx_err):
# log.info("Skipping legacy index drop: %s", idx_err)
# except Exception as err:
# log.error("Failed to init admin Mongo client: %s", err)
# admin_media_clicks = None
# _init_admin_mongo()
# def _admin_logging_status() -> Dict[str, object]:
# if admin_media_clicks is None:
# return {
# "enabled": False,
# "db": None,
# "collection": None,
# }
# return {
# "enabled": True,
# "db": admin_media_clicks.database.name,
# "collection": admin_media_clicks.name,
# }
# def _build_ai_edit_daily_count(
# existing: Optional[List[Dict[str, object]]],
# today: date,
# ) -> List[Dict[str, object]]:
# """
# Build / extend the ai_edit_daily_count array with the following rules:
# - Case A (no existing data): return [{date: today, count: 1}]
# - Case B (today already recorded): return list unchanged
# - Case C (gap in days): fill missing days with count=0 and append today with count=1
# Additionally, the returned list is capped to the most recent 32 entries.
# The stored "date" value is a midnight UTC (naive UTC) datetime for the given day.
# """
# def _to_date_only(value: object) -> date:
# if isinstance(value, datetime):
# return value.date()
# if isinstance(value, date):
# return value
# # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime
# try:
# text = str(value)
# if len(text) == 10:
# return datetime.strptime(text, "%Y-%m-%d").date()
# return datetime.fromisoformat(text).date()
# except Exception:
# # If parsing fails, just treat as today to avoid crashing
# return today
# # Case A: first ever use (no array yet)
# if not existing:
# return [
# {
# "date": datetime(today.year, today.month, today.day),
# "count": 1,
# }
# ]
# # Work on a shallow copy so we don't mutate original in-place
# result: List[Dict[str, object]] = list(existing)
# last_entry = result[-1] if result else None
# if not last_entry or "date" not in last_entry:
# # If structure is unexpected, re-initialize safely
# return [
# {
# "date": datetime(today.year, today.month, today.day),
# "count": 1,
# }
# ]
# last_date = _to_date_only(last_entry["date"])
# # If somehow the last stored date is in the future, do nothing to avoid corrupting history
# if last_date > today:
# return result
# # Case B: today's date already present as the last entry → unchanged
# if last_date == today:
# return result
# # Case C: there is a gap, fill missing days with count=0 and append today with count=1
# cursor = last_date + timedelta(days=1)
# while cursor < today:
# result.append(
# {
# "date": datetime(cursor.year, cursor.month, cursor.day),
# "count": 0,
# }
# )
# cursor += timedelta(days=1)
# # Finally add today's presence indicator
# result.append(
# {
# "date": datetime(today.year, today.month, today.day),
# "count": 1,
# }
# )
# # Enforce 32-entry limit (keep the most recent 32 days)
# if len(result) > 32:
# result = result[-32:]
# return result
# def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None:
# if not ENV_TOKEN:
# return
# if authorization is None or not authorization.lower().startswith("bearer "):
# raise HTTPException(status_code=401, detail="Unauthorized")
# token = authorization.split(" ", 1)[1]
# if token != ENV_TOKEN:
# raise HTTPException(status_code=403, detail="Forbidden")
# class InpaintRequest(BaseModel):
# image_id: str
# mask_id: str
# invert_mask: bool = True # True => selected/painted area is removed
# passthrough: bool = False # If True, return the original image unchanged
# user_id: Optional[str] = None
# category_id: Optional[str] = None
# class SimpleRemoveRequest(BaseModel):
# image_id: str # Image with pink/magenta segments to remove
# def _coerce_object_id(value: Optional[str]) -> ObjectId:
# if value is None:
# return ObjectId()
# value_str = str(value).strip()
# if re.fullmatch(r"[0-9a-fA-F]{24}", value_str):
# return ObjectId(value_str)
# if value_str.isdigit():
# hex_str = format(int(value_str), "x")
# if len(hex_str) > 24:
# hex_str = hex_str[-24:]
# hex_str = hex_str.rjust(24, "0")
# return ObjectId(hex_str)
# return ObjectId()
# def _coerce_category_id(category_id: Optional[str]) -> ObjectId:
# raw = category_id or DEFAULT_CATEGORY_ID
# raw_str = str(raw).strip()
# if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str):
# return ObjectId(raw_str)
# return _coerce_object_id(raw_str)
# def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None:
# """Log to admin media_clicks collection only if user_id is provided."""
# if admin_media_clicks is None:
# return
# # Only log if user_id is provided (not None/empty)
# if not user_id or not user_id.strip():
# return
# try:
# user_obj = _coerce_object_id(user_id)
# category_obj = _coerce_category_id(category_id)
# now = datetime.utcnow()
# today = now.date()
# doc = admin_media_clicks.find_one({"userId": user_obj})
# if doc:
# existing_daily = doc.get("ai_edit_daily_count")
# updated_daily = _build_ai_edit_daily_count(existing_daily, today)
# categories = doc.get("categories") or []
# if any(cat.get("categoryId") == category_obj for cat in categories):
# # Category exists: increment click_count and ai_edit_complete, update dates
# admin_media_clicks.update_one(
# {"_id": doc["_id"], "categories.categoryId": category_obj},
# {
# "$inc": {
# "categories.$.click_count": 1,
# "ai_edit_complete": 1, # $inc handles missing fields (backward compatible)
# },
# "$set": {
# "categories.$.lastClickedAt": now,
# "updatedAt": now,
# "ai_edit_last_date": now,
# "ai_edit_daily_count": updated_daily,
# },
# },
# )
# else:
# # New category to existing document: push category, increment ai_edit_complete
# admin_media_clicks.update_one(
# {"_id": doc["_id"]},
# {
# "$push": {
# "categories": {
# "categoryId": category_obj,
# "click_count": 1,
# "lastClickedAt": now,
# }
# },
# "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields
# "$set": {
# "updatedAt": now,
# "ai_edit_last_date": now,
# "ai_edit_daily_count": updated_daily,
# },
# },
# )
# else:
# # New user: create document with default ai_edit_complete=0, then increment to 1
# daily_for_new = _build_ai_edit_daily_count(None, today)
# admin_media_clicks.update_one(
# {"userId": user_obj},
# {
# "$setOnInsert": {
# "userId": user_obj,
# "categories": [
# {
# "categoryId": category_obj,
# "click_count": 1,
# "lastClickedAt": now,
# }
# ],
# "createdAt": now,
# "updatedAt": now,
# "ai_edit_complete": 0, # Default for new users
# "ai_edit_daily_count": daily_for_new,
# },
# "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use
# "$set": {
# "updatedAt": now,
# "ai_edit_last_date": now,
# },
# },
# upsert=True,
# )
# except Exception as err:
# err_str = str(err)
# if "Unauthorized" in err_str or "not authorized" in err_str.lower():
# log.warning(
# "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. "
# "Check MongoDB user permissions.",
# admin_media_clicks.database.name,
# admin_media_clicks.name,
# )
# else:
# log.warning("Admin media click logging failed: %s", err)
# @app.get("/")
# def root() -> Dict[str, object]:
# return {
# "name": "Photo Object Removal API",
# "status": "ok",
# "endpoints": {
# "GET /health": "health check",
# "POST /upload-image": "form-data: image=file",
# "POST /upload-mask": "form-data: mask=file",
# "POST /inpaint": "JSON: {image_id, mask_id}",
# "POST /inpaint-multipart": "form-data: image=file, mask=file",
# "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)",
# "GET /download/{filename}": "download result image",
# "GET /result/{filename}": "view result image in browser",
# "GET /logs": "recent uploads/results",
# },
# "auth": "set API_TOKEN env var to require Authorization: Bearer <token> (except /health)",
# }
# @app.get("/health")
# def health() -> Dict[str, str]:
# return {"status": "healthy"}
# @app.get("/logging-status")
# def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]:
# """Helper endpoint to verify admin media logging wiring (no secrets exposed)."""
# return _admin_logging_status()
# @app.post("/upload-image")
# def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]:
# ext = os.path.splitext(image.filename)[1] or ".png"
# file_id = str(uuid.uuid4())
# stored_name = f"{file_id}{ext}"
# stored_path = os.path.join(UPLOAD_DIR, stored_name)
# with open(stored_path, "wb") as f:
# shutil.copyfileobj(image.file, f)
# file_store[file_id] = {
# "type": "image",
# "filename": image.filename,
# "stored_name": stored_name,
# "path": stored_path,
# "timestamp": datetime.utcnow().isoformat(),
# }
# logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()})
# return {"id": file_id, "filename": image.filename}
# @app.post("/upload-mask")
# def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]:
# ext = os.path.splitext(mask.filename)[1] or ".png"
# file_id = str(uuid.uuid4())
# stored_name = f"{file_id}{ext}"
# stored_path = os.path.join(UPLOAD_DIR, stored_name)
# with open(stored_path, "wb") as f:
# shutil.copyfileobj(mask.file, f)
# file_store[file_id] = {
# "type": "mask",
# "filename": mask.filename,
# "stored_name": stored_name,
# "path": stored_path,
# "timestamp": datetime.utcnow().isoformat(),
# }
# logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()})
# return {"id": file_id, "filename": mask.filename}
# def _load_rgba_image(path: str) -> Image.Image:
# img = Image.open(path)
# return img.convert("RGBA")
# def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray:
# """
# Convert mask image to RGBA format (black/white mask).
# Standard convention: white (255) = area to remove, black (0) = area to keep
# Returns RGBA with white in RGB channels where removal is needed, alpha=255
# """
# if img.mode != "RGBA":
# # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep
# gray = img.convert("L")
# arr = np.array(gray)
# # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep
# mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8)
# rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8)
# rgba[:, :, 0] = mask_bw # R
# rgba[:, :, 1] = mask_bw # G
# rgba[:, :, 2] = mask_bw # B
# rgba[:, :, 3] = 255 # Fully opaque
# log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)")
# return rgba
# # For RGBA: check if alpha channel is meaningful
# arr = np.array(img)
# alpha = arr[:, :, 3]
# rgb = arr[:, :, :3]
# # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values
# if alpha.mean() > 200:
# # Use RGB to determine mask: white/bright in RGB = remove
# gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
# # Also detect magenta specifically
# magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255
# mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta)
# rgba = arr.copy()
# rgba[:, :, 0] = mask_bw # R
# rgba[:, :, 1] = mask_bw # G
# rgba[:, :, 2] = mask_bw # B
# rgba[:, :, 3] = 255 # Fully opaque
# log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)")
# return rgba
# # Alpha channel encodes the mask - convert to RGB-based
# # Transparent areas (alpha < 128) = remove, Opaque areas = keep
# mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8)
# rgba = arr.copy()
# rgba[:, :, 0] = mask_bw
# rgba[:, :, 1] = mask_bw
# rgba[:, :, 2] = mask_bw
# rgba[:, :, 3] = 255
# log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)")
# return rgba
# @app.post("/inpaint")
# def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]:
# start_time = time.time()
# status = "success"
# error_msg = None
# output_name = None
# try:
# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
# raise HTTPException(status_code=404, detail="image_id not found")
# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
# raise HTTPException(status_code=404, detail="mask_id not found")
# img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
# mask_img = Image.open(file_store[req.mask_id]["path"])
# mask_rgba = _load_rgba_mask_from_image(mask_img)
# if req.passthrough:
# result = np.array(img_rgba.convert("RGB"))
# else:
# result = process_inpaint(
# np.array(img_rgba),
# mask_rgba,
# invert_mask=req.invert_mask
# )
# output_name = f"output_{uuid.uuid4().hex}.png"
# output_path = os.path.join(OUTPUT_DIR, output_name)
# Image.fromarray(result).save(
# output_path, "PNG", optimize=False, compress_level=1
# )
# log_media_click(req.user_id, req.category_id)
# return {"result": output_name}
# except Exception as e:
# status = "fail"
# error_msg = str(e)
# raise
# finally:
# end_time = time.time()
# response_time_ms = (end_time - start_time) * 1000
# log_doc = {
# "input_image_id": req.image_id,
# "input_mask_id": req.mask_id,
# "output_id": output_name,
# "status": status,
# "timestamp": datetime.utcnow(),
# "ts": int(time.time()),
# "response_time_ms": response_time_ms
# }
# if error_msg:
# log_doc["error"] = error_msg
# try:
# mongo_logs.insert_one(log_doc)
# except Exception as mongo_err:
# log.error(f"Mongo log insert failed: {mongo_err}")
# # @app.post("/inpaint")
# # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]:
# # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
# # raise HTTPException(status_code=404, detail="image_id not found")
# # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
# # raise HTTPException(status_code=404, detail="mask_id not found")
# # img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
# # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA
# # mask_rgba = _load_rgba_mask_from_image(mask_img)
# # # Debug: check mask before processing
# # white_pixels = int((mask_rgba[:,:,0] > 128).sum())
# # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}")
# # if req.passthrough:
# # result = np.array(img_rgba.convert("RGB"))
# # else:
# # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask)
# # result_name = f"output_{uuid.uuid4().hex}.png"
# # result_path = os.path.join(OUTPUT_DIR, result_name)
# # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()})
# # return {"result": result_name}
# @app.post("/inpaint-url")
# def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]:
# """Same as /inpaint but returns a JSON with a public download URL instead of image bytes."""
# start_time = time.time()
# status = "success"
# error_msg = None
# result_name = None
# try:
# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image":
# raise HTTPException(status_code=404, detail="image_id not found")
# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask":
# raise HTTPException(status_code=404, detail="mask_id not found")
# img_rgba = _load_rgba_image(file_store[req.image_id]["path"])
# mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA
# mask_rgba = _load_rgba_mask_from_image(mask_img)
# if req.passthrough:
# result = np.array(img_rgba.convert("RGB"))
# else:
# result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask)
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# url = str(request.url_for("download_file", filename=result_name))
# logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()})
# log_media_click(req.user_id, req.category_id)
# return {"result": result_name, "url": url}
# except Exception as e:
# status = "fail"
# error_msg = str(e)
# raise
# finally:
# # Always log to regular MongoDB (mandatory)
# end_time = time.time()
# response_time_ms = (end_time - start_time) * 1000
# log_doc = {
# "input_image_id": req.image_id,
# "input_mask_id": req.mask_id,
# "output_id": result_name,
# "status": status,
# "timestamp": datetime.utcnow(),
# "ts": int(time.time()),
# "response_time_ms": response_time_ms,
# }
# if error_msg:
# log_doc["error"] = error_msg
# try:
# mongo_logs.insert_one(log_doc)
# except Exception as mongo_err:
# log.error("Mongo log insert failed: %s", mongo_err)
# @app.post("/inpaint-multipart")
# def inpaint_multipart(
# image: UploadFile = File(...),
# mask: UploadFile = File(...),
# request: Request = None,
# invert_mask: bool = True,
# mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original)
# passthrough: bool = False,
# user_id: Optional[str] = Form(None),
# category_id: Optional[str] = Form(None),
# _: None = Depends(bearer_auth),
# ) -> Dict[str, str]:
# start_time = time.time()
# status = "success"
# error_msg = None
# result_name = None
# try:
# # Load in-memory
# img = Image.open(image.file).convert("RGBA")
# m = Image.open(mask.file).convert("RGBA")
# if passthrough:
# # Just echo the input image, ignore mask
# result = np.array(img.convert("RGB"))
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# url: Optional[str] = None
# try:
# if request is not None:
# url = str(request.url_for("download_file", filename=result_name))
# except Exception:
# url = None
# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()}
# if url:
# entry["url"] = url
# logs.append(entry)
# resp: Dict[str, str] = {"result": result_name}
# if url:
# resp["url"] = url
# log_media_click(user_id, category_id)
# return resp
# if mask_is_painted:
# # Auto-detect pink/magenta paint and convert to black/white mask
# # White pixels = areas to remove, Black pixels = areas to keep
# log.info("Auto-detecting pink/magenta paint from uploaded image...")
# m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB)
# # Detect pink/magenta using fixed RGB bounds (same as /remove-pink)
# lower = np.array([150, 0, 100], dtype=np.uint8)
# upper = np.array([255, 120, 255], dtype=np.uint8)
# magenta_detected = (
# (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) &
# (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) &
# (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2])
# ).astype(np.uint8) * 255
# # Method 2: Also check if original image was provided to find differences
# if img is not None:
# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB)
# if img_rgb.shape == m_rgb.shape:
# diff = cv2.absdiff(img_rgb, m_rgb)
# gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY)
# # Any significant difference (>50) could be paint
# diff_mask = (gray_diff > 50).astype(np.uint8) * 255
# # Combine with magenta detection
# binmask = cv2.bitwise_or(magenta_detected, diff_mask)
# else:
# binmask = magenta_detected
# else:
# # No original image provided, use magenta detection only
# binmask = magenta_detected
# # Clean up the mask: remove noise and fill small holes
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# # Close small gaps in the mask
# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2)
# # Remove small noise
# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1)
# nonzero = int((binmask > 0).sum())
# log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero)
# # If very few pixels detected, assume the user may already be providing a BW mask
# # and proceed without forcing strict detection
# if nonzero < 50:
# log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.")
# result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB"))
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"}
# # Create binary mask: Pink pixels → white (255), Everything else → black (0)
# # Encode in RGBA format for process_inpaint
# # process_inpaint does: mask = 255 - mask[:,:,3]
# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove)
# # alpha=255 (opaque/keep) → becomes 0 (black/keep)
# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8)
# mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization)
# mask_rgba[:, :, 1] = binmask # G: white where pink
# mask_rgba[:, :, 2] = binmask # B: white where pink
# # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha
# mask_rgba[:, :, 3] = 255 - binmask
# log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)",
# nonzero, binmask.shape[0] * binmask.shape[1] - nonzero)
# else:
# mask_rgba = _load_rgba_mask_from_image(m)
# # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly
# actual_invert = invert_mask # Use default True for painted masks
# log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted)
# result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert)
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# url: Optional[str] = None
# try:
# if request is not None:
# url = str(request.url_for("download_file", filename=result_name))
# except Exception:
# url = None
# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()}
# if url:
# entry["url"] = url
# logs.append(entry)
# resp: Dict[str, str] = {"result": result_name}
# if url:
# resp["url"] = url
# log_media_click(user_id, category_id)
# return resp
# except Exception as e:
# status = "fail"
# error_msg = str(e)
# raise
# finally:
# # Always log to regular MongoDB (mandatory)
# end_time = time.time()
# response_time_ms = (end_time - start_time) * 1000
# log_doc = {
# "endpoint": "inpaint-multipart",
# "output_id": result_name,
# "status": status,
# "timestamp": datetime.utcnow(),
# "ts": int(time.time()),
# "response_time_ms": response_time_ms,
# }
# if error_msg:
# log_doc["error"] = error_msg
# try:
# mongo_logs.insert_one(log_doc)
# except Exception as mongo_err:
# log.error("Mongo log insert failed: %s", mongo_err)
# @app.post("/remove-pink")
# def remove_pink_segments(
# image: UploadFile = File(...),
# request: Request = None,
# user_id: Optional[str] = Form(None),
# category_id: Optional[str] = Form(None),
# _: None = Depends(bearer_auth),
# ) -> Dict[str, str]:
# """
# Simple endpoint: upload an image with pink/magenta segments to remove.
# - Pink/Magenta segments → automatically removed (white in mask)
# - Everything else → automatically kept (black in mask)
# Just paint pink/magenta on areas you want to remove, upload the image, and it works!
# """
# start_time = time.time()
# status = "success"
# error_msg = None
# result_name = None
# try:
# log.info(f"Simple remove-pink: processing image {image.filename}")
# # Load the image (with pink paint on it)
# img = Image.open(image.file).convert("RGBA")
# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB)
# # Auto-detect pink/magenta segments to remove
# # Pink/Magenta → white in mask (remove)
# # Everything else (natural image colors, including dark areas) → black in mask (keep)
# # Detect pink/magenta using fixed RGB bounds per requested logic
# lower = np.array([150, 0, 100], dtype=np.uint8)
# upper = np.array([255, 120, 255], dtype=np.uint8)
# binmask = (
# (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) &
# (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) &
# (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2])
# ).astype(np.uint8) * 255
# # Clean up the pink mask
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2)
# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1)
# nonzero = int((binmask > 0).sum())
# total_pixels = binmask.shape[0] * binmask.shape[1]
# log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove")
# # Debug: log bounds used
# log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]")
# if nonzero < 50:
# log.error("No pink segments detected! Returning original image.")
# result = np.array(img.convert("RGB"))
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# return {
# "result": result_name,
# "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)."
# }
# # Create binary mask: Pink pixels → white (255), Everything else → black (0)
# # Encode in RGBA format that process_inpaint expects
# # process_inpaint does: mask = 255 - mask[:,:,3]
# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove)
# # alpha=255 (opaque/keep) → becomes 0 (black/keep)
# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8)
# # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization
# mask_rgba[:, :, 0] = binmask # R: white where pink
# mask_rgba[:, :, 1] = binmask # G: white where pink
# mask_rgba[:, :, 2] = binmask # B: white where pink
# # Alpha: 0 (transparent) where pink → will become white after 255-alpha
# # 255 (opaque) everywhere else → will become black after 255-alpha
# mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255
# # Verify mask encoding
# alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum())
# alpha_255_count = int((mask_rgba[:,:,3] == 255).sum())
# total_pixels = binmask.shape[0] * binmask.shape[1]
# log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)")
# log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)")
# # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting!
# # Remove pink from the original image before processing
# # Create a clean version: where pink was detected, keep original image colors
# img_clean = np.array(img.convert("RGBA"))
# # Where pink is detected, we want to inpaint, so we can leave it (or blend it out)
# # Actually, the model will inpaint over those areas, so we can pass the original
# # But for better results, we might want to remove the pink overlay first
# # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal
# log.info(f"Starting inpainting process...")
# result = process_inpaint(img_clean, mask_rgba, invert_mask=True)
# log.info(f"Inpainting complete, result shape: {result.shape}")
# result_name = f"output_{uuid.uuid4().hex}.png"
# result_path = os.path.join(OUTPUT_DIR, result_name)
# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1)
# url: Optional[str] = None
# try:
# if request is not None:
# url = str(request.url_for("download_file", filename=result_name))
# except Exception:
# url = None
# logs.append({
# "result": result_name,
# "filename": image.filename,
# "pink_pixels": nonzero,
# "timestamp": datetime.utcnow().isoformat()
# })
# resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)}
# if url:
# resp["url"] = url
# log_media_click(user_id, category_id)
# return resp
# except Exception as e:
# status = "fail"
# error_msg = str(e)
# raise
# finally:
# # Always log to regular MongoDB (mandatory)
# end_time = time.time()
# response_time_ms = (end_time - start_time) * 1000
# log_doc = {
# "endpoint": "remove-pink",
# "output_id": result_name,
# "status": status,
# "timestamp": datetime.utcnow(),
# "ts": int(time.time()),
# "response_time_ms": response_time_ms,
# }
# if error_msg:
# log_doc["error"] = error_msg
# try:
# mongo_logs.insert_one(log_doc)
# except Exception as mongo_err:
# log.error("Mongo log insert failed: %s", mongo_err)
# @app.get("/download/{filename}")
# def download_file(filename: str):
# path = os.path.join(OUTPUT_DIR, filename)
# if not os.path.isfile(path):
# raise HTTPException(status_code=404, detail="file not found")
# return FileResponse(path)
# @app.get("/result/{filename}")
# def view_result(filename: str):
# """View result image directly in browser (same as download but with proper content-type for viewing)"""
# path = os.path.join(OUTPUT_DIR, filename)
# if not os.path.isfile(path):
# raise HTTPException(status_code=404, detail="file not found")
# return FileResponse(path, media_type="image/png")
# @app.get("/logs")
# def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse:
# return JSONResponse(content=logs)