import os from threading import current_thread import requests from bs4 import BeautifulSoup from processor import WuBaProcessor import time import random from verify.yidun import YiDun from threading import Thread from queue import Queue from log import PPLogger from setting import CONCURRENT_REQUESTS, USER_AGENT, DOWNLOADER_IP_PROXY, START_URL from concurrent.futures import ThreadPoolExecutor, as_completed class WuBaSpider: def __init__(self): self.ip_proxy = DOWNLOADER_IP_PROXY # 代理ip请求url self.ua = USER_AGENT self.headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36" } self.yidun = YiDun() # 易盾检测 self.q = Queue() self.logger = PPLogger(name='58spider') self.logger.setup_logger() self.psr = WuBaProcessor() # 数据处理 self.ip_pool = [] # 代理ip池 # self.lock = Lock() def new_session(self, session_=None): """ 重置session,添加代理 :param session_: 旧session :return: """ if session_: try: self.ip_pool.remove(session_.proxies) except Exception as e: pass self.logger.debug(f'ip_pool remove {session_.proxies}') if not self.ip_pool: res_ip = requests.get(self.ip_proxy).json() self.logger.debug(res_ip) if res_ip['code'] != 200: os._exit(0) # 代理ip请求失败,结束程序 for i in res_ip['data']: self.ip_pool.append({ 'http': f'http://{i["ip"]}:{i["port"]}', 'https': f'http://{i["ip"]}:{i["port"]}' }) session = requests.Session() session.proxies = random.choice(self.ip_pool) return session def get_page_urls(self, url): """ 获取所有商铺列表页 :param url: 初始url :return:商铺列表页url列表 """ session = self.new_session() # url = 'https://qd.58.com/licang/shangpucz/pn1/?area=100_300&huansuanyue=0_10000' try: res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) session = self.new_session(session) res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) # print(res.text) # 易盾检测 if 'verifycode' in res.url: session = self.yidun.verify(session, res.url) try: res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) return None soup = BeautifulSoup(res.text, 'lxml') end_page = int(soup.find('div', class_='pager').find_all('span')[-2].text) # end_page = 8 all_pages = [url.replace('pn1', f'pn{i + 1}') for i in range(end_page)] # if not all_pages: # self.logger.info('no pages data') # os._exit(0) # session = requests.Session() self.logger.info(f'all_pages {len(all_pages)}') return all_pages def get_detail_urls(self, page_url): """ 获取列表页上所有商铺详情页链接 :param page_url: 商铺列表页url """ session = self.new_session() try: res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) session = self.new_session(session) res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) # 易盾检测 if 'verifycode' in res_page.url: session = self.yidun.verify(session, res_page.url) try: res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) except Exception as e: self.logger.error(e) session = self.new_session(session) res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10) soup_page = BeautifulSoup(res_page.text, 'lxml') if soup_page.find('div', class_='content-side-left'): detail_urls = [li.find('a').attrs['href'] for li in soup_page.find('div', class_='content-side-left').find_all('li')] for detail_url in detail_urls: self.q.put_nowait(detail_url) # 加入队列 self.logger.debug(f'detail_url {self.q.qsize()}') def get_content(self): """ 获取商铺详情页数据 """ session = self.new_session() while True: self.logger.debug(f'剩余 {self.q.qsize()}') if self.q.empty(): time.sleep(10) # 无数据等待10秒再退出 if self.q.empty(): self.logger.info(current_thread().name+' quit') break detail_url = self.q.get() self.headers['user-agent'] = random.choice(self.ua) try: res_detail = session.get(detail_url, headers=self.headers, timeout=10) except OSError as e: session = self.new_session(session) self.logger.error(e) self.q.put_nowait(detail_url) # 请求失败后,url放回队列 continue else: self.logger.debug(res_detail.url) # 极验检测 if 'geetest' in res_detail.text: self.q.put_nowait(detail_url) continue # 易盾检测 elif 'verifycode' in res_detail.url: session = self.yidun.verify(session, res_detail.url) try: res_detail = session.get(detail_url, headers=self.headers, allow_redirects=True, timeout=10) except OSError as e: self.logger.error(e) self.q.put_nowait(detail_url) continue # IP检测 elif 'verifylogin' in res_detail.url or 'deny' in res_detail.url: session = self.new_session(session) self.q.put_nowait(detail_url) continue # with self.lock: # 文件写入锁 try: self.psr.processor(res_detail.text, res_detail.url) except Exception as e: self.logger.error(str(e)+res_detail.url) # self.q.put_nowait(detail_url) def start(self): page_urls = self.get_page_urls(START_URL) # for page_url in page_urls: # self.get_detail_urls(page_url) with ThreadPoolExecutor() as pool: results1 = [pool.submit(self.get_detail_urls, page_url) for page_url in page_urls] for _ in range(CONCURRENT_REQUESTS): Thread(target=self.get_content).start() # WuBaSpider().start()