|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
import logging
|
|
|
import os
|
|
|
import tempfile
|
|
|
|
|
|
import pandas as pd
|
|
|
from huggingface_hub import hf_hub_download
|
|
|
from huggingface_hub.utils import EntryNotFoundError
|
|
|
|
|
|
from verl.utils.hdfs_io import copy, makedirs
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
DEFAULT_SYSTEM_CONTENT = "You are a helpful and harmless assistant."
|
|
|
DEFAULT_USER_CONTENT_PREFIX = (
|
|
|
"Answer the given question. You must conduct reasoning inside <think> and </think> "
|
|
|
"first every time you get new information. After reasoning, if you find you lack "
|
|
|
"some knowledge, you can call a search engine by <tool_call> query </tool_call> "
|
|
|
"and it will return the top searched results between <tool_response> and "
|
|
|
"</tool_response>. You can search as many times as your want. If you find no "
|
|
|
"further external knowledge needed, you can directly provide the answer inside "
|
|
|
"<answer> and </answer>, without detailed illustrations. For example, "
|
|
|
"<answer> Beijing </answer>. Question: "
|
|
|
)
|
|
|
|
|
|
|
|
|
def process_single_row(row, current_split_name, row_index):
|
|
|
"""
|
|
|
Process a single row of data for SearchR1-like format.
|
|
|
|
|
|
Args:
|
|
|
row: DataFrame row containing the original data
|
|
|
current_split_name: Name of the current split (train/test)
|
|
|
row_index: Index of the row in the DataFrame
|
|
|
|
|
|
Returns:
|
|
|
pd.Series: Processed row data in the required format
|
|
|
"""
|
|
|
question = row.get("question", "")
|
|
|
|
|
|
|
|
|
user_content = user_content_prefix.rstrip("\n") + question
|
|
|
prompt = [{"role": "system", "content": system_content}, {"role": "user", "content": user_content}]
|
|
|
|
|
|
|
|
|
reward_model_data = row.get("reward_model")
|
|
|
if isinstance(reward_model_data, dict) and "ground_truth" in reward_model_data:
|
|
|
ground_truth = reward_model_data.get("ground_truth")
|
|
|
else:
|
|
|
ground_truth = row.get("golden_answers", [])
|
|
|
|
|
|
|
|
|
data_source_tagged = "searchR1_" + str(row.get("data_source", ""))
|
|
|
|
|
|
|
|
|
tools_kwargs = {"search": {"create_kwargs": {"ground_truth": ground_truth, "question": question, "data_source": data_source_tagged}}}
|
|
|
|
|
|
|
|
|
extra_info = {
|
|
|
"index": row_index,
|
|
|
"need_tools_kwargs": True,
|
|
|
"question": question,
|
|
|
"split": current_split_name,
|
|
|
"tools_kwargs": tools_kwargs,
|
|
|
}
|
|
|
|
|
|
return pd.Series(
|
|
|
{
|
|
|
"data_source": data_source_tagged,
|
|
|
"prompt": prompt,
|
|
|
"ability": row.get("ability"),
|
|
|
"reward_model": reward_model_data,
|
|
|
"extra_info": extra_info,
|
|
|
"metadata": row.get("metadata"),
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
local_save_dir = os.path.expanduser(args.local_dir)
|
|
|
os.makedirs(local_save_dir, exist_ok=True)
|
|
|
|
|
|
processed_files = []
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_download_dir:
|
|
|
for split in ["train", "test"]:
|
|
|
parquet_filename = f"{split}.parquet"
|
|
|
logger.info(f"Processing {split} split...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
logger.info(f"Downloading {parquet_filename} from {args.hf_repo_id}")
|
|
|
local_parquet_filepath = hf_hub_download(
|
|
|
repo_id=args.hf_repo_id,
|
|
|
filename=parquet_filename,
|
|
|
repo_type="dataset",
|
|
|
local_dir=tmp_download_dir,
|
|
|
local_dir_use_symlinks=False,
|
|
|
)
|
|
|
|
|
|
|
|
|
df_raw = pd.read_parquet(local_parquet_filepath)
|
|
|
logger.info(f"Loaded {len(df_raw)} rows from {parquet_filename}")
|
|
|
|
|
|
def apply_process_row(row, split_name=split):
|
|
|
return process_single_row(row, current_split_name=split_name, row_index=row.name)
|
|
|
|
|
|
df_processed = df_raw.apply(apply_process_row, axis=1)
|
|
|
|
|
|
|
|
|
output_file_path = os.path.join(local_save_dir, f"{split}.parquet")
|
|
|
df_processed.to_parquet(output_file_path, index=False)
|
|
|
logger.info(f"Saved {len(df_processed)} processed rows to {output_file_path}")
|
|
|
processed_files.append(output_file_path)
|
|
|
|
|
|
except EntryNotFoundError:
|
|
|
logger.warning(f"{parquet_filename} not found in repository {args.hf_repo_id}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing {split} split: {e}")
|
|
|
|
|
|
if not processed_files:
|
|
|
logger.warning("No data was processed or saved")
|
|
|
return
|
|
|
|
|
|
logger.info(f"Successfully processed {len(processed_files)} files to {local_save_dir}")
|
|
|
|
|
|
|
|
|
if args.hdfs_dir:
|
|
|
try:
|
|
|
makedirs(args.hdfs_dir)
|
|
|
copy(src=local_save_dir, dst=args.hdfs_dir)
|
|
|
logger.info(f"Successfully copied files to HDFS: {args.hdfs_dir}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error copying files to HDFS: {e}")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
parser = argparse.ArgumentParser(description="Download Search-R1 from HuggingFace, process, and save to Parquet.")
|
|
|
parser.add_argument("--hf_repo_id", default="PeterJinGo/nq_hotpotqa_train", help="HuggingFace dataset repository ID.")
|
|
|
parser.add_argument("--local_dir", default="~/data/searchR1_processed_direct", help="Local directory to save the processed Parquet files.")
|
|
|
parser.add_argument("--hdfs_dir", default=None, help="Optional HDFS directory to copy the Parquet files to.")
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
system_content = DEFAULT_SYSTEM_CONTENT
|
|
|
user_content_prefix = DEFAULT_USER_CONTENT_PREFIX
|
|
|
|
|
|
main()
|
|
|
|