| from google.cloud import storage | |
| #storage_client = storage.Client() | |
| storage_client = storage.Client.create_anonymous_client() | |
| bucket_name = "docs-axio-clara" | |
| from langchain_community.vectorstores import Annoy | |
| from langchain_community.document_loaders import TextLoader | |
| from langchain_text_splitters import CharacterTextSplitter | |
| from climateqa.engine.embeddings import get_embeddings_function | |
| embeddings_function = get_embeddings_function() | |
| import os | |
| import pdfplumber | |
| def get_PDF_Names_from_GCP(): | |
| listName = [] | |
| # Récupération des fichier depuis GCP storage | |
| blobs = storage_client.list_blobs(bucket_name, prefix='sources/') | |
| for blob in blobs: | |
| listName.append(blob.name) | |
| return listName | |
| def get_PDF_from_GCP(folder_path, pdf_folder="./PDF"): | |
| # Récupération des fichier depuis GCP storage | |
| blobs = storage_client.list_blobs(bucket_name, prefix='sources/') | |
| for blob in blobs: | |
| print( "\n"+blob.name+":") | |
| print( " <- Téléchargement Depuis GCP") | |
| blob.download_to_filename(pdf_folder+"/"+blob.name) | |
| # Extraction des textes dpuis les fichiers PDF | |
| print(" >>> Extraction PDF") | |
| for pdf_file in os.listdir(pdf_folder): | |
| if pdf_file.startswith("."): | |
| continue | |
| print(" > "+pdf_folder+"/"+pdf_file) | |
| pdf_total_pages = 0 | |
| with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: | |
| pdf_total_pages = len(pdf.pages) | |
| # Fuite mémoire pour les gros fichiers | |
| # Reouvrir le fichier à chaque N page semble rélgler le problème | |
| N_page = 300 | |
| page_number = 0 | |
| while page_number < pdf_total_pages: | |
| print(" -- ouverture du fichier pour "+str(N_page)+ " pages --" ) | |
| with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: | |
| npage = 0 | |
| while (npage < N_page and page_number < pdf_total_pages) : | |
| print(" >>> "+str(page_number+1)) | |
| f = open(folder_path+"/"+pdf_file+"..:page:.."+str(page_number+1), "w") | |
| for char_pdf in pdf.pages[page_number].chars: | |
| f.write(char_pdf["text"]) | |
| f.close() | |
| npage = npage + 1 | |
| page_number = page_number + 1 | |
| print(" X removing: " + blob.name ) | |
| os.remove(pdf_folder+"/"+blob.name) | |
| def build_vectores_stores(folder_path, pdf_folder="./PDF", vectors_path = "./vectors"): | |
| if os.path.isfile(vectors_path+"/index.annoy"): | |
| return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) | |
| try: | |
| os.mkdir(vectors_path) | |
| except: | |
| pass | |
| try: | |
| # Récupération des fichier depuis GCP storage | |
| blobs = storage_client.list_blobs(bucket_name, prefix='testvectors/') | |
| for blob in blobs: | |
| print( "\n"+blob.name.split("/")[-1]+":") | |
| print( " <- Téléchargement Depuis GCP") | |
| blob.download_to_filename(vectors_path+"/"+blob.name.split("/")[-1]) | |
| except: | |
| pass | |
| # TODO A FUNCTION FOR THAT TO AVOID CODE DUPLICATION | |
| if os.path.isfile(vectors_path+"/index.annoy"): | |
| return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) | |
| print("MISSING VECTORS") | |
| exit(0) | |
| # get_PDF_from_GCP(folder_path, pdf_folder) | |
| # print(" Vectorisation ...") | |
| # docs = [] | |
| # vector_store_from_docs = () # Créer un nouvel objet Annoy ou utiliser celui déjà initialisé selon votre code existant | |
| # for filename in os.listdir(folder_path): | |
| # if filename.startswith("."): | |
| # continue | |
| # file_path = os.path.join(folder_path, filename) | |
| # if os.path.isfile(file_path): | |
| # loader = TextLoader(file_path) | |
| # documents = loader.load() | |
| # | |
| # for doc in documents: | |
| # if (doc.metadata): | |
| # doc.metadata["ax_page"] = doc.metadata['source'].split("..:page:..")[-1] | |
| # doc.metadata["ax_name"] = doc.metadata['source'].split("..:page:..")[0].split("/")[-1] | |
| # doc.metadata["ax_url"] = "https://storage.googleapis.com/docs-axio-clara/sources/"+doc.metadata["ax_name"] | |
| # | |
| # text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
| # docs += text_splitter.split_documents(documents) | |
| # vector_store_from_docs = Annoy.from_documents(docs, embeddings_function) | |
| # vector_store_from_docs.save_local(vectors_path) | |
| # return vector_store_from_docs | |
| # Pinecone | |
| # More info at https://docs.pinecone.io/docs/langchain | |
| # And https://python.langchain.com/docs/integrations/vectorstores/pinecone | |
| #import os | |
| #from pinecone import Pinecone | |
| #from langchain_community.vectorstores import Pinecone as PineconeVectorstore | |
| # LOAD ENVIRONMENT VARIABLES | |
| #try: | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| #except: | |
| # pass | |
| #def get_pinecone_vectorstore(embeddings,text_key = "content"): | |
| # # initialize pinecone | |
| # pinecone.init( | |
| # api_key=os.getenv("PINECONE_API_KEY"), # find at app.pinecone.io | |
| # environment=os.getenv("PINECONE_API_ENVIRONMENT"), # next to api key in console | |
| # ) | |
| # index_name = os.getenv("PINECONE_API_INDEX") | |
| # vectorstore = Pinecone.from_existing_index(index_name, embeddings,text_key = text_key) | |
| # return vectorstore | |
| # pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) | |
| # index = pc.Index(os.getenv("PINECONE_API_INDEX")) | |
| # vectorstore = PineconeVectorstore( | |
| # index, embeddings, text_key, | |
| # ) | |
| # return vectorstore | |
| # def get_pinecone_retriever(vectorstore,k = 10,namespace = "vectors",sources = ["IPBES","IPCC"]): | |
| # assert isinstance(sources,list) | |
| # # Check if all elements in the list are either IPCC or IPBES | |
| # filter = { | |
| # "source": { "$in":sources}, | |
| # } | |
| # retriever = vectorstore.as_retriever(search_kwargs={ | |
| # "k": k, | |
| # "namespace":"vectors", | |
| # "filter":filter | |
| # }) | |
| # return retriever |