接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,所以我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。
import redis from proxypool.exceptions import PoolEmptyException from proxypool.schemas.proxy import Proxy from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \ PROXY_SCORE_INIT from random import choice from typing import List from loguru import logger from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
defadd(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int: """ add proxy and set it to init score :param proxy: proxy, ip:port, like 8.8.8.8:88 :param score: int score :return: result """ ifnot is_valid_proxy(f'{proxy.host}:{proxy.port}'): logger.info(f'invalid proxy {proxy}, throw it') return ifnot self.exists(proxy): if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, score, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): score})
defrandom(self) -> Proxy: """ get random proxy firstly try to get proxy with max score if not exists, try to get proxy by rank if not exists, raise error :return: proxy, like 8.8.8.8:8 """ # try to get proxy with max score proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else get proxy by rank proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else raise error raise PoolEmptyException
defdecrease(self, proxy: Proxy) -> int: """ decrease score of proxy, if small than PROXY_SCORE_MIN, delete it :param proxy: proxy :return: new score """ score = self.db.zscore(REDIS_KEY, proxy.string()) # current score is larger than PROXY_SCORE_MIN if score and score > PROXY_SCORE_MIN: logger.info(f'{proxy.string()} current score {score}, decrease 1') if IS_REDIS_VERSION_2: return self.db.zincrby(REDIS_KEY, proxy.string(), -1) return self.db.zincrby(REDIS_KEY, -1, proxy.string()) # otherwise delete proxy else: logger.info(f'{proxy.string()} current score {score}, remove') return self.db.zrem(REDIS_KEY, proxy.string())
defmax(self, proxy: Proxy) -> int: """ set proxy to max score :param proxy: proxy :return: new score """ logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}') if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
defcount(self) -> int: """ get count of proxies :return: count, int """ return self.db.zcard(REDIS_KEY)
defall(self) -> List[Proxy]: """ get all proxies :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
defbatch(self, start, end) -> List[Proxy]: """ get batch of proxies :param start: start index :param end: end index :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__': conn = RedisClient() result = conn.random() print(result)
from retrying import retry import requests from loguru import logger
classBaseCrawler(object): urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None) deffetch(self, url, **kwargs): try: response = requests.get(url, **kwargs) if response.status_code == 200: return response.text except requests.ConnectionError: return
@logger.catch defcrawl(self): """ crawl main method """ for url in self.urls: logger.info(f'fetching {url}') html = self.fetch(url) for proxy in self.parse(html): logger.info(f'fetched proxy {proxy.string()} from {url}') yield proxy
import pkgutil from .base import BaseCrawler import inspect
# load classes subclass of BaseCrawler classes = [] for loader, name, is_pkg in pkgutil.walk_packages(__path__): module = loader.find_module(name).load_module(name) for name, value in inspect.getmembers(module): globals()[name] = value if inspect.isclass(value) and issubclass(value, BaseCrawler) and value isnot BaseCrawler: classes.append(value) __all__ = __ALL__ = classes
asyncdeftest(self, proxy: Proxy): """ test single proxy :param proxy: Proxy object :return: """ asyncwith aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: try: logger.debug(f'testing {proxy.string()}') asyncwith session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT, allow_redirects=False) as response: if response.status in TEST_VALID_STATUS: self.redis.max(proxy) logger.debug(f'proxy {proxy.string()} is valid, set max score') else: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score') except EXCEPTIONS: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
@logger.catch defrun(self): """ test main method :return: """ # event loop of aiohttp logger.info('stating tester...') count = self.redis.count() logger.debug(f'{count} proxies to test') for i in range(0, count, TEST_BATCH): # start end end offset start, end = i, min(i + TEST_BATCH, count) logger.debug(f'testing proxies from {start} to {end} indices') proxies = self.redis.batch(start, end) tasks = [self.test(proxy) for proxy in proxies] # run tasks using event loop self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__': tester = Tester() tester.run()
这里定义了一个类 Tester,__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来,定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,这代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。
测试链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接。当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。
如果想做一个通用的代理池,则不需要专门设置 TEST_URL,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。
defrun_server(self): """ run server for api """ ifnot ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
defrun(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start()
if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start()
if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start()
tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive"if tester_process.is_alive() else"dead"}') logger.info(f'getter is {"alive"if getter_process.is_alive() else"dead"}') logger.info(f'server is {"alive"if server_process.is_alive() else"dead"}') logger.info('proxy terminated')
if __name__ == '__main__': scheduler = Scheduler() scheduler.run()