I want you to act as a javascript console. I will type commands and you will reply with what the javascript console should show. I want you to only reply with the terminal output inside one unique code block, and nothing else. do not write explanations. do not type commands unless I instruct you to do so. when i need to tell you something in english, i will do so by putting text inside curly brackets {like this}. my first command is console.log(“Hello World”);
import time from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.action_chains import ActionChains from app.captcha_resolver import CaptchaResolver
# click captchas recognized_indices = [i for i, x in enumerate(recognized_results) if x] logger.debug(f'recognized_indices {recognized_indices}') click_targets = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '.task-image'))) for recognized_index in recognized_indices: click_target: WebElement = click_targets[recognized_index] click_target.click() time.sleep(random())
当然我们也可以通过执行 JavaScript 来对每个节点进行模拟点击,效果是类似的。
这里我们用 for 循环将 true false 列表转成了一个列表,列表的每个元素代表 true 在列表中的位置,其实就是我们的点击目标了。
然后接着我们获取了所有的验证码小图对应的节点,然后依次调用 click 方法进行点击即可。
这样我们就可以实现验证码小图的逐个识别了。
点击验证
好,那么有了上面的逻辑,我们就能完成整个 HCaptcha 的识别和点选了。
最后,我们模拟点击验证按钮就好了:
1 2 3 4 5
# after all captcha clicked verify_button: WebElement = self.get_verify_button() if verify_button.is_displayed: verify_button.click() time.sleep(3)
import time from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.action_chains import ActionChains from app.captcha_resolver import CaptchaResolver
defget_question_id_by_target_name(target_name): logger.debug(f'try to get question id by {target_name}') question_id = CAPTCHA_TARGET_NAME_QUESTION_ID_MAPPING.get(target_name) logger.debug(f'question_id {question_id}') return question_id
single_captcha_elements = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '#rc-imageselect-target table td'))) for recognized_index in recognized_indices: single_captcha_element: WebElement = single_captcha_elements[recognized_index] single_captcha_element.click() # check if need verify single captcha self.verify_single_captcha(recognized_index)
defverify_single_captcha(self, index): time.sleep(3) elements = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '#rc-imageselect-target table td'))) single_captcha_element: WebElement = elements[index] class_name = single_captcha_element.get_attribute('class') logger.debug(f'verifiying single captcha {index}, class {class_name}') if'selected'in class_name: logger.debug(f'no new single captcha displayed') return logger.debug('new single captcha displayed') single_captcha_url = single_captcha_element.find_element_by_css_selector( 'img').get_attribute('src') logger.debug(f'single_captcha_url {single_captcha_url}') with open(CAPTCHA_SINGLE_IMAGE_FILE_PATH, 'wb') as f: f.write(requests.get(single_captcha_url).content) resized_single_captcha_base64_string = resize_base64_image( CAPTCHA_SINGLE_IMAGE_FILE_PATH, (100, 100)) single_captcha_recognize_result = self.captcha_resolver.create_task( resized_single_captcha_base64_string, get_question_id_by_target_name(self.captcha_target_name)) ifnot single_captcha_recognize_result: logger.error('count not get single captcha recognize result') return has_object = single_captcha_recognize_result.get( 'solution', {}).get('hasObject') if has_object isNone: logger.error('count not get captcha recognized indices') return if has_object isFalse: logger.debug('no more object in this single captcha') return if has_object: single_captcha_element.click() # check for new single captcha self.verify_single_captcha(index)
OK,这里我们定义了一个 verify_single_captcha 方法,然后传入了格子对应的序号。接着我们首先尝试查找格子对应的节点,然后找出对应的 HTML 的 class 属性。如果没有出现新的小图,那就是这样的选中状态,对应的 class 就包含了 selected 字样,如图所示:
比如说,我们有两台主机 A、B,我们最终想实现在 A 上控制 B。那么如果用正向 Shell,其实就是在 A 上输入 B 的连接地址,比如通过 ssh 连接到 B,连接成功之后,我们就可以在 A 上通过命令控制 B 了。如果用反向 Shell,那就是在 A 上先开启一个监听端口,然后让 B 去连接 A 的这个端口,连接成功之后,A 这边就能通过命令控制 B 了。
反弹 Shell 有什么用?
还是原来的例子,我们想用 A 来控制 B,如果想用 ssh 等命令来控制,那得输入 B 的 sshd 地址或者端口对吧?但是在很多情况下,由于防火墙、安全组、局域网、NAT 等原因,我们实际上是无法直接连接到 B 的,比如:
网页是运行在浏览器端的,当我们浏览一个网页时,其 HTML 代码、 JavaScript 代码都会被下载到浏览器中执行。借助浏览器的开发者工具,我们可以看到网页在加载过程中所有网络请求的详细信息,也能清楚地看到网站运行的 HTML 代码和 JavaScript 代码,这些代码中就包含了网站加载的全部逻辑,如加载哪些资源、请求接口是如何构造的、页面是如何渲染的等等。正因为代码是完全透明的,所以如果我们能够把其中的执行逻辑研究出来,就可以模拟各个网络请求进行数据爬取了。
网站运营者首先想到防护措施可能是对某些数据接口的参数进行加密,比如说对某些 URL 的一些参数加上校验码或者把一些 id 信息进行编码,使其变得难以阅读或构造;或者对某些 API 请求加上一些 token、sign 等签名,这样这些请求发送到服务器时,服务器会通过客户端发来的一些请求信息以及双方约定好的秘钥等来对当前的请求进行校验,如果校验通过,才返回对应数据结果。
var a = ["hello"]; (function (c, d) { var e = function (f) { while (--f) { c["push"](c["shift"]()); } }; e(++d); })(a, 0x9b); var b = function (c, d) { c = c - 0x0; var e = a[c]; return e; }; let hello = "1" + 0x1; console["log"](b("0x0"), hello);
在一些付费代理套餐中,大家可能会注意到有这样的一个套餐 - 独享代理或私密代理,这种其实就是用了专用服务器搭建了代理服务,相对一般的付费代理来说,其稳定性更好,速度也更快,同时 IP 可以动态变化。这种独享代理或私密代理的 IP 切换大多数都是基于 ADSL 拨号机制来实现的,一台云主机每拨号一次就可以换一个 IP,同时云主机上搭建了代理服务,我们就可以直接使用该云主机的 HTTP 代理来进行数据爬取了。
本节我们就来实际操作一下搭建 ADSL 拨号代理服务的方法。
1. 什么是 ADSL
ADSL,英文全称是 Asymmetric Digital Subscriber Line,即非对称数字用户环路。它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。
ADSL 通过拨号的方式上网,拨号时需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间云主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,由于我们是直接使用专有的云主机搭建的代理服务,所以其代理的稳定性相对更好,代理响应速度也相对更快。
接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,所以我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。
import redis from proxypool.exceptions import PoolEmptyException from proxypool.schemas.proxy import Proxy from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \ PROXY_SCORE_INIT from random import choice from typing import List from loguru import logger from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
defadd(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int: """ add proxy and set it to init score :param proxy: proxy, ip:port, like 8.8.8.8:88 :param score: int score :return: result """ ifnot is_valid_proxy(f'{proxy.host}:{proxy.port}'): logger.info(f'invalid proxy {proxy}, throw it') return ifnot self.exists(proxy): if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, score, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): score})
defrandom(self) -> Proxy: """ get random proxy firstly try to get proxy with max score if not exists, try to get proxy by rank if not exists, raise error :return: proxy, like 8.8.8.8:8 """ # try to get proxy with max score proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else get proxy by rank proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else raise error raise PoolEmptyException
defdecrease(self, proxy: Proxy) -> int: """ decrease score of proxy, if small than PROXY_SCORE_MIN, delete it :param proxy: proxy :return: new score """ score = self.db.zscore(REDIS_KEY, proxy.string()) # current score is larger than PROXY_SCORE_MIN if score and score > PROXY_SCORE_MIN: logger.info(f'{proxy.string()} current score {score}, decrease 1') if IS_REDIS_VERSION_2: return self.db.zincrby(REDIS_KEY, proxy.string(), -1) return self.db.zincrby(REDIS_KEY, -1, proxy.string()) # otherwise delete proxy else: logger.info(f'{proxy.string()} current score {score}, remove') return self.db.zrem(REDIS_KEY, proxy.string())
defmax(self, proxy: Proxy) -> int: """ set proxy to max score :param proxy: proxy :return: new score """ logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}') if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
defcount(self) -> int: """ get count of proxies :return: count, int """ return self.db.zcard(REDIS_KEY)
defall(self) -> List[Proxy]: """ get all proxies :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
defbatch(self, start, end) -> List[Proxy]: """ get batch of proxies :param start: start index :param end: end index :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__': conn = RedisClient() result = conn.random() print(result)
from retrying import retry import requests from loguru import logger
classBaseCrawler(object): urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None) deffetch(self, url, **kwargs): try: response = requests.get(url, **kwargs) if response.status_code == 200: return response.text except requests.ConnectionError: return
@logger.catch defcrawl(self): """ crawl main method """ for url in self.urls: logger.info(f'fetching {url}') html = self.fetch(url) for proxy in self.parse(html): logger.info(f'fetched proxy {proxy.string()} from {url}') yield proxy
import pkgutil from .base import BaseCrawler import inspect
# load classes subclass of BaseCrawler classes = [] for loader, name, is_pkg in pkgutil.walk_packages(__path__): module = loader.find_module(name).load_module(name) for name, value in inspect.getmembers(module): globals()[name] = value if inspect.isclass(value) and issubclass(value, BaseCrawler) and value isnot BaseCrawler: classes.append(value) __all__ = __ALL__ = classes
asyncdeftest(self, proxy: Proxy): """ test single proxy :param proxy: Proxy object :return: """ asyncwith aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: try: logger.debug(f'testing {proxy.string()}') asyncwith session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT, allow_redirects=False) as response: if response.status in TEST_VALID_STATUS: self.redis.max(proxy) logger.debug(f'proxy {proxy.string()} is valid, set max score') else: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score') except EXCEPTIONS: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
@logger.catch defrun(self): """ test main method :return: """ # event loop of aiohttp logger.info('stating tester...') count = self.redis.count() logger.debug(f'{count} proxies to test') for i in range(0, count, TEST_BATCH): # start end end offset start, end = i, min(i + TEST_BATCH, count) logger.debug(f'testing proxies from {start} to {end} indices') proxies = self.redis.batch(start, end) tasks = [self.test(proxy) for proxy in proxies] # run tasks using event loop self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__': tester = Tester() tester.run()
这里定义了一个类 Tester,__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来,定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,这代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。
测试链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接。当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。
如果想做一个通用的代理池,则不需要专门设置 TEST_URL,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。
defrun_server(self): """ run server for api """ ifnot ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
defrun(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start()
if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start()
if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start()
tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive"if tester_process.is_alive() else"dead"}') logger.info(f'getter is {"alive"if getter_process.is_alive() else"dead"}') logger.info(f'server is {"alive"if server_process.is_alive() else"dead"}') logger.info('proxy terminated')
if __name__ == '__main__': scheduler = Scheduler() scheduler.run()
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import WebDriverException import time from loguru import logger
COUNT = 1000
for i in range(1, COUNT + 1): try: browser = webdriver.Chrome() wait = WebDriverWait(browser, 10) browser.get('https://captcha1.scrape.center/') button = wait.until(EC.element_to_be_clickable( (By.CSS_SELECTOR, '.el-button'))) button.click() captcha = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '.geetest_slicebg.geetest_absolute'))) time.sleep(5) captcha.screenshot(f'data/captcha/images/captcha_{i}.png') except WebDriverException as e: logger.error(f'webdriver error occurred {e.msg}') finally: browser.close()
我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的功夫可能就会出现错误,比如 403 Forbidden,这时打开网页一看,可能会看到 “您的 IP 访问频率太高” 这样的提示。出现这种现象的原因是网站采取了一些反爬虫措施。比如,服务器会检测某个 IP 在单位时间内的请求次数,如果超过了这个阈值,就会直接拒绝服务,返回一些错误信息,这种情况可以称为封 IP。
既然服务器检测的是某个 IP 单位时间的请求次数,那么借助某种方式来伪装我们的 IP,让服务器识别不出是由我们本机发起的请求,不就可以成功防止封 IP 了吗?
一种有效的方式就是使用代理,后面会详细说明代理的用法。在这之前,需要先了解下代理的基本原理,它是怎样实现伪装 IP 的呢?
1. 基本原理
代理实际上指的就是代理服务器,英文叫作 Proxy Server,它的功能是代理网络用户去取得网络信息。形象地说,它是网络信息的中转站。在我们正常请求一个网站时,是发送了请求给 Web 服务器,Web 服务器把响应传回给我们。如果设置了代理服务器,实际上就是在本机和服务器之间搭建了一个桥,此时本机不是直接向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,这就是代理的基本原理。
由于验证码目标缺口通常具有比较明显的边缘,所以借助于一些边缘检测算法并通过调整阈值是可以找出它的位置的。目前应用比较广泛的边缘检测算法是 Canny,它是 John F. Canny 于 1986 年开发出来的一个多级边缘检测算法,效果还是不错的,OpenCV 也对此算法进行了实现,方法名称就叫做 Canny,声明如下:
OCR,即 Optical Character Recognition,中文翻译叫做光学字符识别。它是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程。OCR 现在已经广泛应用于生产生活中,如文档识别、证件识别、字幕识别、文档检索等等。当然对于本节所述的图形验证码的识别也没有问题。
import time import re import tesserocr from selenium import webdriver from io import BytesIO from PIL import Image from retrying import retry from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException import numpy as np
start = time.time() session = requests.Session() foriinrange(10): session.get('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for get', end - start) start = time.time()
foriinrange(10): session.post('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for post', end - start)
这里我们添加了一个 allowable_methods 指定了一个过滤器,只有 POST 请求会被缓存,GET 请求就不会。
Cookies <RequestsCookieJar[<Cookie sessionid=psnu8ij69f0ltecd5wasccyzc6ud41tc for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
这下没有问题了,我们发现其 URL 就是 INDEX_URL,模拟登录成功了!同时还可以进一步输出 response_index 的 text 属性看下是否获取成功。
可以看到,这里我们无须再关心 Cookie 的处理和传递问题,我们声明了一个 Session 对象,然后每次调用请求的时候都直接使用 Session 对象的 post 或 get 方法就好了。
运行效果是完全一样的,结果如下:
1 2 3
Cookies <RequestsCookieJar[<Cookie sessionid=ssngkl4i7en9vm73bb36hxif05k10k13 for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
asyncdefmain(): global session session = aiohttp.ClientSession() scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)] await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC
Coroutine: <coroutine object execute at 0x10e0f7830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
Coroutine: <coroutine object execute at 0x10aa33830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
可以发现,其运行效果都是一样的。
6. 绑定回调
另外,我们也可以为某个 task 绑定一个回调方法。比如,我们来看下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import asyncio import requests
asyncdefrequest(): url = 'https://www.baidu.com' status = requests.get(url) return status
Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 ... Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Cost time: 66.64284420013428
Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 ... Task exception was never retrieved future: <Task finished coro=<request() done, defined at demo.py:8> exception=TypeError("object Response can't be used in 'await' expression")> Traceback (most recent call last): File "demo.py", line 11, in request response = await requests.get(url) TypeError: object Response can't be used in 'await' expression
Waiting for https://httpbin.org/delay/5 Get response fromhttps://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 ... Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Cost time: 65.394437756259273
with sync_playwright() as p: for browser_type in [p.chromium, p.firefox, p.webkit]: browser = browser_type.launch(headless=False) page = browser.new_page() page.goto('https://www.baidu.com') page.screenshot(path=f'screenshot-{browser_type.name}.png') print(page.title()) browser.close()
Options: -o, --output <file name> saves the generated script to a file --target <language> language to use, one of javascript, python, python-async, csharp (default: "python") -b, --browser <browserType> browser to use, one of cr, chromium, ff, firefox, wk, webkit (default: "chromium") --channel <channel> Chromium distribution channel, "chrome", "chrome-beta", "msedge-dev", etc --color-scheme <scheme> emulate preferred color scheme, "light" or "dark" --device <deviceName> emulate device, for example "iPhone 11" --geolocation <coordinates> specify geolocation coordinates, for example "37.819722,-122.478611" --load-storage <filename> load context storage state from the file, previously saved with --save-storage --lang <language> specify language / locale, for example "en-GB" --proxy-server <proxy> specify proxy server, for example "http://myproxy:3128" or "socks5://myproxy:8080" --save-storage <filename> save context storage state at the end, for later use with --load-storage --timezone <time zone> time zone to emulate, for example "Europe/Rome" --timeout <timeout> timeout for Playwright actions in milliseconds (default: "10000") --user-agent <ua string> specify user agent string --viewport-size <size> specify browser viewport size in pixels, for example "1280, 720" -h, --help display help for command
with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto('https://spa6.scrape.center/') page.wait_for_load_state('networkidle') elements = page.query_selector_all('a.name') for element in elements: print(element.get_attribute('href')) print(element.text_content()) browser.close()
/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx 霸王别姬 - Farewell My Concubine /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIy 这个杀手不太冷 - Léon /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIz 肖申克的救赎 - The Shawshank Redemption /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI0 泰坦尼克号 - Titanic /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI1 罗马假日 - Roman Holiday /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI2 唐伯虎点秋香 - Flirting Scholar /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3 乱世佳人 - Gone with the Wind /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4 喜剧之王 - The King of Comedy /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5 楚门的世界 - The Truman Show /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIxMA== 狮子王 - The Lion King
url = 'https://spa1.scrape.center/' html = requests.get(url).text print(html)
运行结果如下:
1
<!DOCTYPE html><htmllang=en><head><metacharset=utf-8><metahttp-equiv=X-UA-Compatiblecontent="IE=edge"><metaname=viewportcontent="width=device-width,initial-scale=1"><linkrel=iconhref=/favicon.ico><title>Scrape | Movie</title><linkhref=/css/chunk-700f70e1.1126d090.cssrel=prefetch><linkhref=/css/chunk-d1db5eda.0ff76b36.cssrel=prefetch><linkhref=/js/chunk-700f70e1.0548e2b4.jsrel=prefetch><linkhref=/js/chunk-d1db5eda.b564504d.jsrel=prefetch><linkhref=/css/app.ea9d802a.cssrel=preloadas=style><linkhref=/js/app.1435ecd5.jsrel=preloadas=script><linkhref=/js/chunk-vendors.77daf991.jsrel=preloadas=script><linkhref=/css/app.ea9d802a.cssrel=stylesheet></head><body><noscript><strong>We're sorry but portal doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><divid=app></div><scriptsrc=/js/chunk-vendors.77daf991.js></script><scriptsrc=/js/app.1435ecd5.js></script></body></html>
可以看到,爬取结果就只有这么一点 HTML 内容,而我们在浏览器中打开这个页面,却能看到如图所示的结果:
在 HTML 中,我们只能看到在源码中引用了一些 JavaScript 和 CSS 文件,并没有观察到有任何电影数据信息。
如果遇到这样的情况,这说明我们现在看到的整个页面便是 JavaScript 渲染得到的,浏览器执行了 HTML 中所引用的 JavaScript 文件,JavaScript 通过调用一些数据加载和页面渲染方法,才最终呈现了图中所示的结果。
defmain(): for page in range(1, TOTAL_PAGE + 1): index_data = scrape_index(page) for item in index_data.get('results'): id = item.get('id') detail_data = scrape_detail(id) logging.info('detail data %s', detail_data)
if __name__ == '__main__': main()
这里我们定义了一个 main 方法,首先遍历获取了页码 page,然后把 page 当参数传递给了 scrape_index 方法,得到列表页的数据。接着我们遍历每个列表页的每个结果,获取到每部电影的 id,然后把 id 当作参数传递给 scrape_detail 方法来爬取每部电影的详情数据,并将其赋值为 detail_data,输出即可。
defsave_data(data): collection.update_one({ 'name': data.get('name') }, { '$set': data }, upsert=True)
在这里我们声明了一个 save_data 方法,它接收一个 data 参数,也就是我们刚才提取的电影详情信息。在方法里面,我们调用了 update_one 方法,第一个参数是查询条件,即根据 name 进行查询;第二个参数就是 data 对象本身,就是所有的数据,这里我们用 $set 操作符表示更新操作;第三个参数很关键,这里实际上是 upsert 参数,如果把这个设置为 True,则可以做到存在即更新,不存在即插入的功能,更新会根据第一个参数设置的 name 字段,所以这样可以防止数据库中出现同名的电影数据。
defmain(): for page in range(1, TOTAL_PAGE + 1): index_data = scrape_index(page) for item in index_data.get('results'): id = item.get('id') detail_data = scrape_detail(id) logging.info('detail data %s', detail_data) save_data(detail_data) logging.info('data saved successfully')
JavaScript 有改变网页内容的能力,解析完响应内容之后,就可以调用 JavaScript 来针对解析完的内容对网页进行下一步处理了。比如,通过 document.getElementById().innerHTML 这样的操作,便可以对某个元素内的源代码进行更改,这样网页显示的内容就改变了,这样的操作也被称作 DOM 操作,即对网页文档进行操作,如更改、删除等。
上例中,document.getElementById("myDiv").innerHTML=xmlhttp.responseText 便将 ID 为 myDiv 的节点内部的 HTML 代码更改为服务器返回的内容,这样 myDiv 元素内部便会呈现出服务器返回的新数据,网页的部分内容看上去就更新了。
这样一来,我们就可以用索引来获取对应的内容了。例如,如果想取第一个元素里的 name 属性,就可以使用如下方式:
1 2
data[0]['name'] data[0].get('name')
得到的结果都是:
1
Bob
通过中括号加 0 索引,可以得到第一个字典元素,然后再调用其键名即可得到相应的键值。获取键值时有两种方式,一种是中括号加键名,另一种是通过 get 方法传入键名。这里推荐使用 get 方法,这样如果键名不存在,则不会报错,会返回 None。另外,get 方法还可以传入第二个参数(即默认值),示例如下:
可以看到两个结果都是 SelectorList 对象,它其实是一个可迭代对象。另外可以用 len 方法获取它的长度,都是 3,提取结果代表的节点其实也是一样的,都是第 1、3、5 个 li 节点,每个节点还是以 Selector 对象的形式返回了,其中每个 Selector 对象的 data 属性里面包含了提取节点的 HTML 代码。
好,既然刚才提取的结果是一个可迭代对象 SelectorList,那么要获取提取到的所有 li 节点的文本内容就要对结果进行遍历了,写法如下:
1 2 3 4 5 6
from parsel import Selector selector = Selector(text=html) items = selector.css('.item-0') for item in items: text = item.xpath('.//text()').get() print(text)
这里 get 方法的作用是从 SelectorList 里面提取第一个 Selector 对象,然后输出其中的结果。
我们再看一个实例:
1 2
result = selector.xpath('//li[contains(@class, "item-0")]//text()').get() print(result)
输出结果如下:
1
firstitem
其实这里我们使用 //li[contains(@class, "item-0")]//text() 选取了所有 class 包含 item-0 的 li 节点的文本内容。应该来说,返回结果 SelectorList 应该对应三个 li 对象,而这里 get 方法仅仅返回了第一个 li 对象的文本内容,因为其实它会只提取第一个 Selector 对象的结果。
那有没有能提取所有 Selector 的对应内容的方法呢?有,那就是 getall 方法。
所以如果要提取所有对应的 li 节点的文本内容的话,写法可以改写为如下内容:
1 2
result = selector.xpath('//li[contains(@class, "item-0")]//text()').getall() print(result)
from parsel import Selector selector = Selector(text=html) result = selector.css('.item-0.active a::attr(href)').get() print(result) result = selector.xpath('//li[contains(@class, "item-0") and contains(@class, "active")]/a/@href').get() print(result)
这里我们实现了两种写法,分别用 css 和 xpath 方法实现。我们根据同时包含 item-0 和 active 这两个 class 为依据来选取第三个 li 节点,然后进一步选取了里面的 a 节点,对于 CSS Selector,选取属性需要加 ::attr() 并传入对应的属性名称来选取,对于 XPath,直接用 /@ 再加属性名称即可选取。最后统一用 get 方法提取结果即可。
我们一般会用 // 开头的 XPath 规则来选取所有符合要求的节点。这里以前面的 HTML 文本为例,如果要选取所有节点,可以这样实现:
1 2 3 4
from lxml import etree html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//*') print(result)
运行结果如下:
1
[<Element html at 0x10510d9c8>, <Element body at 0x10510da08>, <Element div at 0x10510da48>, <Element ul at 0x10510da88>, <Element li at 0x10510dac8>, <Element a at 0x10510db48>, <Element li at 0x10510db88>, <Element a at 0x10510dbc8>, <Element li at 0x10510dc08>, <Element a at 0x10510db08>, <Element li at 0x10510dc48>, <Element a at 0x10510dc88>, <Element li at 0x10510dcc8>, <Element a at 0x10510dd08>]
这里使用 * 代表匹配所有节点,也就是整个 HTML 文本中的所有节点都会被获取。可以看到,返回形式是一个列表,每个元素是 Element 类型,其后跟了节点的名称,如 html、body、div、ul、li、a 等,所有节点都包含在列表中了。
当然,此处匹配也可以指定节点名称。如果想获取所有 li 节点,示例如下:
1 2 3 4 5
from lxml import etree html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li') print(result) print(result[0])
这里要选取所有 li 节点,可以使用 //,然后直接加上节点名称即可,调用时直接使用 xpath 方法即可。
运行结果如下:
1 2
[<Element li at 0x105849208>, <Element li at 0x105849248>, <Element li at 0x105849288>, <Element li at 0x1058492c8>, <Element li at 0x105849308>] <Element li at 0x105849208>
这里可以看到,提取结果是一个列表形式,其中每个元素都是一个 Element 对象。如果要取出其中一个对象,可以直接用中括号加索引,如 [0]。
6. 子节点
我们通过 / 或 // 即可查找元素的子节点或子孙节点。假如现在想选择 li 节点的所有直接子节点 a,可以这样实现:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li/a') print(result)
这里通过追加 /a 即选择了所有 li 节点的所有直接子节点 a。因为 //li 用于选中所有 li 节点,/a 用于选中 li 节点的所有直接子节点 a,二者组合在一起即获取所有 li 节点的所有直接子节点 a。
运行结果如下:
1
[<Element a at 0x106ee8688>, <Element a at 0x106ee86c8>, <Element a at 0x106ee8708>, <Element a at 0x106ee8748>, <Element a at 0x106ee8788>]
此处的 / 用于选取直接子节点,如果要获取所有子孙节点,就可以使用 //。例如,要获取 ul 节点下的所有子孙节点 a,可以这样实现:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//ul//a') print(result)
运行结果是相同的。
但是如果这里用 //ul/a,就无法获取任何结果了。因为 / 用于获取直接子节点,而在 ul 节点下没有直接的 a 子节点,只有 li 节点,所以无法获取任何匹配结果,代码如下:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//ul/a') print(result)
比如,现在首先选中 href 属性为 link4.html 的 a 节点,然后获取其父节点,再获取其 class 属性,相关代码如下:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//a[@href="link4.html"]/../@class') print(result)
运行结果如下:
1
['item-1']
检查一下结果发现,这正是我们获取的目标 li 节点的 class 属性。
同时,我们也可以通过 parent:: 来获取父节点,代码如下:
1 2 3 4
from lxml import etree html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//a[@href="link4.html"]/parent::*/@class') print(result)
8. 属性匹配
在选取的时候,我们还可以用 @ 符号进行属性过滤。比如,这里如果要选取 class 为 item-0 的 li 节点,可以这样实现:
1 2 3 4
from lxml import etree html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li[@class="item-0"]') print(result)
这里我们通过加入 [@class="item-0"],限制了节点的 class 属性为 item-0,而 HTML 文本中符合条件的 li 节点有两个,所以结果应该返回两个匹配到的元素。结果如下:
1
<Element li at 0x10a399288>, <Element li at 0x10a3992c8>
可见,匹配结果正是两个,至于是不是那正确的两个,后面再验证。
9. 文本获取
我们用 XPath 中的 text 方法获取节点中的文本,接下来尝试获取前面 li 节点中的文本,相关代码如下:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li[@class="item-0"]/text()') print(result)
运行结果如下:
1
['\n ']
奇怪的是,我们并没有获取到任何文本,只获取到了一个换行符,这是为什么呢?因为 XPath 中 text 方法前面是 /,而此处 / 的含义是选取直接子节点,很明显 li 的直接子节点都是 a 节点,文本都是在 a 节点内部的,所以这里匹配到的结果就是被修正的 li 节点内部的换行符,因为自动修正的 li 节点的尾标签换行了。
其中一个节点因为自动修正,li 节点的尾标签添加的时候换行了,所以提取文本得到的唯一结果就是 li 节点的尾标签和 a 节点的尾标签之间的换行符。
因此,如果想获取 li 节点内部的文本,就有两种方式,一种是先选取 a 节点再获取文本,另一种就是使用 //。接下来,我们来看下二者的区别。
首先,选取 a 节点再获取文本,代码如下:
1 2 3 4
from lxml import etree html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li[@class="item-0"]/a/text()') print(result)
运行结果如下:
1
['first item', 'fifth item']
可以看到,这里的返回值是两个,内容都是属性为 item-0 的 li 节点的文本,这也印证了前面属性匹配的结果是正确的。
这里我们是逐层选取的,先选取了 li 节点,又利用 / 选取了其直接子节点 a,然后再选取其文本,得到的结果恰好是符合我们预期的两个结果。
再来看下用另一种方式(即使用 //)选取的结果,代码如下:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li[@class="item-0"]//text()') print(result)
运行结果如下:
1
['first item', 'fifth item', '\n ']
不出所料,这里的返回结果是 3 个。可想而知,这里是选取所有子孙节点的文本,其中前两个就是 li 的子节点 a 内部的文本,另外一个就是最后一个 li 节点内部的文本,即换行符。
所以说,如果要想获取子孙节点内部的所有文本,可以直接用 // 加 text 方法的方式,这样可以保证获取到最全面的文本信息,但是可能会夹杂一些换行符等特殊字符。如果想获取某些特定子孙节点下的所有文本,可以先选取到特定的子孙节点,然后再调用 text 方法获取其内部文本,这样可以保证获取的结果是整洁的。
10. 属性获取
我们知道用 text 方法可以获取节点内部文本,那么节点属性该怎样获取呢?其实还是用 @ 符号就可以。例如,我们想获取所有 li 节点下所有 a 节点的 href 属性,代码如下:
1 2 3 4 5
from lxml import etree
html = etree.parse('./test.html', etree.HTMLParser()) result = html.xpath('//li/a/@href') print(result)
from lxml import etree text = ''' <li class="li li-first"><a href="link.html">first item</a></li> ''' html = etree.HTML(text) result = html.xpath('//li[@class="li"]/a/text()') print(result)
这里 HTML 文本中 li 节点的 class 属性有两个值 li 和 li-first,此时如果还想用之前的属性匹配获取,就无法匹配了,此时的运行结果如下:
1
[]
这时就需要用 contains 方法了,代码可以改写如下:
1 2 3 4 5 6 7
from lxml import etree text = ''' <li class="li li-first"><a href="link.html">first item</a></li> ''' html = etree.HTML(text) result = html.xpath('//li[contains(@class, "li")]/a/text()') print(result)
另外,我们可能还遇到一种情况,那就是根据多个属性确定一个节点,这时就需要同时匹配多个属性。此时可以使用运算符 and 来连接,示例如下:
1 2 3 4 5 6 7
from lxml import etree text = ''' <li class="li li-first" name="item"><a href="link.html">first item</a></li> ''' html = etree.HTML(text) result = html.xpath('//li[contains(@class, "li") and @name="item"]/a/text()') print(result)
这里的 li 节点又增加了一个属性 name。要确定这个节点,需要同时根据 class 和 name 属性来选择,一个条件是 class 属性里面包含 li 字符串,另一个条件是 name 属性为 item 字符串,二者需要同时满足,需要用 and 操作符相连,相连之后置于中括号内进行条件筛选。运行结果如下:
1
['first item']
这里的 and 其实是 XPath 中的运算符。另外,还有很多运算符,如 or、mod 等,在此总结为表 3-。
text = ''' <div> <ul> <li class="item-0"><a href="link1.html"><span>first item</span></a></li> <li class="item-1"><a href="link2.html">second item</a></li> <li class="item-inactive"><a href="link3.html">third item</a></li> <li class="item-1"><a href="link4.html">fourth item</a></li> <li class="item-0"><a href="link5.html">fifth item</a> </ul> </div> ''' html = etree.HTML(text) result = html.xpath('//li[1]/ancestor::*') print(result) result = html.xpath('//li[1]/ancestor::div') print(result) result = html.xpath('//li[1]/attribute::*') print(result) result = html.xpath('//li[1]/child::a[@href="link1.html"]') print(result) result = html.xpath('//li[1]/descendant::span') print(result) result = html.xpath('//li[1]/following::*[2]') print(result) result = html.xpath('//li[1]/following-sibling::*') print(result)
运行结果如下:
1 2 3 4 5 6 7
[<Element html at 0x107941808>, <Element body at 0x1079418c8>, <Element div at 0x107941908>, <Element ul at 0x107941948>] [<Element div at 0x107941908>] ['item-0'] [<Element a at 0x1079418c8>] [<Element span at 0x107941948>] [<Element a at 0x1079418c8>] [<Element li at 0x107941948>, <Element li at 0x107941988>, <Element li at 0x1079419c8>, <Element li at 0x107941a08>]
第一次选择时,我们调用了 ancestor 轴,可以获取所有祖先节点。其后需要跟两个冒号,然后是节点的选择器,这里我们直接使用 *,表示匹配所有节点,因此返回结果是第一个 li 节点的所有祖先节点,包括 html、body、div 和 ul。
第二次选择时,我们又加了限定条件,这次在冒号后面加了 div,这样得到的结果就只有 div 这个祖先节点了。
第三次选择时,我们调用了 attribute 轴,可以获取所有属性值,其后跟的选择器还是 *,这代表获取节点的所有属性,返回值就是 li 节点的所有属性值。
第四次选择时,我们调用了 child 轴,可以获取所有直接子节点。这里我们又加了限定条件,选取 href 属性为 link1.html 的 a 节点。
第五次选择时,我们调用了 descendant 轴,可以获取所有子孙节点。这里我们又加了限定条件获取 span 节点,所以返回的结果只包含 span 节点而不包含 a 节点。
第六次选择时,我们调用了 following 轴,可以获取当前节点之后的所有节点。这里我们虽然使用的是 * 匹配,但又加了索引选择,所以只获取了第二个后续节点。