176 lines
5.3 KiB
Python
176 lines
5.3 KiB
Python
import requests
|
||
from bs4 import BeautifulSoup
|
||
import json
|
||
import time
|
||
import os
|
||
|
||
CATEGORY_DICT = {
|
||
"A": "quant-ph",
|
||
"B": "physics.chem-ph",
|
||
"C": "physics.atom-ph",
|
||
"D": "cond-mat.soft",
|
||
"E": "cs.RO",
|
||
"F": "cs.CL",
|
||
"G": "cs.SE",
|
||
"H": "cs.IR",
|
||
"I": "hep-th",
|
||
"J": "hep-ph",
|
||
"K": "physics.optics",
|
||
"L": "cs.AI",
|
||
"M": "cs.CV",
|
||
"N": "nucl-th",
|
||
"O": "astro-ph",
|
||
"P": "math.PR",
|
||
"Q": "cs.OS",
|
||
"R": "eess.SP",
|
||
"S": "math.OC",
|
||
"T": "math.DS",
|
||
"U": "math.DG",
|
||
"V": "math.MP",
|
||
"W": "cs.MM",
|
||
"X": "stat.ME",
|
||
"Y": "math.CO",
|
||
"Z": "cs.NE"
|
||
}
|
||
|
||
def fetch_arxiv_papers_batch(query, start, max_results=100):
|
||
"""
|
||
从arXiv获取一批论文数据
|
||
|
||
Args:
|
||
query: 搜索查询
|
||
start: 起始位置
|
||
max_results: 本次获取结果数(arXiv API最大支持10000)
|
||
"""
|
||
base_url = "http://export.arxiv.org/api/query"
|
||
params = {
|
||
"search_query": query,
|
||
"start": start,
|
||
"max_results": max_results
|
||
}
|
||
|
||
try:
|
||
response = requests.get(base_url, params=params, timeout=30)
|
||
|
||
if response.status_code == 200:
|
||
soup = BeautifulSoup(response.content, "xml")
|
||
entries = soup.find_all("entry")
|
||
papers = []
|
||
|
||
for entry in entries:
|
||
title = entry.title.text.strip()
|
||
summary = entry.summary.text.strip()
|
||
|
||
# 获取作者信息
|
||
authors = entry.find_all("author")
|
||
author_names = []
|
||
for author in authors:
|
||
name = author.find("name")
|
||
if name:
|
||
author_names.append(name.text.strip())
|
||
|
||
# 获取分类信息
|
||
categories = entry.find_all("category")
|
||
category_list = [cat.get("term") for cat in categories]
|
||
|
||
# 获取论文ID和链接
|
||
paper_id = entry.id.text.strip()
|
||
published = entry.published.text.strip() if entry.published else ""
|
||
updated = entry.updated.text.strip() if entry.updated else ""
|
||
|
||
# 构建论文数据结构
|
||
paper_data = {
|
||
"id": paper_id,
|
||
"title": title,
|
||
"authors": author_names,
|
||
"summary": summary,
|
||
"categories": category_list,
|
||
"published": published,
|
||
"updated": updated
|
||
}
|
||
|
||
papers.append(paper_data)
|
||
|
||
return papers
|
||
else:
|
||
print(f"请求失败,状态码: {response.status_code}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"请求异常: {e}")
|
||
return []
|
||
|
||
def save_papers_to_jsonl(papers, category_code, category_name):
|
||
"""
|
||
将论文数据保存为JSONL格式文件
|
||
|
||
Args:
|
||
papers: 论文数据列表
|
||
category_code: 类别代码(如"A")
|
||
category_name: 类别名称(如"quant-ph")
|
||
"""
|
||
# 创建统一的子文件夹
|
||
folder_name = "arxiv_papers"
|
||
os.makedirs(folder_name, exist_ok=True)
|
||
|
||
# 文件路径
|
||
filename = f"arxiv_papers_{category_code}_{category_name.replace('.', '_')}.jsonl"
|
||
file_path = os.path.join(folder_name, filename)
|
||
|
||
with open(file_path, 'a', encoding='utf-8') as f:
|
||
for paper in papers:
|
||
f.write(json.dumps(paper, ensure_ascii=False) + '\n')
|
||
|
||
print(f"已追加保存 {len(papers)} 条数据到 {file_path}")
|
||
|
||
def crawl_category(category_code, category_name, target_count=500):
|
||
"""
|
||
爬取单个类别的论文数据
|
||
|
||
Args:
|
||
category_code: 类别代码
|
||
category_name: 类别名称
|
||
target_count: 目标论文数量
|
||
"""
|
||
query = f"cat:{category_name}"
|
||
collected_count = 0
|
||
start = 0
|
||
batch_size = 100 # 每批获取的论文数量
|
||
|
||
print(f"开始爬取类别 {category_code} ({category_name}) 的论文...")
|
||
|
||
while collected_count < target_count:
|
||
needed_count = min(batch_size, target_count - collected_count)
|
||
print(f"正在获取 {collected_count+1} 到 {collected_count+needed_count} 篇论文...")
|
||
|
||
papers = fetch_arxiv_papers_batch(query, start, needed_count)
|
||
|
||
if not papers:
|
||
print("未获取到更多论文,停止爬取")
|
||
break
|
||
|
||
# 保存这批论文
|
||
save_papers_to_jsonl(papers, category_code, category_name)
|
||
|
||
collected_count += len(papers)
|
||
start += len(papers)
|
||
|
||
print(f"当前已获取 {collected_count} 篇论文")
|
||
|
||
# 避免请求过于频繁
|
||
time.sleep(3)
|
||
|
||
print(f"完成类别 {category_code} ({category_name}) 的爬取,共获取 {collected_count} 篇论文\n")
|
||
|
||
def main():
|
||
"""
|
||
主函数:遍历所有类别进行爬取
|
||
"""
|
||
for category_code, category_name in CATEGORY_DICT.items():
|
||
try:
|
||
crawl_category(category_code, category_name, target_count=500)
|
||
except Exception as e:
|
||
print(f"爬取类别 {category_code} ({category_name}) 时出现错误: {e}")
|
||
continue
|
||
|
||
if __name__ == "__main__":
|
||
main() |