Spaces:
Runtime error
Runtime error
fixed download_file( "https://huggingface.co/novateur/WavTokenizer-large-speech-75token/resolve/main/wavtokenizer_large_speech_320_v2.ckpt", "wavtokenizer_large_speech_320_24k.ckpt" )
56e6ad9
verified
| import os | |
| import torch | |
| from transformers import pipeline | |
| # Loading the TTS and Vocoder ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| import sys | |
| import re | |
| import json | |
| import inflect | |
| import random | |
| import uroman as ur | |
| import numpy as np | |
| import torchaudio | |
| import subprocess | |
| import requests | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from outetts.wav_tokenizer.decoder import WavTokenizer | |
| # Function to execute shell commands safely | |
| def run_command(command): | |
| try: | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| shell=True, | |
| universal_newlines=True | |
| ) | |
| stdout, stderr = process.communicate() | |
| if process.returncode != 0: | |
| print(f"Error executing: {command}") | |
| print(stderr) | |
| return False | |
| else: | |
| print(stdout) | |
| return True | |
| except Exception as e: | |
| print(f"Exception during execution of {command}: {e}") | |
| return False | |
| # Clone the YarnGPT repository | |
| if not os.path.exists('yarngpt'): | |
| print("Cloning YarnGPT repository...") | |
| run_command("git clone https://github.com/saheedniyi02/yarngpt.git") | |
| else: | |
| print("YarnGPT repository already exists") | |
| # # Install required packages with specific versions | |
| # print("Installing required packages with specific versions...") | |
| # run_command("pip install -q outetts==0.3.3 uroman==1.3.1.1") | |
| # Add the yarngpt directory to Python path instead of installing it | |
| yarngpt_path = os.path.join(os.getcwd(), 'yarngpt') | |
| if os.path.exists(yarngpt_path) and yarngpt_path not in sys.path: | |
| sys.path.append(yarngpt_path) | |
| print(f"Added {yarngpt_path} to Python path") | |
| # Now you should be able to import from yarngpt | |
| # Import this after adding to path | |
| try: | |
| from yarngpt.audiotokenizer import AudioTokenizerV2 | |
| print("Successfully imported AudioTokenizerV2 from yarngpt") | |
| except ImportError as e: | |
| print(f"Error importing from yarngpt: {e}") | |
| # Check the content of the directory to debug | |
| if os.path.exists(yarngpt_path): | |
| print("Contents of yarngpt directory:") | |
| print(os.listdir(yarngpt_path)) | |
| # Download files using Python's requests library instead of !wget | |
| def download_file(url, save_path): | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| with open(save_path, 'wb') as f: | |
| f.write(response.content) | |
| print(f"Downloaded {save_path}") | |
| else: | |
| print(f"Failed to download {url}") | |
| # Download required files | |
| download_file( | |
| "https://huggingface.co/novateur/WavTokenizer-medium-speech-75token/resolve/main/wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml", | |
| "wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml" | |
| ) | |
| download_file( | |
| "https://huggingface.co/novateur/WavTokenizer-large-speech-75token/resolve/main/wavtokenizer_large_speech_320_v2.ckpt", | |
| "wavtokenizer_large_speech_320_24k.ckpt" | |
| ) | |
| from yarngpt.audiotokenizer import AudioTokenizerV2 | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| tokenizer_path="saheedniyi/YarnGPT2" | |
| wav_tokenizer_config_path = "./wavtokenizer_mediumdata_frame75_3s_nq1_code4096_dim512_kmeans200_attn.yaml" | |
| wav_tokenizer_model_path = "./wavtokenizer_large_speech_320_24k.ckpt" | |
| audio_tokenizer=AudioTokenizerV2(tokenizer_path,wav_tokenizer_model_path,wav_tokenizer_config_path) | |
| tts_model = AutoModelForCausalLM.from_pretrained(tokenizer_path,torch_dtype="auto").to(audio_tokenizer.device) | |
| # The LLM Model ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| from huggingface_hub import HfFolder | |
| from openai import OpenAI | |
| api_key = os.getenv("API_KEY") | |
| if api_key is None: | |
| raise ValueError("API_KEY is not set in the environment variables.") | |
| print("API key successfully loaded.") | |
| # Initialize OpenAI client for Hugging Face Inference Endpoint | |
| client = OpenAI( | |
| base_url="https://y1ztgv8tu09nay6u.us-east-1.aws.endpoints.huggingface.cloud/v1/", #https://f2iozzwigntrzkve.us-east-1.aws.endpoints.huggingface.cloud/v1/", | |
| api_key=api_key | |
| ) | |
| def generate_llm_response(text, model_id="ccibeekeoc42/Llama3.1-8b-base-SFT-2024-11-09"): | |
| full_response = [] | |
| try: | |
| chat_completion = client.chat.completions.create( | |
| model="tgi", | |
| messages=[ | |
| {"role": "system", "content": "You are HypaAI a very BRIEF AND DIRECT assistant. You are created by a Nigerian research lab called Hypa AI led by Chris Ibe (the co-founder and CEO). As part of a speech pipeline so keep your responses short (under 60 words), fluent, and straight to the point. Avoid markdown or digits in responses."}, | |
| {"role": "user", "content": text} | |
| ], | |
| top_p=0.3, | |
| temperature=1, | |
| max_tokens=150, | |
| stream=True, | |
| seed=None, | |
| stop=None, | |
| frequency_penalty=None, | |
| presence_penalty=None | |
| ) | |
| for chunk in chat_completion: | |
| if chunk.choices[0].delta.content: | |
| full_response.append(chunk.choices[0].delta.content) | |
| return "".join(full_response) | |
| except Exception as e: | |
| # If the error has a response with status code 503, assume the GPU is booting up. | |
| if hasattr(e, 'response') and e.response is not None and e.response.status_code == 503: | |
| return "The GPU is currently booting up. Please wait about 10 minutes and try again." | |
| else: | |
| raise e | |
| # generate_llm_response("Explain Deep Learning in Igbo") | |
| # Loading the ST Model (Whisper) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| pipe = pipeline("automatic-speech-recognition", model="okezieowen/whisper-small-multilingual-naija-11-03-2024", device=device) | |
| # Take audio and return translated text | |
| def transcribe(audio): | |
| outputs = pipe(audio, max_new_tokens=256, generate_kwargs={"task": "transcribe"}) | |
| return outputs["text"] | |
| # putting the ST and TTS system together ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| import numpy as np | |
| def synthesise_yarn2(text): | |
| # change the language and voice | |
| prompt=audio_tokenizer.create_prompt(text,lang="english",speaker_name="idera") | |
| input_ids=audio_tokenizer.tokenize_prompt(prompt) | |
| output = tts_model.generate( | |
| input_ids=input_ids, | |
| temperature=0.1, | |
| repetition_penalty=1.1, | |
| max_length=4000, | |
| num_beams=5,# using a beam size helps for the local languages but not english | |
| ) | |
| codes=audio_tokenizer.get_codes(output) | |
| audio=audio_tokenizer.get_audio(codes) | |
| return audio.cpu() | |
| target_dtype = np.int16 | |
| max_range = np.iinfo(target_dtype).max # Maximum value for 16-bit PCM audio conversion | |
| def speech_to_speech_translation(audio, language="english"): | |
| # Speech to Text | |
| transcribed_text = transcribe(audio) | |
| print(f"Transcribed: {transcribed_text}") | |
| # Generate LLM Response | |
| print("Now making LLM Call ~~~~~~~~~~~~~~~~~~~~~~~~") | |
| llm_response = generate_llm_response(transcribed_text) | |
| print(f"LLM Response: {llm_response}") | |
| # Select a random voice based on the chosen language | |
| voice_mapping = { | |
| "english": ["idera", "chinenye", "jude", "emma", "umar", "joke", "zainab", "osagie", "remi", "tayo"], | |
| "yoruba": ["yoruba_male2", "yoruba_female2", "yoruba_feamle1"], | |
| "igbo": ["igbo_female2", "igbo_male2", "igbo_female1"], | |
| "hausa": ["hausa_feamle1", "hausa_female2", "hausa_male2", "hausa_male1"] | |
| } | |
| selected_voice = random.choice(voice_mapping.get(language.lower(), voice_mapping["english"])) | |
| print(f"Selected {language} voice: {selected_voice}") | |
| # Text to Speech | |
| print("Synthesizing Speech ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
| # Use the selected language and voice | |
| prompt = audio_tokenizer.create_prompt(llm_response, lang=language.lower(), speaker_name=selected_voice) | |
| input_ids = audio_tokenizer.tokenize_prompt(prompt) | |
| output = tts_model.generate( | |
| input_ids=input_ids, | |
| temperature=0.1, | |
| repetition_penalty=1.1, | |
| max_length=4000, | |
| ) | |
| codes = audio_tokenizer.get_codes(output) | |
| synthesised_speech = audio_tokenizer.get_audio(codes) | |
| # Make sure we have a NumPy array, not a tensor | |
| if hasattr(synthesised_speech, 'numpy'): | |
| audio_np = synthesised_speech.numpy() | |
| else: | |
| audio_np = synthesised_speech | |
| # Handle NaN and Inf values | |
| audio_np = np.nan_to_num(audio_np) | |
| # Ensure audio is in [-1, 1] range | |
| if np.max(np.abs(audio_np)) > 0: | |
| audio_np = audio_np / np.max(np.abs(audio_np)) | |
| # Convert to signed int16 (-32768 to 32767) | |
| int16_max = 32767 # Max value for signed 16-bit | |
| audio_int16 = np.clip(audio_np * int16_max, -int16_max, int16_max).astype(np.int16) | |
| # Ensure the audio is mono channel if needed | |
| if len(audio_int16.shape) > 1 and audio_int16.shape[0] == 1: | |
| audio_int16 = audio_int16[0] # Convert from [1, samples] to [samples] | |
| # Debug info | |
| print(f"Audio stats - Min: {np.min(audio_int16)}, Max: {np.max(audio_int16)}, Shape: {audio_int16.shape}") | |
| # Ensure sample rate is within valid range (1-192000) | |
| sample_rate = min(max(24000, 1), 192000) | |
| print("Speech Synthesis Completed~~~~~~~~~~~~~~~~~~~") | |
| return transcribed_text, llm_response, (sample_rate, audio_int16) | |
| # Gradio Demo | |
| import gradio as gr | |
| demo = gr.Blocks() | |
| with demo: | |
| gr.Markdown("# Aware Speech-to-Speech Demo") | |
| with gr.Tab("Microphone"): | |
| with gr.Row(): | |
| mic_input = gr.Audio(sources="microphone", type="filepath", label="Speak") | |
| lang_dropdown_mic = gr.Dropdown( | |
| choices=["English", "Yoruba", "Igbo", "Hausa"], | |
| value="English", | |
| label="Select Language" | |
| ) | |
| mic_submit = gr.Button("Submit") | |
| with gr.Row(): | |
| mic_transcribed = gr.Textbox(label="Transcribed Text", interactive=False) | |
| mic_response = gr.Textbox(label="HypaAI's Response", interactive=False) | |
| mic_audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
| mic_submit.click( | |
| fn=speech_to_speech_translation, | |
| inputs=[mic_input, lang_dropdown_mic], | |
| outputs=[mic_transcribed, mic_response, mic_audio_output] | |
| ) | |
| with gr.Tab("Audio File"): | |
| with gr.Row(): | |
| file_input = gr.Audio(sources="upload", type="filepath", label="Upload Audio") | |
| lang_dropdown_file = gr.Dropdown( | |
| choices=["English", "Yoruba", "Igbo", "Hausa"], | |
| value="English", | |
| label="Select Language" | |
| ) | |
| file_submit = gr.Button("Submit") | |
| with gr.Row(): | |
| file_transcribed = gr.Textbox(label="Transcribed Text", interactive=False) | |
| file_response = gr.Textbox(label="HypaAI's Response", interactive=False) | |
| file_audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
| file_submit.click( | |
| fn=speech_to_speech_translation, | |
| inputs=[file_input, lang_dropdown_file], | |
| outputs=[file_transcribed, file_response, file_audio_output] | |
| ) | |
| demo.launch(share=True) |