我们还可以通过 position 参数控制二维码的位置,比如说一张图片里面有一个女生穿裙子,而我们想要把二维码放在裙子的位置并与之融合起来,我们就可以尝试改下二维码的位置,调用样例如下:
1 2 3 4 5 6 7 8 9
curl -X POST "https://api.zhishuyun.com/qrart/generate?token={token}" \ -H "accept: application/json" \ -H "content-type: application/json" \ -d '{ "type": "link", "content": "https://data.zhishuyun.com", "prompt": "one of the beautiful girls in the moonlight in the background, in the style of pixelated chaos, rococo-inspired art, dark white and sky-blue, made of plastic, delicate flowers, gongbi, wimmelbilder", "position": "bottom" }'
在这里我们就不再对各种 API 参数进行一一介绍了,更详细更实时的内容可以参见知数云的官方文档,链接为:https://data.zhishuyun.com/documents/ee085d2a-a0b9-4f0e-8b4d-8da407345138。
价格
知数云艺术二维码的 API 提供了阶梯定价,首次申请免费赠送 20 次,而且购买越多越便宜,由于价格会动态调整,所以大家可以查看知数云官网来查看最新实时价格:https://data.zhishuyun.com/services/38ecf158-36f2-42f2-8e7f-6786cdfc2452
价格怎么样呢?由于价格可能会动态变化,大家可以直接参考知数云的官方网站了解:https://data.zhishuyun.com/services/d87e5e99-b797-4ade-9e73-b896896b0461。但总的来说,能够以这个价格做到知数云 Midjourney API 这样的稳定性和并发的,业界寥寥无几,欢迎选购和评测。
动态住宅代理:这种代理实际上就是用真实的住宅用户的 IP 搭建的代理。一般来说,住宅代理对于很多场景的使用封禁概率会比较低,因为很多厂商对封禁住宅代理是比较谨慎的。动态住宅代理其实就是可以定时切换的IP,比如说做网络爬虫,我们就需要不断变换的不同的代理IP,这样可以进一步的减少被封禁的概率。
静态住宅代理:相对于动态代理来说,静态住宅代理的特点就是长效稳定,可以一直获取一个稳定不变的代理 IP,适合长久的稳定的海外网络环境使用。比如说,我们要进行自动化网站的爬取,如果在一个页面内 IP 地址频繁变动会增大被风控的概率。所以,如果有一个长效稳定的住宅 IP 代理,就会非常方便。
数据中心代理:这种代理实际上是很多服务器厂商的服务器搭建起来的代理。例如腾讯云、阿里云、微软云等服务器所在的IP地址段,就属于所谓的数据中心的 IP 地址段。因此,用这些服务器搭建出来的代理就叫做数据中心代理。一般来说,这种数据中心代理相对于住宅代理更容易被爬虫封禁,但是这种代理的优势就是价格更加便宜,而且网络速度也会相对较好。
I want you to act as a javascript console. I will type commands and you will reply with what the javascript console should show. I want you to only reply with the terminal output inside one unique code block, and nothing else. do not write explanations. do not type commands unless I instruct you to do so. when i need to tell you something in english, i will do so by putting text inside curly brackets {like this}. my first command is console.log(“Hello World”);
import time from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.action_chains import ActionChains from app.captcha_resolver import CaptchaResolver
# click captchas recognized_indices = [i for i, x in enumerate(recognized_results) if x] logger.debug(f'recognized_indices {recognized_indices}') click_targets = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '.task-image'))) for recognized_index in recognized_indices: click_target: WebElement = click_targets[recognized_index] click_target.click() time.sleep(random())
当然我们也可以通过执行 JavaScript 来对每个节点进行模拟点击,效果是类似的。
这里我们用 for 循环将 true false 列表转成了一个列表,列表的每个元素代表 true 在列表中的位置,其实就是我们的点击目标了。
然后接着我们获取了所有的验证码小图对应的节点,然后依次调用 click 方法进行点击即可。
这样我们就可以实现验证码小图的逐个识别了。
点击验证
好,那么有了上面的逻辑,我们就能完成整个 HCaptcha 的识别和点选了。
最后,我们模拟点击验证按钮就好了:
1 2 3 4 5
# after all captcha clicked verify_button: WebElement = self.get_verify_button() if verify_button.is_displayed: verify_button.click() time.sleep(3)
import time from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.common.action_chains import ActionChains from app.captcha_resolver import CaptchaResolver
defget_question_id_by_target_name(target_name): logger.debug(f'try to get question id by {target_name}') question_id = CAPTCHA_TARGET_NAME_QUESTION_ID_MAPPING.get(target_name) logger.debug(f'question_id {question_id}') return question_id
single_captcha_elements = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '#rc-imageselect-target table td'))) for recognized_index in recognized_indices: single_captcha_element: WebElement = single_captcha_elements[recognized_index] single_captcha_element.click() # check if need verify single captcha self.verify_single_captcha(recognized_index)
defverify_single_captcha(self, index): time.sleep(3) elements = self.wait.until(EC.visibility_of_all_elements_located( (By.CSS_SELECTOR, '#rc-imageselect-target table td'))) single_captcha_element: WebElement = elements[index] class_name = single_captcha_element.get_attribute('class') logger.debug(f'verifiying single captcha {index}, class {class_name}') if'selected'in class_name: logger.debug(f'no new single captcha displayed') return logger.debug('new single captcha displayed') single_captcha_url = single_captcha_element.find_element_by_css_selector( 'img').get_attribute('src') logger.debug(f'single_captcha_url {single_captcha_url}') with open(CAPTCHA_SINGLE_IMAGE_FILE_PATH, 'wb') as f: f.write(requests.get(single_captcha_url).content) resized_single_captcha_base64_string = resize_base64_image( CAPTCHA_SINGLE_IMAGE_FILE_PATH, (100, 100)) single_captcha_recognize_result = self.captcha_resolver.create_task( resized_single_captcha_base64_string, get_question_id_by_target_name(self.captcha_target_name)) ifnot single_captcha_recognize_result: logger.error('count not get single captcha recognize result') return has_object = single_captcha_recognize_result.get( 'solution', {}).get('hasObject') if has_object isNone: logger.error('count not get captcha recognized indices') return if has_object isFalse: logger.debug('no more object in this single captcha') return if has_object: single_captcha_element.click() # check for new single captcha self.verify_single_captcha(index)
OK,这里我们定义了一个 verify_single_captcha 方法,然后传入了格子对应的序号。接着我们首先尝试查找格子对应的节点,然后找出对应的 HTML 的 class 属性。如果没有出现新的小图,那就是这样的选中状态,对应的 class 就包含了 selected 字样,如图所示:
比如说,我们有两台主机 A、B,我们最终想实现在 A 上控制 B。那么如果用正向 Shell,其实就是在 A 上输入 B 的连接地址,比如通过 ssh 连接到 B,连接成功之后,我们就可以在 A 上通过命令控制 B 了。如果用反向 Shell,那就是在 A 上先开启一个监听端口,然后让 B 去连接 A 的这个端口,连接成功之后,A 这边就能通过命令控制 B了。
反弹 Shell 有什么用?
还是原来的例子,我们想用 A 来控制 B,如果想用 ssh 等命令来控制,那得输入 B 的 sshd 地址或者端口对吧?但是在很多情况下,由于防火墙、安全组、局域网、NAT 等原因,我们实际上是无法直接连接到 B 的,比如:
网页是运行在浏览器端的,当我们浏览一个网页时,其 HTML 代码、 JavaScript 代码都会被下载到浏览器中执行。借助浏览器的开发者工具,我们可以看到网页在加载过程中所有网络请求的详细信息,也能清楚地看到网站运行的 HTML 代码和 JavaScript 代码,这些代码中就包含了网站加载的全部逻辑,如加载哪些资源、请求接口是如何构造的、页面是如何渲染的等等。正因为代码是完全透明的,所以如果我们能够把其中的执行逻辑研究出来,就可以模拟各个网络请求进行数据爬取了。
网站运营者首先想到防护措施可能是对某些数据接口的参数进行加密,比如说对某些 URL 的一些参数加上校验码或者把一些 id 信息进行编码,使其变得难以阅读或构造;或者对某些 API 请求加上一些 token、sign 等签名,这样这些请求发送到服务器时,服务器会通过客户端发来的一些请求信息以及双方约定好的秘钥等来对当前的请求进行校验,如果校验通过,才返回对应数据结果。
var a = ["hello"]; (function (c, d) { var e = function (f) { while (--f) { c["push"](c["shift"]()); } }; e(++d); })(a, 0x9b); var b = function (c, d) { c = c - 0x0; var e = a[c]; return e; }; let hello = "1" + 0x1; console["log"](b("0x0"), hello);
在一些付费代理套餐中,大家可能会注意到有这样的一个套餐 - 独享代理或私密代理,这种其实就是用了专用服务器搭建了代理服务,相对一般的付费代理来说,其稳定性更好,速度也更快,同时 IP 可以动态变化。这种独享代理或私密代理的 IP 切换大多数都是基于 ADSL 拨号机制来实现的,一台云主机每拨号一次就可以换一个 IP,同时云主机上搭建了代理服务,我们就可以直接使用该云主机的 HTTP 代理来进行数据爬取了。
本节我们就来实际操作一下搭建 ADSL 拨号代理服务的方法。
1. 什么是 ADSL
ADSL,英文全称是 Asymmetric Digital Subscriber Line,即非对称数字用户环路。它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。
ADSL 通过拨号的方式上网,拨号时需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间云主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,由于我们是直接使用专有的云主机搭建的代理服务,所以其代理的稳定性相对更好,代理响应速度也相对更快。
接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,所以我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。
import redis from proxypool.exceptions import PoolEmptyException from proxypool.schemas.proxy import Proxy from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \ PROXY_SCORE_INIT from random import choice from typing import List from loguru import logger from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
defadd(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int: """ add proxy and set it to init score :param proxy: proxy, ip:port, like 8.8.8.8:88 :param score: int score :return: result """ ifnot is_valid_proxy(f'{proxy.host}:{proxy.port}'): logger.info(f'invalid proxy {proxy}, throw it') return ifnot self.exists(proxy): if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, score, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): score})
defrandom(self) -> Proxy: """ get random proxy firstly try to get proxy with max score if not exists, try to get proxy by rank if not exists, raise error :return: proxy, like 8.8.8.8:8 """ # try to get proxy with max score proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else get proxy by rank proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX) if len(proxies): return convert_proxy_or_proxies(choice(proxies)) # else raise error raise PoolEmptyException
defdecrease(self, proxy: Proxy) -> int: """ decrease score of proxy, if small than PROXY_SCORE_MIN, delete it :param proxy: proxy :return: new score """ score = self.db.zscore(REDIS_KEY, proxy.string()) # current score is larger than PROXY_SCORE_MIN if score and score > PROXY_SCORE_MIN: logger.info(f'{proxy.string()} current score {score}, decrease 1') if IS_REDIS_VERSION_2: return self.db.zincrby(REDIS_KEY, proxy.string(), -1) return self.db.zincrby(REDIS_KEY, -1, proxy.string()) # otherwise delete proxy else: logger.info(f'{proxy.string()} current score {score}, remove') return self.db.zrem(REDIS_KEY, proxy.string())
defmax(self, proxy: Proxy) -> int: """ set proxy to max score :param proxy: proxy :return: new score """ logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}') if IS_REDIS_VERSION_2: return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string()) return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
defcount(self) -> int: """ get count of proxies :return: count, int """ return self.db.zcard(REDIS_KEY)
defall(self) -> List[Proxy]: """ get all proxies :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
defbatch(self, start, end) -> List[Proxy]: """ get batch of proxies :param start: start index :param end: end index :return: list of proxies """ return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__': conn = RedisClient() result = conn.random() print(result)
from retrying import retry import requests from loguru import logger
classBaseCrawler(object): urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None) deffetch(self, url, **kwargs): try: response = requests.get(url, **kwargs) if response.status_code == 200: return response.text except requests.ConnectionError: return
@logger.catch defcrawl(self): """ crawl main method """ for url in self.urls: logger.info(f'fetching {url}') html = self.fetch(url) for proxy in self.parse(html): logger.info(f'fetched proxy {proxy.string()} from {url}') yield proxy
import pkgutil from .base import BaseCrawler import inspect
# load classes subclass of BaseCrawler classes = [] for loader, name, is_pkg in pkgutil.walk_packages(__path__): module = loader.find_module(name).load_module(name) for name, value in inspect.getmembers(module): globals()[name] = value if inspect.isclass(value) and issubclass(value, BaseCrawler) and value isnot BaseCrawler: classes.append(value) __all__ = __ALL__ = classes
asyncdeftest(self, proxy: Proxy): """ test single proxy :param proxy: Proxy object :return: """ asyncwith aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: try: logger.debug(f'testing {proxy.string()}') asyncwith session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT, allow_redirects=False) as response: if response.status in TEST_VALID_STATUS: self.redis.max(proxy) logger.debug(f'proxy {proxy.string()} is valid, set max score') else: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score') except EXCEPTIONS: self.redis.decrease(proxy) logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
@logger.catch defrun(self): """ test main method :return: """ # event loop of aiohttp logger.info('stating tester...') count = self.redis.count() logger.debug(f'{count} proxies to test') for i in range(0, count, TEST_BATCH): # start end end offset start, end = i, min(i + TEST_BATCH, count) logger.debug(f'testing proxies from {start} to {end} indices') proxies = self.redis.batch(start, end) tasks = [self.test(proxy) for proxy in proxies] # run tasks using event loop self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__': tester = Tester() tester.run()
这里定义了一个类 Tester,__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来,定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,这代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。
测试链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接。当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。
如果想做一个通用的代理池,则不需要专门设置 TEST_URL,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。
defrun_server(self): """ run server for api """ ifnot ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
defrun(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start()
if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start()
if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start()
tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive"if tester_process.is_alive() else"dead"}') logger.info(f'getter is {"alive"if getter_process.is_alive() else"dead"}') logger.info(f'server is {"alive"if server_process.is_alive() else"dead"}') logger.info('proxy terminated')
if __name__ == '__main__': scheduler = Scheduler() scheduler.run()
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import WebDriverException import time from loguru import logger
COUNT = 1000
for i in range(1, COUNT + 1): try: browser = webdriver.Chrome() wait = WebDriverWait(browser, 10) browser.get('https://captcha1.scrape.center/') button = wait.until(EC.element_to_be_clickable( (By.CSS_SELECTOR, '.el-button'))) button.click() captcha = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, '.geetest_slicebg.geetest_absolute'))) time.sleep(5) captcha.screenshot(f'data/captcha/images/captcha_{i}.png') except WebDriverException as e: logger.error(f'webdriver error occurred {e.msg}') finally: browser.close()
我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的功夫可能就会出现错误,比如 403 Forbidden,这时打开网页一看,可能会看到 “您的 IP 访问频率太高” 这样的提示。出现这种现象的原因是网站采取了一些反爬虫措施。比如,服务器会检测某个 IP 在单位时间内的请求次数,如果超过了这个阈值,就会直接拒绝服务,返回一些错误信息,这种情况可以称为封 IP。
既然服务器检测的是某个 IP 单位时间的请求次数,那么借助某种方式来伪装我们的 IP,让服务器识别不出是由我们本机发起的请求,不就可以成功防止封 IP 了吗?
一种有效的方式就是使用代理,后面会详细说明代理的用法。在这之前,需要先了解下代理的基本原理,它是怎样实现伪装 IP 的呢?
1. 基本原理
代理实际上指的就是代理服务器,英文叫作 Proxy Server,它的功能是代理网络用户去取得网络信息。形象地说,它是网络信息的中转站。在我们正常请求一个网站时,是发送了请求给 Web 服务器,Web 服务器把响应传回给我们。如果设置了代理服务器,实际上就是在本机和服务器之间搭建了一个桥,此时本机不是直接向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,这就是代理的基本原理。
由于验证码目标缺口通常具有比较明显的边缘,所以借助于一些边缘检测算法并通过调整阈值是可以找出它的位置的。目前应用比较广泛的边缘检测算法是 Canny,它是 John F. Canny 于 1986 年开发出来的一个多级边缘检测算法,效果还是不错的,OpenCV 也对此算法进行了实现,方法名称就叫做 Canny,声明如下:
OCR,即 Optical Character Recognition,中文翻译叫做光学字符识别。它是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程。OCR 现在已经广泛应用于生产生活中,如文档识别、证件识别、字幕识别、文档检索等等。当然对于本节所述的图形验证码的识别也没有问题。
import time import re import tesserocr from selenium import webdriver from io import BytesIO from PIL import Image from retrying import retry from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException import numpy as np
start = time.time() session = requests.Session() foriinrange(10): session.get('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for get', end - start) start = time.time()
foriinrange(10): session.post('http://httpbin.org/delay/1') print(f'Finished {i + 1} requests') end = time.time() print('Cost time for post', end - start)
这里我们添加了一个 allowable_methods 指定了一个过滤器,只有 POST 请求会被缓存,GET 请求就不会。
Cookies <RequestsCookieJar[<Cookie sessionid=psnu8ij69f0ltecd5wasccyzc6ud41tc for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
这下没有问题了,我们发现其 URL 就是 INDEX_URL,模拟登录成功了!同时还可以进一步输出 response_index 的 text 属性看下是否获取成功。
可以看到,这里我们无须再关心 Cookie 的处理和传递问题,我们声明了一个 Session 对象,然后每次调用请求的时候都直接使用 Session 对象的 post 或 get 方法就好了。
运行效果是完全一样的,结果如下:
1 2 3
Cookies <RequestsCookieJar[<Cookie sessionid=ssngkl4i7en9vm73bb36hxif05k10k13 for login2.scrape.center/>]> Response Status 200 Response URL https://login2.scrape.center/page/1
asyncdefmain(): global session session = aiohttp.ClientSession() scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)] await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC
Coroutine: <coroutine object execute at 0x10e0f7830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
Coroutine: <coroutine object execute at 0x10aa33830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
可以发现,其运行效果都是一样的。
6. 绑定回调
另外,我们也可以为某个 task 绑定一个回调方法。比如,我们来看下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import asyncio import requests
asyncdefrequest(): url = 'https://www.baidu.com' status = requests.get(url) return status
Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 ... Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Cost time: 66.64284420013428
Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 Waiting for https://httpbin.org/delay/5 ... Task exception was never retrieved future: <Task finished coro=<request() done, defined at demo.py:8> exception=TypeError("object Response can't be used in 'await' expression")> Traceback (most recent call last): File "demo.py", line 11, in request response = await requests.get(url) TypeError: object Response can't be used in 'await' expression
Waiting for https://httpbin.org/delay/5 Get response fromhttps://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 ... Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Waiting for https://httpbin.org/delay/5 Get response from https://httpbin.org/delay/5 response <Response [200]> Cost time: 65.394437756259273
with sync_playwright() as p: for browser_type in [p.chromium, p.firefox, p.webkit]: browser = browser_type.launch(headless=False) page = browser.new_page() page.goto('https://www.baidu.com') page.screenshot(path=f'screenshot-{browser_type.name}.png') print(page.title()) browser.close()
Options: -o, --output <file name> saves the generated script to a file --target <language> language to use, one of javascript, python, python-async, csharp (default: "python") -b, --browser <browserType> browser to use, one of cr, chromium, ff, firefox, wk, webkit (default: "chromium") --channel <channel> Chromium distribution channel, "chrome", "chrome-beta", "msedge-dev", etc --color-scheme <scheme> emulate preferred color scheme, "light" or "dark" --device <deviceName> emulate device, for example "iPhone 11" --geolocation <coordinates> specify geolocation coordinates, for example "37.819722,-122.478611" --load-storage <filename> load context storage state from the file, previously saved with --save-storage --lang <language> specify language / locale, for example "en-GB" --proxy-server <proxy> specify proxy server, for example "http://myproxy:3128" or "socks5://myproxy:8080" --save-storage <filename> save context storage state at the end, for later use with --load-storage --timezone <time zone> time zone to emulate, for example "Europe/Rome" --timeout <timeout> timeout for Playwright actions in milliseconds (default: "10000") --user-agent <ua string> specify user agent string --viewport-size <size> specify browser viewport size in pixels, for example "1280, 720" -h, --help display help for command
with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto('https://spa6.scrape.center/') page.wait_for_load_state('networkidle') elements = page.query_selector_all('a.name') for element in elements: print(element.get_attribute('href')) print(element.text_content()) browser.close()
/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx 霸王别姬 - Farewell My Concubine /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIy 这个杀手不太冷 - Léon /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIz 肖申克的救赎 - The Shawshank Redemption /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI0 泰坦尼克号 - Titanic /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI1 罗马假日 - Roman Holiday /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI2 唐伯虎点秋香 - Flirting Scholar /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3 乱世佳人 - Gone with the Wind /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4 喜剧之王 - The King of Comedy /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5 楚门的世界 - The Truman Show /detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIxMA== 狮子王 - The Lion King
url = 'https://spa1.scrape.center/' html = requests.get(url).text print(html)
运行结果如下:
1
<!DOCTYPE html><htmllang=en><head><metacharset=utf-8><metahttp-equiv=X-UA-Compatiblecontent="IE=edge"><metaname=viewportcontent="width=device-width,initial-scale=1"><linkrel=iconhref=/favicon.ico><title>Scrape | Movie</title><linkhref=/css/chunk-700f70e1.1126d090.cssrel=prefetch><linkhref=/css/chunk-d1db5eda.0ff76b36.cssrel=prefetch><linkhref=/js/chunk-700f70e1.0548e2b4.jsrel=prefetch><linkhref=/js/chunk-d1db5eda.b564504d.jsrel=prefetch><linkhref=/css/app.ea9d802a.cssrel=preloadas=style><linkhref=/js/app.1435ecd5.jsrel=preloadas=script><linkhref=/js/chunk-vendors.77daf991.jsrel=preloadas=script><linkhref=/css/app.ea9d802a.cssrel=stylesheet></head><body><noscript><strong>We're sorry but portal doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><divid=app></div><scriptsrc=/js/chunk-vendors.77daf991.js></script><scriptsrc=/js/app.1435ecd5.js></script></body></html>
可以看到,爬取结果就只有这么一点 HTML 内容,而我们在浏览器中打开这个页面,却能看到如图所示的结果:
在 HTML 中,我们只能看到在源码中引用了一些 JavaScript 和 CSS 文件,并没有观察到有任何电影数据信息。
如果遇到这样的情况,这说明我们现在看到的整个页面便是 JavaScript 渲染得到的,浏览器执行了 HTML 中所引用的 JavaScript 文件,JavaScript 通过调用一些数据加载和页面渲染方法,才最终呈现了图中所示的结果。
defmain(): for page in range(1, TOTAL_PAGE + 1): index_data = scrape_index(page) for item in index_data.get('results'): id = item.get('id') detail_data = scrape_detail(id) logging.info('detail data %s', detail_data)
if __name__ == '__main__': main()
这里我们定义了一个 main 方法,首先遍历获取了页码 page,然后把 page 当参数传递给了 scrape_index 方法,得到列表页的数据。接着我们遍历每个列表页的每个结果,获取到每部电影的 id,然后把 id 当作参数传递给 scrape_detail 方法来爬取每部电影的详情数据,并将其赋值为 detail_data,输出即可。
defsave_data(data): collection.update_one({ 'name': data.get('name') }, { '$set': data }, upsert=True)
在这里我们声明了一个 save_data 方法,它接收一个 data 参数,也就是我们刚才提取的电影详情信息。在方法里面,我们调用了 update_one 方法,第一个参数是查询条件,即根据 name 进行查询;第二个参数就是 data 对象本身,就是所有的数据,这里我们用 $set 操作符表示更新操作;第三个参数很关键,这里实际上是 upsert 参数,如果把这个设置为 True,则可以做到存在即更新,不存在即插入的功能,更新会根据第一个参数设置的 name 字段,所以这样可以防止数据库中出现同名的电影数据。
defmain(): for page in range(1, TOTAL_PAGE + 1): index_data = scrape_index(page) for item in index_data.get('results'): id = item.get('id') detail_data = scrape_detail(id) logging.info('detail data %s', detail_data) save_data(detail_data) logging.info('data saved successfully')
JavaScript 有改变网页内容的能力,解析完响应内容之后,就可以调用 JavaScript 来针对解析完的内容对网页进行下一步处理了。比如,通过 document.getElementById().innerHTML 这样的操作,便可以对某个元素内的源代码进行更改,这样网页显示的内容就改变了,这样的操作也被称作 DOM 操作,即对网页文档进行操作,如更改、删除等。
上例中,document.getElementById("myDiv").innerHTML=xmlhttp.responseText 便将 ID 为 myDiv 的节点内部的 HTML 代码更改为服务器返回的内容,这样 myDiv 元素内部便会呈现出服务器返回的新数据,网页的部分内容看上去就更新了。