网站建设免备案免费空间,从化定制型网站建设,网易做网站,工商年检网上申报系统引言#xff1a;微博热搜背后的数据价值微博热搜榜作为中国社交媒体最热门的实时话题指标#xff0c;每天吸引数亿用户关注。它不仅反映了当前的社会热点和舆论动向#xff0c;更是网络营销、舆情分析、趋势预测的重要数据源。本文将详细介绍如何使用Python最新技术栈构建一…引言微博热搜背后的数据价值微博热搜榜作为中国社交媒体最热门的实时话题指标每天吸引数亿用户关注。它不仅反映了当前的社会热点和舆论动向更是网络营销、舆情分析、趋势预测的重要数据源。本文将详细介绍如何使用Python最新技术栈构建一个高性能的微博热搜榜爬虫系统实现数据的实时采集与持久化存储。技术选型与架构设计核心技术栈异步框架aiohttpasyncio- 实现高并发请求HTML解析BeautifulSoup4lxml- 高效解析DOM结构数据存储SQLAlchemyAlembic- ORM映射与数据库迁移反爬对抗playwright 代理池 请求头轮换监控调度APScheduler- 定时任务管理配置管理pydantic- 数据验证与配置管理系统架构设计text微博热搜采集系统架构 1. 调度层APScheduler定时触发采集任务 2. 采集层异步爬虫集群分布式代理支持 3. 解析层多策略解析器支持页面结构变化 4. 存储层MySQL/PostgreSQL Redis缓存 5. 监控层日志记录 异常报警完整代码实现1. 环境配置与依赖安装python# requirements.txt aiohttp3.9.1 beautifulsoup44.12.2 sqlalchemy2.0.23 alembic1.12.1 playwright1.40.0 apscheduler3.10.4 pydantic2.5.0 pydantic-settings2.1.0 redis5.0.1 pandas2.1.4 httpx0.25.2 tenacity8.2.3 loguru0.7.22. 配置管理系统python# config.py from pydantic_settings import BaseSettings from pydantic import Field, RedisDsn, PostgresDsn from typing import List, Optional import os class Settings(BaseSettings): 应用配置类 # 数据库配置 database_url: PostgresDsn Field( defaultpostgresql://user:passwordlocalhost:5432/weibo_hot ) redis_url: RedisDsn Field( defaultredis://localhost:6379/0 ) # 爬虫配置 weibo_hot_url: str https://s.weibo.com/top/summary request_timeout: int 30 max_concurrent: int 10 retry_times: int 3 # 代理配置 proxy_enabled: bool False proxy_pool: List[str] [] # 请求头配置 user_agents: List[str] [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, ] # 采集频率 crawl_interval: int 300 # 5分钟 class Config: env_file .env env_file_encoding utf-8 settings Settings()3. 数据模型定义python# models.py from sqlalchemy import ( Column, Integer, String, DateTime, Float, Text, Boolean, Index, BigInteger ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func from datetime import datetime import json Base declarative_base() class HotSearchItem(Base): 热搜条目数据模型 __tablename__ hot_search_items id Column(BigInteger, primary_keyTrue, autoincrementTrue) rank Column(Integer, nullableFalse, comment排名) keyword Column(String(255), nullableFalse, comment关键词) url Column(String(500), comment微博链接) tag Column(String(50), comment标签热、新、爆等) hot_value Column(Integer, comment热度值) category Column(String(50), comment分类) timestamp Column(DateTime, defaultdatetime.now, comment采集时间) created_at Column(DateTime, defaultdatetime.now, comment创建时间) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) # 添加复合索引 __table_args__ ( Index(idx_timestamp_rank, timestamp, rank), Index(idx_keyword_timestamp, keyword, timestamp), ) def to_dict(self): 转换为字典 return { id: self.id, rank: self.rank, keyword: self.keyword, url: self.url, tag: self.tag, hot_value: self.hot_value, category: self.category, timestamp: self.timestamp.isoformat() if self.timestamp else None, } class HotSearchHistory(Base): 热搜历史快照 __tablename__ hot_search_history id Column(BigInteger, primary_keyTrue, autoincrementTrue) snapshot_data Column(Text, comment快照数据JSON) total_items Column(Integer, comment总条目数) timestamp Column(DateTime, defaultdatetime.now, comment快照时间) created_at Column(DateTime, defaultdatetime.now)4. 异步HTTP客户端python# http_client.py import aiohttp import asyncio from typing import Optional, Dict, Any from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type ) import random from loguru import logger from config import settings class AsyncHTTPClient: 异步HTTP客户端 def __init__(self): self.session: Optional[aiohttp.ClientSession] None self.user_agents settings.user_agents self.timeout aiohttp.ClientTimeout(totalsettings.request_timeout) async def __aenter__(self): self.session aiohttp.ClientSession(timeoutself.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_random_headers(self) - Dict[str, str]: 获取随机请求头 return { User-Agent: random.choice(self.user_agents), Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, } retry( stopstop_after_attempt(settings.retry_times), waitwait_exponential(multiplier1, min4, max10), retryretry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)) ) async def fetch(self, url: str, use_proxy: bool False) - str: 获取网页内容 if not self.session: self.session aiohttp.ClientSession(timeoutself.timeout) headers self._get_random_headers() proxy random.choice(settings.proxy_pool) if use_proxy and settings.proxy_pool else None try: async with self.session.get( url, headersheaders, proxyproxy, sslFalse ) as response: response.raise_for_status() html await response.text() logger.info(f成功获取页面: {url}, 状态码: {response.status}) return html except aiohttp.ClientError as e: logger.error(f请求失败: {url}, 错误: {str(e)}) raise async def close(self): 关闭会话 if self.session: await self.session.close()5. 微博热搜解析器python# parser.py from bs4 import BeautifulSoup import re from typing import List, Dict, Any, Optional from datetime import datetime import json from loguru import logger class WeiboHotSearchParser: 微博热搜解析器 staticmethod def parse_hot_list(html: str) - List[Dict[str, Any]]: 解析热搜列表 soup BeautifulSoup(html, lxml) hot_items [] # 查找热搜表格 hot_table soup.find(tbody) if not hot_table: logger.warning(未找到热搜表格) return hot_items # 解析每个热搜条目 rows hot_table.find_all(tr, class_lambda x: x ! thead) for row in rows: try: item WeiboHotSearchParser._parse_row(row) if item: hot_items.append(item) except Exception as e: logger.error(f解析行失败: {str(e)}) continue logger.info(f解析到 {len(hot_items)} 个热搜条目) return hot_items staticmethod def _parse_row(row) - Optional[Dict[str, Any]]: 解析单行数据 # 获取排名 rank_td row.find(td, class_td-01) if not rank_td: return None rank_text rank_td.get_text(stripTrue) rank int(rank_text) if rank_text.isdigit() else 0 # 获取关键词和链接 keyword_td row.find(td, class_td-02) if not keyword_td: return None keyword_link keyword_td.find(a) if not keyword_link: return None keyword keyword_link.get_text(stripTrue) url keyword_link.get(href, ) # 补全URL if url and not url.startswith(http): url fhttps://s.weibo.com{url} # 获取标签热、新、爆等 tag_span keyword_td.find(span) tag tag_span.get_text(stripTrue) if tag_span else None # 获取热度值 hot_value_td row.find(td, class_td-03) hot_value None if hot_value_td: hot_text hot_value_td.get_text(stripTrue) if hot_text.isdigit(): hot_value int(hot_text) # 获取分类 category_td row.find(td, class_td-04) category category_td.get_text(stripTrue) if category_td else None return { rank: rank, keyword: keyword, url: url, tag: tag, hot_value: hot_value, category: category, timestamp: datetime.now() } staticmethod def parse_real_time_hot(html: str) - List[Dict[str, Any]]: 解析实时热搜备用方案 # 使用正则表达式匹配JSON数据 pattern rscript.*?STK\.pageId.*?hot.*?/script match re.search(pattern, html, re.DOTALL) if match: try: # 提取JSON数据 json_pattern r\[.*?\] json_match re.search(json_pattern, match.group(), re.DOTALL) if json_match: data json.loads(json_match.group()) return data except (json.JSONDecodeError, AttributeError) as e: logger.error(fJSON解析失败: {str(e)}) return []6. 数据库操作层python# database.py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from contextlib import asynccontextmanager from typing import AsyncGenerator, Generator import asyncio from loguru import logger from config import settings from models import Base, HotSearchItem, HotSearchHistory class DatabaseManager: 数据库管理器 def __init__(self): # 同步引擎用于Alembic迁移 self.sync_engine create_engine( str(settings.database_url), pool_size20, max_overflow30, pool_pre_pingTrue, echoFalse ) # 异步引擎 async_database_url str(settings.database_url).replace( postgresql://, postgresqlasyncpg:// ) self.async_engine create_async_engine( async_database_url, pool_size20, max_overflow30, pool_pre_pingTrue, echoFalse ) # 创建会话工厂 self.SyncSessionLocal sessionmaker( bindself.sync_engine, autocommitFalse, autoflushFalse ) self.AsyncSessionLocal sessionmaker( bindself.async_engine, class_AsyncSession, expire_on_commitFalse ) def init_database(self): 初始化数据库 Base.metadata.create_all(bindself.sync_engine) logger.info(数据库表创建完成) asynccontextmanager async def get_async_session(self) - AsyncGenerator[AsyncSession, None]: 获取异步会话 session self.AsyncSessionLocal() try: yield session await session.commit() except Exception as e: await session.rollback() logger.error(f数据库操作失败: {str(e)}) raise finally: await session.close() def get_sync_session(self) - Generator[Session, None, None]: 获取同步会话 session self.SyncSessionLocal() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f数据库操作失败: {str(e)}) raise finally: session.close() db_manager DatabaseManager()7. 主爬虫类python# spider.py import asyncio from typing import List, Dict, Any from datetime import datetime import json from loguru import logger from http_client import AsyncHTTPClient from parser import WeiboHotSearchParser from database import db_manager, HotSearchItem, HotSearchHistory from config import settings class WeiboHotSearchSpider: 微博热搜爬虫 def __init__(self): self.http_client AsyncHTTPClient() self.parser WeiboHotSearchParser() self.crawl_count 0 async def crawl(self) - List[Dict[str, Any]]: 执行爬取任务 logger.info(f开始第 {self.crawl_count 1} 次爬取) try: # 获取页面内容 async with self.http_client: html await self.http_client.fetch( settings.weibo_hot_url, use_proxysettings.proxy_enabled ) # 解析热搜数据 hot_items self.parser.parse_hot_list(html) if not hot_items: # 尝试备用解析方法 hot_items self.parser.parse_real_time_hot(html) # 保存到数据库 await self.save_to_database(hot_items) # 保存历史快照 await self.save_snapshot(hot_items) self.crawl_count 1 logger.success(f爬取完成获取到 {len(hot_items)} 条数据) return hot_items except Exception as e: logger.error(f爬取失败: {str(e)}) return [] async def save_to_database(self, hot_items: List[Dict[str, Any]]): 保存数据到数据库 async with db_manager.get_async_session() as session: for item in hot_items: # 检查是否已存在 existing await session.execute( select(HotSearchItem).where( HotSearchItem.keyword item[keyword], HotSearchItem.timestamp item[timestamp] ) ) if existing.scalar() is None: db_item HotSearchItem(**item) session.add(db_item) await session.commit() async def save_snapshot(self, hot_items: List[Dict[str, Any]]): 保存历史快照 async with db_manager.get_async_session() as session: snapshot HotSearchHistory( snapshot_datajson.dumps(hot_items, ensure_asciiFalse, defaultstr), total_itemslen(hot_items), timestampdatetime.now() ) session.add(snapshot) await session.commit() async def batch_crawl(self, times: int 10, interval: int 300): 批量爬取 for i in range(times): logger.info(f开始第 {i1}/{times} 轮爬取) await self.crawl() if i times - 1: logger.info(f等待 {interval} 秒后继续...) await asyncio.sleep(interval)8. 调度与监控系统python# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from datetime import datetime, timedelta import asyncio from loguru import logger import signal import sys from spider import WeiboHotSearchSpider from config import settings class SpiderScheduler: 爬虫调度器 def __init__(self): self.scheduler AsyncIOScheduler() self.spider WeiboHotSearchSpider() self.is_running True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): 信号处理函数 logger.info(接收到停止信号正在关闭...) self.is_running False self.scheduler.shutdown() sys.exit(0) async def crawl_job(self): 爬虫任务 try: await self.spider.crawl() except Exception as e: logger.error(f定时任务执行失败: {str(e)}) def start(self): 启动调度器 # 添加定时任务 trigger IntervalTrigger(secondssettings.crawl_interval) self.scheduler.add_job( self.crawl_job, triggertrigger, idweibo_hot_crawl, name微博热搜爬取任务, replace_existingTrue ) # 添加立即执行任务 self.scheduler.add_job( self.crawl_job, triggerdate, run_datedatetime.now() timedelta(seconds5), idinitial_crawl ) # 启动调度器 self.scheduler.start() logger.info(爬虫调度器已启动) # 保持程序运行 try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): logger.info(程序正常退出) def stop(self): 停止调度器 self.scheduler.shutdown() logger.info(爬虫调度器已停止)9. 主程序入口python# main.py import asyncio import argparse from loguru import logger import sys from scheduler import SpiderScheduler from database import db_manager from config import settings # 配置日志 logger.add( logs/weibo_spider_{time:YYYY-MM-DD}.log, rotation00:00, retention30 days, levelINFO, encodingutf-8, format{time:YYYY-MM-DD HH:mm:ss} | {level} | {message} ) async def single_crawl(): 单次爬取 from spider import WeiboHotSearchSpider spider WeiboHotSearchSpider() results await spider.crawl() # 打印结果 for item in results[:10]: # 只显示前10个 print(f{item[rank]:2d}. {item[keyword]} ({item.get(tag, )}) f- 热度: {item.get(hot_value, N/A)}) print(f\n总共爬取到 {len(results)} 条数据) def main(): 主函数 parser argparse.ArgumentParser(description微博热搜爬虫) parser.add_argument(--mode, choices[single, daemon], defaultdaemon, help运行模式) parser.add_argument(--init-db, actionstore_true, help初始化数据库) args parser.parse_args() # 初始化数据库 if args.init_db: logger.info(正在初始化数据库...) db_manager.init_database() logger.success(数据库初始化完成) # 运行模式 if args.mode single: asyncio.run(single_crawl()) else: # 守护进程模式 scheduler SpiderScheduler() scheduler.start() if __name__ __main__: main()10. Docker部署配置dockerfile# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ g \ libpq-dev \ curl \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建日志目录 RUN mkdir -p logs # 运行数据库迁移 RUN alembic upgrade head # 运行应用 CMD [python, main.py, --mode, daemon]yaml# docker-compose.yml version: 3.8 services: weibo-spider: build: . container_name: weibo-hot-search-spider environment: - DATABASE_URLpostgresql://user:passworddb:5432/weibo_hot - REDIS_URLredis://redis:6379/0 depends_on: - db - redis volumes: - ./logs:/app/logs - ./data:/app/data restart: unless-stopped db: image: postgres:15 container_name: weibo-db environment: - POSTGRES_USERuser - POSTGRES_PASSWORDpassword - POSTGRES_DBweibo_hot volumes: - postgres_data:/var/lib/postgresql/data ports: - 5432:5432 redis: image: redis:7-alpine container_name: weibo-redis ports: - 6379:6379 volumes: - redis_data:/data pgadmin: image: dpage/pgadmin4 container_name: pgadmin environment: - PGADMIN_DEFAULT_EMAILadminexample.com - PGADMIN_DEFAULT_PASSWORDadmin ports: - 5050:80 depends_on: - db volumes: postgres_data: redis_data:高级功能与优化建议1. 反爬虫策略应对python# 使用Playwright模拟浏览器 async def crawl_with_playwright(): from playwright.async_api import async_playwright async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) page await browser.new_page() # 设置用户代理 await page.set_extra_http_headers({ User-Agent: random.choice(settings.user_agents) }) # 访问页面 await page.goto(settings.weibo_hot_url, wait_untilnetworkidle) # 执行JavaScript滚动 await page.evaluate(window.scrollTo(0, document.body.scrollHeight)) await asyncio.sleep(2) # 获取页面内容 html await page.content() await browser.close() return html2. 数据可视化展示python# 使用Plotly生成热力图 import plotly.graph_objects as go import pandas as pd from datetime import datetime, timedelta async def generate_heatmap(days: int 7): 生成热搜热力图 async with db_manager.get_async_session() as session: # 查询最近N天的数据 start_date datetime.now() - timedelta(daysdays) result await session.execute( select(HotSearchItem) .where(HotSearchItem.timestamp start_date) .order_by(HotSearchItem.timestamp) ) items result.scalars().all() # 转换为DataFrame df pd.DataFrame([item.to_dict() for item in items]) # 创建热力图 fig go.Figure(datago.Heatmap( zdf[hot_value], xdf[timestamp], ydf[keyword], colorscaleViridis )) fig.update_layout( titlef微博热搜热力图最近{days}天, xaxis_title时间, yaxis_title关键词, height800 ) fig.write_html(fheatmap_{datetime.now().strftime(%Y%m%d)}.html)3. 异常监控与报警python# 监控报警系统 import smtplib from email.mime.text import MIMEText from typing import List class AlertSystem: 报警系统 def __init__(self): self.error_count 0 self.last_alert_time None async def check_health(self, spider: WeiboHotSearchSpider): 检查爬虫健康状态 if spider.crawl_count 0: return success_rate spider.success_count / spider.crawl_count if success_rate 0.8: await self.send_alert(f爬虫成功率过低: {success_rate:.2%}) if time.time() - spider.last_success_time 3600: # 1小时无成功 await self.send_alert(爬虫超过1小时无成功记录) async def send_alert(self, message: str): 发送报警 # 邮件报警 msg MIMEText(message, plain, utf-8) msg[Subject] 微博爬虫报警 msg[From] alertexample.com msg[To] adminexample.com try: smtp smtplib.SMTP(smtp.example.com, 587) smtp.starttls() smtp.login(user, password) smtp.send_message(msg) smtp.quit() except Exception as e: logger.error(f发送报警邮件失败: {str(e)})部署与运行指南1. 环境准备bash# 克隆代码 git clone https://github.com/yourusername/weibo-hot-search-spider.git cd weibo-hot-search-spider # 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium # 初始化数据库 python main.py --init-db2. 配置环境变量bash# .env文件 DATABASE_URLpostgresql://user:passwordlocalhost:5432/weibo_hot REDIS_URLredis://localhost:6379/0 CRAWL_INTERVAL300 MAX_CONCURRENT103. 运行爬虫bash# 单次运行 python main.py --mode single # 守护进程模式 python main.py --mode daemon # Docker运行 docker-compose up -d总结与展望本文详细介绍了如何使用Python最新技术栈构建一个完整的微博热搜榜爬虫系统。通过采用异步编程、反爬虫策略、数据库优化等技术实现了高效稳定的数据采集。系统特点高性能异步并发处理支持大规模数据采集高可用多重容错机制自动重试与故障恢复易扩展模块化设计支持分布式部署功能丰富数据存储、监控报警、可视化展示扩展方向添加机器学习模型预测热搜趋势实现情感分析挖掘舆论倾向构建实时数据大屏动态展示热点变化开发RESTful API提供数据服务