spider.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import os
  2. from threading import current_thread
  3. import requests
  4. from bs4 import BeautifulSoup
  5. from processor import WuBaProcessor
  6. import time
  7. import random
  8. from verify.yidun import YiDun
  9. from threading import Thread
  10. from queue import Queue
  11. from log import PPLogger
  12. from setting import CONCURRENT_REQUESTS, USER_AGENT, DOWNLOADER_IP_PROXY, START_URL
  13. from concurrent.futures import ThreadPoolExecutor, as_completed
  14. class WuBaSpider:
  15. def __init__(self):
  16. self.ip_proxy = DOWNLOADER_IP_PROXY # 代理ip请求url
  17. self.ua = USER_AGENT
  18. self.headers = {
  19. "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
  20. "accept-encoding": "gzip, deflate, br",
  21. "accept-language": "zh-CN,zh;q=0.9",
  22. "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36"
  23. }
  24. self.yidun = YiDun() # 易盾检测
  25. self.q = Queue()
  26. self.logger = PPLogger(name='58spider')
  27. self.logger.setup_logger()
  28. self.psr = WuBaProcessor() # 数据处理
  29. self.ip_pool = [] # 代理ip池
  30. # self.lock = Lock()
  31. def new_session(self, session_=None):
  32. """
  33. 重置session,添加代理
  34. :param session_: 旧session
  35. :return:
  36. """
  37. if session_:
  38. try:
  39. self.ip_pool.remove(session_.proxies)
  40. except Exception as e:
  41. pass
  42. self.logger.debug(f'ip_pool remove {session_.proxies}')
  43. if not self.ip_pool:
  44. res_ip = requests.get(self.ip_proxy).json()
  45. self.logger.debug(res_ip)
  46. if res_ip['code'] != 200:
  47. os._exit(0) # 代理ip请求失败,结束程序
  48. for i in res_ip['data']:
  49. self.ip_pool.append({
  50. 'http': f'http://{i["ip"]}:{i["port"]}',
  51. 'https': f'http://{i["ip"]}:{i["port"]}'
  52. })
  53. session = requests.Session()
  54. session.proxies = random.choice(self.ip_pool)
  55. return session
  56. def get_page_urls(self, url):
  57. """
  58. 获取所有商铺列表页
  59. :param url: 初始url
  60. :return:商铺列表页url列表
  61. """
  62. session = self.new_session()
  63. # url = 'https://qd.58.com/licang/shangpucz/pn1/?area=100_300&huansuanyue=0_10000'
  64. try:
  65. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  66. except OSError as e:
  67. self.logger.error(e)
  68. session = self.new_session(session)
  69. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  70. # print(res.text)
  71. # 易盾检测
  72. if 'verifycode' in res.url:
  73. session = self.yidun.verify(session, res.url)
  74. try:
  75. res = session.get(url, headers=self.headers, allow_redirects=True, timeout=10)
  76. except OSError as e:
  77. self.logger.error(e)
  78. return None
  79. soup = BeautifulSoup(res.text, 'lxml')
  80. end_page = int(soup.find('div', class_='pager').find_all('span')[-2].text)
  81. # end_page = 8
  82. all_pages = [url.replace('pn1', f'pn{i + 1}') for i in range(end_page)]
  83. # if not all_pages:
  84. # self.logger.info('no pages data')
  85. # os._exit(0)
  86. # session = requests.Session()
  87. self.logger.info(f'all_pages {len(all_pages)}')
  88. return all_pages
  89. def get_detail_urls(self, page_url):
  90. """
  91. 获取列表页上所有商铺详情页链接
  92. :param page_url: 商铺列表页url
  93. """
  94. session = self.new_session()
  95. try:
  96. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  97. except OSError as e:
  98. self.logger.error(e)
  99. session = self.new_session(session)
  100. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  101. # 易盾检测
  102. if 'verifycode' in res_page.url:
  103. session = self.yidun.verify(session, res_page.url)
  104. try:
  105. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  106. except Exception as e:
  107. self.logger.error(e)
  108. session = self.new_session(session)
  109. res_page = session.get(page_url, headers=self.headers, allow_redirects=True, timeout=10)
  110. soup_page = BeautifulSoup(res_page.text, 'lxml')
  111. if soup_page.find('div', class_='content-side-left'):
  112. detail_urls = [li.find('a').attrs['href'] for li in soup_page.find('div', class_='content-side-left').find_all('li')]
  113. for detail_url in detail_urls:
  114. self.q.put_nowait(detail_url) # 加入队列
  115. self.logger.debug(f'detail_url {self.q.qsize()}')
  116. def get_content(self):
  117. """
  118. 获取商铺详情页数据
  119. """
  120. session = self.new_session()
  121. while True:
  122. self.logger.debug(f'剩余 {self.q.qsize()}')
  123. if self.q.empty():
  124. time.sleep(10) # 无数据等待10秒再退出
  125. if self.q.empty():
  126. self.logger.info(current_thread().name+' quit')
  127. break
  128. detail_url = self.q.get()
  129. self.headers['user-agent'] = random.choice(self.ua)
  130. try:
  131. res_detail = session.get(detail_url, headers=self.headers, timeout=10)
  132. except OSError as e:
  133. session = self.new_session(session)
  134. self.logger.error(e)
  135. self.q.put_nowait(detail_url) # 请求失败后,url放回队列
  136. continue
  137. else:
  138. self.logger.debug(res_detail.url)
  139. # 极验检测
  140. if 'geetest' in res_detail.text:
  141. self.q.put_nowait(detail_url)
  142. continue
  143. # 易盾检测
  144. elif 'verifycode' in res_detail.url:
  145. session = self.yidun.verify(session, res_detail.url)
  146. try:
  147. res_detail = session.get(detail_url, headers=self.headers, allow_redirects=True, timeout=10)
  148. except OSError as e:
  149. self.logger.error(e)
  150. self.q.put_nowait(detail_url)
  151. continue
  152. # IP检测
  153. elif 'verifylogin' in res_detail.url or 'deny' in res_detail.url:
  154. session = self.new_session(session)
  155. self.q.put_nowait(detail_url)
  156. continue
  157. # with self.lock: # 文件写入锁
  158. try:
  159. self.psr.processor(res_detail.text, res_detail.url)
  160. except Exception as e:
  161. self.logger.error(str(e)+res_detail.url)
  162. # self.q.put_nowait(detail_url)
  163. def start(self):
  164. page_urls = self.get_page_urls(START_URL)
  165. # for page_url in page_urls:
  166. # self.get_detail_urls(page_url)
  167. with ThreadPoolExecutor() as pool:
  168. results1 = [pool.submit(self.get_detail_urls, page_url) for page_url in page_urls]
  169. for _ in range(CONCURRENT_REQUESTS):
  170. Thread(target=self.get_content).start()
  171. # WuBaSpider().start()