Files
data-prepare/crawl-arxiv.py

176 lines
5.3 KiB
Python
Raw Normal View History

import requests
from bs4 import BeautifulSoup
import json
import time
import os
CATEGORY_DICT = {
"A": "quant-ph",
"B": "physics.chem-ph",
"C": "physics.atom-ph",
"D": "cond-mat.soft",
"E": "cs.RO",
"F": "cs.CL",
"G": "cs.SE",
"H": "cs.IR",
"I": "hep-th",
"J": "hep-ph",
"K": "physics.optics",
"L": "cs.AI",
"M": "cs.CV",
"N": "nucl-th",
"O": "astro-ph",
"P": "math.PR",
"Q": "cs.OS",
"R": "eess.SP",
"S": "math.OC",
"T": "math.DS",
"U": "math.DG",
"V": "math.MP",
"W": "cs.MM",
"X": "stat.ME",
"Y": "math.CO",
"Z": "cs.NE"
}
def fetch_arxiv_papers_batch(query, start, max_results=100):
"""
从arXiv获取一批论文数据
Args:
query: 搜索查询
start: 起始位置
max_results: 本次获取结果数arXiv API最大支持10000
"""
base_url = "http://export.arxiv.org/api/query"
params = {
"search_query": query,
"start": start,
"max_results": max_results
}
try:
response = requests.get(base_url, params=params, timeout=30)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "xml")
entries = soup.find_all("entry")
papers = []
for entry in entries:
title = entry.title.text.strip()
summary = entry.summary.text.strip()
# 获取作者信息
authors = entry.find_all("author")
author_names = []
for author in authors:
name = author.find("name")
if name:
author_names.append(name.text.strip())
# 获取分类信息
categories = entry.find_all("category")
category_list = [cat.get("term") for cat in categories]
# 获取论文ID和链接
paper_id = entry.id.text.strip()
published = entry.published.text.strip() if entry.published else ""
updated = entry.updated.text.strip() if entry.updated else ""
# 构建论文数据结构
paper_data = {
"id": paper_id,
"title": title,
"authors": author_names,
"summary": summary,
"categories": category_list,
"published": published,
"updated": updated
}
papers.append(paper_data)
return papers
else:
print(f"请求失败,状态码: {response.status_code}")
return []
except Exception as e:
print(f"请求异常: {e}")
return []
def save_papers_to_jsonl(papers, category_code, category_name):
"""
将论文数据保存为JSONL格式文件
Args:
papers: 论文数据列表
category_code: 类别代码"A"
category_name: 类别名称"quant-ph"
"""
# 创建统一的子文件夹
folder_name = "arxiv_papers"
os.makedirs(folder_name, exist_ok=True)
# 文件路径
filename = f"arxiv_papers_{category_code}_{category_name.replace('.', '_')}.jsonl"
file_path = os.path.join(folder_name, filename)
with open(file_path, 'a', encoding='utf-8') as f:
for paper in papers:
f.write(json.dumps(paper, ensure_ascii=False) + '\n')
print(f"已追加保存 {len(papers)} 条数据到 {file_path}")
def crawl_category(category_code, category_name, target_count=500):
"""
爬取单个类别的论文数据
Args:
category_code: 类别代码
category_name: 类别名称
target_count: 目标论文数量
"""
query = f"cat:{category_name}"
collected_count = 0
start = 0
batch_size = 100 # 每批获取的论文数量
print(f"开始爬取类别 {category_code} ({category_name}) 的论文...")
while collected_count < target_count:
needed_count = min(batch_size, target_count - collected_count)
print(f"正在获取 {collected_count+1}{collected_count+needed_count} 篇论文...")
papers = fetch_arxiv_papers_batch(query, start, needed_count)
if not papers:
print("未获取到更多论文,停止爬取")
break
# 保存这批论文
save_papers_to_jsonl(papers, category_code, category_name)
collected_count += len(papers)
start += len(papers)
print(f"当前已获取 {collected_count} 篇论文")
# 避免请求过于频繁
time.sleep(3)
print(f"完成类别 {category_code} ({category_name}) 的爬取,共获取 {collected_count} 篇论文\n")
def main():
"""
主函数遍历所有类别进行爬取
"""
for category_code, category_name in CATEGORY_DICT.items():
try:
crawl_category(category_code, category_name, target_count=500)
except Exception as e:
print(f"爬取类别 {category_code} ({category_name}) 时出现错误: {e}")
continue
if __name__ == "__main__":
main()