Spaces:
Sleeping
Sleeping
| import sklearn | |
| from tenacity import retry, stop_after_attempt, wait_random_exponential | |
| from tqdm import tqdm | |
| import sys | |
| # import openai | |
| import time | |
| # import pandas as pd | |
| import random | |
| import csv | |
| import os | |
| import pickle | |
| import json | |
| from langchain.chat_models import AzureChatOpenAI | |
| from langchain.schema import HumanMessage, SystemMessage | |
| from langchain.callbacks import get_openai_callback | |
| from langchain.llms import OpenAI | |
| import tiktoken | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| from collections import Counter | |
| import math | |
| import io | |
| import contextlib | |
| # os.system('pip install pandas reportlab') | |
| # os.system('pip install openai==0.27.2') | |
| # os.system('pip install tenacity') | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import ast | |
| import nltk | |
| nltk.download('punkt') | |
| nltk.download('stopwords') | |
| from nltk.tokenize import sent_tokenize | |
| from nltk.corpus import stopwords | |
| import string | |
| from nltk.tokenize import sent_tokenize | |
| from nltk.tokenize import word_tokenize | |
| import numpy as np | |
| import evaluate | |
| def tree_edit_distance(tree1, tree2): | |
| def cost(node1, node2): | |
| """ Cost to transform node1 to node2 """ | |
| if node1 == node2: | |
| return 0 | |
| return 1 | |
| def tree_size(tree): | |
| """ Calculate the size of the tree """ | |
| if not isinstance(tree, list) or not tree: | |
| return 1 | |
| return 1 + sum(tree_size(child) for child in tree) | |
| def ted(tree1, tree2): | |
| """ Compute tree edit distance between two trees """ | |
| if not isinstance(tree1, list) and not isinstance(tree2, list): | |
| return cost(tree1, tree2) | |
| if not isinstance(tree1, list): | |
| return tree_size(tree2) | |
| if not isinstance(tree2, list): | |
| return tree_size(tree1) | |
| if not tree1 and not tree2: | |
| return 0 | |
| if not tree1: | |
| return sum(tree_size(child) for child in tree2) | |
| if not tree2: | |
| return sum(tree_size(child) for child in tree1) | |
| dp = [[0] * (len(tree2) + 1) for _ in range(len(tree1) + 1)] | |
| for i in range(1, len(tree1) + 1): | |
| dp[i][0] = dp[i-1][0] + tree_size(tree1[i-1]) | |
| for j in range(1, len(tree2) + 1): | |
| dp[0][j] = dp[0][j-1] + tree_size(tree2[j-1]) | |
| for i in range(1, len(tree1) + 1): | |
| for j in range(1, len(tree2) + 1): | |
| dp[i][j] = min(dp[i-1][j] + tree_size(tree1[i-1]), | |
| dp[i][j-1] + tree_size(tree2[j-1]), | |
| dp[i-1][j-1] + ted(tree1[i-1], tree2[j-1])) | |
| return dp[len(tree1)][len(tree2)] | |
| return ted(tree1, tree2) | |
| def preprocess_code_str(code_str): | |
| prefix = "citation_bracket = {}\nsentence = {}\n" | |
| code_str = code_str.replace(" ", "") | |
| code_lines = code_str.split("\n") | |
| code_line_list = [] | |
| for line in code_lines: | |
| if "citation_bracket[" in line.split("=")[0]: | |
| code_line_list.append(line) | |
| if "sentence[" in line.split("=")[0]: | |
| code_line_list.append(line) | |
| return prefix + "\n".join(code_line_list) + "\nprint(sentence)" | |
| def run_code(code_str): | |
| # Redirect stdout to capture print statements | |
| f = io.StringIO() | |
| with contextlib.redirect_stdout(f): | |
| exec(preprocess_code_str(code_str)) | |
| # Get the standard output | |
| output = f.getvalue() | |
| return ast.literal_eval(output) | |
| def replace_with_char(input_list, char='a'): | |
| def replace_in_nested_list(nested_list): | |
| if isinstance(nested_list, list): | |
| return [replace_in_nested_list(item) for item in nested_list] | |
| else: | |
| return char | |
| return replace_in_nested_list(input_list) | |
| def top_k_keys(input_dict, k): | |
| # Sort the dictionary items by value in descending order and extract the keys | |
| sorted_keys = sorted(input_dict, key=input_dict.get, reverse=True) | |
| # Return the top-k keys | |
| return sorted_keys[:k] | |
| def keys_with_least_k_values(d, k): | |
| if k <= 0: | |
| return [] | |
| # Get the sorted list of (key, value) tuples based on the values | |
| sorted_items = sorted(d.items(), key=lambda item: item[1]) | |
| # Extract the keys of the first k items | |
| least_k_keys = [item[0] for item in sorted_items[:k]] | |
| return least_k_keys | |
| def edit_distance_code_str(code1, code2, just_tree_structure=False): | |
| # code1 = preprocess_code_str(code1) | |
| # code2 = preprocess_code_str(code2) | |
| sentence1 = run_code(code1) | |
| list_1 = [sentence1[key] for key in sentence1] | |
| sentence2 = run_code(code2) | |
| list_2 = [sentence2[key] for key in sentence2] | |
| if just_tree_structure: | |
| list_1 = replace_with_char(list_1) | |
| list_2 = replace_with_char(list_2) | |
| return tree_edit_distance(list_1, list_2) | |
| class eval_metrics: | |
| def __init__(self): | |
| pass | |
| # if is_bertscore: | |
| # pass | |
| def get_rouge_l(self, pred, refs): | |
| rouge = evaluate.load('rouge') | |
| results = rouge.compute(predictions=pred, references=refs) | |
| return results['rougeL'] | |
| def get_bleu(self, pred, refs): | |
| bleu = evaluate.load('bleu') | |
| tmp_refs = [[item] for item in refs] | |
| results = bleu.compute(predictions=pred, references=tmp_refs) | |
| return results['bleu'] | |
| def get_meteor(self, pred, refs): | |
| meteor = evaluate.load('meteor') | |
| results = meteor.compute(predictions=pred, references=refs) | |
| return results['meteor'] | |
| def get_bertscore(self, pred, refs): | |
| bertscore = evaluate.load('bertscore') | |
| results = bertscore.compute(predictions=pred, references=refs, lang = "en") | |
| return np.mean(results['f1']) | |
| def get_bleurt(self, pred, refs): | |
| bleurt = evaluate.load('bleurt', module_type="metric") | |
| # tmp_refs = [[item] for item in refs] | |
| results = bleurt.compute(predictions=pred, references=refs) | |
| return np.mean(results['scores']) | |
| class BM25: | |
| def __init__(self, documents, k1=1.5, b=0.75): | |
| self.documents = documents | |
| self.k1 = k1 | |
| self.b = b | |
| self.vectorizer = CountVectorizer().fit(documents) | |
| self.doc_term_matrix = self.vectorizer.transform(documents) | |
| self.doc_lengths = np.array(self.doc_term_matrix.sum(axis=1)).flatten() | |
| self.avg_doc_length = np.mean(self.doc_lengths) | |
| self.df = np.diff(self.doc_term_matrix.tocsc().indptr) | |
| self.idf = self.compute_idf() | |
| def compute_idf(self): | |
| N = len(self.documents) | |
| idf = np.log((N - self.df + 0.5) / (self.df + 0.5) + 1) | |
| return idf | |
| def compute_bm25(self, query): | |
| query_vec = self.vectorizer.transform([query]) | |
| scores = [] | |
| for doc_idx in range(self.doc_term_matrix.shape[0]): | |
| score = 0 | |
| for term_idx in query_vec.indices: | |
| if term_idx in self.doc_term_matrix[doc_idx].indices: | |
| tf = self.doc_term_matrix[doc_idx, term_idx] | |
| idf = self.idf[term_idx] | |
| numerator = tf * (self.k1 + 1) | |
| denominator = tf + self.k1 * (1 - self.b + self.b * (self.doc_lengths[doc_idx] / self.avg_doc_length)) | |
| score += idf * numerator / denominator | |
| scores.append(score) | |
| return scores | |
| def get_top_k(self, query, k=5): | |
| scores = self.compute_bm25(query) | |
| top_k_indices = np.argsort(scores)[::-1][:k] | |
| top_k_docs = [self.documents[i] for i in top_k_indices] | |
| return top_k_docs, top_k_indices | |
| def get_nmis(true_dict, pred_dict): | |
| labels_true = [] | |
| labels_pred = [] | |
| # print(true_dict.keys()) | |
| # print(pred_dict.keys()) | |
| # print() | |
| for key in true_dict: | |
| labels_true.append(true_dict[key]) | |
| if key not in pred_dict: | |
| labels_pred.append(-1) | |
| else: | |
| labels_pred.append(pred_dict[key]) | |
| if len(labels_pred) == 0: | |
| max_label_pred = 0 | |
| else: | |
| max_label_pred = np.max(labels_pred) + 1 | |
| for label_idx, item in enumerate(labels_pred): | |
| if item==-1: | |
| labels_pred[label_idx] = max_label_pred | |
| max_label_pred+=1 | |
| return sklearn.metrics.normalized_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred), sklearn.metrics.adjusted_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred) | |
| def calculate_precision_recall_f1(predicted, ground_truth): | |
| # print(predicted) | |
| # print() | |
| # print(ground_truth) | |
| # print("-------------") | |
| # Convert lists to sets to handle duplicates and perform set operations | |
| predicted_set = set(predicted) | |
| ground_truth_set = set(ground_truth) | |
| # Calculate true positives (intersection of predicted and ground truth) | |
| true_positives = predicted_set.intersection(ground_truth_set) | |
| # Calculate precision | |
| precision = len(true_positives) / len(predicted_set) if predicted_set else 0 | |
| # Calculate recall | |
| recall = len(true_positives) / len(ground_truth_set) if ground_truth_set else 0 | |
| # Calculate F1-score | |
| if precision + recall == 0: | |
| f1_score = 0 | |
| else: | |
| f1_score = 2 * (precision * recall) / (precision + recall) | |
| return precision, recall, f1_score | |
| def get_introduction(arxiv_id): | |
| # Step 1: Construct the URL | |
| url = f"https://ar5iv.org/html/{arxiv_id}" | |
| # Step 2: Fetch the HTML content of the page | |
| response = requests.get(url) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to fetch the page: Status code {response.status_code}") | |
| # Step 3: Parse the HTML content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Step 4: Locate the introduction section | |
| # We assume the introduction is the first section after the abstract | |
| # This heuristic might need adjustment depending on the exact structure of the paper | |
| introduction_text = "" | |
| found_introduction = False | |
| # Look for h2 tags which usually denote sections | |
| for tag in soup.find_all(['h2', 'h3']): | |
| # print(tag.text.lower()) | |
| if 'introduction' in tag.text.lower(): | |
| # print(tag.text) | |
| introduction_text += tag.text.strip() + "\n\n" | |
| next_node = tag.find_next_sibling() | |
| while next_node and next_node.name not in ['h2', 'h3']: | |
| introduction_text += next_node.get_text().strip() + "\n\n" | |
| next_node = next_node.find_next_sibling() | |
| break | |
| return introduction_text | |
| def write_to_file(filepath, content): | |
| if '.txt' in filepath: | |
| with open(filepath, 'w') as fw: | |
| fw.write(content) | |
| elif '.json' in filepath: | |
| with open(filepath, 'w') as fw: | |
| json.dump(content, fw) | |
| elif '.pickle' in filepath or '.pkl' in filepath: | |
| with open(filepath, 'wb') as fw: | |
| pickle.dump(content, fw) | |
| elif '.npy' in filepath: | |
| np.save(filepath, content) | |
| def read_from_file(filepath): | |
| if '.txt' in filepath: | |
| with open(filepath, 'r') as fr: | |
| return fr.read() | |
| elif '.json' in filepath: | |
| with open(filepath, 'r') as fr: | |
| return json.load(fr) | |
| elif '.pickle' in filepath or '.pkl' in filepath: | |
| with open(filepath, 'rb') as fr: | |
| return pickle.load(fr) | |
| elif '.npy' in filepath: | |
| return np.load(filepath) | |
| def remove_stopwords_and_punctuation(text): | |
| # Get the list of stopwords | |
| stop_words = set(stopwords.words('english')) | |
| # Remove punctuation from text | |
| text = text.translate(str.maketrans('', '', string.punctuation.replace('_', '').replace('@', ''))) | |
| # Split the text into words | |
| words = text.split() | |
| # Remove stopwords | |
| filtered_words = [word for word in words if word.lower() not in stop_words] | |
| # Join the words back into a single string | |
| filtered_text = ' '.join(filtered_words) | |
| return filtered_text | |
| class AzureModels: | |
| def __init__(self, model_name): | |
| if model_name == "gpt4": | |
| DEPLOYMENT_NAME = "gentech-gpt4-research" | |
| BASE_URL = "https://gentechworkbench-stage.openai.azure.com/" | |
| API_KEY = "f074d7f2bfdf486783db5f4605b263a6" | |
| self.model = AzureChatOpenAI( | |
| openai_api_base=BASE_URL, | |
| openai_api_version="2023-03-15-preview", | |
| deployment_name=DEPLOYMENT_NAME, | |
| openai_api_key=API_KEY, | |
| openai_api_type="azure", | |
| ) | |
| self.enc = tiktoken.encoding_for_model("gpt-4-0314") | |
| elif model_name == "gpt4o": | |
| DEPLOYMENT_NAME = "gpt-4o" | |
| BASE_URL = "https://docexpresearch.openai.azure.com/" | |
| API_KEY = "2d6dc256edd94e65a2fa4b5658651377" | |
| self.model = AzureChatOpenAI( | |
| openai_api_base=BASE_URL, | |
| openai_api_version="2023-07-01-preview", | |
| deployment_name=DEPLOYMENT_NAME, | |
| openai_api_key=API_KEY, | |
| openai_api_type="azure", | |
| ) | |
| self.enc = tiktoken.encoding_for_model("gpt-4o") | |
| def get_completion(self, question, max_tokens, stop=None): | |
| gpt_answer = self.model( | |
| [ | |
| HumanMessage( | |
| content=question | |
| ) | |
| ], max_tokens = max_tokens, stop=stop | |
| ) | |
| gpt_answer_content = gpt_answer.content # Access the content attribute | |
| # Convert the answer_content to string datatype | |
| if isinstance(gpt_answer_content, str): | |
| gpt_answer_string = gpt_answer_content # If the content is already a string, use it directly | |
| else: | |
| gpt_answer_string = str(gpt_answer_content) # Convert to string if it's not already a string | |
| return gpt_answer_string | |
| def get_num_inp_tokens(self, inp): | |
| tokens = self.enc.encode(inp) | |
| return len(tokens) | |