Spaces:
Sleeping
Sleeping
| import re | |
| from typing import Dict, Any | |
| from collections import Counter, defaultdict | |
| from . import Common_TextProcess as TextProcess | |
| from . import Common_PdfProcess as PdfProcess | |
| # =============================== | |
| # 1. Utils -> class U1_Utils | |
| # =============================== | |
| class U1_Utils: | |
| # ===== Hàm tự động thu thập tên riêng ===== | |
| def collect_proper_names(lines, min_count=10): | |
| title_words = [] | |
| for line in lines: | |
| text = line.get("Text", "") | |
| words = re.findall(r"[A-Za-zÀ-ỹĐđ0-9]+", text) | |
| if not words: | |
| continue | |
| # Bỏ qua từ đầu tiên | |
| for w in words[1:]: | |
| if w.istitle(): | |
| clean_w = TextProcess.normalize_word(w) | |
| if clean_w: | |
| title_words.append(clean_w) | |
| counter = Counter(title_words) | |
| proper_names = {TextProcess.normalize_word(w) for w, cnt in counter.items() if cnt >= min_count} | |
| return proper_names | |
| def extract_marker(text, patterns): | |
| for pattern_info in patterns["markers"]: | |
| match = pattern_info["pattern"].match(text) | |
| if match: | |
| marker_text = re.sub(r'^\s+', '', match.group(0)) | |
| marker_text = re.sub(r'\s+$', ' ', marker_text) | |
| return {"marker_text": marker_text} | |
| return {"marker_text": None} | |
| def format_marker(marker_text, patterns): | |
| """ | |
| Chuẩn hoá MarkerText | |
| """ | |
| if not marker_text: | |
| return None | |
| formatted = marker_text | |
| formatted = re.sub(r'\b[0-9]+\b', '123', formatted) | |
| formatted = re.sub(r'\b[IVXLC]+\b', 'XVI', formatted) | |
| parts = re.split(r'(\W+)', formatted) | |
| formatted_parts = [] | |
| for part in parts: | |
| if re.match(r'(\W+)', part): | |
| formatted_parts.append(part) | |
| continue | |
| if part.lower() in patterns["keywords_set"]: | |
| formatted_parts.append(part) | |
| elif re.match(r'^[a-z]$', part) or re.match(r'^[a-zđêôơư]$', part): | |
| formatted_parts.append('abc') | |
| elif re.match(r'^[A-Z]$', part) or re.match(r'^[A-ZĐÊÔƠƯ]$', part): | |
| formatted_parts.append('ABC') | |
| else: | |
| formatted_parts.append(part) | |
| return ''.join(formatted_parts) | |
| # ===== Hàm chuẩn hoá số La Mã ===== | |
| def normalizeRomans(lines, mode="marker", replace_with="ABC"): | |
| format_groups = defaultdict(list) | |
| for idx, line in enumerate(lines): | |
| fmt = line.get("MarkerType") | |
| marker = line.get("MarkerText") | |
| if fmt and marker: | |
| format_groups[fmt].append((idx, marker)) | |
| # --- kiểm tra MarkerType --- | |
| if mode == "marker": | |
| for fmt, group in format_groups.items(): | |
| roman_markers = [] | |
| for idx, marker in group: | |
| m = re.search(r'\b([IVXLC]+)\b', marker) | |
| if m and TextProcess.is_roman(m.group(1)): | |
| roman_markers.append((idx, m.group(1))) | |
| else: | |
| break | |
| if roman_markers: | |
| roman_numbers = [TextProcess.roman_to_int(rm[1]) for rm in roman_markers] | |
| expected = list(range(min(roman_numbers), max(roman_numbers) + 1)) | |
| if sorted(roman_numbers) != expected: | |
| for idx, _ in roman_markers: | |
| lines[idx]["MarkerType"] = re.sub(r'\b[IVXLC]+\b', replace_with, lines[idx]["MarkerType"]) | |
| # --- Chuẩn hoá toàn bộ Text/MarkerText --- | |
| elif mode == "text": | |
| for line in lines: | |
| for key in ["Text", "MarkerText", "MarkerType"]: | |
| if line.get(key): | |
| line[key] = re.sub(r'\b[IVXLC]+\b', replace_with, line[key]) | |
| return lines | |
| # =============================== | |
| # 2. Word-level functions (mới) -> class U2_Word | |
| # =============================== | |
| class U2_Word: | |
| def caseStyle(word_text: str) -> int: | |
| """CaseStyle cho từ: 3000 (UPPER), 2000 (Title), 1000 (khác)""" | |
| clean = re.sub(r'[^A-Za-zÀ-ỹà-ỹ0-9]', '', word_text) | |
| if clean and clean.isupper(): | |
| return 3000 | |
| if clean and clean.istitle(): | |
| return 2000 | |
| return 1000 | |
| def buildStyle(word_text, span): | |
| """Style gộp = CaseStyle + FontStyle (100,10,1)""" | |
| cs = U2_Word.caseStyle(word_text) | |
| b, i, u = PdfProcess.fontFlags(span) | |
| fs = (100 if b else 0) + (10 if i else 0) + (1 if u else 0) | |
| return cs + fs | |
| def getWordStyle(line, index: int): | |
| """Lấy Style của từ tại vị trí index.""" | |
| words = PdfProcess.extractWords(line) | |
| if -len(words) <= index < len(words): | |
| word, span = words[index] | |
| return U2_Word.buildStyle(word, span) | |
| return 0 | |
| # =============================== | |
| # 3. Line-level functions (mới) -> class U3_Line | |
| # =============================== | |
| class U3_Line: | |
| def getPageGeneralSize(page): | |
| """[height, width] của trang""" | |
| return [round(page.rect.height, 1), round(page.rect.width, 1)] | |
| def getLineText(line): | |
| """Text đầy đủ của line""" | |
| return line.get("text", "") | |
| def getLineStyle(line, exceptions=None): | |
| """ | |
| Style của line = CaseStyle (min trên từ hợp lệ) + FontStyle (AND spans). | |
| """ | |
| words = line.get("words", []) | |
| spans = line.get("spans", []) | |
| # Gom exceptions | |
| exception_texts = set() | |
| if exceptions: | |
| exception_texts = ( | |
| set(exceptions.get("common_words", [])) | | |
| set(exceptions.get("proper_names", [])) | | |
| set(exceptions.get("abbreviations", [])) | |
| ) | |
| # ===== CaseStyle ===== | |
| cs_values = [] | |
| for w, _ in words: | |
| clean_w = TextProcess.normalize_word(w) | |
| if not clean_w: | |
| continue | |
| if clean_w in exception_texts or TextProcess.is_abbreviation(clean_w): | |
| continue | |
| cs_values.append(U2_Word.caseStyle(clean_w)) | |
| cs_line = min(cs_values) if cs_values else 1000 | |
| # ===== FontStyle ===== | |
| if spans: | |
| bold_all = italic_all = underline_all = True | |
| for s in spans: | |
| b, i, u = PdfProcess.fontFlags(s) | |
| bold_all &= b | |
| italic_all &= i | |
| underline_all &= u | |
| fs_line = (100 if bold_all else 0) + (10 if italic_all else 0) + (1 if underline_all else 0) | |
| else: | |
| fs_line = 0 | |
| return cs_line + fs_line | |
| # =============================== | |
| # 4. Compatibility wrappers -> class U4_Compat | |
| # =============================== | |
| class U4_Compat: | |
| def getText(line): | |
| """Alias cũ: Text của line""" | |
| return U3_Line.getLineText(line) | |
| def getCoords(line): | |
| """Alias cũ: Coord của line, giữ tuple (x0, x1, xm, y0, y1)""" | |
| return PdfProcess.getLineCoord(line) | |
| def getFirstWord(line): | |
| """Giữ API cũ: trả {Text, Style, FontSize} của từ đầu""" | |
| return { | |
| "Text": PdfProcess.getWordText(line, 0), | |
| "Style": U2_Word.getWordStyle(line, 0), | |
| "FontSize": PdfProcess.getWordFontSize(line, 0), | |
| } | |
| def getLastWord(line): | |
| """Giữ API cũ: trả {Text, Style, FontSize} của từ cuối""" | |
| return { | |
| "Text": PdfProcess.getWordText(line, -1), | |
| "Style": U2_Word.getWordStyle(line, -1), | |
| "FontSize": PdfProcess.getWordFontSize(line, -1), | |
| } | |
| # =============================== | |
| # 5. Marker / Style (line-level) -> class U5_MarkerStyle | |
| # =============================== | |
| class U5_MarkerStyle: | |
| def getMarker(text, patterns): | |
| info = U1_Utils.extract_marker(text, patterns) | |
| marker_text = info.get("marker_text") | |
| marker_type = None | |
| if marker_text: | |
| # Giữ sửa lỗi xử lý dấu '+' | |
| marker_text_cleaned = re.sub(r'([A-Za-z0-9ĐÊÔƠƯđêôơư])\+(?=\W|$)', r'\1', marker_text) | |
| marker_type = U1_Utils.format_marker(marker_text_cleaned, patterns) | |
| return marker_text, marker_type | |
| def getFontSize(line): | |
| """ | |
| Mean FontSize trên spans (logic cũ) — vẫn giữ cho compatibility nếu còn chỗ gọi. | |
| """ | |
| spans = line.get("spans", []) | |
| if spans: | |
| valid_spans = [s for s in spans if s.get("text", "").strip()] | |
| if valid_spans: | |
| sizes = [s.get("size", 12.0) for s in valid_spans] | |
| else: | |
| sizes = [s.get("size", 12.0) for s in spans] | |
| avg = sum(sizes) / len(sizes) | |
| return round(avg * 2) / 2 | |
| return 12.0 | |
| # =============================== | |
| # 6. Tổng hợp toàn văn bản -> class U6_Document | |
| # =============================== | |
| class U6_Document: | |
| def getTextStatus(pdf_doc, exceptions, patterns): | |
| doc = pdf_doc | |
| general = {"pageGeneralSize": U3_Line.getPageGeneralSize(doc[0])} | |
| lines = [] | |
| for i, page in enumerate(doc): | |
| text_dict = page.get_text("dict") | |
| for block in text_dict["blocks"]: | |
| if "lines" in block: | |
| for l in block["lines"]: | |
| text = "".join(span["text"] for span in l["spans"]).strip() | |
| if not text: | |
| continue | |
| # Marker | |
| marker_text, marker_type = U5_MarkerStyle.getMarker(text, patterns) | |
| # Style/FontSize/Coord | |
| line_obj = {"text": text, "spans": l["spans"]} | |
| style = U3_Line.getLineStyle(line_obj) | |
| fontsize = PdfProcess.getLineFontSize(line_obj) | |
| x0, x1, xm, y0, y1 = PdfProcess.getLineCoord(line_obj) | |
| # Words | |
| words_obj = { | |
| "First": U4_Compat.getFirstWord(line_obj), | |
| "Last": U4_Compat.getLastWord(line_obj) | |
| } | |
| line_dict = { | |
| "Line": len(lines) + 1, | |
| "Text": text, | |
| "MarkerText": marker_text, | |
| "MarkerType": marker_type, | |
| "Style": style, | |
| "FontSize": fontsize, | |
| "Words": words_obj, | |
| "Coords": {"X0": x0, "X1": x1, "XM": xm, "Y0": y0, "Y1": y1} | |
| } | |
| lines.append(line_dict) | |
| return {"general": general, "lines": lines} | |
| # =============================== | |
| # 7. Các hàm set* -> class U7_Setters | |
| # =============================== | |
| class U7_Setters: | |
| def setCommonStatus(lines, attr, rank=1): | |
| values = [l[attr] for l in lines if l.get(attr) is not None] | |
| counter = Counter(values) | |
| return counter.most_common(rank) | |
| def setCommonFontSize(lines): | |
| fs, _ = U7_Setters.setCommonStatus(lines, "FontSize", 1)[0] | |
| return round(fs, 1) | |
| def setCommonFontSizes(lines): | |
| """ | |
| Trả về tất cả FontSize và số lượng của chúng, sắp xếp theo tần suất giảm dần. | |
| """ | |
| values = [l["FontSize"] for l in lines if l.get("FontSize") is not None] | |
| counter = Counter(values) | |
| results = [] | |
| for fs, count in counter.most_common(): # trả về tất cả | |
| results.append({"FontSize": round(fs, 1), "Count": count}) | |
| return results | |
| def setCommonMarkers(lines): | |
| total = len(lines) | |
| counter = Counter([l["MarkerType"] for l in lines if l["MarkerType"]]) | |
| results = [] | |
| for marker, count in counter.most_common(10): | |
| if count >= total * 0.005: | |
| results.append(marker) | |
| else: | |
| break | |
| return results | |
| def setTextStatus(baseJson): | |
| lines = baseJson["lines"] | |
| pageGeneralSize = baseJson["general"]["pageGeneralSize"] | |
| xStart, yStart, xEnd, yEnd, xMid, yMid = PdfProcess.setPageCoords(lines, pageGeneralSize) | |
| regionWidth, regionHeight = PdfProcess.setPageRegionSize(xStart, yStart, xEnd, yEnd) | |
| commonFontSizes = U7_Setters.setCommonFontSizes(lines) | |
| commonFontSize = U7_Setters.setCommonFontSize(lines) | |
| commonMarkers = U7_Setters.setCommonMarkers(lines) | |
| new_general = { | |
| "pageGeneralSize": baseJson["general"]["pageGeneralSize"], | |
| "pageCoords": {"xStart": xStart, "yStart": yStart, "xEnd": xEnd, "yEnd": yEnd, "xMid": xMid, "yMid": yMid}, | |
| "pageRegionWidth": regionWidth, | |
| "pageRegionHeight": regionHeight, | |
| "commonFontSize": commonFontSize, | |
| "commonFontSizes": commonFontSizes, | |
| "commonMarkers": commonMarkers | |
| } | |
| new_lines = [] | |
| for i, line in enumerate(lines): | |
| lineWidth, lineHeight = PdfProcess.setLineSize(line) | |
| pos = PdfProcess.setPosition(line, lines[i - 1] if i > 0 else None, | |
| lines[i + 1] if i < len(lines) - 1 else None, | |
| xStart, xEnd, xMid) | |
| pos_dict = {"Left": pos[0], "Right": pos[1], "Mid": pos[2], "Top": pos[3], "Bot": pos[4]} | |
| line_dict = { | |
| **line, | |
| "LineWidth": lineWidth, | |
| "LineHeight": lineHeight, | |
| "Position": pos_dict, | |
| "Align": PdfProcess.setAlign(pos_dict, regionWidth) | |
| } | |
| new_lines.append(line_dict) | |
| return {"general": new_general, "lines": new_lines} | |
| # =============================== | |
| # 8. Các hàm del/reset -> class U8_Cleanup | |
| # =============================== | |
| class U8_Cleanup: | |
| def delStatus(jsonDict, deleteList): | |
| for line in jsonDict["lines"]: | |
| for attr in deleteList: | |
| if attr in line: | |
| del line[attr] | |
| return jsonDict | |
| def resetPosition(jsonDict): | |
| lines = jsonDict.get("lines", []) | |
| for i, line in enumerate(lines): | |
| pos = line.get("Position", {}) | |
| if "Top" in pos and pos["Top"] < 0: | |
| top_candidates = [] | |
| if i > 0: | |
| prev_top = lines[i - 1].get("Position", {}).get("Top") | |
| if prev_top is not None: | |
| top_candidates.append(prev_top) | |
| if i < len(lines) - 1: | |
| next_top = lines[i + 1].get("Position", {}).get("Top") | |
| if next_top is not None: | |
| top_candidates.append(next_top) | |
| if top_candidates: | |
| pos["Top"] = min(top_candidates) | |
| if "Bot" in pos and pos["Bot"] < 0: | |
| bot_candidates = [] | |
| if i > 0: | |
| prev_bot = lines[i - 1].get("Position", {}).get("Bot") | |
| if prev_bot is not None: | |
| bot_candidates.append(prev_bot) | |
| if i < len(lines) - 1: | |
| next_bot = lines[i + 1].get("Position", {}).get("Bot") | |
| if next_bot is not None: | |
| bot_candidates.append(next_bot) | |
| if bot_candidates: | |
| pos["Bot"] = min(bot_candidates) | |
| line["Position"] = pos | |
| return jsonDict | |
| def normalizeFinal(jsonDict): | |
| for line in jsonDict.get("lines", []): | |
| # xử lý Text và MarkerText | |
| if "Text" in line: | |
| line["Text"] = TextProcess.strip_extra_spaces(line["Text"]) | |
| if "MarkerText" in line and line["MarkerText"]: | |
| line["MarkerText"] = TextProcess.strip_extra_spaces(line["MarkerText"]) | |
| # xử lý word-level | |
| words = line.get("Words", {}) | |
| for key in ["First", "Last"]: | |
| if key in words and "Text" in words[key]: | |
| words[key]["Text"] = TextProcess.strip_extra_spaces(words[key]["Text"]) | |
| return jsonDict | |
| # =============================== | |
| # 9. Hàm chính extractData (giữ API cũ) | |
| # =============================== | |
| def extractData(pdf_doc, exceptData, markerData, statusData): | |
| # ===== 1. Load JSON theo format đồng bộ ===== | |
| exceptions = dict(exceptData) | |
| markers = dict(markerData) | |
| status = dict(statusData) | |
| # ===== 2. Biên dịch markers ===== | |
| keywords = markers.get("keywords", []) | |
| title_keywords = '|'.join(re.escape(k[0].upper() + k[1:].lower()) for k in keywords) | |
| upper_keywords = '|'.join(re.escape(k.upper()) for k in keywords) | |
| all_keywords = f"{title_keywords}|{upper_keywords}" | |
| compiled_markers = [] | |
| for item in markers.get("markers", []): | |
| pattern_str = item["pattern"].replace("{keywords}", all_keywords) | |
| try: | |
| compiled_pattern = re.compile(pattern_str) | |
| except re.error: | |
| continue | |
| compiled_markers.append({ | |
| "pattern": compiled_pattern, | |
| "description": item.get("description", ""), | |
| "type": item.get("type", "") | |
| }) | |
| patterns = { | |
| "markers": compiled_markers, | |
| "keywords_set": set(k.lower() for k in keywords) | |
| } | |
| # ===== 3. Xử lý PDF ===== | |
| baseJson = U6_Document.getTextStatus(pdf_doc, exceptions, patterns) | |
| baseJson["lines"] = U1_Utils.normalizeRomans(baseJson["lines"]) | |
| modifiedJson = U7_Setters.setTextStatus(baseJson) | |
| cleanJson = U8_Cleanup.resetPosition(modifiedJson) | |
| extractedData = U8_Cleanup.delStatus(cleanJson, ["Coords"]) | |
| extractedData = U8_Cleanup.normalizeFinal(extractedData) | |
| # ===== 4. Bổ sung tên riêng động ===== | |
| proper_names_auto = U1_Utils.collect_proper_names(extractedData["lines"], min_count=10) | |
| proper_names_existing = [p["text"] if isinstance(p, dict) else str(p) | |
| for p in exceptions.get("proper_names", [])] | |
| exceptions["proper_names"] = list(set(proper_names_existing) | proper_names_auto) | |
| return extractedData | |
| class B1Extractor: | |
| """ | |
| Orchestrator theo instance: | |
| - Giữ nguyên quy tắc/thuật toán của extractData cũ. | |
| - exceptions/markers/status và regex markers được nạp/biên dịch 1 lần. | |
| """ | |
| def __init__( | |
| self, | |
| exceptData: Any, | |
| markerData: Any, | |
| statusData: Any, | |
| proper_name_min_count: int = 10, | |
| ) -> None: | |
| """ | |
| exceptData / markerData / statusData: | |
| - str: đường dẫn tới JSON theo format đồng bộ (U1_Utils.loadHardcodes) | |
| - dict: dữ liệu đã load sẵn (bỏ qua loadHardcodes) | |
| proper_name_min_count: | |
| - Ngưỡng đếm tên riêng động. | |
| """ | |
| # ---- 1) Nạp exceptions/markers/status (không đổi format) ---- | |
| def _ensure_dict(src, wanted=None): | |
| if isinstance(src, dict): | |
| return dict(src) | |
| raise ValueError("Vui lòng truyền dict đã load sẵn thay vì đường dẫn file.") | |
| self.exceptions: Dict[str, Any] = _ensure_dict( | |
| exceptData, wanted=["common_words", "proper_names", "abbreviations"] | |
| ) | |
| self.markers: Dict[str, Any] = _ensure_dict( | |
| markerData, wanted=["keywords", "markers"] | |
| ) | |
| self.status: Dict[str, Any] = _ensure_dict(statusData) | |
| self.proper_name_min_count = proper_name_min_count | |
| # ---- 2) Biên dịch markers (y như logic cũ) ---- | |
| keywords = self.markers.get("keywords", []) | |
| title_keywords = "|".join(re.escape(k[0].upper() + k[1:].lower()) for k in keywords) | |
| upper_keywords = "|".join(re.escape(k.upper()) for k in keywords) | |
| all_keywords = f"{title_keywords}|{upper_keywords}" if keywords else "" | |
| compiled_markers = [] | |
| for item in self.markers.get("markers", []): | |
| pattern_str = item.get("pattern", "") | |
| if all_keywords: | |
| pattern_str = pattern_str.replace("{keywords}", all_keywords) | |
| try: | |
| compiled = re.compile(pattern_str) | |
| except re.error: | |
| continue | |
| compiled_markers.append( | |
| { | |
| "pattern": compiled, | |
| "description": item.get("description", ""), | |
| "type": item.get("type", ""), | |
| } | |
| ) | |
| self.patterns = { | |
| "markers": compiled_markers, | |
| "keywords_set": set(k.lower() for k in keywords), | |
| } | |
| # ---------- Public API ---------- | |
| def extract(self, pdf_doc) -> Dict[str, Any]: | |
| """ | |
| Chạy pipeline extractData cũ cho 1 file PDF. | |
| Trả về extractedData (như trước). | |
| """ | |
| # ===== 3) Trích xuất text & thuộc tính dòng từ PDF ===== | |
| baseJson = U6_Document.getTextStatus(pdf_doc, self.exceptions, self.patterns) | |
| # Chuẩn hoá số La Mã (giữ nguyên quy tắc) | |
| baseJson["lines"] = U1_Utils.normalizeRomans(baseJson["lines"]) | |
| # ===== 4) Tính toán status/position/align (giữ nguyên) ===== | |
| modifiedJson = U7_Setters.setTextStatus(baseJson) | |
| cleanJson = U8_Cleanup.resetPosition(modifiedJson) | |
| extractedData = U8_Cleanup.delStatus(cleanJson, ["Coords"]) | |
| extractedData = U8_Cleanup.normalizeFinal(extractedData) | |
| # ===== 5) Bổ sung proper_names động (giữ nguyên tinh thần) ===== | |
| proper_names_auto = U1_Utils.collect_proper_names( | |
| extractedData["lines"], min_count=self.proper_name_min_count | |
| ) | |
| proper_names_existing = [ | |
| p["text"] if isinstance(p, dict) else str(p) | |
| for p in self.exceptions.get("proper_names", []) | |
| ] | |
| # Cập nhật vào trạng thái của instance (để chạy nhiều file liên tiếp vẫn tích lũy) | |
| self.exceptions["proper_names"] = list(set(proper_names_existing) | proper_names_auto) | |
| return extractedData |