GraphGen / graphgen /operators /build_kg /build_kg_service.py
github-actions[bot]
Auto-sync from demo at Tue Dec 16 08:21:05 UTC 2025
31086ae
from typing import List
import pandas as pd
from graphgen.bases import BaseGraphStorage, BaseLLMWrapper, BaseOperator
from graphgen.bases.datatypes import Chunk
from graphgen.common import init_llm, init_storage
from graphgen.utils import logger
from .build_mm_kg import build_mm_kg
from .build_text_kg import build_text_kg
class BuildKGService(BaseOperator):
def __init__(self, working_dir: str = "cache", graph_backend: str = "kuzu"):
super().__init__(working_dir=working_dir, op_name="build_kg_service")
self.llm_client: BaseLLMWrapper = init_llm("synthesizer")
self.graph_storage: BaseGraphStorage = init_storage(
backend=graph_backend, working_dir=working_dir, namespace="graph"
)
def process(self, batch: pd.DataFrame) -> pd.DataFrame:
docs = batch.to_dict(orient="records")
docs = [Chunk.from_dict(doc["_chunk_id"], doc) for doc in docs]
# consume the chunks and build kg
self.build_kg(docs)
return pd.DataFrame([{"status": "kg_building_completed"}])
def build_kg(self, chunks: List[Chunk]) -> None:
"""
Build knowledge graph (KG) and merge into kg_instance
"""
text_chunks = [chunk for chunk in chunks if chunk.type == "text"]
mm_chunks = [
chunk
for chunk in chunks
if chunk.type in ("image", "video", "table", "formula")
]
if len(text_chunks) == 0:
logger.info("All text chunks are already in the storage")
else:
logger.info("[Text Entity and Relation Extraction] processing ...")
build_text_kg(
llm_client=self.llm_client,
kg_instance=self.graph_storage,
chunks=text_chunks,
)
if len(mm_chunks) == 0:
logger.info("All multi-modal chunks are already in the storage")
else:
logger.info("[Multi-modal Entity and Relation Extraction] processing ...")
build_mm_kg(
llm_client=self.llm_client,
kg_instance=self.graph_storage,
chunks=mm_chunks,
)
self.graph_storage.index_done_callback()