doc-ai-api / Libraries /PDF_ExtractData.py
LongK171's picture
Add all
dbe2c62
import re
from typing import Dict, Any
from collections import Counter, defaultdict
from . import Common_TextProcess as TextProcess
from . import Common_PdfProcess as PdfProcess
# ===============================
# 1. Utils -> class U1_Utils
# ===============================
class U1_Utils:
# ===== Hàm tự động thu thập tên riêng =====
@staticmethod
def collect_proper_names(lines, min_count=10):
title_words = []
for line in lines:
text = line.get("Text", "")
words = re.findall(r"[A-Za-zÀ-ỹĐđ0-9]+", text)
if not words:
continue
# Bỏ qua từ đầu tiên
for w in words[1:]:
if w.istitle():
clean_w = TextProcess.normalize_word(w)
if clean_w:
title_words.append(clean_w)
counter = Counter(title_words)
proper_names = {TextProcess.normalize_word(w) for w, cnt in counter.items() if cnt >= min_count}
return proper_names
@staticmethod
def extract_marker(text, patterns):
for pattern_info in patterns["markers"]:
match = pattern_info["pattern"].match(text)
if match:
marker_text = re.sub(r'^\s+', '', match.group(0))
marker_text = re.sub(r'\s+$', ' ', marker_text)
return {"marker_text": marker_text}
return {"marker_text": None}
@staticmethod
def format_marker(marker_text, patterns):
"""
Chuẩn hoá MarkerText
"""
if not marker_text:
return None
formatted = marker_text
formatted = re.sub(r'\b[0-9]+\b', '123', formatted)
formatted = re.sub(r'\b[IVXLC]+\b', 'XVI', formatted)
parts = re.split(r'(\W+)', formatted)
formatted_parts = []
for part in parts:
if re.match(r'(\W+)', part):
formatted_parts.append(part)
continue
if part.lower() in patterns["keywords_set"]:
formatted_parts.append(part)
elif re.match(r'^[a-z]$', part) or re.match(r'^[a-zđêôơư]$', part):
formatted_parts.append('abc')
elif re.match(r'^[A-Z]$', part) or re.match(r'^[A-ZĐÊÔƠƯ]$', part):
formatted_parts.append('ABC')
else:
formatted_parts.append(part)
return ''.join(formatted_parts)
# ===== Hàm chuẩn hoá số La Mã =====
@staticmethod
def normalizeRomans(lines, mode="marker", replace_with="ABC"):
format_groups = defaultdict(list)
for idx, line in enumerate(lines):
fmt = line.get("MarkerType")
marker = line.get("MarkerText")
if fmt and marker:
format_groups[fmt].append((idx, marker))
# --- kiểm tra MarkerType ---
if mode == "marker":
for fmt, group in format_groups.items():
roman_markers = []
for idx, marker in group:
m = re.search(r'\b([IVXLC]+)\b', marker)
if m and TextProcess.is_roman(m.group(1)):
roman_markers.append((idx, m.group(1)))
else:
break
if roman_markers:
roman_numbers = [TextProcess.roman_to_int(rm[1]) for rm in roman_markers]
expected = list(range(min(roman_numbers), max(roman_numbers) + 1))
if sorted(roman_numbers) != expected:
for idx, _ in roman_markers:
lines[idx]["MarkerType"] = re.sub(r'\b[IVXLC]+\b', replace_with, lines[idx]["MarkerType"])
# --- Chuẩn hoá toàn bộ Text/MarkerText ---
elif mode == "text":
for line in lines:
for key in ["Text", "MarkerText", "MarkerType"]:
if line.get(key):
line[key] = re.sub(r'\b[IVXLC]+\b', replace_with, line[key])
return lines
# ===============================
# 2. Word-level functions (mới) -> class U2_Word
# ===============================
class U2_Word:
@staticmethod
def caseStyle(word_text: str) -> int:
"""CaseStyle cho từ: 3000 (UPPER), 2000 (Title), 1000 (khác)"""
clean = re.sub(r'[^A-Za-zÀ-ỹà-ỹ0-9]', '', word_text)
if clean and clean.isupper():
return 3000
if clean and clean.istitle():
return 2000
return 1000
@staticmethod
def buildStyle(word_text, span):
"""Style gộp = CaseStyle + FontStyle (100,10,1)"""
cs = U2_Word.caseStyle(word_text)
b, i, u = PdfProcess.fontFlags(span)
fs = (100 if b else 0) + (10 if i else 0) + (1 if u else 0)
return cs + fs
@staticmethod
def getWordStyle(line, index: int):
"""Lấy Style của từ tại vị trí index."""
words = PdfProcess.extractWords(line)
if -len(words) <= index < len(words):
word, span = words[index]
return U2_Word.buildStyle(word, span)
return 0
# ===============================
# 3. Line-level functions (mới) -> class U3_Line
# ===============================
class U3_Line:
@staticmethod
def getPageGeneralSize(page):
"""[height, width] của trang"""
return [round(page.rect.height, 1), round(page.rect.width, 1)]
@staticmethod
def getLineText(line):
"""Text đầy đủ của line"""
return line.get("text", "")
@staticmethod
def getLineStyle(line, exceptions=None):
"""
Style của line = CaseStyle (min trên từ hợp lệ) + FontStyle (AND spans).
"""
words = line.get("words", [])
spans = line.get("spans", [])
# Gom exceptions
exception_texts = set()
if exceptions:
exception_texts = (
set(exceptions.get("common_words", [])) |
set(exceptions.get("proper_names", [])) |
set(exceptions.get("abbreviations", []))
)
# ===== CaseStyle =====
cs_values = []
for w, _ in words:
clean_w = TextProcess.normalize_word(w)
if not clean_w:
continue
if clean_w in exception_texts or TextProcess.is_abbreviation(clean_w):
continue
cs_values.append(U2_Word.caseStyle(clean_w))
cs_line = min(cs_values) if cs_values else 1000
# ===== FontStyle =====
if spans:
bold_all = italic_all = underline_all = True
for s in spans:
b, i, u = PdfProcess.fontFlags(s)
bold_all &= b
italic_all &= i
underline_all &= u
fs_line = (100 if bold_all else 0) + (10 if italic_all else 0) + (1 if underline_all else 0)
else:
fs_line = 0
return cs_line + fs_line
# ===============================
# 4. Compatibility wrappers -> class U4_Compat
# ===============================
class U4_Compat:
@staticmethod
def getText(line):
"""Alias cũ: Text của line"""
return U3_Line.getLineText(line)
@staticmethod
def getCoords(line):
"""Alias cũ: Coord của line, giữ tuple (x0, x1, xm, y0, y1)"""
return PdfProcess.getLineCoord(line)
@staticmethod
def getFirstWord(line):
"""Giữ API cũ: trả {Text, Style, FontSize} của từ đầu"""
return {
"Text": PdfProcess.getWordText(line, 0),
"Style": U2_Word.getWordStyle(line, 0),
"FontSize": PdfProcess.getWordFontSize(line, 0),
}
@staticmethod
def getLastWord(line):
"""Giữ API cũ: trả {Text, Style, FontSize} của từ cuối"""
return {
"Text": PdfProcess.getWordText(line, -1),
"Style": U2_Word.getWordStyle(line, -1),
"FontSize": PdfProcess.getWordFontSize(line, -1),
}
# ===============================
# 5. Marker / Style (line-level) -> class U5_MarkerStyle
# ===============================
class U5_MarkerStyle:
@staticmethod
def getMarker(text, patterns):
info = U1_Utils.extract_marker(text, patterns)
marker_text = info.get("marker_text")
marker_type = None
if marker_text:
# Giữ sửa lỗi xử lý dấu '+'
marker_text_cleaned = re.sub(r'([A-Za-z0-9ĐÊÔƠƯđêôơư])\+(?=\W|$)', r'\1', marker_text)
marker_type = U1_Utils.format_marker(marker_text_cleaned, patterns)
return marker_text, marker_type
@staticmethod
def getFontSize(line):
"""
Mean FontSize trên spans (logic cũ) — vẫn giữ cho compatibility nếu còn chỗ gọi.
"""
spans = line.get("spans", [])
if spans:
valid_spans = [s for s in spans if s.get("text", "").strip()]
if valid_spans:
sizes = [s.get("size", 12.0) for s in valid_spans]
else:
sizes = [s.get("size", 12.0) for s in spans]
avg = sum(sizes) / len(sizes)
return round(avg * 2) / 2
return 12.0
# ===============================
# 6. Tổng hợp toàn văn bản -> class U6_Document
# ===============================
class U6_Document:
@staticmethod
def getTextStatus(pdf_doc, exceptions, patterns):
doc = pdf_doc
general = {"pageGeneralSize": U3_Line.getPageGeneralSize(doc[0])}
lines = []
for i, page in enumerate(doc):
text_dict = page.get_text("dict")
for block in text_dict["blocks"]:
if "lines" in block:
for l in block["lines"]:
text = "".join(span["text"] for span in l["spans"]).strip()
if not text:
continue
# Marker
marker_text, marker_type = U5_MarkerStyle.getMarker(text, patterns)
# Style/FontSize/Coord
line_obj = {"text": text, "spans": l["spans"]}
style = U3_Line.getLineStyle(line_obj)
fontsize = PdfProcess.getLineFontSize(line_obj)
x0, x1, xm, y0, y1 = PdfProcess.getLineCoord(line_obj)
# Words
words_obj = {
"First": U4_Compat.getFirstWord(line_obj),
"Last": U4_Compat.getLastWord(line_obj)
}
line_dict = {
"Line": len(lines) + 1,
"Text": text,
"MarkerText": marker_text,
"MarkerType": marker_type,
"Style": style,
"FontSize": fontsize,
"Words": words_obj,
"Coords": {"X0": x0, "X1": x1, "XM": xm, "Y0": y0, "Y1": y1}
}
lines.append(line_dict)
return {"general": general, "lines": lines}
# ===============================
# 7. Các hàm set* -> class U7_Setters
# ===============================
class U7_Setters:
@staticmethod
def setCommonStatus(lines, attr, rank=1):
values = [l[attr] for l in lines if l.get(attr) is not None]
counter = Counter(values)
return counter.most_common(rank)
@staticmethod
def setCommonFontSize(lines):
fs, _ = U7_Setters.setCommonStatus(lines, "FontSize", 1)[0]
return round(fs, 1)
@staticmethod
def setCommonFontSizes(lines):
"""
Trả về tất cả FontSize và số lượng của chúng, sắp xếp theo tần suất giảm dần.
"""
values = [l["FontSize"] for l in lines if l.get("FontSize") is not None]
counter = Counter(values)
results = []
for fs, count in counter.most_common(): # trả về tất cả
results.append({"FontSize": round(fs, 1), "Count": count})
return results
@staticmethod
def setCommonMarkers(lines):
total = len(lines)
counter = Counter([l["MarkerType"] for l in lines if l["MarkerType"]])
results = []
for marker, count in counter.most_common(10):
if count >= total * 0.005:
results.append(marker)
else:
break
return results
@staticmethod
def setTextStatus(baseJson):
lines = baseJson["lines"]
pageGeneralSize = baseJson["general"]["pageGeneralSize"]
xStart, yStart, xEnd, yEnd, xMid, yMid = PdfProcess.setPageCoords(lines, pageGeneralSize)
regionWidth, regionHeight = PdfProcess.setPageRegionSize(xStart, yStart, xEnd, yEnd)
commonFontSizes = U7_Setters.setCommonFontSizes(lines)
commonFontSize = U7_Setters.setCommonFontSize(lines)
commonMarkers = U7_Setters.setCommonMarkers(lines)
new_general = {
"pageGeneralSize": baseJson["general"]["pageGeneralSize"],
"pageCoords": {"xStart": xStart, "yStart": yStart, "xEnd": xEnd, "yEnd": yEnd, "xMid": xMid, "yMid": yMid},
"pageRegionWidth": regionWidth,
"pageRegionHeight": regionHeight,
"commonFontSize": commonFontSize,
"commonFontSizes": commonFontSizes,
"commonMarkers": commonMarkers
}
new_lines = []
for i, line in enumerate(lines):
lineWidth, lineHeight = PdfProcess.setLineSize(line)
pos = PdfProcess.setPosition(line, lines[i - 1] if i > 0 else None,
lines[i + 1] if i < len(lines) - 1 else None,
xStart, xEnd, xMid)
pos_dict = {"Left": pos[0], "Right": pos[1], "Mid": pos[2], "Top": pos[3], "Bot": pos[4]}
line_dict = {
**line,
"LineWidth": lineWidth,
"LineHeight": lineHeight,
"Position": pos_dict,
"Align": PdfProcess.setAlign(pos_dict, regionWidth)
}
new_lines.append(line_dict)
return {"general": new_general, "lines": new_lines}
# ===============================
# 8. Các hàm del/reset -> class U8_Cleanup
# ===============================
class U8_Cleanup:
@staticmethod
def delStatus(jsonDict, deleteList):
for line in jsonDict["lines"]:
for attr in deleteList:
if attr in line:
del line[attr]
return jsonDict
@staticmethod
def resetPosition(jsonDict):
lines = jsonDict.get("lines", [])
for i, line in enumerate(lines):
pos = line.get("Position", {})
if "Top" in pos and pos["Top"] < 0:
top_candidates = []
if i > 0:
prev_top = lines[i - 1].get("Position", {}).get("Top")
if prev_top is not None:
top_candidates.append(prev_top)
if i < len(lines) - 1:
next_top = lines[i + 1].get("Position", {}).get("Top")
if next_top is not None:
top_candidates.append(next_top)
if top_candidates:
pos["Top"] = min(top_candidates)
if "Bot" in pos and pos["Bot"] < 0:
bot_candidates = []
if i > 0:
prev_bot = lines[i - 1].get("Position", {}).get("Bot")
if prev_bot is not None:
bot_candidates.append(prev_bot)
if i < len(lines) - 1:
next_bot = lines[i + 1].get("Position", {}).get("Bot")
if next_bot is not None:
bot_candidates.append(next_bot)
if bot_candidates:
pos["Bot"] = min(bot_candidates)
line["Position"] = pos
return jsonDict
@staticmethod
def normalizeFinal(jsonDict):
for line in jsonDict.get("lines", []):
# xử lý Text và MarkerText
if "Text" in line:
line["Text"] = TextProcess.strip_extra_spaces(line["Text"])
if "MarkerText" in line and line["MarkerText"]:
line["MarkerText"] = TextProcess.strip_extra_spaces(line["MarkerText"])
# xử lý word-level
words = line.get("Words", {})
for key in ["First", "Last"]:
if key in words and "Text" in words[key]:
words[key]["Text"] = TextProcess.strip_extra_spaces(words[key]["Text"])
return jsonDict
# ===============================
# 9. Hàm chính extractData (giữ API cũ)
# ===============================
def extractData(pdf_doc, exceptData, markerData, statusData):
# ===== 1. Load JSON theo format đồng bộ =====
exceptions = dict(exceptData)
markers = dict(markerData)
status = dict(statusData)
# ===== 2. Biên dịch markers =====
keywords = markers.get("keywords", [])
title_keywords = '|'.join(re.escape(k[0].upper() + k[1:].lower()) for k in keywords)
upper_keywords = '|'.join(re.escape(k.upper()) for k in keywords)
all_keywords = f"{title_keywords}|{upper_keywords}"
compiled_markers = []
for item in markers.get("markers", []):
pattern_str = item["pattern"].replace("{keywords}", all_keywords)
try:
compiled_pattern = re.compile(pattern_str)
except re.error:
continue
compiled_markers.append({
"pattern": compiled_pattern,
"description": item.get("description", ""),
"type": item.get("type", "")
})
patterns = {
"markers": compiled_markers,
"keywords_set": set(k.lower() for k in keywords)
}
# ===== 3. Xử lý PDF =====
baseJson = U6_Document.getTextStatus(pdf_doc, exceptions, patterns)
baseJson["lines"] = U1_Utils.normalizeRomans(baseJson["lines"])
modifiedJson = U7_Setters.setTextStatus(baseJson)
cleanJson = U8_Cleanup.resetPosition(modifiedJson)
extractedData = U8_Cleanup.delStatus(cleanJson, ["Coords"])
extractedData = U8_Cleanup.normalizeFinal(extractedData)
# ===== 4. Bổ sung tên riêng động =====
proper_names_auto = U1_Utils.collect_proper_names(extractedData["lines"], min_count=10)
proper_names_existing = [p["text"] if isinstance(p, dict) else str(p)
for p in exceptions.get("proper_names", [])]
exceptions["proper_names"] = list(set(proper_names_existing) | proper_names_auto)
return extractedData
class B1Extractor:
"""
Orchestrator theo instance:
- Giữ nguyên quy tắc/thuật toán của extractData cũ.
- exceptions/markers/status và regex markers được nạp/biên dịch 1 lần.
"""
def __init__(
self,
exceptData: Any,
markerData: Any,
statusData: Any,
proper_name_min_count: int = 10,
) -> None:
"""
exceptData / markerData / statusData:
- str: đường dẫn tới JSON theo format đồng bộ (U1_Utils.loadHardcodes)
- dict: dữ liệu đã load sẵn (bỏ qua loadHardcodes)
proper_name_min_count:
- Ngưỡng đếm tên riêng động.
"""
# ---- 1) Nạp exceptions/markers/status (không đổi format) ----
def _ensure_dict(src, wanted=None):
if isinstance(src, dict):
return dict(src)
raise ValueError("Vui lòng truyền dict đã load sẵn thay vì đường dẫn file.")
self.exceptions: Dict[str, Any] = _ensure_dict(
exceptData, wanted=["common_words", "proper_names", "abbreviations"]
)
self.markers: Dict[str, Any] = _ensure_dict(
markerData, wanted=["keywords", "markers"]
)
self.status: Dict[str, Any] = _ensure_dict(statusData)
self.proper_name_min_count = proper_name_min_count
# ---- 2) Biên dịch markers (y như logic cũ) ----
keywords = self.markers.get("keywords", [])
title_keywords = "|".join(re.escape(k[0].upper() + k[1:].lower()) for k in keywords)
upper_keywords = "|".join(re.escape(k.upper()) for k in keywords)
all_keywords = f"{title_keywords}|{upper_keywords}" if keywords else ""
compiled_markers = []
for item in self.markers.get("markers", []):
pattern_str = item.get("pattern", "")
if all_keywords:
pattern_str = pattern_str.replace("{keywords}", all_keywords)
try:
compiled = re.compile(pattern_str)
except re.error:
continue
compiled_markers.append(
{
"pattern": compiled,
"description": item.get("description", ""),
"type": item.get("type", ""),
}
)
self.patterns = {
"markers": compiled_markers,
"keywords_set": set(k.lower() for k in keywords),
}
# ---------- Public API ----------
def extract(self, pdf_doc) -> Dict[str, Any]:
"""
Chạy pipeline extractData cũ cho 1 file PDF.
Trả về extractedData (như trước).
"""
# ===== 3) Trích xuất text & thuộc tính dòng từ PDF =====
baseJson = U6_Document.getTextStatus(pdf_doc, self.exceptions, self.patterns)
# Chuẩn hoá số La Mã (giữ nguyên quy tắc)
baseJson["lines"] = U1_Utils.normalizeRomans(baseJson["lines"])
# ===== 4) Tính toán status/position/align (giữ nguyên) =====
modifiedJson = U7_Setters.setTextStatus(baseJson)
cleanJson = U8_Cleanup.resetPosition(modifiedJson)
extractedData = U8_Cleanup.delStatus(cleanJson, ["Coords"])
extractedData = U8_Cleanup.normalizeFinal(extractedData)
# ===== 5) Bổ sung proper_names động (giữ nguyên tinh thần) =====
proper_names_auto = U1_Utils.collect_proper_names(
extractedData["lines"], min_count=self.proper_name_min_count
)
proper_names_existing = [
p["text"] if isinstance(p, dict) else str(p)
for p in self.exceptions.get("proper_names", [])
]
# Cập nhật vào trạng thái của instance (để chạy nhiều file liên tiếp vẫn tích lũy)
self.exceptions["proper_names"] = list(set(proper_names_existing) | proper_names_auto)
return extractedData