Spaces:
Sleeping
Sleeping
| from typing import TypedDict, Optional, List | |
| from langchain_core.messages import AnyMessage, ToolMessage, HumanMessage, AIMessage | |
| from langgraph.graph.message import add_messages | |
| from typing import Sequence, Annotated | |
| from langchain_core.messages import RemoveMessage | |
| from langchain_core.documents import Document | |
| from src.config.llm import get_llm | |
| from src.utils.logger import logger | |
| from src.utils.helper import extract_transcript, extract_comment | |
| from .prompt import * | |
| import operator | |
| from src.config.mongo import PromptCRUD | |
| from pydantic import BaseModel, Field | |
| class State(TypedDict): | |
| language: str | |
| script_structure_analyzer_prompt: str | |
| comment_insight_extractor_prompt: str | |
| scientific_fact_finder_prompt: str | |
| script_re_outline_prompt: str | |
| script_writer_prompt: str | |
| video_link: str | |
| messages: Annotated[Sequence[AnyMessage], add_messages] | |
| transcript: str | |
| comment: str | |
| script_structure_analyzer_response: str | |
| comment_insight_extractor_response: str | |
| research_insight_response: str | |
| script_re_outline_response: str | |
| script_writer_response: List[str] | |
| target_word_count: int | |
| script_count: int | |
| current_script_index: int | |
| def trim_history(state: State): | |
| history = state.get("messages", []) | |
| if len(history) > 20: | |
| num_to_remove = len(history) - 20 | |
| remove_messages = [ | |
| RemoveMessage(id=history[i].id) for i in range(num_to_remove) | |
| ] | |
| return { | |
| "messages": remove_messages, | |
| "selected_ids": [], | |
| "selected_documents": [], | |
| } | |
| return {} | |
| async def extract_transcript_and_comment(state: State): | |
| transcript = extract_transcript(state["video_link"]) | |
| comment = extract_comment(state["video_link"]) | |
| prompt_template = await PromptCRUD.read({}) | |
| prompt_template = prompt_template[0] | |
| # Calculate script count based on target word count | |
| # Assume each script is around 200-300 words | |
| avg_words_per_script = 1000 | |
| script_count = max(1, state.get("target_word_count", 8000) // avg_words_per_script) | |
| return { | |
| "transcript": transcript, | |
| "comment": comment, | |
| "script_count": script_count, | |
| "messages": HumanMessage( | |
| content=f"Will generate {script_count} scripts for {state.get('target_word_count', 8000)} words target" | |
| ), | |
| "script_structure_analyzer_prompt": prompt_template[ | |
| "script_structure_analyzer_prompt" | |
| ], | |
| "comment_insight_extractor_prompt": prompt_template[ | |
| "comment_insight_extractor_prompt" | |
| ], | |
| "scientific_fact_finder_prompt": prompt_template[ | |
| "scientific_fact_finder_prompt" | |
| ], | |
| "script_re_outline_prompt": prompt_template["script_re_outline_prompt"], | |
| "script_writer_prompt": prompt_template["script_writer_prompt"], | |
| "script_structure_analyzer_response": "", | |
| "comment_insight_extractor_response": "", | |
| } | |
| def script_structure_analyzer(state: State): | |
| transcript = state["transcript"] | |
| response = chain_script_structure_analyzer.invoke( | |
| {"script": transcript, "prompt": state["script_structure_analyzer_prompt"]} | |
| ) | |
| return { | |
| "script_structure_analyzer_response": response.content, | |
| "messages": HumanMessage( | |
| content="Script Structure Analyzer Response: " + response.content | |
| ), | |
| } | |
| def comment_insight_extractor(state: State): | |
| response = chain_comment_insight_extractor.invoke( | |
| { | |
| "comment": state["comment"], | |
| "script_structure_analyzer_response": state[ | |
| "script_structure_analyzer_response" | |
| ], | |
| "prompt": state["comment_insight_extractor_prompt"], | |
| } | |
| ) | |
| return { | |
| "comment_insight_extractor_response": response.content, | |
| "messages": HumanMessage( | |
| content="Comment Insight Extractor Response: " + response.content | |
| ), | |
| } | |
| def scientific_fact_finder(state: State): | |
| input_message = {} | |
| input_message["messages"] = [ | |
| { | |
| "role": "user", | |
| "content": f"""Hãy tìm 3-5 nghiên cứu khoa học thực tế (PubMed, JAMA, Circulation, Nutrients…), | |
| Tóm tắt số liệu, trích nguồn, gợi ý số liệu phù hợp cho từng đoạn trong script mới. Dựa trên các thông tin sau: | |
| Script Structure Analyzer Response: {state["script_structure_analyzer_response"]} | |
| Comment Insight Extractor Response: {state["comment_insight_extractor_response"] if state["comment_insight_extractor_response"] else "Không có comment insight"} | |
| """, | |
| } | |
| ] | |
| input_message["prompt"] = state["scientific_fact_finder_prompt"] | |
| response = scientific_fact_finder_agent( | |
| state["scientific_fact_finder_prompt"] | |
| ).invoke(input_message) | |
| research_insight = response["messages"][-1].content | |
| return { | |
| "research_insight_response": research_insight, | |
| "messages": HumanMessage( | |
| content="Scientific Fact Finder Response: " + research_insight | |
| ), | |
| } | |
| def script_re_outline(state: State): | |
| language = state["language"] | |
| if not language: | |
| language = "English" | |
| response = chain_script_re_outline.invoke( | |
| { | |
| "messages": state["messages"], | |
| "prompt": state["script_re_outline_prompt"], | |
| } | |
| ) | |
| return { | |
| "script_re_outline_response": response.content, | |
| "messages": HumanMessage( | |
| content="Script Re-Outline Response: " + response.content | |
| ), | |
| } | |
| def script_writer_init(state: State): | |
| """Initialize script writing process""" | |
| return { | |
| "script_writer_response": [], | |
| "current_script_index": 0, | |
| "messages": HumanMessage(content="Starting script generation process..."), | |
| } | |
| def script_writer_single(state: State): | |
| """Generate a single script""" | |
| language = state["language"] | |
| if not language: | |
| language = "English" | |
| current_index = state.get("current_script_index", 0) | |
| script_count = state.get("script_count", 10) | |
| target_word_count = state.get("target_word_count", 8000) | |
| words_per_script = target_word_count // script_count if script_count > 0 else 1000 | |
| # Get existing scripts | |
| script_out = list(state.get("script_writer_response", [])) | |
| current_messages = list(state["messages"]) | |
| # Add word count guidance to the prompt | |
| if current_index == 0: | |
| word_prompt = f"Hãy viết script đầu tiên với khoảng {words_per_script} từ." | |
| elif current_index == script_count - 1: | |
| word_prompt = f"ok, viết cho tôi phần tiếp theo và cũng là phần cuối cùng, bám sát cấu trúc, khoảng {words_per_script} từ cho script này. Ở cuối phần này, hãy kêu gọi người xem like, subscribe và comment để ủng hộ video để có động lực ra video mới. Đừng quên đối tượng khán giả là người Mỹ,giới tính nữ, trên 20 tuổi, bắt đầu, trình bày thành dạng câu văn liền mạch, dùng để làm văn nói cho video YouTube, không dùng icon" | |
| else: | |
| word_prompt = f"ok, viết cho tôi phần tiếp theo, bám sát cấu trúc, khoảng {words_per_script} từ cho script này, các công thức tạo cảm xúc và đừng quên đối tượng khán giả là người Mỹ,giới tính nữ, trên 20 tuổi, bắt đầu, trình bày thành dạng câu văn liền mạch, dùng để làm văn nói cho video YouTube, không dùng icon" | |
| current_messages.append(HumanMessage(content=word_prompt)) | |
| # Generate script | |
| response = chain_script_writer.invoke( | |
| { | |
| "messages": current_messages, | |
| "prompt": state["script_writer_prompt"] | |
| + "Output script language must in: " | |
| + language | |
| + " Language", | |
| } | |
| ) | |
| script_out.append(response.content) | |
| # Add response to message history | |
| current_messages.append(AIMessage(content=response.content)) | |
| return { | |
| "script_writer_response": script_out, | |
| "current_script_index": current_index + 1, | |
| "messages": current_messages | |
| + [ | |
| HumanMessage(content=f"Script {current_index + 1}/{script_count} completed") | |
| ], | |
| } | |
| def should_continue_writing(state: State): | |
| """Check if we should continue writing more scripts""" | |
| current_index = state.get("current_script_index", 0) | |
| script_count = state.get("script_count", 10) | |
| return ( | |
| "script_writer_single" if current_index < script_count else "script_writer_end" | |
| ) | |
| def script_writer_end(state: State): | |
| """Finalize script writing""" | |
| script_count = len(state.get("script_writer_response", [])) | |
| return {"messages": HumanMessage(content=f"All {script_count} scripts completed!")} | |