| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import os
- import requests
- from bs4 import BeautifulSoup
- from processor import WuBaProcessor
- import time
- import random
- from verify.yidun import YiDun
- # from threading import Lock
- from queue import Queue
- from log import PPLogger
- from setting import CONCURRENT_REQUESTS, USER_AGENT, DOWNLOADER_IP_PROXY, START_URL
- from concurrent.futures import ThreadPoolExecutor, as_completed
- class WuBaSpider:
- def __init__(self):
- self.ip_proxy = DOWNLOADER_IP_PROXY # 代理ip请求url
- self.ua = USER_AGENT
- self.headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "zh-CN,zh;q=0.9",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36"
- }
- self.yidun = YiDun() # 易盾检测
- self.q = Queue()
- self.logger = PPLogger(name='58spider')
- self.logger.setup_logger()
- self.psr = WuBaProcessor() # 数据处理
- self.ip_pool = [] # 代理ip池
- # self.lock = Lock()
- def new_session(self, session_=None):
- """
- 重置session,添加代理
- :param session_: 旧session
- :return:
- """
- if session_:
- self.ip_pool.remove(session_.proxies)
- self.logger.info(f'ip_pool remove {session_.proxies}')
- if not self.ip_pool:
- res_ip = requests.get(self.ip_proxy).json()
- self.logger.info(res_ip)
- if res_ip['code'] != 200:
- os._exit(0) # 代理ip请求失败,结束程序
- for i in res_ip['data']:
- self.ip_pool.append({
- 'http': f'http://{i["ip"]}:{i["port"]}',
- 'https': f'http://{i["ip"]}:{i["port"]}'
- })
- session = requests.Session()
- session.proxies = random.choice(self.ip_pool)
- return session
- def get_page_urls(self, url):
- """
- 获取所有商铺列表页
- :param url: 初始url
- :return:商铺列表页url列表
- """
- session = self.new_session()
- # url = 'https://qd.58.com/licang/shangpucz/pn1/?area=100_300&huansuanyue=0_10000'
- try:
- res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
- except OSError as e:
- self.logger.error(e)
- session = self.new_session(session)
- res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
- # print(res.text)
- # 易盾检测
- if 'verifycode' in res.url:
- session = self.yidun.verify(session, res.url)
- try:
- res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
- except OSError as e:
- self.logger.error(e)
- return None
- soup = BeautifulSoup(res.text, 'lxml')
- end_page = int(soup.find('div', class_='pager').find_all('span')[-2].text)
- # end_page = 8
- all_pages = [url.replace('pn1', f'pn{i + 1}') for i in range(end_page)]
- # if not all_pages:
- # self.logger.info('no pages data')
- # os._exit(0)
- # session = requests.Session()
- self.logger.info(f'all_pages {len(all_pages)}')
- return all_pages
- def get_detail_urls(self, page_url):
- """
- 获取列表页上所有商铺详情页链接
- :param page_url: 商铺列表页url
- """
- session = self.new_session()
- try:
- res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
- except OSError as e:
- self.logger.error(e)
- session = self.new_session(session)
- res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
- # 易盾检测
- if 'verifycode' in res_page.url:
- session = self.yidun.verify(session, res_page.url)
- res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
- soup_page = BeautifulSoup(res_page.text, 'lxml')
- detail_urls = [li.find('a').attrs['href'] for li in soup_page.find('div', class_='content-side-left').find_all('li')]
- for detail_url in detail_urls:
- self.q.put_nowait(detail_url) # 加入队列
- self.logger.info(f'detail_url {detail_url}')
- def get_content(self):
- """
- 获取商铺详情页数据
- """
- session = self.new_session()
- while True:
- if self.q.empty():
- time.sleep(5) # 无数据等待5秒再退出
- if self.q.empty():
- break
- detail_url = self.q.get()
- self.headers['user-agent'] = random.choice(self.ua)
- try:
- res_detail = session.get(detail_url, headers=self.headers, timeout=10)
- except OSError as e:
- session = self.new_session(session)
- self.logger.error(e)
- self.q.put_nowait(detail_url) # 请求失败后,url放回队列
- continue
- else:
- self.logger.info(res_detail.url)
- # 极验检测
- if 'geetest' in res_detail.text:
- self.q.put_nowait(detail_url)
- continue
- # 易盾检测
- elif 'verifycode' in res_detail.url:
- session = self.yidun.verify(session, res_detail.url)
- try:
- res_detail = session.get(detail_url, headers=self.headers, allow_redirects=True, timeout=10)
- except OSError as e:
- self.logger.error(e)
- self.q.put_nowait(detail_url)
- continue
- # IP检测
- elif 'verifylogin' in res_detail.url or 'deny' in res_detail.url:
- session = self.new_session(session)
- self.q.put_nowait(detail_url)
- continue
- # with self.lock: # 文件写入锁
- self.psr.processor(res_detail.text, res_detail.url)
- def start(self):
- page_urls = self.get_page_urls(START_URL)
- with ThreadPoolExecutor() as pool:
- results1 = [pool.submit(self.get_detail_urls, page_url) for page_url in page_urls[:1]]
- while self.q.empty():
- time.sleep(1)
- results2 = [pool.submit(self.get_content) for _ in range(CONCURRENT_REQUESTS)]
- # WuBaSpider().start()
|