import os import requests from bs4 import BeautifulSoup from processor import WuBaProcessor import time import random from verify.yidun import YiDun # from threading import Lock from queue import Queue from log import PPLogger from setting import CONCURRENT_REQUESTS, USER_AGENT, DOWNLOADER_IP_PROXY, START_URL from concurrent.futures import ThreadPoolExecutor, as_completed class WuBaSpider: def __init__(self): self.ip_proxy = DOWNLOADER_IP_PROXY # 代理ip请求url self.ua = USER_AGENT self.headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36" } self.yidun = YiDun() # 易盾检测 self.q = Queue() self.logger = PPLogger(name='58spider') self.logger.setup_logger() self.psr = WuBaProcessor() # 数据处理 self.ip_pool = [] # 代理ip池 # self.lock = Lock() def new_session(self, session_=None): """ 重置session,添加代理 :param session_: 旧session :return: """ if session_: self.ip_pool.remove(session_.proxies) self.logger.info(f'ip_pool remove {session_.proxies}') if not self.ip_pool: res_ip = requests.get(self.ip_proxy).json() self.logger.info(res_ip) if res_ip['code'] != 200: os._exit(0) # 代理ip请求失败,结束程序 for i in res_ip['data']: self.ip_pool.append({ 'http': f'http://{i["ip"]}:{i["port"]}', 'https': f'http://{i["ip"]}:{i["port"]}' }) session = requests.Session() session.proxies = random.choice(self.ip_pool) return session def get_page_urls(self, url): """ 获取所有商铺列表页 :param url: 初始url :return:商铺列表页url列表 """ session = self.new_session() # url = 'https://qd.58.com/licang/shangpucz/pn1/?area=100_300&huansuanyue=0_10000' try: res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) session = self.new_session(session) res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) # print(res.text) # 易盾检测 if 'verifycode' in res.url: session = self.yidun.verify(session, res.url) try: res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) return None soup = BeautifulSoup(res.text, 'lxml') end_page = int(soup.find('div', class_='pager').find_all('span')[-2].text) # end_page = 8 all_pages = [url.replace('pn1', f'pn{i + 1}') for i in range(end_page)] # if not all_pages: # self.logger.info('no pages data') # os._exit(0) # session = requests.Session() self.logger.info(f'all_pages {len(all_pages)}') return all_pages def get_detail_urls(self, page_url): """ 获取列表页上所有商铺详情页链接 :param page_url: 商铺列表页url """ session = self.new_session() try: res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) session = self.new_session(session) res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) # 易盾检测 if 'verifycode' in res_page.url: session = self.yidun.verify(session, res_page.url) res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) soup_page = BeautifulSoup(res_page.text, 'lxml') detail_urls = [li.find('a').attrs['href'] for li in soup_page.find('div', class_='content-side-left').find_all('li')] for detail_url in detail_urls: self.q.put_nowait(detail_url) # 加入队列 self.logger.info(f'detail_url {detail_url}') def get_content(self): """ 获取商铺详情页数据 """ session = self.new_session() while True: if self.q.empty(): time.sleep(5) # 无数据等待5秒再退出 if self.q.empty(): break detail_url = self.q.get() self.headers['user-agent'] = random.choice(self.ua) try: res_detail = session.get(detail_url, headers=self.headers, timeout=10) except OSError as e: session = self.new_session(session) self.logger.error(e) self.q.put_nowait(detail_url) # 请求失败后,url放回队列 continue else: self.logger.info(res_detail.url) # 极验检测 if 'geetest' in res_detail.text: self.q.put_nowait(detail_url) continue # 易盾检测 elif 'verifycode' in res_detail.url: session = self.yidun.verify(session, res_detail.url) try: res_detail = session.get(detail_url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) self.q.put_nowait(detail_url) continue # IP检测 elif 'verifylogin' in res_detail.url or 'deny' in res_detail.url: session = self.new_session(session) self.q.put_nowait(detail_url) continue # with self.lock: # 文件写入锁 self.psr.processor(res_detail.text, res_detail.url) def start(self): page_urls = self.get_page_urls(START_URL) with ThreadPoolExecutor() as pool: results1 = [pool.submit(self.get_detail_urls, page_url) for page_url in page_urls[:1]] while self.q.empty(): time.sleep(1) results2 = [pool.submit(self.get_content) for _ in range(CONCURRENT_REQUESTS)] # WuBaSpider().start()