山东济宁做网站的公司有哪些,棋牌软件开发,在线制作图片加闪光字,seo网站推广优化就找微源优化引言#xff1a;招聘数据挖掘的价值与技术挑战在当今大数据时代#xff0c;招聘平台数据蕴含着巨大的商业价值和研究意义。58同城作为中国领先的分类信息平台#xff0c;其招聘频道汇聚了海量企业招聘信息#xff0c;这些数据对于市场分析、人才趋势预测、竞品研究等领域具…引言招聘数据挖掘的价值与技术挑战在当今大数据时代招聘平台数据蕴含着巨大的商业价值和研究意义。58同城作为中国领先的分类信息平台其招聘频道汇聚了海量企业招聘信息这些数据对于市场分析、人才趋势预测、竞品研究等领域具有重要价值。然而随着网站反爬技术的不断升级传统的静态爬虫已难以应对动态渲染、请求加密、行为验证等多重防护。本文将深入探讨如何运用最新Python爬虫技术构建一个高效、稳定的58同城招聘信息采集系统。技术栈概览现代爬虫的核心组件在开始编码前让我们先了解本次实战将用到的关键技术Playwright微软开源的现代化浏览器自动化工具支持无头浏览器操作完美处理JavaScript动态渲染AsyncioPython原生异步IO框架实现高并发数据抓取BeautifulSoup4 PyQuery双解析引擎备用提高数据提取稳定性Redis分布式任务队列和去重存储Pydantic数据验证与序列化User-Agent池 代理IP池绕过基础反爬检测指纹浏览器技术模拟真实浏览器环境项目架构设计text58同城爬虫架构 ├── 反爬绕过层 (Anti-Anti-Crawler Layer) │ ├── 动态User-Agent轮换 │ ├── 代理IP中间件 │ ├── 浏览器指纹模拟 │ └── 请求延迟随机化 ├── 数据采集层 (Data Acquisition Layer) │ ├── Playwright异步控制器 │ ├── 页面渲染引擎 │ └── AJAX拦截器 ├── 数据处理层 (Data Processing Layer) │ ├── HTML解析器 │ ├── 数据清洗器 │ └── 结构化存储 └── 任务调度层 (Task Scheduler Layer) ├── Redis队列管理 ├── 分布式调度 └── 失败重试机制环境配置与依赖安装python# requirements.txt playwright1.40.0 asyncio3.4.3 aiohttp3.9.1 beautifulsoup44.12.2 pyquery2.0.0 redis5.0.1 pydantic2.5.0 pydantic-settings2.1.0 fake-useragent1.4.0 asyncio-redis0.1.5 lxml4.9.3 pandas2.1.4安装命令bashpip install -r requirements.txt playwright install chromium核心代码实现1. 配置文件与数据模型python# config/settings.py from pydantic_settings import BaseSettings from typing import List, Optional import os class CrawlerSettings(BaseSettings): # 爬虫基础配置 crawl_depth: int 3 max_concurrent_tasks: int 10 request_timeout: int 30 retry_times: int 3 # Redis配置 redis_host: str localhost redis_port: int 6379 redis_db: int 0 redis_password: Optional[str] None # 代理配置 proxy_enabled: bool True proxy_pool_url: str http://proxy-pool/api/get # 存储配置 save_format: str both # json, csv, both output_dir: str ./data class Config: env_file .env # models/job_model.py from pydantic import BaseModel, Field, validator from datetime import datetime from typing import Optional, List import re class CompanyInfo(BaseModel): name: str scale: Optional[str] None # 公司规模 industry: Optional[str] None # 行业 nature: Optional[str] None # 公司性质 class JobSalary(BaseModel): min_salary: Optional[float] None max_salary: Optional[float] None unit: str 月 # 月/年/小时 is_negotiable: bool False validator(min_salary, max_salary, preTrue) def parse_salary(cls, v): if isinstance(v, str): # 处理薪资字符串如8-13K match re.search(r(\d\.?\d*)[kK千]-?(\d\.?\d*)?[kK千]?, v) if match: groups match.groups() return float(groups[0]) * 1000 if groups[0] else None return v class JobInfo(BaseModel): # 基础信息 job_id: str title: str url: str publish_date: datetime # 薪资待遇 salary: JobSalary # 公司信息 company: CompanyInfo # 职位详情 work_location: str experience: Optional[str] None # 经验要求 education: Optional[str] None # 学历要求 job_type: Optional[str] None # 职位类型 job_description: Optional[str] None job_requirements: Optional[str] None # 福利待遇 benefits: List[str] [] # 爬虫元数据 source: str 58同城 crawl_time: datetime Field(default_factorydatetime.now) page_url: Optional[str] None2. 反爬策略实现python# core/anti_spider.py import random import asyncio from typing import Dict, List from fake_useragent import UserAgent import aiohttp class AntiSpiderMiddleware: def __init__(self): self.ua UserAgent() self.proxy_pool [] self.load_balancing_index 0 async def rotate_user_agent(self) - str: 动态轮换User-Agent agents [ self.ua.random, # 添加自定义移动端User-Agent Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1, # 添加Chrome最新版 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 ] return random.choice(agents) async def get_proxy(self) - Dict[str, str]: 获取代理IP if not self.proxy_pool: await self.refresh_proxy_pool() proxy self.proxy_pool[self.load_balancing_index % len(self.proxy_pool)] self.load_balancing_index 1 return { server: fhttp://{proxy[ip]}:{proxy[port]}, username: proxy.get(username, ), password: proxy.get(password, ) } async def refresh_proxy_pool(self): 刷新代理池 async with aiohttp.ClientSession() as session: async with session.get(http://proxy-pool/api/all) as resp: self.proxy_pool await resp.json() def generate_browser_fingerprint(self) - Dict[str, str]: 生成浏览器指纹 return { webgl_vendor: Google Inc., renderer: ANGLE (Intel, Intel(R) UHD Graphics Direct3D11 vs_5_0 ps_5_0), platform: Win32, hardware_concurrency: 8, device_memory: 8, timezone: Asia/Shanghai, language: zh-CN, screen_resolution: 1920x1080 } async def random_delay(self, min_sec: float 1.0, max_sec: float 3.0): 随机延迟模拟人工操作 await asyncio.sleep(random.uniform(min_sec, max_sec))3. Playwright异步爬虫核心python# core/playwright_crawler.py from playwright.async_api import async_playwright, Browser, Page, BrowserContext import asyncio from typing import Optional, Dict, List import json from urllib.parse import urljoin, urlparse import logging logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class PlaywrightCrawler: def __init__(self, anti_spider: AntiSpiderMiddleware): self.anti_spider anti_spider self.browser: Optional[Browser] None self.context: Optional[BrowserContext] None async def init_browser(self, headless: bool True): 初始化浏览器实例 playwright await async_playwright().start() # 获取代理配置 proxy_config await self.anti_spider.get_proxy() if self.anti_spider.proxy_enabled else None # 创建浏览器上下文模拟真实用户 self.browser await playwright.chromium.launch( headlessheadless, proxyproxy_config, args[ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox, --disable-setuid-sandbox, --disable-web-security, --disable-featuresIsolateOrigins,site-per-process ] ) # 设置浏览器指纹和User-Agent user_agent await self.anti_spider.rotate_user_agent() fingerprint self.anti_spider.generate_browser_fingerprint() self.context await self.browser.new_context( user_agentuser_agent, viewport{width: 1920, height: 1080}, localezh-CN, timezone_idAsia/Shanghai, extra_http_headers{ Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Cache-Control: max-age0, } ) # 注入JavaScript代码覆盖webdriver属性 await self.context.add_init_script( Object.defineProperty(navigator, webdriver, { get: () false }); // 覆盖plugins属性 Object.defineProperty(navigator, plugins, { get: () [1, 2, 3, 4, 5] }); // 覆盖languages属性 Object.defineProperty(navigator, languages, { get: () [zh-CN, zh, en] }); ) logger.info(浏览器初始化完成) async def crawl_job_list(self, url: str, max_pages: int 10) - List[str]: 爬取职位列表页获取详情页链接 page await self.context.new_page() job_urls [] try: await page.goto(url, wait_untilnetworkidle, timeout60000) await self.anti_spider.random_delay(2, 4) current_page 1 while current_page max_pages: logger.info(f正在抓取第{current_page}页列表) # 等待职位列表加载 await page.wait_for_selector(.job-list, timeout30000) # 提取职位链接 job_items await page.query_selector_all(.job-item) for item in job_items: link await item.query_selector(a.job-title) if link: href await link.get_attribute(href) if href and job in href: full_url urljoin(url, href) job_urls.append(full_url) # 检查是否有下一页 next_button await page.query_selector(a.next-page) if next_button and current_page max_pages: await next_button.click() await page.wait_for_load_state(networkidle) await self.anti_spider.random_delay(3, 5) current_page 1 else: break except Exception as e: logger.error(f抓取列表页失败: {e}) finally: await page.close() return job_urls async def crawl_job_detail(self, url: str) - Optional[Dict]: 爬取职位详情页 page await self.context.new_page() try: # 设置请求拦截捕获API数据 api_data {} def intercept_response(response): if api in response.url or interface in response.url: try: api_data[response.url] response.json() except: pass page.on(response, intercept_response) # 访问页面 await page.goto(url, wait_untilnetworkidle, timeout60000) await self.anti_spider.random_delay(1, 2) # 处理可能的弹窗 await self.handle_popups(page) # 等待关键内容加载 await page.wait_for_selector(.job-detail, timeout20000) # 提取页面数据 job_data await self.extract_job_data(page, url) # 合并API数据 if api_data: job_data[api_data] api_data logger.info(f成功抓取职位: {job_data.get(title, Unknown)}) return job_data except Exception as e: logger.error(f抓取详情页失败 {url}: {e}) return None finally: await page.close() async def handle_popups(self, page: Page): 处理各种弹窗 popup_selectors [ .popup-close, .modal-close, button:has-text(关闭), div[class*mask] button, div[class*dialog] button ] for selector in popup_selectors: try: close_btn await page.query_selector(selector) if close_btn: await close_btn.click(timeout3000) await asyncio.sleep(0.5) except: continue async def extract_job_data(self, page: Page, url: str) - Dict: 提取职位数据 # 使用多种选择器提高稳定性 selectors { title: [.job-title, h1.title, div.title], salary: [.salary, .job-salary, .pay], company: [.company-name, .comp-name, a.comp-title], location: [.job-location, .location, .address], experience: [.job-experience, .experience, .req-exp], education: [.job-education, .education, .req-edu], description: [.job-description, .des, .job-detail-content] } data {url: url} for field, selector_list in selectors.items(): for selector in selector_list: try: element await page.query_selector(selector) if element: text await element.text_content() if text and text.strip(): data[field] text.strip() break except: continue # 提取福利信息 benefit_elements await page.query_selector_all(.welfare-item, .benefit-item, .tag) benefits [] for elem in benefit_elements: text await elem.text_content() if text and text.strip(): benefits.append(text.strip()) data[benefits] benefits # 提取发布时间 try: date_elem await page.query_selector(.publish-time, .update-time) if date_elem: data[publish_date] await date_elem.text_content() except: data[publish_date] 未知 return data async def close(self): 关闭浏览器资源 if self.context: await self.context.close() if self.browser: await self.browser.close()4. 异步任务调度器python# core/task_scheduler.py import asyncio import aioredis from typing import List, Dict, Optional import logging from dataclasses import dataclass from enum import Enum logger logging.getLogger(__name__) class TaskStatus(Enum): PENDING pending PROCESSING processing SUCCESS success FAILED failed dataclass class CrawlTask: url: str priority: int 1 retry_count: int 0 max_retries: int 3 metadata: Optional[Dict] None class AsyncTaskScheduler: def __init__(self, redis_url: str redis://localhost:6379/0): self.redis_url redis_url self.redis: Optional[aioredis.Redis] None self.task_queue asyncio.Queue() self.active_tasks set() async def init_redis(self): 初始化Redis连接 self.redis await aioredis.from_url( self.redis_url, encodingutf-8, decode_responsesTrue ) logger.info(Redis连接初始化完成) async def add_task(self, task: CrawlTask): 添加任务到队列 await self.task_queue.put(task) # 同时存储到Redis持久化 if self.redis: task_key f58job:task:{hash(task.url)} await self.redis.hmset(task_key, { url: task.url, status: TaskStatus.PENDING.value, priority: str(task.priority), retry_count: str(task.retry_count) }) async def get_task(self) - Optional[CrawlTask]: 获取任务 try: task await asyncio.wait_for(self.task_queue.get(), timeout1.0) return task except asyncio.TimeoutError: return None async def mark_task_done(self, task: CrawlTask, status: TaskStatus, result: Optional[Dict] None): 标记任务完成 if self.redis: task_key f58job:task:{hash(task.url)} await self.redis.hset(task_key, status, status.value) if result: result_key f58job:result:{hash(task.url)} await self.redis.set(result_key, json.dumps(result, ensure_asciiFalse)) self.task_queue.task_done() async def process_tasks(self, crawler: PlaywrightCrawler, max_concurrent: int 5): 并发处理任务 semaphore asyncio.Semaphore(max_concurrent) async def process_single_task(task: CrawlTask): async with semaphore: try: logger.info(f开始处理任务: {task.url}) # 爬取数据 result await crawler.crawl_job_detail(task.url) if result: await self.mark_task_done(task, TaskStatus.SUCCESS, result) logger.info(f任务完成: {task.url}) else: await self.retry_task(task) except Exception as e: logger.error(f任务处理失败 {task.url}: {e}) await self.retry_task(task) # 启动任务处理循环 while True: task await self.get_task() if task: asyncio.create_task(process_single_task(task)) else: # 短暂休眠避免CPU空转 await asyncio.sleep(0.1) async def retry_task(self, task: CrawlTask): 重试失败的任务 task.retry_count 1 if task.retry_count task.max_retries: logger.warning(f任务重试 {task.url}, 第{task.retry_count}次) # 增加延迟后重新加入队列 await asyncio.sleep(task.retry_count * 2) await self.add_task(task) else: logger.error(f任务最终失败 {task.url}, 已达最大重试次数) await self.mark_task_done(task, TaskStatus.FAILED)5. 主程序入口python# main.py import asyncio import sys import json import csv from datetime import datetime from pathlib import Path from typing import List from core.anti_spider import AntiSpiderMiddleware from core.playwright_crawler import PlaywrightCrawler from core.task_scheduler import AsyncTaskScheduler, CrawlTask from models.job_model import JobInfo class Job58Crawler: def __init__(self, settings): self.settings settings self.anti_spider AntiSpiderMiddleware() self.crawler None self.scheduler AsyncTaskScheduler() self.results [] async def init(self): 初始化爬虫组件 await self.scheduler.init_redis() self.crawler PlaywrightCrawler(self.anti_spider) await self.crawler.init_browser(headlessTrue) async def crawl(self, start_urls: List[str], max_pages_per_url: int 5): 主爬取流程 # 1. 收集所有职位链接 all_job_urls [] for start_url in start_urls: logger.info(f开始收集职位链接: {start_url}) job_urls await self.crawler.crawl_job_list(start_url, max_pages_per_url) all_job_urls.extend(job_urls) logger.info(f收集到 {len(job_urls)} 个职位链接) # 去重 all_job_urls list(set(all_job_urls)) logger.info(f去重后共有 {len(all_job_urls)} 个唯一职位链接) # 2. 创建任务 for url in all_job_urls: task CrawlTask(urlurl, priority1) await self.scheduler.add_task(task) # 3. 启动任务处理器 processor_task asyncio.create_task( self.scheduler.process_tasks(self.crawler, self.settings.max_concurrent_tasks) ) # 4. 等待所有任务完成 await self.scheduler.task_queue.join() processor_task.cancel() # 5. 收集结果 await self.collect_results() async def collect_results(self): 从Redis收集所有结果 if self.scheduler.redis: # 获取所有成功任务的结果 keys await self.scheduler.redis.keys(58job:result:*) for key in keys: result_json await self.scheduler.redis.get(key) if result_json: try: result json.loads(result_json) self.results.append(result) except json.JSONDecodeError as e: logger.error(f解析结果失败 {key}: {e}) logger.info(f共收集到 {len(self.results)} 条有效数据) async def save_results(self): 保存结果到文件 output_dir Path(self.settings.output_dir) output_dir.mkdir(exist_okTrue) timestamp datetime.now().strftime(%Y%m%d_%H%M%S) if self.settings.save_format in [json, both]: json_path output_dir / f58_jobs_{timestamp}.json with open(json_path, w, encodingutf-8) as f: json.dump(self.results, f, ensure_asciiFalse, indent2, defaultstr) logger.info(f结果已保存到 JSON: {json_path}) if self.settings.save_format in [csv, both] and self.results: csv_path output_dir / f58_jobs_{timestamp}.csv self.save_to_csv(csv_path) logger.info(f结果已保存到 CSV: {csv_path}) def save_to_csv(self, filepath: Path): 保存为CSV格式 if not self.results: return # 提取所有可能的字段 all_keys set() for item in self.results: all_keys.update(item.keys()) with open(filepath, w, newline, encodingutf-8-sig) as f: writer csv.DictWriter(f, fieldnameslist(all_keys)) writer.writeheader() writer.writerows(self.results) async def cleanup(self): 清理资源 await self.crawler.close() if self.scheduler.redis: await self.scheduler.redis.close() async def main(): 主函数 # 配置设置 settings CrawlerSettings() # 起始URL示例Python开发岗位 start_urls [ https://bj.58.com/python/pn1/, https://sh.58.com/python/pn1/, https://sz.58.com/python/pn1/, https://hz.58.com/python/pn1/, ] # 创建爬虫实例 crawler Job58Crawler(settings) try: # 初始化 await crawler.init() # 执行爬取 await crawler.crawl(start_urls, max_pages_per_url3) # 保存结果 await crawler.save_results() logger.info(爬虫任务完成) except KeyboardInterrupt: logger.info(用户中断爬虫) except Exception as e: logger.error(f爬虫运行失败: {e}, exc_infoTrue) finally: # 清理资源 await crawler.cleanup() if __name__ __main__: asyncio.run(main())高级优化技巧1. 分布式爬虫扩展python# distributed/worker.py import asyncio import aiohttp from multiprocessing import Process import signal class DistributedWorker: def __init__(self, worker_id: str, master_url: str): self.worker_id worker_id self.master_url master_url self.running True async def fetch_task(self): 从主节点获取任务 async with aiohttp.ClientSession() as session: async with session.get(f{self.master_url}/api/task/acquire) as resp: return await resp.json() async def report_result(self, task_id: str, result: Dict): 向主节点报告结果 async with aiohttp.ClientSession() as session: await session.post(f{self.master_url}/api/task/complete, json{task_id: task_id, result: result}) def signal_handler(self, signum, frame): 信号处理器 self.running False2. 智能解析策略python# core/intelligent_parser.py import re from typing import Dict, Any class IntelligentParser: staticmethod def parse_salary(text: str) - Dict[str, Any]: 智能解析薪资文本 patterns [ # 8-13K r(\d\.?\d*)[kK千]-(\d\.?\d*)[kK千], # 面议 r面议|Negotiable, # 8K以上 r(\d\.?\d*)[kK千]以上, # 8万/年 r(\d\.?\d*)万/年, ] for pattern in patterns: match re.search(pattern, text) if match: return {pattern: pattern, match: match.groups()} return {pattern: unknown, match: None}