0%

【2022 年】Python3 爬虫教程 - 便于高效检索的 Elasticsearch 存储

爬虫系列文章总目录:【2022 年】Python3 爬虫学习教程,本教程内容多数来自于《Python3网络爬虫开发实战(第二版)》一书,目前截止 2022 年,可以将爬虫基本技术进行系统讲解,同时将最新前沿爬虫技术如异步、JavaScript 逆向、AST、安卓逆向、Hook、智能解析、群控技术、WebAssembly、大规模分布式、Docker、Kubernetes 等,市面上目前就仅有《Python3 网络爬虫开发实战(第二版)》一书了,点击了解详情

想查数据,就免不了搜索,而搜索离不开搜索引擎。百度、谷歌都是非常庞大、复杂的搜索引擎,它们几乎索引了互联网上开放的所有网页和数据。然而对于我们自己的业务数据来说,没必要用这么复杂的技术。如果我们想实现自己的搜索引擎,为了便于存储和检索,Elasticsearch 就是不二选择。它是一个全文搜索引擎,可以快速存储、搜索和分析海量数据。

所以,如果我们我们将爬取到的数据存储到 Elasticsearch 里面,那将会非常方便检索。

1. Elasticsearch 介绍

Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。

那 Lucene 又是什么呢?Lucene 可能是目前存在的(不论开源还是私有的)拥有最先进、高性能和全功能搜索引擎功能的库,但也仅仅只是一个库。要想用 Lucene,我们需要编写 Java 并引用 Lucene 包才可以,而且我们需要对信息检索有一定程度的理解。

为了解决这个问题,Elasticsearch 就诞生了。Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目标是使全文检索变得简单,相当于 Lucene 的一层封装,它提供了一套简单一致的 RESTful API 来帮助我们实现存储和检索。

所以 Elasticsearch 仅仅就是一个简易版的 Lucene 封装吗?那就大错特错了,Elasticsearch 不仅仅是 Lucene,并且也不仅仅只是一个全文搜索引擎。它可以这样准确形容:

  • 一个分布式的实时文档存储,每个字段可以被索引与搜索;
  • 一个分布式实时分析搜索引擎;
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据。

总之,它是一个非常强大的搜索引擎,维基百科、Stack Overflow、GitHub 都纷纷采用它来做搜索,不仅仅提供强大的检索能力,也提供强大的存储能力。

2. Elasticsearch 相关概念

在 Elasticsearch 中有几个基本概念,如节点、索引、文档等,下面分别说明一下。理解了这些概念,对熟悉 Elasticsearch 是非常有帮助的。

节点和集群

Elasticsearch 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个 Elasticsearch 实例。

单个 Elasticsearch 实例称为一个节点(Node),一组节点构成一个集群(Cluster)。

索引

索引,即 Index,Elasticsearch 会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。

所以,Elasticsearch 数据管理的顶层单位就叫作索引,其实就相当于 MySQL、MongoDB 等中数据库的概念。另外,值得注意的是,每个索引 (即数据库)的名字必须小写。

文档

文档,即 Document。索引里面单条记录称为文档,许多条文档构成了一个索引。

同一个索引里面的文档,不要求有相同的结构(Schema),但是最好保持一致,因为这样有利于提高搜索效率。

类型

文档可以分组,比如 weather 这个索引里面,既可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。这种分组就叫作类型(Type),它是虚拟的逻辑分组,用来过滤文档,类似 MySQL 中的数据表、MongoDB 中的 Collection。

不同的类型应该有相似的结构。举例来说,id 字段不能在这个组中是字符串,在另一个组中是数值。这是与关系型数据库的表的一个区别。性质完全不同的数据(比如 productslogs)应该存成两个索引,而不是一个索引里面的两个类型(虽然可以做到)。

根据规划,Elastic 6.x 版只允许每个索引包含一个类型,Elastic 7.x 开始将会将其彻底移除。

字段

每个文档都类似一个 JSON 结构,它包含了许多字段,每个字段都有其对应的值,多个字段组成了一个文档,其实就可以类比 MySQL 数据表中的字段。

在 Elasticsearch 中,文档归属于一种类型(Type),而这些类型存在于索引中,我们可以画一些简单的对比图来类比传统关系型数据库:

1
2
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields

以上就是 Elasticsearch 里面的一些基本概念,通过和关系型数据库的对比更加有助于理解。

3. 准备工作

在开始本节实际操作之前,请确保已经正确安装好了 Elasticsearch,安装方式可以参考:https://setup.scrape.center/elasticsearch,安装完成之后确保其在本地 9200 端口上正常运行即可。

Elasticsearch 实际上提供了一系列 Restful API 来进行存取和查询操作,我们可以使用 curl 等命令或者直接调用 API 来进行数据存储和修改操作,但总归来说并不是很方便。所以这里我们就直接介绍一个专门用来对接 Elasticsearch 操作的 Python 库,名称也叫做 Elasticsearch,使用 pip3 安装即可:

1
pip3 install elasticsearch

更详细的安装方式可以参考:https://setup.scrape.center/elasticsearch-py。

安装好了之后我们就可以开始本节的学习了。

4. 创建索引

我们先来看下怎样创建一个索引,这里我们创建一个名为 news 的索引:

1
2
3
4
5
from elasticsearch import Elasticsearch

es = Elasticsearch()
result = es.indices.create(index='news', ignore=400)
print(result)

这里我们首先创建了一个 Elasticsearch 对象,并且没有设置任何参数,默认情况下它会连接本地 9200 端口运行的 Elasticsearch 服务,我们也可以设置特定的连接信息,如:

1
2
3
4
es = Elasticsearch(
['https://[username:password@]hostname:port'],
verify_certs=True, # 是否验证 SSL 证书
)

第一个参数我们可以构造特定格式的链接字符串并传入,hostname 和 port 即 Elasticsearch 运行的地址和端口,username 和 password 是可选的,代表连接 Elasticsearch 需要的用户名和密码,另外而且还有其他的参数设置,比如 verify_certs 代表是否验证证书有效性。更多参数的设置可以参考:https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch。

声明 Elasticsearch 对象之后,我们调用了 es 的 indices 对象的 create 方法传入了 index 的名称,如果创建成功,会返回如下结果:

1
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'news'}

可以看到,其返回结果是 JSON 格式,其中的 acknowledged 字段表示创建操作执行成功。

但这时如果我们再把代码执行一次的话,就会返回如下结果:

1
{'error': {'root_cause': [{'type': 'resource_already_exists_exception', 'reason': 'index [news/hHEYozoqTzK_qRvV4j4a3w] already exists', 'index_uuid': 'hHEYozoqTzK_qRvV4j4a3w', 'index': 'news'}], 'type': 'resource_already_exists_exception', 'reason': 'index [news/hHEYozoqTzK_qRvV4j4a3w] already exists', 'index_uuid': 'hHEYozoqTzK_qRvV4j4a3w', 'index': 'news'}, 'status': 400}

它提示创建失败,status 状态码是 400,错误原因是索引已经存在了。

注意在这里的代码中,我们使用的 ignore 参数为 400,这说明如果返回结果是 400 的话,就忽略这个错误,不会报错,程序不会抛出异常。

假如我们不加 ignore 这个参数的话:

1
2
3
es = Elasticsearch()
result = es.indices.create(index='news')
print(result)

再次执行就会报错了:

1
2
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.RequestError: TransportError(400, 'resource_already_exists_exception', 'index [news/QM6yz2W8QE-bflKhc5oThw] already exists')

这样程序的执行就会出现问题。因此,我们需要善用 ignore 参数,把一些意外情况排除,这样可以保证程序正常执行而不会中断。

创建完之后,我们还可以设置下索引的字段映射定义,可以参考:https://elasticsearch-py.readthedocs.io/en/latest/api.html?#elasticsearch.client.IndicesClient.put_mapping。

5. 删除索引

删除索引也是类似的,代码如下:

1
2
3
4
5
from elasticsearch import Elasticsearch

es = Elasticsearch()
result = es.indices.delete(index='news', ignore=[400, 404])
print(result)

这里也使用了 ignore 参数来忽略索引不存在而删除失败导致程序中断的问题。

如果删除成功,会输出如下结果:

1
{'acknowledged': True}

如果索引已经被删除,再执行删除,则会输出如下结果:

1
{'error': {'root_cause': [{'type': 'index_not_found_exception', 'reason': 'no such index [news]', 'resource.type': 'index_or_alias', 'resource.id': 'news', 'index_uuid': '_na_', 'index': 'news'}], 'type': 'index_not_found_exception', 'reason': 'no such index [news]', 'resource.type': 'index_or_alias', 'resource.id': 'news', 'index_uuid': '_na_', 'index': 'news'}, 'status': 404}

这个结果表明当前索引不存在,删除失败。返回的结果同样是 JSON,状态码是 404,但是由于我们添加了 ignore 参数,忽略了 404 状态码,因此程序正常执行,输出 JSON 结果,而不是抛出异常。

6. 插入数据

Elasticsearch 就像 MongoDB 一样,在插入数据的时候可以直接插入结构化字典数据,插入数据可以调用 create 方法。例如,这里我们插入一条新闻数据:

1
2
3
4
5
6
7
8
9
10
11
from elasticsearch import Elasticsearch

es = Elasticsearch()
es.indices.create(index='news', ignore=400)

data = {
'title': '乘风破浪不负韶华,奋斗青春圆梦高考',
'url': 'http://view.inews.qq.com/a/EDU2021041600732200'
}
result = es.create(index='news', id=1, body=data)
print(result)

这里我们首先声明了一条新闻数据,包括标题和链接,然后通过调用 create 方法插入了这条数据。在调用 create 方法时,我们传入了 4 个参数,index 参数代表了索引名称,id 则是数据的唯一标识 ID,body 则代表了文档的具体内容。

运行结果如下:

1
{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

结果中 result 字段为 created,代表该数据插入成功。

另外,其实我们也可以使用 index 方法来插入数据。但与 create 不同的是,create 方法需要我们指定 id 字段来唯一标识该条数据,而 index 方法则不需要,如果不指定 id,会自动生成一个 id。调用 index 方法的写法如下:

1
es.index(index='news', body=data)

create 方法内部其实也是调用了 index 方法,是对 index 方法的封装。

7. 更新数据

更新数据也非常简单,我们同样需要指定数据的 id 和内容,调用 update 方法即可,代码如下:

1
2
3
4
5
6
7
8
9
10
from elasticsearch import Elasticsearch

es = Elasticsearch()
data = {
'title': '乘风破浪不负韶华,奋斗青春圆梦高考',
'url': 'http://view.inews.qq.com/a/EDU2021041600732200',
'date': '2021-07-05'
}
result = es.update(index='news', body=data, id=1)
print(result)

这里我们为数据增加了一个日期字段,然后调用了 update 方法,结果如下:

1
{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

可以看到,返回结果中 result 字段为 updated,即表示更新成功。另外,我们还注意到一个字段 _version,这代表更新后的版本号数,2 代表这是第二个版本。因为之前已经插入过一次数据,所以第一次插入的数据是版本 1,可以参见上例的运行结果,这次更新之后版本号就变成了 2,以后每更新一次,版本号都会加 1。

另外,更新操作利用 index 方法同样可以做到,其写法如下:

1
es.index(index='news', body=data, id=1)

可以看到,index 方法可以代替我们完成插入和更新数据这两个操作。如果数据不存在,就执行插入操作,如果已经存在,就执行更新操作,非常方便。

8. 删除数据

如果想删除一条数据,可以调用 delete 方法并指定需要删除的数据 id 即可。其写法如下:

1
2
3
4
5
from elasticsearch import Elasticsearch

es = Elasticsearch()
result = es.delete(index='news', id=1)
print(result)

运行结果如下:

1
{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}

可以看到,运行结果中 result 字段为 deleted,代表删除成功;_version 变成了 3,又增加了 1。

9. 查询数据

上面的几个操作都是非常简单的操作,普通的数据库如 MongoDB 都可以完成,看起来并没有什么了不起的,Elasticsearch 更特殊的地方在于其异常强大的检索功能。

对于中文来说,我们需要安装一个分词插件,这里使用的是 elasticsearch-analysis-ik,其 GitHub 链接为https://github.com/medcl/elasticsearch-analysis-ik。这里我们使用 Elasticsearch 的另一个命令行工具 elasticsearch-plugin 来安装,这里安装的版本是 7.13.2,请确保和 Elasticsearch 的版本对应起来,命令如下:

1
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.13.2/elasticsearch-analysis-ik-7.13.2.zip

这里的版本号请替换成你的 Elasticsearch 版本号。

安装之后,我们需要重新启动 Elasticsearch,启动之后它会自动加载安装好的插件。

首先,我们重新新建一个索引并指定需要分词的字段,相应代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from elasticsearch import Elasticsearch

es = Elasticsearch()
mapping = {
'properties': {
'title': {
'type': 'text',
'analyzer': 'ik_max_word',
'search_analyzer': 'ik_max_word'
}
}
}
es.indices.delete(index='news', ignore=[400, 404])
es.indices.create(index='news', ignore=400)
result = es.indices.put_mapping(index='news', body=mapping)
print(result)

这里我们先将之前的索引删除了,然后新建了一个索引,接着更新了它的 mapping 信息。mapping 信息中指定了分词的字段,指定了字段的类型 typetext,分词器 analyzer 和搜索分词器 search_analyzerik_max_word,即使用我们刚才安装的中文分词插件。如果不指定的话,则使用默认的英文分词器。

接下来,我们插入几条新数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from elasticsearch import Elasticsearch

es = Elasticsearch()

datas = [
{
'title': '高考结局大不同',
'url': 'https://k.sina.com.cn/article_7571064628_1c3454734001011lz9.html',
},
{
'title': '进入职业大洗牌时代,“吃香”职业还吃香吗?',
'url': 'https://new.qq.com/omn/20210828/20210828A025LK00.html',
},
{
'title': '乘风破浪不负韶华,奋斗青春圆梦高考',
'url': 'http://view.inews.qq.com/a/EDU2021041600732200',
},
{
'title': '他,活出了我们理想的样子',
'url': 'https://new.qq.com/omn/20210821/20210821A020ID00.html',
}
]

for data in datas:
es.index(index='news', body=data)

这里我们指定了 4 条数据,它们都带有 titleurl 字段,然后通过 index 方法将其插入 Elasticsearch 中,索引名称为 news

接下来,我们根据关键词查询一下相关内容:

1
2
result = es.search(index='news')
print(result)

可以看到,这里查询出了插入的 4 条数据:

1
{'took': 11, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 4, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'news', '_type': '_doc', '_id': 'jebpkHsBm-BAny-7hOYp', '_score': 1.0, '_source': {'title': '高考结局大不同', 'url': 'https://k.sina.com.cn/article_7571064628_1c3454734001011lz9.html'}}, {'_index': 'news', '_type': '_doc', '_id': 'jubpkHsBm-BAny-7hObz', '_score': 1.0, '_source': {'title': '进入职业大洗牌时代,“吃香”职业还吃香吗?', 'url': 'https://new.qq.com/omn/20210828/20210828A025LK00.html'}}, {'_index': 'news', '_type': '_doc', '_id': 'j-bpkHsBm-BAny-7heZN', '_score': 1.0, '_source': {'title': '乘风破浪不负韶华,奋斗青春圆梦高考', 'url': 'http://view.inews.qq.com/a/EDU2021041600732200'}}, {'_index': 'news', '_type': '_doc', '_id': 'kObpkHsBm-BAny-7hean', '_score': 1.0, '_source': {'title': '他,活出了我们理想的样子', 'url': 'https://new.qq.com/omn/20210821/20210821A020ID00.html'}}]}}

可以看到,返回结果会出现在 hits 字段里面,其中 total 字段标明了查询的结果条目数,max_score 代表了最大匹配分数。

另外,我们还可以进行全文检索,这才是体现 Elasticsearch 搜索引擎特性的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from elasticsearch import Elasticsearch
import json

dsl = {
'query': {
'match': {
'title': '高考 圆梦'
}
}
}

es = Elasticsearch()
result = es.search(index='news', body=dsl)
print(result)

这里我们使用 Elasticsearch 支持的 DSL 语句来进行查询,使用 match 指定全文检索,检索的字段是 title,内容是“中国 领事馆”,搜索结果如下:

1
{'took': 6, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 2, 'relation': 'eq'}, 'max_score': 1.7796917, 'hits': [{'_index': 'news', '_type': '_doc', '_id': 'j-bpkHsBm-BAny-7heZN', '_score': 1.7796917, '_source': {'title': '乘风破浪不负韶华,奋斗青春圆梦高考', 'url': 'http://view.inews.qq.com/a/EDU2021041600732200'}}, {'_index': 'news', '_type': '_doc', '_id': 'jebpkHsBm-BAny-7hOYp', '_score': 0.81085134, '_source': {'title': '高考结局大不同', 'url': 'https://k.sina.com.cn/article_7571064628_1c3454734001011lz9.html'}}]}}

这里我们看到匹配的结果有两条,第一条的分数为 1.7796917,第二条的分数为 0.81085134,这是因为第一条匹配的数据中含有“高考”和“圆梦”两个词,第二条匹配的数据中不包含“圆梦”,但是包含了“高考”这个词,所以也被检索出来了,但是分数比较低。

因此,可以看出,检索时会对对应的字段进行全文检索,结果还会按照检索关键词的相关性进行排序,这就是一个基本的搜索引擎雏形。

另外,Elasticsearch 还支持非常多的查询方式,这里就不再一一展开描述了,总之其功能非常强大,详情可以参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl.html。

10. 总结

以上便是对 Elasticsearch 的基本介绍以及使用 Python 操作 Elasticsearch 的基本用法,但这仅仅是 Elasticsearch 的基本功能,它还有更多强大的功能等待着我们去探索。

本节代码地址:https://github.com/Python3WebSpider/ElasticSearchTest。