import requests from bs4 import BeautifulSoup import json import time import os CATEGORY_DICT = { "A": "quant-ph", "B": "physics.chem-ph", "C": "physics.atom-ph", "D": "cond-mat.soft", "E": "cs.RO", "F": "cs.CL", "G": "cs.SE", "H": "cs.IR", "I": "hep-th", "J": "hep-ph", "K": "physics.optics", "L": "cs.AI", "M": "cs.CV", "N": "nucl-th", "O": "astro-ph", "P": "math.PR", "Q": "cs.OS", "R": "eess.SP", "S": "math.OC", "T": "math.DS", "U": "math.DG", "V": "math.MP", "W": "cs.MM", "X": "stat.ME", "Y": "math.CO", "Z": "cs.NE" } def fetch_arxiv_papers_batch(query, start, max_results=100): """ 从arXiv获取一批论文数据 Args: query: 搜索查询 start: 起始位置 max_results: 本次获取结果数(arXiv API最大支持10000) """ base_url = "http://export.arxiv.org/api/query" params = { "search_query": query, "start": start, "max_results": max_results } try: response = requests.get(base_url, params=params, timeout=30) if response.status_code == 200: soup = BeautifulSoup(response.content, "xml") entries = soup.find_all("entry") papers = [] for entry in entries: title = entry.title.text.strip() summary = entry.summary.text.strip() # 获取作者信息 authors = entry.find_all("author") author_names = [] for author in authors: name = author.find("name") if name: author_names.append(name.text.strip()) # 获取分类信息 categories = entry.find_all("category") category_list = [cat.get("term") for cat in categories] # 获取论文ID和链接 paper_id = entry.id.text.strip() published = entry.published.text.strip() if entry.published else "" updated = entry.updated.text.strip() if entry.updated else "" # 构建论文数据结构 paper_data = { "id": paper_id, "title": title, "authors": author_names, "summary": summary, "categories": category_list, "published": published, "updated": updated } papers.append(paper_data) return papers else: print(f"请求失败,状态码: {response.status_code}") return [] except Exception as e: print(f"请求异常: {e}") return [] def save_papers_to_jsonl(papers, category_code, category_name): """ 将论文数据保存为JSONL格式文件 Args: papers: 论文数据列表 category_code: 类别代码(如"A") category_name: 类别名称(如"quant-ph") """ # 创建统一的子文件夹 folder_name = "arxiv_papers" os.makedirs(folder_name, exist_ok=True) # 文件路径 filename = f"arxiv_papers_{category_code}_{category_name.replace('.', '_')}.jsonl" file_path = os.path.join(folder_name, filename) with open(file_path, 'a', encoding='utf-8') as f: for paper in papers: f.write(json.dumps(paper, ensure_ascii=False) + '\n') print(f"已追加保存 {len(papers)} 条数据到 {file_path}") def crawl_category(category_code, category_name, target_count=500): """ 爬取单个类别的论文数据 Args: category_code: 类别代码 category_name: 类别名称 target_count: 目标论文数量 """ query = f"cat:{category_name}" collected_count = 0 start = 0 batch_size = 100 # 每批获取的论文数量 print(f"开始爬取类别 {category_code} ({category_name}) 的论文...") while collected_count < target_count: needed_count = min(batch_size, target_count - collected_count) print(f"正在获取 {collected_count+1} 到 {collected_count+needed_count} 篇论文...") papers = fetch_arxiv_papers_batch(query, start, needed_count) if not papers: print("未获取到更多论文,停止爬取") break # 保存这批论文 save_papers_to_jsonl(papers, category_code, category_name) collected_count += len(papers) start += len(papers) print(f"当前已获取 {collected_count} 篇论文") # 避免请求过于频繁 time.sleep(3) print(f"完成类别 {category_code} ({category_name}) 的爬取,共获取 {collected_count} 篇论文\n") def main(): """ 主函数:遍历所有类别进行爬取 """ for category_code, category_name in CATEGORY_DICT.items(): try: crawl_category(category_code, category_name, target_count=500) except Exception as e: print(f"爬取类别 {category_code} ({category_name}) 时出现错误: {e}") continue if __name__ == "__main__": main()