HariLogicgo commited on
Commit
ec15dbc
Β·
1 Parent(s): 4b9cc62
Files changed (1) hide show
  1. app.py +52 -4
app.py CHANGED
@@ -17,6 +17,11 @@ from motor.motor_asyncio import AsyncIOMotorClient
17
  from bson.objectid import ObjectId
18
  from gradio import mount_gradio_app
19
  import uvicorn
 
 
 
 
 
20
 
21
  # -------------------------------------------------
22
  # Paths
@@ -35,6 +40,7 @@ os.makedirs(MODELS_DIR, exist_ok=True)
35
  # Download models once
36
  # -------------------------------------------------
37
  def download_models():
 
38
  inswapper_path = hf_hub_download(
39
  repo_id=REPO_ID,
40
  filename="models/inswapper_128.onnx",
@@ -55,6 +61,7 @@ def download_models():
55
  repo_type="model",
56
  local_dir=MODELS_DIR
57
  )
 
58
  return inswapper_path
59
 
60
  inswapper_path = download_models()
@@ -62,10 +69,12 @@ inswapper_path = download_models()
62
  # -------------------------------------------------
63
  # Initialize face analysis and swapper
64
  # -------------------------------------------------
65
- providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] # Prefer CUDA
 
66
  app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers)
67
  app.prepare(ctx_id=0, det_size=(640, 640))
68
  swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers)
 
69
 
70
  # -------------------------------------------------
71
  # CodeFormer setup
@@ -74,11 +83,13 @@ CODEFORMER_PATH = "CodeFormer/inference_codeformer.py"
74
 
75
  def ensure_codeformer():
76
  if not os.path.exists("CodeFormer"):
 
77
  subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True)
78
  subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True)
79
  subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True)
80
  subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True)
81
  subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True)
 
82
 
83
  ensure_codeformer()
84
 
@@ -94,6 +105,7 @@ database = client.FaceSwap
94
  target_images_collection = database.get_collection("Target_Images")
95
  source_images_collection = database.get_collection("Source_Images")
96
  results_collection = database.get_collection("Results")
 
97
 
98
  # -------------------------------------------------
99
  # Lock for face swap
@@ -104,6 +116,7 @@ swap_lock = threading.Lock()
104
  # Pipeline Function
105
  # -------------------------------------------------
106
  def face_swap_and_enhance(src_img, tgt_img):
 
107
  try:
108
  with swap_lock:
109
  shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
@@ -111,38 +124,56 @@ def face_swap_and_enhance(src_img, tgt_img):
111
  os.makedirs(UPLOAD_DIR, exist_ok=True)
112
  os.makedirs(RESULT_DIR, exist_ok=True)
113
 
 
 
 
 
114
  src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
115
  tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
116
 
 
117
  src_faces = app.get(src_bgr)
118
  tgt_faces = app.get(tgt_bgr)
119
  if not src_faces or not tgt_faces:
120
- return None, None, "❌ Face not detected in one of the images."
 
121
 
122
  unique_name = f"swapped_{uuid.uuid4().hex[:8]}.jpg"
123
  swapped_path = os.path.join(UPLOAD_DIR, unique_name)
 
124
  swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
 
 
 
 
125
  cv2.imwrite(swapped_path, swapped_bgr)
 
126
 
127
  cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {RESULT_DIR} --bg_upsampler realesrgan --face_upsample"
 
128
  result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
129
  if result.returncode != 0:
 
130
  return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
131
 
132
  final_results_dir = os.path.join(RESULT_DIR, "final_results")
133
  if not os.path.exists(final_results_dir):
134
- return None, None, "❌ CodeFormer did not produce final results."
 
135
 
136
  final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")]
137
  if not final_files:
138
- return None, None, "❌ No enhanced image found in final results."
 
139
 
140
  final_path = os.path.join(final_results_dir, final_files[0])
141
  final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
 
142
 
143
  return final_img, final_path, ""
144
 
145
  except Exception as e:
 
146
  return None, None, f"❌ Error: {str(e)}"
147
 
148
  # -------------------------------------------------
@@ -182,6 +213,7 @@ async def health():
182
 
183
  @app.post("/source")
184
  async def upload_source(image: UploadFile = File(...)):
 
185
  contents = await image.read()
186
  doc = {
187
  "filename": image.filename,
@@ -189,10 +221,12 @@ async def upload_source(image: UploadFile = File(...)):
189
  "data": contents
190
  }
191
  result = await source_images_collection.insert_one(doc)
 
192
  return {"source_id": str(result.inserted_id)}
193
 
194
  @app.post("/target")
195
  async def upload_target(image: UploadFile = File(...)):
 
196
  contents = await image.read()
197
  doc = {
198
  "filename": image.filename,
@@ -200,6 +234,7 @@ async def upload_target(image: UploadFile = File(...)):
200
  "data": contents
201
  }
202
  result = await target_images_collection.insert_one(doc)
 
203
  return {"target_id": str(result.inserted_id)}
204
 
205
  class FaceSwapRequest(BaseModel):
@@ -208,24 +243,34 @@ class FaceSwapRequest(BaseModel):
208
 
209
  @app.post("/faceswap")
210
  async def perform_faceswap(request: FaceSwapRequest):
 
211
  source_doc = await source_images_collection.find_one({"_id": ObjectId(request.source_id)})
212
  if not source_doc:
 
213
  raise HTTPException(status_code=404, detail="Source image not found")
214
 
215
  target_doc = await target_images_collection.find_one({"_id": ObjectId(request.target_id)})
216
  if not target_doc:
 
217
  raise HTTPException(status_code=404, detail="Target image not found")
218
 
219
  source_array = np.frombuffer(source_doc["data"], np.uint8)
220
  source_bgr = cv2.imdecode(source_array, cv2.IMREAD_COLOR)
 
 
 
221
  source_rgb = cv2.cvtColor(source_bgr, cv2.COLOR_BGR2RGB)
222
 
223
  target_array = np.frombuffer(target_doc["data"], np.uint8)
224
  target_bgr = cv2.imdecode(target_array, cv2.IMREAD_COLOR)
 
 
 
225
  target_rgb = cv2.cvtColor(target_bgr, cv2.COLOR_BGR2RGB)
226
 
227
  final_img, final_path, err = face_swap_and_enhance(source_rgb, target_rgb)
228
  if err:
 
229
  raise HTTPException(status_code=500, detail=err)
230
 
231
  with open(final_path, "rb") as f:
@@ -239,12 +284,15 @@ async def perform_faceswap(request: FaceSwapRequest):
239
  "data": final_bytes
240
  }
241
  result = await results_collection.insert_one(result_doc)
 
242
  return {"result_id": str(result.inserted_id)}
243
 
244
  @app.get("/download/{result_id}")
245
  async def download_result(result_id: str):
 
246
  doc = await results_collection.find_one({"_id": ObjectId(result_id)})
247
  if not doc:
 
248
  raise HTTPException(status_code=404, detail="Result not found")
249
  return Response(
250
  content=doc["data"],
 
17
  from bson.objectid import ObjectId
18
  from gradio import mount_gradio_app
19
  import uvicorn
20
+ import logging
21
+
22
+ # Set up logging
23
+ logging.basicConfig(level=logging.INFO)
24
+ logger = logging.getLogger(__name__)
25
 
26
  # -------------------------------------------------
27
  # Paths
 
40
  # Download models once
41
  # -------------------------------------------------
42
  def download_models():
43
+ logger.info("Downloading models...")
44
  inswapper_path = hf_hub_download(
45
  repo_id=REPO_ID,
46
  filename="models/inswapper_128.onnx",
 
61
  repo_type="model",
62
  local_dir=MODELS_DIR
63
  )
64
+ logger.info("Models downloaded successfully")
65
  return inswapper_path
66
 
67
  inswapper_path = download_models()
 
69
  # -------------------------------------------------
70
  # Initialize face analysis and swapper
71
  # -------------------------------------------------
72
+ providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
73
+ logger.info(f"Initializing FaceAnalysis with providers: {providers}")
74
  app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers)
75
  app.prepare(ctx_id=0, det_size=(640, 640))
76
  swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers)
77
+ logger.info("FaceAnalysis and swapper initialized")
78
 
79
  # -------------------------------------------------
80
  # CodeFormer setup
 
83
 
84
  def ensure_codeformer():
85
  if not os.path.exists("CodeFormer"):
86
+ logger.info("Cloning CodeFormer repository...")
87
  subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True)
88
  subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True)
89
  subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True)
90
  subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True)
91
  subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True)
92
+ logger.info("CodeFormer setup complete")
93
 
94
  ensure_codeformer()
95
 
 
105
  target_images_collection = database.get_collection("Target_Images")
106
  source_images_collection = database.get_collection("Source_Images")
107
  results_collection = database.get_collection("Results")
108
+ logger.info("MongoDB client initialized")
109
 
110
  # -------------------------------------------------
111
  # Lock for face swap
 
116
  # Pipeline Function
117
  # -------------------------------------------------
118
  def face_swap_and_enhance(src_img, tgt_img):
119
+ logger.info("Starting face swap and enhancement")
120
  try:
121
  with swap_lock:
122
  shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
 
124
  os.makedirs(UPLOAD_DIR, exist_ok=True)
125
  os.makedirs(RESULT_DIR, exist_ok=True)
126
 
127
+ if not isinstance(src_img, np.ndarray) or not isinstance(tgt_img, np.ndarray):
128
+ logger.error("Invalid input images: not numpy arrays")
129
+ return None, None, "❌ Invalid input images: not numpy arrays"
130
+
131
  src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
132
  tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
133
 
134
+ logger.info("Detecting faces...")
135
  src_faces = app.get(src_bgr)
136
  tgt_faces = app.get(tgt_bgr)
137
  if not src_faces or not tgt_faces:
138
+ logger.error("Face not detected in one of the images")
139
+ return None, None, "❌ Face not detected in one of the images"
140
 
141
  unique_name = f"swapped_{uuid.uuid4().hex[:8]}.jpg"
142
  swapped_path = os.path.join(UPLOAD_DIR, unique_name)
143
+ logger.info("Performing face swap...")
144
  swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
145
+ if swapped_bgr is None:
146
+ logger.error("Face swap failed: swapper returned None")
147
+ return None, None, "❌ Face swap failed"
148
+
149
  cv2.imwrite(swapped_path, swapped_bgr)
150
+ logger.info(f"Swapped image saved to {swapped_path}")
151
 
152
  cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {RESULT_DIR} --bg_upsampler realesrgan --face_upsample"
153
+ logger.info(f"Running CodeFormer: {cmd}")
154
  result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
155
  if result.returncode != 0:
156
+ logger.error(f"CodeFormer failed: {result.stderr}")
157
  return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
158
 
159
  final_results_dir = os.path.join(RESULT_DIR, "final_results")
160
  if not os.path.exists(final_results_dir):
161
+ logger.error("CodeFormer did not produce final results")
162
+ return None, None, "❌ CodeFormer did not produce final results"
163
 
164
  final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")]
165
  if not final_files:
166
+ logger.error("No enhanced image found in final results")
167
+ return None, None, "❌ No enhanced image found in final results"
168
 
169
  final_path = os.path.join(final_results_dir, final_files[0])
170
  final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
171
+ logger.info(f"Enhanced image ready at {final_path}")
172
 
173
  return final_img, final_path, ""
174
 
175
  except Exception as e:
176
+ logger.error(f"Face swap error: {str(e)}")
177
  return None, None, f"❌ Error: {str(e)}"
178
 
179
  # -------------------------------------------------
 
213
 
214
  @app.post("/source")
215
  async def upload_source(image: UploadFile = File(...)):
216
+ logger.info(f"Uploading source image: {image.filename}")
217
  contents = await image.read()
218
  doc = {
219
  "filename": image.filename,
 
221
  "data": contents
222
  }
223
  result = await source_images_collection.insert_one(doc)
224
+ logger.info(f"Source image uploaded with ID: {str(result.inserted_id)}")
225
  return {"source_id": str(result.inserted_id)}
226
 
227
  @app.post("/target")
228
  async def upload_target(image: UploadFile = File(...)):
229
+ logger.info(f"Uploading target image: {image.filename}")
230
  contents = await image.read()
231
  doc = {
232
  "filename": image.filename,
 
234
  "data": contents
235
  }
236
  result = await target_images_collection.insert_one(doc)
237
+ logger.info(f"Target image uploaded with ID: {str(result.inserted_id)}")
238
  return {"target_id": str(result.inserted_id)}
239
 
240
  class FaceSwapRequest(BaseModel):
 
243
 
244
  @app.post("/faceswap")
245
  async def perform_faceswap(request: FaceSwapRequest):
246
+ logger.info(f"Starting face swap for source_id: {request.source_id}, target_id: {request.target_id}")
247
  source_doc = await source_images_collection.find_one({"_id": ObjectId(request.source_id)})
248
  if not source_doc:
249
+ logger.error(f"Source image not found: {request.source_id}")
250
  raise HTTPException(status_code=404, detail="Source image not found")
251
 
252
  target_doc = await target_images_collection.find_one({"_id": ObjectId(request.target_id)})
253
  if not target_doc:
254
+ logger.error(f"Target image not found: {request.target_id}")
255
  raise HTTPException(status_code=404, detail="Target image not found")
256
 
257
  source_array = np.frombuffer(source_doc["data"], np.uint8)
258
  source_bgr = cv2.imdecode(source_array, cv2.IMREAD_COLOR)
259
+ if source_bgr is None:
260
+ logger.error("Failed to decode source image")
261
+ raise HTTPException(status_code=500, detail="Failed to decode source image")
262
  source_rgb = cv2.cvtColor(source_bgr, cv2.COLOR_BGR2RGB)
263
 
264
  target_array = np.frombuffer(target_doc["data"], np.uint8)
265
  target_bgr = cv2.imdecode(target_array, cv2.IMREAD_COLOR)
266
+ if target_bgr is None:
267
+ logger.error("Failed to decode target image")
268
+ raise HTTPException(status_code=500, detail="Failed to decode target image")
269
  target_rgb = cv2.cvtColor(target_bgr, cv2.COLOR_BGR2RGB)
270
 
271
  final_img, final_path, err = face_swap_and_enhance(source_rgb, target_rgb)
272
  if err:
273
+ logger.error(f"Face swap failed: {err}")
274
  raise HTTPException(status_code=500, detail=err)
275
 
276
  with open(final_path, "rb") as f:
 
284
  "data": final_bytes
285
  }
286
  result = await results_collection.insert_one(result_doc)
287
+ logger.info(f"Face swap result stored with ID: {str(result.inserted_id)}")
288
  return {"result_id": str(result.inserted_id)}
289
 
290
  @app.get("/download/{result_id}")
291
  async def download_result(result_id: str):
292
+ logger.info(f"Downloading result: {result_id}")
293
  doc = await results_collection.find_one({"_id": ObjectId(result_id)})
294
  if not doc:
295
+ logger.error(f"Result not found: {result_id}")
296
  raise HTTPException(status_code=404, detail="Result not found")
297
  return Response(
298
  content=doc["data"],