# _*_ coding : UTF-8 _*_ # @Time : 2024/11/20 21:49 # @UpdateTime : 2024/11/20 21:49 # @Author : haochen zhong # @File : CrawlZhongguoshehuibao.py # @Software : PyCharm # @Comment : 本程序采集中国社会报数据 import asyncio import random import re from datetime import datetime, timedelta, time from bs4 import BeautifulSoup from httpx import AsyncClient from motor.motor_asyncio import AsyncIOMotorClient start_date = datetime.strptime('2022-12-01', '%Y-%m-%d') """中国社会报2022年12月01日开始有数据""" end_date = datetime.today() """截止到今天""" headers = { 'User-Agent': 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.42'} # 正则表达式提取年份、月份和数组内容 pattern = r"_htep_(\d{4})_(\d{1,2})=new Array\((.*?)\);" pattern_url = r']*>' # 链接数据库 client = AsyncIOMotorClient('mongodb://localhost:27017') db = client['buweijiguanbao'] collection = db['zhongguoshehuibao'] async def main(): collection_names = await db.list_collection_names() # 判断数据表是否存在 if "zhongguoshehuibao" not in collection_names: # 如果不存在,则从2017年9月开始爬取 print("中国社会报数据表不存在,开始采集!") await getData(start_date, end_date) else: # 如果存在,则从数据库中获取最后一条记录的日期 last_record = await collection.find_one({}, sort=[('release_time', -1)]) last_date_str = last_record['release_time'] print("数据库截止时间:", last_date_str) await getData(last_date_str, end_date) async def getContent(soup: BeautifulSoup) -> str: """ :param soup: BeautifulSoup对象 :return: 文章内容 """ content = "" for p in soup.select("#articleFont p"): para = p.text.strip() if para: content += para content += '\n' return content async def seconds_until_next_allowed_time() -> int: """计算到下一个可运行时间的秒数""" now = datetime.now() current_time = now.time() start_time = time(7, 0, 0) end_time = time(23, 0, 0) if current_time < start_time: # 当前时间早于可运行时间,计算到 07:00:00 的时间差 next_run = datetime.combine(now.date(), start_time) elif current_time > end_time: # 当前时间晚于可运行时间,计算到第二天 07:00:00 的时间差 next_run = datetime.combine(now.date() + timedelta(days=1), start_time) else: # 当前时间在可运行时间内 return 0 delta = next_run - now return int(delta.total_seconds()) async def loading(): """ 等待程序 :return: """ # 获取当前时间 now = datetime.now().time() # 定义时间范围 start_time = time(7, 0, 0) # 07:00:00 end_time = time(23, 0, 0) # 23:00:00 # 判断当前时间是否在范围内 if start_time <= now <= end_time: # print("当前时间在07:00:00--23:00:00范围内,中国可正常采集!") return True else: print("当前时间不在07:00:00--23:00:00范围内,中国社会报无法采集") awaitTime = await seconds_until_next_allowed_time() """等待时间""" print(f"等待{awaitTime}秒后继续采集") await asyncio.sleep(awaitTime) async def getData(start_date: datetime, end_date: datetime): """" :param start_date: 开始日期 :param end_date: 结束日期 :return: None """ crawl_num = 0 date_url = "https://epaper.shehuiwang.cn/epaper/zgshb/pubdate.js" async with AsyncClient(headers=headers, timeout=60) as client: await loading() response = await client.get(date_url) response.encoding = response.charset_encoding js_text = response.text dayList = [] for item in js_text.split("\n"): matches = re.findall(pattern, item.strip()) # 解析匹配到的内容 for year, month, data in matches: if (datetime(int(year), int(month), 1) - start_date).days < 0: continue # 将数组数据转为列表 data_array = list(map(int, data.split(','))) for i, value in enumerate(data_array): current_date = datetime(int(year), int(month), 1) + timedelta(days=i) if value: dayList.append(current_date) for date in dayList: date_now_s = date.strftime('%Y/%m/%d') base_url = f"https://epaper.shehuiwang.cn/epaper/zgshb/{date_now_s}/" url = base_url + "pub_index.html" """https://epaper.shehuiwang.cn/epaper/zgshb/2022/11/23/pub_index.html""" print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), url) try: await loading() response = await client.get(url, follow_redirects=True) response.encoding = response.charset_encoding match = re.search(pattern_url, response.text, re.IGNORECASE) if match: url = "https://epaper.shehuiwang.cn" + match.group(1) response = await client.get(url) response.encoding = response.charset_encoding print(f"一级连接状态:{response.status_code}") if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') for item in soup.select(".listTitle a"): banmianming = item.text.split(":")[-1] banmianhao = item.text.split(":")[0] url1 = "https://epaper.shehuiwang.cn" + item.get("href") print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), url1) await loading() response1 = await client.get(url1) response1.encoding = response1.charset_encoding print(f"二级连接状态:{response1.status_code}") if response1.status_code == 200: soup1 = BeautifulSoup(response1.text, 'lxml') for item2 in soup1.select(".contentNews .humor a"): title = item2.text.strip() url2 = "https://epaper.shehuiwang.cn" + "/".join( item.get("href").split("/")[:-1]) + "/" + item2.get("href") if await collection.find_one({"detail_url": url2}, {"_id": False}): continue print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), url2) await loading() response2 = await client.get(url2) response2.encoding = response2.charset_encoding print(f"三级连接状态:{response2.status_code}") if response2.status_code == 200: soup2 = BeautifulSoup(response2.text, 'lxml') try: title = soup2.select_one(".articleTitle").text.strip() except: title = title try: subtitle = soup2.select(".articleTitle2")[-1].text.strip() preTitle = soup2.select(".articleTitle2")[0].text.strip() except: subtitle = "" preTitle = "" content = await getContent(soup2) await collection.insert_one({ "title": title, "subtitle": subtitle, "preTitle": preTitle, "author": "empty", "banmianming": banmianming, "banmianhao": banmianhao, 'keywordlist': 'empty', 'detail_url': url2, 'release_time': date, 'insert_timestamp': datetime.today(), 'content': content }) crawl_num += 1 print( f"中国社会报---{date_now_s}---{banmianming}---{banmianhao}---{title}---采集完成!") await asyncio.sleep(random.randint(5, 15)) print(f"中国社会报---{date_now_s}---{banmianming}---{banmianhao}----采集完成!") await asyncio.sleep(random.randint(5, 15)) print(f"中国社会报---{date_now_s}-----采集完成!") await asyncio.sleep(random.randint(5, 15)) except Exception as e: await collection.insert_one( {'banmianhao': 'empty', 'banmianming': 'empty', 'preTitle': 'empty', 'title': 'empty', 'subtitle': 'empty', 'author': 'empty', 'keywordlist': 'empty', 'detail_url': url, 'release_time': date, 'insert_timestamp': datetime.today(), 'content': 'empty'} ) print(e) print(f"中国社会报采集完毕,共采集{crawl_num}条数据!") asyncio.run(main())