比如说,我们有两台主机 A、B,我们最终想实现在 A 上控制 B。那么如果用正向 Shell,其实就是在 A 上输入 B 的连接地址,比如通过 ssh 连接到 B,连接成功之后,我们就可以在 A 上通过命令控制 B 了。如果用反向 Shell,那就是在 A 上先开启一个监听端口,然后让 B 去连接 A 的这个端口,连接成功之后,A 这边就能通过命令控制 B了。
反弹 Shell 有什么用?
还是原来的例子,我们想用 A 来控制 B,如果想用 ssh 等命令来控制,那得输入 B 的 sshd 地址或者端口对吧?但是在很多情况下,由于防火墙、安全组、局域网、NAT 等原因,我们实际上是无法直接连接到 B 的,比如:
网页是运行在浏览器端的,当我们浏览一个网页时,其 HTML 代码、 JavaScript 代码都会被下载到浏览器中执行。借助浏览器的开发者工具,我们可以看到网页在加载过程中所有网络请求的详细信息,也能清楚地看到网站运行的 HTML 代码和 JavaScript 代码,这些代码中就包含了网站加载的全部逻辑,如加载哪些资源、请求接口是如何构造的、页面是如何渲染的等等。正因为代码是完全透明的,所以如果我们能够把其中的执行逻辑研究出来,就可以模拟各个网络请求进行数据爬取了。
网站运营者首先想到防护措施可能是对某些数据接口的参数进行加密,比如说对某些 URL 的一些参数加上校验码或者把一些 id 信息进行编码,使其变得难以阅读或构造;或者对某些 API 请求加上一些 token、sign 等签名,这样这些请求发送到服务器时,服务器会通过客户端发来的一些请求信息以及双方约定好的秘钥等来对当前的请求进行校验,如果校验通过,才返回对应数据结果。
var a = ["hello"]; (function (c, d) { var e = function (f) { while (--f) { c["push"](c["shift"]()); } }; e(++d); })(a, 0x9b); var b = function (c, d) { c = c - 0x0; var e = a[c]; return e; }; let hello = "1" + 0x1; console["log"](b("0x0"), hello);
在一些付费代理套餐中,大家可能会注意到有这样的一个套餐 - 独享代理或私密代理,这种其实就是用了专用服务器搭建了代理服务,相对一般的付费代理来说,其稳定性更好,速度也更快,同时 IP 可以动态变化。这种独享代理或私密代理的 IP 切换大多数都是基于 ADSL 拨号机制来实现的,一台云主机每拨号一次就可以换一个 IP,同时云主机上搭建了代理服务,我们就可以直接使用该云主机的 HTTP 代理来进行数据爬取了。
本节我们就来实际操作一下搭建 ADSL 拨号代理服务的方法。
1. 什么是 ADSL
ADSL,英文全称是 Asymmetric Digital Subscriber Line,即非对称数字用户环路。它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。
ADSL 通过拨号的方式上网,拨号时需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间云主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,由于我们是直接使用专有的云主机搭建的代理服务,所以其代理的稳定性相对更好,代理响应速度也相对更快。
接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,所以我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。
import redis from proxypool.exceptions import PoolEmptyException from proxypool.schemas.proxy import Proxy from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \ PROXY_SCORE_INIT from random import choice from typing import List from loguru import logger from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
defadd(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int: """ add proxy and set it to init score :param proxy: proxy, ip:port, like 8.8.8.8:88 :param score: int score :return: result """ ifnot is_valid_proxy(f'{proxy.host}:{proxy.port}'): logger.info(f'invalid proxy {proxy}, throw it') return ifnot self.exists(proxy): if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, score, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): score})
defrandom(self) -> Proxy: """ get random proxy firstly try to get proxy with max score if not exists, try to get proxy by rank if not exists, raise error :return: proxy, like 8.8.8.8:8 """ # try to get proxy with max score proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else get proxy by rank proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else raise error raise PoolEmptyException
defdecrease(self, proxy: Proxy) -> int: """ decrease score of proxy, if small than PROXY_SCORE_MIN, delete it :param proxy: proxy :return: new score """ score = self.db.zscore(REDIS_KEY, proxy.string()) # current score is larger than PROXY_SCORE_MIN if score and score > PROXY_SCORE_MIN: logger.info(f'{proxy.string()} current score {score}, decrease 1') if IS_REDIS_VERSION_2: return self.db.zincrby(REDIS_KEY, proxy.string(), -1) return self.db.zincrby(REDIS_KEY, -1, proxy.string()) # otherwise delete proxy else: logger.info(f'{proxy.string()} current score {score}, remove') return self.db.zrem(REDIS_KEY, proxy.string())
defmax(self, proxy: Proxy) -> int: """ set proxy to max score :param proxy: proxy :return: new score """ logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}') if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
defcount(self) -> int: """ get count of proxies :return: count, int """ return self.db.zcard(REDIS_KEY)
defall(self) -> List[Proxy]: """ get all proxies :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
defbatch(self, start, end) -> List[Proxy]: """ get batch of proxies :param start: start index :param end: end index :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__': conn = RedisClient() result = conn.random() print(result)
from retrying import retry import requests from loguru import logger
classBaseCrawler(object): urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None) deffetch(self, url, **kwargs): try: response = requests.get(url, **kwargs) if response.status_code == 200: return response.text except requests.ConnectionError: return
@logger.catch defcrawl(self): """ crawl main method """ for url in self.urls: logger.info(f'fetching {url}') html = self.fetch(url) for proxy in self.parse(html): logger.info(f'fetched proxy {proxy.string()} from {url}') yield proxy
import pkgutil from .base import BaseCrawler import inspect
# load classes subclass of BaseCrawler classes = [] for loader, name, is_pkg in pkgutil.walk_packages(__path__): module = loader.find_module(name).load_module(name) for name, value in inspect.getmembers(module): globals()[name] = value if inspect.isclass(value) and issubclass(value, BaseCrawler) and value isnot BaseCrawler: classes.append(value) __all__ = __ALL__ = classes
asyncdeftest(self, proxy: Proxy): """ test single proxy :param proxy: Proxy object :return: """ asyncwith aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: try: logger.debug(f'testing {proxy.string()}') asyncwith session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT, allow_redirects=False) as response: if response.status in TEST_VALID_STATUS: self.redis.max(proxy) logger.debug(f'proxy {proxy.string()} is valid, set max score') else: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score') except EXCEPTIONS: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
@logger.catch defrun(self): """ test main method :return: """ # event loop of aiohttp logger.info('stating tester...') count = self.redis.count() logger.debug(f'{count} proxies to test') for i in range(0, count, TEST_BATCH): # start end end offset start, end = i, min(i + TEST_BATCH, count) logger.debug(f'testing proxies from {start} to {end} indices') proxies = self.redis.batch(start, end) tasks = [self.test(proxy) for proxy in proxies] # run tasks using event loop self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__': tester = Tester() tester.run()
这里定义了一个类 Tester,__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来,定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,这代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。
测试链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接。当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。
如果想做一个通用的代理池,则不需要专门设置 TEST_URL,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。
defrun_server(self): """ run server for api """ ifnot ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
defrun(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start()
if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start()
if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start()
tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive"if tester_process.is_alive() else"dead"}') logger.info(f'getter is {"alive"if getter_process.is_alive() else"dead"}') logger.info(f'server is {"alive"if server_process.is_alive() else"dead"}') logger.info('proxy terminated')
if __name__ == '__main__': scheduler = Scheduler() scheduler.run()
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import WebDriverException import time from loguru import logger
COUNT = 1000
for i in range(1, COUNT + 1): try: browser = webdriver.Chrome() wait = WebDriverWait(browser, 10) browser.get('https://captcha1.scrape.center/') button = wait.until(EC.element_to_be_clickable( (By.CSS_SELECTOR, '.el-button'))) button.click() captcha = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '.geetest_slicebg.geetest_absolute'))) time.sleep(5) captcha.screenshot(f'data/captcha/images/captcha_{i}.png') except WebDriverException as e: logger.error(f'webdriver error occurred {e.msg}') finally: browser.close()
我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的功夫可能就会出现错误,比如 403 Forbidden,这时打开网页一看,可能会看到 “您的 IP 访问频率太高” 这样的提示。出现这种现象的原因是网站采取了一些反爬虫措施。比如,服务器会检测某个 IP 在单位时间内的请求次数,如果超过了这个阈值,就会直接拒绝服务,返回一些错误信息,这种情况可以称为封 IP。
既然服务器检测的是某个 IP 单位时间的请求次数,那么借助某种方式来伪装我们的 IP,让服务器识别不出是由我们本机发起的请求,不就可以成功防止封 IP 了吗?
一种有效的方式就是使用代理,后面会详细说明代理的用法。在这之前,需要先了解下代理的基本原理,它是怎样实现伪装 IP 的呢?
1. 基本原理
代理实际上指的就是代理服务器,英文叫作 Proxy Server,它的功能是代理网络用户去取得网络信息。形象地说,它是网络信息的中转站。在我们正常请求一个网站时,是发送了请求给 Web 服务器,Web 服务器把响应传回给我们。如果设置了代理服务器,实际上就是在本机和服务器之间搭建了一个桥,此时本机不是直接向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,这就是代理的基本原理。
由于验证码目标缺口通常具有比较明显的边缘,所以借助于一些边缘检测算法并通过调整阈值是可以找出它的位置的。目前应用比较广泛的边缘检测算法是 Canny,它是 John F. Canny 于 1986 年开发出来的一个多级边缘检测算法,效果还是不错的,OpenCV 也对此算法进行了实现,方法名称就叫做 Canny,声明如下:
OCR,即 Optical Character Recognition,中文翻译叫做光学字符识别。它是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程。OCR 现在已经广泛应用于生产生活中,如文档识别、证件识别、字幕识别、文档检索等等。当然对于本节所述的图形验证码的识别也没有问题。
import time import re import tesserocr from selenium import webdriver from io import BytesIO from PIL import Image from retrying import retry from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException import numpy as np
start = time.time() session = requests.Session() foriinrange(10): session.get('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for get', end - start) start = time.time()
foriinrange(10): session.post('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for post', end - start)
这里我们添加了一个 allowable_methods 指定了一个过滤器,只有 POST 请求会被缓存,GET 请求就不会。
Cookies <RequestsCookieJar[<Cookie sessionid=psnu8ij69f0ltecd5wasccyzc6ud41tc for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
这下没有问题了,我们发现其 URL 就是 INDEX_URL,模拟登录成功了!同时还可以进一步输出 response_index 的 text 属性看下是否获取成功。
可以看到,这里我们无须再关心 Cookie 的处理和传递问题,我们声明了一个 Session 对象,然后每次调用请求的时候都直接使用 Session 对象的 post 或 get 方法就好了。
运行效果是完全一样的,结果如下:
1 2 3
Cookies <RequestsCookieJar[<Cookie sessionid=ssngkl4i7en9vm73bb36hxif05k10k13 for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
asyncdefmain(): global session session = aiohttp.ClientSession() scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)] await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC
Coroutine: <coroutine object execute at 0x10e0f7830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
Coroutine: <coroutine object execute at 0x10aa33830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
可以发现,其运行效果都是一样的。
6. 绑定回调
另外,我们也可以为某个 task 绑定一个回调方法。比如,我们来看下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import asyncio import requests
asyncdefrequest(): url = 'https://www.baidu.com' status = requests.get(url) return status