关于这个,我体会也很深。 我回顾了自己一年以来没有做和已经做的事情,发现了这么一个现象:有件事我确实给自己定目标了,比如我要学习 Go 语言。然后我就把这个加到了我的待做清单里面,没有给他设置时间限度,也没有具体规划我怎样去做,哪个时间去学什么,反而自己的时间被一些零碎的或更紧急的事情占据了,最后我一整年都没有学 Go。我仔细想想,其实也并不是没有时间,有时候,我在某个时间段,确实是完全闲着的,比如我周六的时候,可能会躺在床上玩手机,一玩一上午,但那会啥也不想做,也没想好要那会要做什么。 我反思了一下自己,还是因为自己给自己的规划不明确。 主要有这么两点:
第一,某些目标我设置的太大,没有详细去规划什么时间做什么。比如学 Go 语言,我应该去好好思考一下,我要在多久时间内达成这个目标,我应该什么时间去做什么,我应该去细分到每一章节,在最开始的时候可能没必要所有的都分的那么细,但真正下一步要做的,一定要列得详细再详细。 比如说,我要三个月内学好 Go 语言,我可以先思考,三个月,我要学多少知识模块,比如有十个知识模块,那么我就规划每一个模块大体什么时候完成,每个模块列到自己的 Todo List 里面,设定好期限,注意,一定要设置好期限,不然真的会一拖再拖!然后,最开始我可能没必要把大把的时间把每个模块里面的每个小知识点都拆分好,但前面的一定要列好,比如我十个模块,我最开始的一两个模块一定要再拆分规划好,同时再设定好每个小知识点的时间。要是前面的模块学完了,再去抽时间规划下一个模块就好了。
代码中有 3 对 b 标签,第 1 对 b 标签中包含 3 对 i 标签,i 标签中的数字都是 7,也就是说第 1 对 b 标签的显示结果应该是 777。而第 2 对 b 标签中的数字是 6,第 3 对 b 标签中的数字是 4。 这些数字与页面所显示票价 467 的关系是什么呢? 这一步找到的标签和数字有可能是数据源,但是数字的组合有很多种可能,如图 6-9 所示。 图 6-9 数字组合推测 5 个数字的组合结果太多了,我们必须找出其中的规律,这样就能知道网页为什么显示 467 而不是 764 或者 776 。在仔细查看过后,发现每个带有数字的标签都设定了样式。第 1 对 b 标签的样式为:
1
width:48px;left:-48px
第 2 对 b 标签的样式为:
1
width: 16px;left:-32px
第 3 对 b 标签的样式为:
1
width: 16px;left:-48px
i 标签对的样式是相同的,都是:
1
width: 16px;
另外,还注意到最外层的 span 标签对的样式为:
1
width:48px
如果按照 CSS 样式这条线索来分析的话,第 1 对 b 标签中的 3 对 i 标签刚好占满 span 标签对的位置,其位置如图 6-10 所示。 图 6-10 span 标签对和 i 标签对位置图 此时网页中显示的价格应该是 777,但是由于第 2 和第 3 对 b 标签中有值,所以我们还需要计算它们的位置。此时标签位置的变化如图 6-11 所示。 图 6-11 标签位置变化 右侧是标签位置变化后的结果,由于第 2 对 b 标签的位置样式是 left:-32px,所以第 2 对 b 标签中的值 6 就会覆盖原来第 1 对 b 标签中的中的第 2 个数字 7,此时页面应该显示的数字是 767。 按此规律推算,第 3 对 b 标签的位置样式是 left:-48px,这个标签的值会覆盖第 1 对 b 标签中的第 1 个数字 7,覆盖结果如图 6-12 所示,最后显示的票价是 467。 图 6-12 覆盖结果 根据结果来看这种算法是合理的,不过我们还需要对其进行验证,现在将第二架航班的 HTML 值 和 CSS 样式按照这个规律进行推算。最后推算得到的结果与页面显示结果相同,说明这个位置偏移的计算方法是正确的,这样我们就可以编写 Python 代码获取网页中的票价信息了。因为 b 标签包裹在 class 属性为 rel 的 em 标签下,所以我们要定位所有的 em 标签。对应的 Python 代码如下:
1 2 3 4 5 6 7
import requests import re from parsel import Selector url = 'http://www.porters.vip/confusion/flight.html' resp = requests.get(url) sel = Selector(resp.text) em = sel.css('em.rel').extract()
接着定位所有的 b 标签。由于 b 标签中还有 i 标签,而且 i 标签的值是基准数据,所以可以直接提取。对应的 Python 代码如下:
alternate_price = [] for eb in element_b: eb = Selector(eb) # 提取<b>标签的 style 属性值 style = eb.css('b::attr("style")').get() # 获得具体的位置 position = ''.join(re.findall('left:(.*)px', style)) # 获得该标签下的数字 value = eb.css('b::text').get() # 将<b>标签的位置信息和数字以字典的格式添加到替补票价列表中 alternate_price.append({'position': position, 'value': value})
然后根据偏移量决定基准数据列表的覆盖元素,实际上是完成图 6-11 中的操作。
1 2 3 4 5 6 7 8 9 10
foral in alternate_price: position = int(al.get('position')) value = al.get('value') # 判断位置的数值是否正整数 plus = True if position >= 0else False # 计算下标,以 16px 为基准 index = int(position / 16) # 替换第一对<b>标签值列表中的元素,也就是完成值覆盖操作 base_price[index] = value print(base_price)
最后将数据列表打印出来,得到的输出结果为:
1 2
['4', '6', '7'] ['8', '7', '0', '5']
令人感到奇怪的是,输出结果中第一组票价数字与页面中显示的相同,但第二组却不同。这是因为第二架航班的票价基准数据有 4 个值。航班票价对应的 HTML 代码如下:
包含很多的 d 标签,难道它使用 d 标签进行占位,然后用元素进行覆盖吗?我们可以将 d 标签的数量和数字的数量进行对比,发现它们的数量是相同的,也就是说一对 d 标签代表一个数字。 每一对 d 标签都有 class 属性,有些 class 属性值是相同的,有些则不同。我们再将 class 属性值与数字进行对比,看一看能否找到规律,如图 6-17 所示。 图 6-17 class 属性值和数字的对比 从图 6-17 中可以看出,class 属性值和数字是一一对应的,如属性值 vhk08k 与数字 0 对应。根据这个线索,我们可以猜测每个数字都与一个属性值对应,对应关系如图 6-18 所示。 图 6-18 数字与属性值对应关系 浏览器在渲染页面的时候就会按照这个对应关系进行映射,所以页面中显示的是数字,而我们在 HTML 代码中看到的则是这些 class 属性值。浏览器在渲染时将 HTML 中的 d 标签与数字按照此关系进行映射,并将映射结果呈现在页面中。映射逻辑如图 6-19 所示。 图 6-19 映射逻辑 我们的爬虫代码可以按照同样的逻辑实现映射功能,在解析 HTML 代码时将 d 标签的 class 属性值取出来,然后进行映射即可得到页面中显示的数字。如何在爬虫代码中实现映射关系呢?实际上网页中使用的是“属性名数字”这种结构,Python 中内置的字典正好可以满足我们的需求。我们可以用 Python 代码测试一下,代码如下:
import re pile = '.%s{background:-(d+)px-(d+)px;}' % css_class_name pattern = re.compile(pile) css = css_resp.replace('n', '').replace(' ', '') coord = pattern.findall(css) if coord: x, y = coord[0] x, y = int(x), int(y)
此时得到的坐标值是正数,可以直接用于 SVG 字符定位。定位前我们要先拿到 SVG 中所有 text 标签的 Element 对象:
1 2 3
from parsel import Selector svg_data = Selector(svg_resp) texts = svg_data.xpath('//text')
然后获取所有 text 标签中的 y 值,接着我们将上一步得到的 Element 对象进行循环取值即可:
1
axis_y = [i.attrib.get('y') for i in texts if y <= int(i.attrib.get('y'))][0]
得到 y 值后就可以开始字符定位了。要注意的是,SVG 中 text 标签的 y 值与 CSS 样式中得到的 y 值并不需要完全相等,因为样式可以随意调整,比如 CSS 样式中-90 和-92 对于 SVG 的定位来说并没有什么差别,所以我们只需要知道具体是哪一个 text 即可。 那么如何确定是哪一个 text呢? 我们可以用排除法来确定,假如当前 CSS 样式中的 y 值是-97,那么在 SVG 中 text 的 y 值就不可能小于 97,我们只需要取到比 97 大且最相近的 text 标签 y 值即可。比如当前 SVG 所有 text 标签的 y 值为:
1
[38, 83, 120, 164]
那么大于 97 且最相近的是 120。将这个逻辑转化为代码:
1
axis_y = [i.attrib.get('y') for i in texts if y <= int(i.attrib.get('y'))][0]
页面中重要的数据都是一些奇怪的字符,本应该显示“9.7”的地方在 HTML 中显示的是“☒.☒”,而本应该显示“56.83”的地方在 HTML 中显示的是“☒☒.☒☒”。与 6.3 节中的映射反爬虫不同,案例中的文字都被“☒”符号代替了,根本无法分辨。这就很奇怪了,“☒”能代表这么多种数字吗? 要注意的是,Chrome 开发者工具的元素面板中显示的内容不一定是相应正文的原文,要想知道“☒”符号是什么,还需要到网页源代码中确认。对应的网页源代码如下:
web_code = '.' # 编码文字替换 woff_code = [i.upper().replace('&#X', 'uni') for i in web_code.split('.')] import hashlib result = [] for w in woff_code: # 从字体文件中取出对应编码的字形信息 content = font['glyf'].glyphs.get(w).data # 字形信息 MD5 glyph = hashlib.md5(content).hexdigest() for b in base_font.get('font'): # 与基准字形中的 MD5 值进行对比,如果相同则取出该字形描述的文字 if b.get('hex') == glyph: result.append(b.get('value')) break # 打印映射结果 print(result)
在 Bloom Filter 中使用位数组来辅助实现检测判断。在初始状态下,我们声明一个包含 m 位的位数组,它的所有位都是 0,如图 14-7 所示。 图 14-7 初始位数组 现在我们有了一个待检测集合,我们表示为 S={x1, x2, …, xn},我们接下来需要做的就是检测一个 x 是否已经存在于集合 S 中。在 BloomFilter 算法中首先使用 k 个相互独立的、随机的哈希函数来将这个集合 S 中的每个元素 x1、x2、…、xn 映射到这个长度为 m 的位数组上,哈希函数得到的结果记作位置索引,然后将位数组该位置索引的位置 1。例如这里我们取 k 为 3,即有三个哈希函数,x1 经过三个哈希函数映射得到的结果分别为 1、4、8,x2 经过三个哈希函数映射得到的结果分别为 4、6、10,那么就会将位数组的 1、4、6、8、10 这五位置 1,如图 14-8 所示: 图 14-8 映射后位数组 这时如果再有一个新的元素 x,我们要判断 x 是否属于 S 这个集合,我们便会将仍然用 k 个哈希函数对 x 求映射结果,如果所有结果对应的位数组位置均为 1,那么我们就认为 x 属于 S 这个集合,否则如果有一个不为 1,则 x 不属于 S 集合。 例如一个新元素 x 经过三个哈希函数映射的结果为 4、6、8,对应的位置均为 1,则判断 x 属于 S 这个集合。如果结果为 4、6、7,7 对应的位置为 0,则判定 x 不属于 S 这个集合。 注意这里 m、n、k 满足的关系是 m>nk,也就是说位数组的长度 m 要比集合元素 n 和哈希函数 k 的乘积还要大。 这样的判定方法很高效,但是也是有代价的,它可能把不属于这个集合的元素误认为属于这个集合,我们来估计一下它的错误率。当集合 S={x1, x2,…, xn} 的所有元素都被 k 个哈希函数映射到 m 位的位数组中时,这个位数组中某一位还是 0 的概率是: 因为哈希函数是随机的,所以任意一个哈希函数选中这一位的概率为 1/m,那么 1-1/m 就代表哈希函数一次没有选中这一位的概率,要把 S 完全映射到 m 位数组中,需要做 kn 次哈希运算,所以最后的概率就是 1-1/m 的 kn 次方。 一个不属于 S 的元素 x 如果要被误判定为在 S 中,那么这个概率就是 k 次哈希运算得到的结果对应的位数组位置都为 1,所以误判概率为: 根据: 可以将误判概率转化为: 在给定 m、n 时,可以求出使得 f 最小化的 k 值为: 在这里将误判概率归纳如下: 表 14-1 误判概率
m/n
最优 k
k=1
k=2
k=3
k=4
k=5
k=6
k=7
k=8
2
1.39
0.393
0.400
3
2.08
0.283
0.237
0.253
4
2.77
0.221
0.155
0.147
0.160
5
3.46
0.181
0.109
0.092
0.092
0.101
6
4.16
0.154
0.0804
0.0609
0.0561
0.0578
0.0638
7
4.85
0.133
0.0618
0.0423
0.0359
0.0347
0.0364
8
5.55
0.118
0.0489
0.0306
0.024
0.0217
0.0216
0.0229
9
6.24
0.105
0.0397
0.0228
0.0166
0.0141
0.0133
0.0135
0.0145
10
6.93
0.0952
0.0329
0.0174
0.0118
0.00943
0.00844
0.00819
0.00846
11
7.62
0.0869
0.0276
0.0136
0.00864
0.0065
0.00552
0.00513
0.00509
12
8.32
0.08
0.0236
0.0108
0.00646
0.00459
0.00371
0.00329
0.00314
13
9.01
0.074
0.0203
0.00875
0.00492
0.00332
0.00255
0.00217
0.00199
14
9.7
0.0689
0.0177
0.00718
0.00381
0.00244
0.00179
0.00146
0.00129
15
10.4
0.0645
0.0156
0.00596
0.003
0.00183
0.00128
0.001
0.000852
16
11.1
0.0606
0.0138
0.005
0.00239
0.00139
0.000935
0.000702
0.000574
17
11.8
0.0571
0.0123
0.00423
0.00193
0.00107
0.000692
0.000499
0.000394
18
12.5
0.054
0.0111
0.00362
0.00158
0.000839
0.000519
0.00036
0.000275
19
13.2
0.0513
0.00998
0.00312
0.0013
0.000663
0.000394
0.000264
0.000194
20
13.9
0.0488
0.00906
0.0027
0.00108
0.00053
0.000303
0.000196
0.00014
21
14.6
0.0465
0.00825
0.00236
0.000905
0.000427
0.000236
0.000147
0.000101
22
15.2
0.0444
0.00755
0.00207
0.000764
0.000347
0.000185
0.000112
7.46e-05
23
15.9
0.0425
0.00694
0.00183
0.000649
0.000285
0.000147
8.56e-05
5.55e-05
24
16.6
0.0408
0.00639
0.00162
0.000555
0.000235
0.000117
6.63e-05
4.17e-05
25
17.3
0.0392
0.00591
0.00145
0.000478
0.000196
9.44e-05
5.18e-05
3.16e-05
26
18
0.0377
0.00548
0.00129
0.000413
0.000164
7.66e-05
4.08e-05
2.42e-05
27
18.7
0.0364
0.0051
0.00116
0.000359
0.000138
6.26e-05
3.24e-05
1.87e-05
28
19.4
0.0351
0.00475
0.00105
0.000314
0.000117
5.15e-05
2.59e-05
1.46e-05
29
20.1
0.0339
0.00444
0.000949
0.000276
9.96e-05
4.26e-05
2.09e-05
1.14e-05
30
20.8
0.0328
0.00416
0.000862
0.000243
8.53e-05
3.55e-05
1.69e-05
9.01e-06
31
21.5
0.0317
0.0039
0.000785
0.000215
7.33e-05
2.97e-05
1.38e-05
7.16e-06
32
22.2
0.0308
0.00367
0.000717
0.000191
6.33e-05
2.5e-05
1.13e-05
5.73e-06
表 14-1 中第一列为 m/n 的值,第二列为最优 k 值,其后列为不同 k 值的误判概率,可以看到当 k 值确定时,随着 m/n 的增大,误判概率逐渐变小。当 m/n 的值确定时,当 k 越靠近最优 K 值,误判概率越小。另外误判概率总体来看都是极小的,在容忍此误判概率的情况下,大幅减小存储空间和判定速度是完全值得的。 接下来我们就将 BloomFilter 算法应用到 Scrapy-Redis 分布式爬虫的去重过程中,以解决 Redis 内存不足的问题。
classHashMap(object): def__init__(self, m, seed): self.m = m self.seed = seed
defhash(self, value): """ Hash Algorithm :param value: Value :return: Hash Value """ ret = 0 for i in range(len(value)): ret += self.seed * ret + ord(value[i]) return (self.m - 1) & ret
在这里新建了一个 HashMap 类,构造函数传入两个值,一个是 m 位数组的位数,另一个是种子值 seed,不同的哈希函数需要有不同的 seed,这样可以保证不同的哈希函数的结果不会碰撞。 在 hash() 方法的实现中,value 是要被处理的内容,在这里我们遍历了该字符的每一位并利用 ord() 方法取到了它的 ASCII 码值,然后混淆 seed 进行迭代求和运算,最终会得到一个数值。这个数值的结果就由 value 和 seed 唯一确定,然后我们再将它和 m 进行按位与运算,即可获取到 m 位数组的映射结果,这样我们就实现了一个由字符串和 seed 来确定的哈希函数。当 m 固定时,只要 seed 值相同,就代表是同一个哈希函数,相同的 value 必然会映射到相同的位置。所以如果我们想要构造几个不同的哈希函数,只需要改变其 seed 就好了,以上便是一个简易的哈希函数的实现。 接下来我们再实现 BloomFilter,BloomFilter 里面需要用到 k 个哈希函数,所以在这里我们需要对这几个哈希函数指定相同的 m 值和不同的 seed 值,在这里构造如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
BLOOMFILTER_HASH_NUMBER = 6 BLOOMFILTER_BIT = 30
class BloomFilter(object): def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER): """ Initialize BloomFilter :param server: Redis Server :param key: BloomFilter Key :param bit: m = 2 ^ bit :param hash_number: the number of hash function """ # default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints self.m = 1 << bit self.seeds = range(hash_number) self.maps = [HashMap(self.m, seed) for seed in self.seeds] self.server = server self.key = key
defexists(self, value): """ if value exists :param value: :return: """ ifnot value: returnFalse exist = 1 for map in self.maps: offset = map.hash(value) exist = exist & self.server.getbit(self.key, offset) return exist
definsert(self, value): """ add value to bloom :param value: :return: """ for f in self.maps: offset = f.hash(value) self.server.setbit(self.key, offset, 1)
首先我们先看下 insert() 方法,BloomFilter 算法中会逐个调用哈希函数对放入集合中的元素进行运算得到在 m 位位数组中的映射位置,然后将位数组对应的位置置 1,所以这里在代码中我们遍历了初始化好的哈希函数,然后调用其 hash() 方法算出映射位置 offset,再利用 Redis 的 setbit() 方法将该位置 1。 在 exists() 方法中我们就需要实现判定是否重复的逻辑了,方法参数 value 即为待判断的元素,在这里我们首先定义了一个变量 exist,然后遍历了所有哈希函数对 value 进行哈希运算,得到映射位置,然后我们用 getbit() 方法取得该映射位置的结果,依次进行与运算。这样只有每次 getbit() 得到的结果都为 1 时,最后的 exist 才为 True,即代表 value 属于这个集合。如果其中只要有一次 getbit() 得到的结果为 0,即 m 位数组中有对应的 0 位,那么最终的结果 exist 就为 False,即代表 value 不属于这个集合。这样此方法最后的返回结果就是判定重复与否的结果了。 到现在为止 BloomFilter 的实现就已经完成了,我们可以用一个实例来测试一下,代码如下:
def__len__(self): """Return the length of the queue""" return self.server.llen(self.key)
defpush(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
defpop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
可以看到这个类继承了 Base 类,并重写了 len()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis 的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,len() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。 所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做 FifoQueue。 另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:
def__len__(self): """Return the length of the stack""" return self.server.llen(self.key)
defpush(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
defpop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key)
if data: return self._decode_request(data)
与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。 另外在源码中还有一个子类实现,叫做 PriorityQueue,顾名思义,它叫做优先级队列,实现如下:
classPriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set"""
def__len__(self): """Return the length of the queue""" return self.server.zcard(self.key)
defpush(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
defpop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
classRFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def__init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True
@classmethod deffrom_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)
defrequest_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request)
defclose(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear()
import hashlib def request_fingerprint(request, include_headers=None): if include_headers: include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers)) cache = _fingerprint_cache.setdefault(request, {}) if include_headers not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url))) fp.update(request.body or b'') if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[include_headers] = fp.hexdigest() return cache[include_headers]
首先我们要实现用户的大规模爬取。这里采用的爬取方式是,以微博的几个大 V 为起始点,爬取他们各自的粉丝和关注列表,然后获取粉丝和关注列表的粉丝和关注列表,以此类推,这样下去就可以实现递归爬取。如果一个用户与其他用户有社交网络上的关联,那他们的信息就会被爬虫抓取到,这样我们就可以做到对所有用户的爬取。通过这种方式,我们可以得到用户的唯一 ID,再根据 ID 获取每个用户发布的微博即可。
def parse_time(self, date): if re.match(' 刚刚 ', date): date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time())) if re.match('d + 分钟前 ', date): minute = re.match('(d+)', date).group(1) date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(minute) * 60)) if re.match('d + 小时前 ', date): hour = re.match('(d+)', date).group(1) date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(hour) * 60 * 60)) if re.match(' 昨天.*', date): date = re.match(' 昨天 (.*)', date).group(1).strip() date = time.strftime('% Y-% m-% d', time.localtime() - 24 * 60 * 60) + ' ' + date if re.match('d{2}-d{2}', date): date = time.strftime('% Y-', time.localtime()) + date + ' 00:00' returndate
我们用正则来提取一些关键数字,用 time 库来实现标准时间的转换。 以 X 分钟前的处理为例,爬取的时间会赋值为 created_at 字段。我们首先用正则匹配这个时间,表达式写作 d + 分钟前,如果提取到的时间符合这个表达式,那么就提取出其中的数字,这样就可以获取分钟数了。接下来使用 time 模块的 strftime() 方法,第一个参数传入要转换的时间格式,第二个参数就是时间戳。这里我们用当前的时间戳减去此分钟数乘以 60 就是当时的时间戳,这样我们就可以得到格式化后的正确时间了。 然后 Pipeline 可以实现如下处理:
1 2 3 4 5 6
class WeiboPipeline(): def process_item(self, item, spider): if isinstance(item, WeiboItem): ifitem.get('created_at'): item['created_at'] = item['created_at'].strip() item['created_at'] = self.parse_time(item.get('created_at'))
注明:突发性t5实例,别看到价格比较便宜就直接购买,里面很多套路,购买页面有提示:限制20%性能基线。释义:依靠CPU 积分来提升 CPU 性能,满足业务需求。当实例实际工作性能高于基准 CPU 计算性能时,会把服务器 CPU 的性能限制在 20%以下,如果这时20%CPU性能满足不了业务需求,云服务器CPU会跑满100%,到那时候你以为是被某大佬攻击了,很有可能是你突发性t5实例CPU 积分消耗完了。笔者建议:如果用户业务对 CPU 要求高的,可以直接略过,选择t5实例(无限制CPU性能)、n4共享型、通用型mn4。以下笔者建议爆款:
import sys from scrapy.utils.project import get_project_settings from scrapyuniversal.spiders.universal import UniversalSpider from scrapyuniversal.utils import get_config from scrapy.crawler import CrawlerProcess
from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from scrapyuniversal.utils import get_config from scrapyuniversal.rules import rules
yield SplashRequest(url, self.parse_result, args={ # optional; parameters passed to Splash HTTP API 'wait': 0.5, # 'url' is prefilled from request url # 'http_method' is setto'POST'for POST requests # 'body' is setto request body for POST requests }, endpoint='render.json', # optional; default is render.html splash_url='<url>', # optional; overrides SPLASH_URL )
from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from scrapy.http import HtmlResponse from logging import getLogger
@classmethod deffrom_crawler(cls, crawler): o = cls(crawler.settings['USER_AGENT']) crawler.signals.connect(o.spider_opened, signal=signals.spider_opened) return o
ADSL(Asymmetric Digital Subscriber Line,非对称数字用户环路),它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。 ADSL 通过拨号的方式上网,需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,主机的稳定性很好,代理响应速度很快。
接下来要做的就是拨号,并把新的 IP 保存到 Redis 散列表里。 首先是拨号定时,它分为定时拨号和非定时拨号两种选择。 非定时拨号:最好的方法就是向该主机发送一个信号,然后主机就启动拨号,但这样做的话,我们首先要搭建一个重新拨号的接口,如搭建一个 Web 接口,请求该接口即进行拨号,但开始拨号之后,此时主机的状态就从在线转为离线,而此时的 Web 接口也就相应失效了,拨号过程无法再连接,拨号之后接口的 IP 也变了,所以我们无法通过接口来方便地控制拨号过程和获取拨号结果,下次拨号还得改变拨号请求接口,所以非定时拨号的开销还是比较大的。 定时拨号:我们只需要在拨号主机上运行定时脚本即可,每隔一段时间拨号一次,更新 IP,然后将 IP 在 Redis 散列表中更新即可,非常简单易用,另外可以适当将拨号频率调高一点,减少短时间内 IP 被封的可能性。 在这里选择定时拨号。 接下来就是获取 IP。获取拨号后的 IP 非常简单,只需要调用 ifconfig 命令,然后解析出对应网卡的 IP 即可。 获取了 IP 之后,我们还需要进行有效性检测。拨号主机可以自己检测,比如可以利用 requests 设置自身的代理请求外网,如果成功,那么证明代理可用,然后再修改 Redis 散列表,更新代理。 需要注意,由于在拨号的间隙拨号主机是离线状态,而此时 Redis 散列表中还存留了上次的代理,一旦这个代理被取用了,该代理是无法使用的。为了避免这个情况,每台主机在拨号之前还需要将自身的代理从 Redis 散列表中移除。 这样基本的流程就理顺了,我们用如下代码实现:
class Sender(): def get_ip(self, ifname=ADSL_IFNAME): """ 获取本机 IP :param ifname: 网卡名称 :return: """ (status, output) = subprocess.getstatusoutput('ifconfig') if status == 0: pattern = re.compile(ifname + '.*?inet.*?(d+.d+.d+.d+).*?netmask', re.S) result = re.search(pattern, output) if result: ip = result.group(1) return ip
目前为止,我们已经成功实时更新拨号主机的代理。不过还缺少一个模块,那就是接口模块。像之前的代理池一样,我们也定义一些接口来获取代理,如 random 获取随机代理、count 获取代理个数等。 我们选用 Tornado 来实现,利用 Tornado 的 Server 模块搭建 Web 接口服务,示例如下:
defget(self, api=''): ifnot api: links = ['random', 'proxies', 'names', 'all', 'count'] self.write('<h4>Welcome to ADSL Proxy API</h4>') for link in links: self.write('<a href=' + link + '>' + link + '</a><br>')
if api == 'random': result = self.redis.random() if result: self.write(result)
if api == 'names': result = self.redis.names() if result: self.write(json.dumps(result))
if api == 'proxies': result = self.redis.proxies() if result: self.write(json.dumps(result))
if api == 'all': result = self.redis.all() if result: self.write(json.dumps(result))
if api == 'count': self.write(str(self.redis.count()))
这里首先利用选择器选取所有的 quote,并将其赋值为 quotes 变量,然后利用 for 循环对每个 quote 遍历,解析每个 quote 的内容。 对 text 来说,观察到它的 class 为 text,所以可以用.text 选择器来选取,这个结果实际上是整个带有标签的节点,要获取它的正文内容,可以加::text 来获取。这时的结果是长度为 1 的列表,所以还需要用 extract_first() 方法来获取第一个元素。而对于 tags 来说,由于我们要获取所有的标签,所以用 extract() 方法获取整个列表即可。 以第一个 quote 的结果为例,各个选择方法及结果的说明如下内容。 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
<div class="quote" itemscope=""itemtype="http://schema.org/CreativeWork"> <span class="text" itemprop="text">“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”</span> <span>by <small class="author" itemprop="author">Albert Einstein</small> <a href="/author/Albert-Einstein">(about)</a> </span> <div class="tags"> Tags: <meta class="keywords" itemprop="keywords" content="change,deep-thoughts,thinking,world"> <a class="tag" href="/tag/change/page/1/">change</a> <a class="tag" href="/tag/deep-thoughts/page/1/">deep-thoughts</a> <a class="tag" href="/tag/thinking/page/1/">thinking</a> <a class="tag" href="/tag/world/page/1/">world</a> </div> </div>
不同选择器的返回结果如下。
quote.css(‘.text’)
1
[<Selector xpath="descendant-or-self::*[@class and contains(concat(' ', normalize-space(@class), ' '), ' text ')]"data='<span class="text"itemprop="text">“The '>]
quote.css(‘.text::text’)
1
[<Selector xpath="descendant-or-self::*[@class and contains(concat(' ', normalize-space(@class), ' '), ' text ')]/text()"data='“The world as we have created it is a pr'>]
quote.css(‘.text’).extract()
1
['<span class="text"itemprop="text">“The world as we have created itis a process of our thinking. It cannot be changed without changing our thinking.”</span>']
quote.css(‘.text::text’).extract()
1
['“The world as we have created itis a process of our thinking. It cannot be changed without changing our thinking.”']
quote.css(‘.text::text’).extract_first()
1
“The world as we have created it is aprocessof our thinking. It cannot be changed without changing our thinking.”
defindex_page(self, response): for item in response.doc('.item').items(): self.crawl(item.find('a').attr.url, callback=self.detail_page, itag=item.find('.update-time').text())
@config(age=10 * 24 * 60 * 60) defindex_page(self, response): for each in response.doc('a[href^="http"]').items(): self.crawl(each.attr.href, callback=self.detail_page)
defindex_page(self, response): for each in response.doc('li> .tit > a').items(): self.crawl(each.attr.href, callback=self.detail_page, fetch_type='js') next = response.doc('.next').attr.href self.crawl(next, callback=self.index_page)
def response(flow): url = 'cdnware.m.jd.com' if url in flow.request.url: text = flow.response.text data = json.loads(text) if data.get('wareInfo') and data.get('wareInfo').get('basicInfo'): info = data.get('wareInfo').get('basicInfo') id = info.get('wareId') name = info.get('name') images = info.get('wareImage') print(id, name, images)
# 提取评论数据 url = 'api.m.jd.com/client.action' if url in flow.request.url: pattern = re.compile('sku".*?"(d+)"') # Request 请求参数中包含商品 ID body = unquote(flow.request.text) # 提取商品 ID id = re.search(pattern, body).group(1) if re.search(pattern, body) else None # 提取 Response Body text = flow.response.text data = json.loads(text) comments = data.get('commentInfoList') or [] # 提取评论数据 for comment in comments: if comment.get('commentInfo') and comment.get('commentInfo').get('commentData'): info = comment.get('commentInfo') text = info.get('commentData') date = info.get('commentDate') nickname = info.get('userNickName') pictures = info.get('pictureInfoList') print(id, nickname, text, date, pictures)
这里指定了接口的部分链接内容,以判断当前请求的 URL 是不是获取评论的 URL。如果满足条件,那么就提取商品的 ID 和评论信息。
商品的 ID 实际上隐藏在请求中,我们需要提取请求的表单内容来提取商品的 ID,这里直接用了正则表达式。
商品的评论信息在响应中,我们像刚才一样提取了响应的内容,然后对 JSON 进行解析,最后提取出商品评论人的昵称、评论正文、评论日期和图片信息。这些信息和商品的 ID 组合起来,形成一条评论数据。
from appium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from time import sleep