doc-ai-api / Libraries /Json_GetStructures.py
LongK171's picture
Add all
dbe2c62
import re
from typing import Dict, List, Any
from collections import Counter, defaultdict
class StructureAnalyzer:
def __init__(self, verbose: bool = False):
self.verbose = verbose
# ---------------- B1 ---------------- #
def extract_markers(self, RawDataDict) -> List[str]:
bullet_pattern = re.compile(r"^\s*[-•●♦▪‣–—]+\s*$")
paragraphs = RawDataDict.get("paragraphs", [])
common_markers = set(RawDataDict.get("general", {}).get("commonMarkers", []))
raw_markers: List[Any] = []
for p in paragraphs:
mt = p.get("MarkerText")
mtype = p.get("MarkerType")
# Bỏ bullet
if bullet_pattern.match(mt or "") or bullet_pattern.match(mtype or ""):
continue
# Giữ nếu thuộc common hoặc là None
if mtype in common_markers or mtype is None:
raw_markers.append(mtype)
# Loại bỏ trùng kề nhau và chuẩn hóa None -> "none"
cleaned: List[str] = []
prev = object()
for m in raw_markers:
val = str(m) if m is not None else "none"
if val != prev:
cleaned.append(val)
prev = val
return cleaned
# ---------------- B2 ---------------- #
def build_structures(self, markers: List[str]) -> List[Dict[str, Any]]:
unique_markers = list(dict.fromkeys(markers))
counter1 = Counter(markers)
results = [{"Depth": 1, "Structure": [m], "Count": counter1[m]} for m in unique_markers]
max_depth = len(unique_markers)
prev_structures = set((m,) for m in unique_markers)
for i in range(2, max_depth + 1):
counter = Counter()
for j in range(len(markers) - i + 1):
seq_raw = tuple(markers[j:j+i])
prefix = seq_raw[:-1]
# Điều kiện 1: phải có cha
if prefix not in prev_structures:
continue
# Điều kiện 2: không trùng MarkerType trong cùng cấu trúc
if len(seq_raw) != len(set(seq_raw)):
continue
# Điều kiện 3: chỉ chấp nhận nếu "none" không có, hoặc nằm ở cuối
if "none" in seq_raw and seq_raw[-1] != "none":
continue
counter[seq_raw] += 1
if not counter:
break
min_count = min(counter.values())
max_count = max(counter.values())
filtered = {s: f for s, f in counter.items() if not (f == min_count and f != max_count)}
sorted_structs = sorted(filtered.items(), key=lambda x: x[1], reverse=True)
depth_lines = [{"Depth": i, "Structure": list(s), "Count": f} for s, f in sorted_structs]
results.extend(depth_lines)
prev_structures = set(s for s, _ in sorted_structs)
return results
# ---------------- B3 ---------------- #
def deduplicate(self, structures: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
grouped = defaultdict(list)
for item in structures:
depth = item["Depth"]
key = (depth, tuple(sorted(item["Structure"])))
grouped[key].append(item)
filtered = []
for _, group in grouped.items():
best = max(group, key=lambda x: x["Count"])
filtered.append(best)
filtered.sort(key=lambda x: (x["Depth"], -x["Count"], x["Structure"]))
return filtered
# ---------------- B4 ---------------- #
def select_top(self, dedup: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not dedup:
return []
max_depth = max(item["Depth"] for item in dedup)
at_max = [x for x in dedup if x["Depth"] == max_depth]
max_count = max(x["Count"] for x in at_max)
top = [x for x in at_max if x["Count"] == max_count]
result = []
for t in top:
level_dict = {}
for i, marker in enumerate(t["Structure"]):
if i == len(t["Structure"]) - 1:
# phần tử cuối cùng
level_dict["Contents"] = marker
else:
level_dict[f"Level {i+1}"] = marker
result.append(level_dict)
return result
def level_rank(level: str) -> int:
"""Quy đổi level thành số để so sánh"""
if level == "Contents":
return 9999 # Contents coi như cao nhất
if level.startswith("Level "):
try:
return int(level.split()[1])
except Exception:
return 0
return 0
def extend_top(self, top: List[Dict[str, Any]], dedup: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Mở rộng top bằng cách thêm tail từ dedup:
- Nếu Contents: chỉ giữ tail == ['none']
- Các level khác: thêm tail vào các level tiếp theo
- Nếu level đã có -> gộp vào list
- Luôn chuẩn hóa: mọi giá trị là list
"""
if not top:
return []
RawLvlsDict = dict(top[0]) # copy để tránh sửa trực tiếp
all_markers = set(v for val in RawLvlsDict.values() for v in (val if isinstance(val, list) else [val]))
seen_tails = set()
# snapshot tránh lỗi "dict changed size"
snapshot_items = list(RawLvlsDict.items())
for level, marker_values in reversed(snapshot_items):
if level == "Level 1":
continue
# chuẩn hóa về list để dễ xử lý
markers = marker_values if isinstance(marker_values, list) else [marker_values]
for marker in markers:
for d in dedup:
struct = d["Structure"]
if d["Depth"] < 2:
continue
if struct and struct[0] == marker:
if not (set(struct) & (all_markers - {marker})):
tail = tuple(struct[1:])
# xử lý riêng cho Contents
if level == "Contents" and tail != ("none",):
continue
if tail in seen_tails:
continue
seen_tails.add(tail)
# xác định base level
if level.startswith("Level "):
base_level_num = int(level.split()[1])
elif level == "Contents":
base_level_num = max(
int(l.split()[1]) for l in RawLvlsDict if l.startswith("Level ")
)
else:
base_level_num = 0
# thêm từng phần tử tail vào level tiếp theo
for i, t in enumerate(tail, start=1):
next_level = f"Level {base_level_num+i}"
if next_level not in RawLvlsDict:
RawLvlsDict[next_level] = []
if not isinstance(RawLvlsDict[next_level], list):
RawLvlsDict[next_level] = [RawLvlsDict[next_level]]
if t not in RawLvlsDict[next_level]:
RawLvlsDict[next_level].append(t)
# đổi level cao nhất thành Contents (và gộp nếu đã có)
level_nums = [int(l.split()[1]) for l in RawLvlsDict if l.startswith("Level ")]
if level_nums:
max_level = f"Level {max(level_nums)}"
new_contents = RawLvlsDict.pop(max_level)
if "Contents" not in RawLvlsDict:
RawLvlsDict["Contents"] = []
if not isinstance(RawLvlsDict["Contents"], list):
RawLvlsDict["Contents"] = [RawLvlsDict["Contents"]]
for v in (new_contents if isinstance(new_contents, list) else [new_contents]):
if v not in RawLvlsDict["Contents"]:
RawLvlsDict["Contents"].append(v)
# --- 🔹 Đổi nhãn ngay trước khi trả kết quả --- #
keys = list(RawLvlsDict.keys())
if len(keys) > 1 and keys[-2].startswith("Level "):
RawLvlsDict["Article"] = RawLvlsDict.pop(keys[-2])
if "Contents" in RawLvlsDict:
RawLvlsDict["Content"] = RawLvlsDict.pop("Contents")
# chuẩn hóa tất cả value thành list
for k, v in RawLvlsDict.items():
if not isinstance(v, list):
RawLvlsDict[k] = [v]
return [RawLvlsDict]