spider.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import os
  2. import requests
  3. from bs4 import BeautifulSoup
  4. from processor import WuBaProcessor
  5. import time
  6. import random
  7. from verify.yidun import YiDun
  8. # from threading import Lock
  9. from queue import Queue
  10. from log import PPLogger
  11. from setting import CONCURRENT_REQUESTS, USER_AGENT, DOWNLOADER_IP_PROXY, START_URL
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. class WuBaSpider:
  14. def __init__(self):
  15. self.ip_proxy = DOWNLOADER_IP_PROXY # 代理ip请求url
  16. self.ua = USER_AGENT
  17. self.headers = {
  18. "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
  19. "accept-encoding": "gzip, deflate, br",
  20. "accept-language": "zh-CN,zh;q=0.9",
  21. "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36"
  22. }
  23. self.yidun = YiDun() # 易盾检测
  24. self.q = Queue()
  25. self.logger = PPLogger(name='58spider')
  26. self.logger.setup_logger()
  27. self.psr = WuBaProcessor() # 数据处理
  28. self.ip_pool = [] # 代理ip池
  29. # self.lock = Lock()
  30. def new_session(self, session_=None):
  31. """
  32. 重置session,添加代理
  33. :param session_: 旧session
  34. :return:
  35. """
  36. if session_:
  37. self.ip_pool.remove(session_.proxies)
  38. self.logger.info(f'ip_pool remove {session_.proxies}')
  39. if not self.ip_pool:
  40. res_ip = requests.get(self.ip_proxy).json()
  41. self.logger.info(res_ip)
  42. if res_ip['code'] != 200:
  43. os._exit(0) # 代理ip请求失败,结束程序
  44. for i in res_ip['data']:
  45. self.ip_pool.append({
  46. 'http': f'http://{i["ip"]}:{i["port"]}',
  47. 'https': f'http://{i["ip"]}:{i["port"]}'
  48. })
  49. session = requests.Session()
  50. session.proxies = random.choice(self.ip_pool)
  51. return session
  52. def get_page_urls(self, url):
  53. """
  54. 获取所有商铺列表页
  55. :param url: 初始url
  56. :return:商铺列表页url列表
  57. """
  58. session = self.new_session()
  59. # url = 'https://qd.58.com/licang/shangpucz/pn1/?area=100_300&huansuanyue=0_10000'
  60. try:
  61. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  62. except OSError as e:
  63. self.logger.error(e)
  64. session = self.new_session(session)
  65. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  66. # print(res.text)
  67. # 易盾检测
  68. if 'verifycode' in res.url:
  69. session = self.yidun.verify(session, res.url)
  70. try:
  71. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  72. except OSError as e:
  73. self.logger.error(e)
  74. return None
  75. soup = BeautifulSoup(res.text, 'lxml')
  76. end_page = int(soup.find('div', class_='pager').find_all('span')[-2].text)
  77. # end_page = 8
  78. all_pages = [url.replace('pn1', f'pn{i + 1}') for i in range(end_page)]
  79. # if not all_pages:
  80. # self.logger.info('no pages data')
  81. # os._exit(0)
  82. # session = requests.Session()
  83. self.logger.info(f'all_pages {len(all_pages)}')
  84. return all_pages
  85. def get_detail_urls(self, page_url):
  86. """
  87. 获取列表页上所有商铺详情页链接
  88. :param page_url: 商铺列表页url
  89. """
  90. session = self.new_session()
  91. try:
  92. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  93. except OSError as e:
  94. self.logger.error(e)
  95. session = self.new_session(session)
  96. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  97. # 易盾检测
  98. if 'verifycode' in res_page.url:
  99. session = self.yidun.verify(session, res_page.url)
  100. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  101. soup_page = BeautifulSoup(res_page.text, 'lxml')
  102. detail_urls = [li.find('a').attrs['href'] for li in soup_page.find('div', class_='content-side-left').find_all('li')]
  103. for detail_url in detail_urls:
  104. self.q.put_nowait(detail_url) # 加入队列
  105. self.logger.info(f'detail_url {detail_url}')
  106. def get_content(self):
  107. """
  108. 获取商铺详情页数据
  109. """
  110. session = self.new_session()
  111. while True:
  112. if self.q.empty():
  113. time.sleep(5) # 无数据等待5秒再退出
  114. if self.q.empty():
  115. break
  116. detail_url = self.q.get()
  117. self.headers['user-agent'] = random.choice(self.ua)
  118. try:
  119. res_detail = session.get(detail_url, headers=self.headers, timeout=10)
  120. except OSError as e:
  121. session = self.new_session(session)
  122. self.logger.error(e)
  123. self.q.put_nowait(detail_url) # 请求失败后,url放回队列
  124. continue
  125. else:
  126. self.logger.info(res_detail.url)
  127. # 极验检测
  128. if 'geetest' in res_detail.text:
  129. self.q.put_nowait(detail_url)
  130. continue
  131. # 易盾检测
  132. elif 'verifycode' in res_detail.url:
  133. session = self.yidun.verify(session, res_detail.url)
  134. try:
  135. res_detail = session.get(detail_url, headers=self.headers, allow_redirects=True, timeout=10)
  136. except OSError as e:
  137. self.logger.error(e)
  138. self.q.put_nowait(detail_url)
  139. continue
  140. # IP检测
  141. elif 'verifylogin' in res_detail.url or 'deny' in res_detail.url:
  142. session = self.new_session(session)
  143. self.q.put_nowait(detail_url)
  144. continue
  145. # with self.lock: # 文件写入锁
  146. self.psr.processor(res_detail.text, res_detail.url)
  147. def start(self):
  148. page_urls = self.get_page_urls(START_URL)
  149. with ThreadPoolExecutor() as pool:
  150. results1 = [pool.submit(self.get_detail_urls, page_url) for page_url in page_urls[:1]]
  151. while self.q.empty():
  152. time.sleep(1)
  153. results2 = [pool.submit(self.get_content) for _ in range(CONCURRENT_REQUESTS)]
  154. # WuBaSpider().start()