在 Bloom Filter 中使用位数组来辅助实现检测判断。在初始状态下,我们声明一个包含 m 位的位数组,它的所有位都是 0,如图 14-7 所示。 图 14-7 初始位数组 现在我们有了一个待检测集合,我们表示为 S={x1, x2, …, xn},我们接下来需要做的就是检测一个 x 是否已经存在于集合 S 中。在 BloomFilter 算法中首先使用 k 个相互独立的、随机的哈希函数来将这个集合 S 中的每个元素 x1、x2、…、xn 映射到这个长度为 m 的位数组上,哈希函数得到的结果记作位置索引,然后将位数组该位置索引的位置 1。例如这里我们取 k 为 3,即有三个哈希函数,x1 经过三个哈希函数映射得到的结果分别为 1、4、8,x2 经过三个哈希函数映射得到的结果分别为 4、6、10,那么就会将位数组的 1、4、6、8、10 这五位置 1,如图 14-8 所示: 图 14-8 映射后位数组 这时如果再有一个新的元素 x,我们要判断 x 是否属于 S 这个集合,我们便会将仍然用 k 个哈希函数对 x 求映射结果,如果所有结果对应的位数组位置均为 1,那么我们就认为 x 属于 S 这个集合,否则如果有一个不为 1,则 x 不属于 S 集合。 例如一个新元素 x 经过三个哈希函数映射的结果为 4、6、8,对应的位置均为 1,则判断 x 属于 S 这个集合。如果结果为 4、6、7,7 对应的位置为 0,则判定 x 不属于 S 这个集合。 注意这里 m、n、k 满足的关系是 m>nk,也就是说位数组的长度 m 要比集合元素 n 和哈希函数 k 的乘积还要大。 这样的判定方法很高效,但是也是有代价的,它可能把不属于这个集合的元素误认为属于这个集合,我们来估计一下它的错误率。当集合 S={x1, x2,…, xn} 的所有元素都被 k 个哈希函数映射到 m 位的位数组中时,这个位数组中某一位还是 0 的概率是: 因为哈希函数是随机的,所以任意一个哈希函数选中这一位的概率为 1/m,那么 1-1/m 就代表哈希函数一次没有选中这一位的概率,要把 S 完全映射到 m 位数组中,需要做 kn 次哈希运算,所以最后的概率就是 1-1/m 的 kn 次方。 一个不属于 S 的元素 x 如果要被误判定为在 S 中,那么这个概率就是 k 次哈希运算得到的结果对应的位数组位置都为 1,所以误判概率为: 根据: 可以将误判概率转化为: 在给定 m、n 时,可以求出使得 f 最小化的 k 值为: 在这里将误判概率归纳如下: 表 14-1 误判概率
m/n
最优 k
k=1
k=2
k=3
k=4
k=5
k=6
k=7
k=8
2
1.39
0.393
0.400
3
2.08
0.283
0.237
0.253
4
2.77
0.221
0.155
0.147
0.160
5
3.46
0.181
0.109
0.092
0.092
0.101
6
4.16
0.154
0.0804
0.0609
0.0561
0.0578
0.0638
7
4.85
0.133
0.0618
0.0423
0.0359
0.0347
0.0364
8
5.55
0.118
0.0489
0.0306
0.024
0.0217
0.0216
0.0229
9
6.24
0.105
0.0397
0.0228
0.0166
0.0141
0.0133
0.0135
0.0145
10
6.93
0.0952
0.0329
0.0174
0.0118
0.00943
0.00844
0.00819
0.00846
11
7.62
0.0869
0.0276
0.0136
0.00864
0.0065
0.00552
0.00513
0.00509
12
8.32
0.08
0.0236
0.0108
0.00646
0.00459
0.00371
0.00329
0.00314
13
9.01
0.074
0.0203
0.00875
0.00492
0.00332
0.00255
0.00217
0.00199
14
9.7
0.0689
0.0177
0.00718
0.00381
0.00244
0.00179
0.00146
0.00129
15
10.4
0.0645
0.0156
0.00596
0.003
0.00183
0.00128
0.001
0.000852
16
11.1
0.0606
0.0138
0.005
0.00239
0.00139
0.000935
0.000702
0.000574
17
11.8
0.0571
0.0123
0.00423
0.00193
0.00107
0.000692
0.000499
0.000394
18
12.5
0.054
0.0111
0.00362
0.00158
0.000839
0.000519
0.00036
0.000275
19
13.2
0.0513
0.00998
0.00312
0.0013
0.000663
0.000394
0.000264
0.000194
20
13.9
0.0488
0.00906
0.0027
0.00108
0.00053
0.000303
0.000196
0.00014
21
14.6
0.0465
0.00825
0.00236
0.000905
0.000427
0.000236
0.000147
0.000101
22
15.2
0.0444
0.00755
0.00207
0.000764
0.000347
0.000185
0.000112
7.46e-05
23
15.9
0.0425
0.00694
0.00183
0.000649
0.000285
0.000147
8.56e-05
5.55e-05
24
16.6
0.0408
0.00639
0.00162
0.000555
0.000235
0.000117
6.63e-05
4.17e-05
25
17.3
0.0392
0.00591
0.00145
0.000478
0.000196
9.44e-05
5.18e-05
3.16e-05
26
18
0.0377
0.00548
0.00129
0.000413
0.000164
7.66e-05
4.08e-05
2.42e-05
27
18.7
0.0364
0.0051
0.00116
0.000359
0.000138
6.26e-05
3.24e-05
1.87e-05
28
19.4
0.0351
0.00475
0.00105
0.000314
0.000117
5.15e-05
2.59e-05
1.46e-05
29
20.1
0.0339
0.00444
0.000949
0.000276
9.96e-05
4.26e-05
2.09e-05
1.14e-05
30
20.8
0.0328
0.00416
0.000862
0.000243
8.53e-05
3.55e-05
1.69e-05
9.01e-06
31
21.5
0.0317
0.0039
0.000785
0.000215
7.33e-05
2.97e-05
1.38e-05
7.16e-06
32
22.2
0.0308
0.00367
0.000717
0.000191
6.33e-05
2.5e-05
1.13e-05
5.73e-06
表 14-1 中第一列为 m/n 的值,第二列为最优 k 值,其后列为不同 k 值的误判概率,可以看到当 k 值确定时,随着 m/n 的增大,误判概率逐渐变小。当 m/n 的值确定时,当 k 越靠近最优 K 值,误判概率越小。另外误判概率总体来看都是极小的,在容忍此误判概率的情况下,大幅减小存储空间和判定速度是完全值得的。 接下来我们就将 BloomFilter 算法应用到 Scrapy-Redis 分布式爬虫的去重过程中,以解决 Redis 内存不足的问题。
classHashMap(object): def__init__(self, m, seed): self.m = m self.seed = seed
defhash(self, value): """ Hash Algorithm :param value: Value :return: Hash Value """ ret = 0 for i in range(len(value)): ret += self.seed * ret + ord(value[i]) return (self.m - 1) & ret
在这里新建了一个 HashMap 类,构造函数传入两个值,一个是 m 位数组的位数,另一个是种子值 seed,不同的哈希函数需要有不同的 seed,这样可以保证不同的哈希函数的结果不会碰撞。 在 hash() 方法的实现中,value 是要被处理的内容,在这里我们遍历了该字符的每一位并利用 ord() 方法取到了它的 ASCII 码值,然后混淆 seed 进行迭代求和运算,最终会得到一个数值。这个数值的结果就由 value 和 seed 唯一确定,然后我们再将它和 m 进行按位与运算,即可获取到 m 位数组的映射结果,这样我们就实现了一个由字符串和 seed 来确定的哈希函数。当 m 固定时,只要 seed 值相同,就代表是同一个哈希函数,相同的 value 必然会映射到相同的位置。所以如果我们想要构造几个不同的哈希函数,只需要改变其 seed 就好了,以上便是一个简易的哈希函数的实现。 接下来我们再实现 BloomFilter,BloomFilter 里面需要用到 k 个哈希函数,所以在这里我们需要对这几个哈希函数指定相同的 m 值和不同的 seed 值,在这里构造如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
BLOOMFILTER_HASH_NUMBER = 6 BLOOMFILTER_BIT = 30
class BloomFilter(object): def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER): """ Initialize BloomFilter :param server: Redis Server :param key: BloomFilter Key :param bit: m = 2 ^ bit :param hash_number: the number of hash function """ # default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints self.m = 1 << bit self.seeds = range(hash_number) self.maps = [HashMap(self.m, seed) for seed in self.seeds] self.server = server self.key = key
defexists(self, value): """ if value exists :param value: :return: """ ifnot value: returnFalse exist = 1 for map in self.maps: offset = map.hash(value) exist = exist & self.server.getbit(self.key, offset) return exist
definsert(self, value): """ add value to bloom :param value: :return: """ for f in self.maps: offset = f.hash(value) self.server.setbit(self.key, offset, 1)
首先我们先看下 insert() 方法,BloomFilter 算法中会逐个调用哈希函数对放入集合中的元素进行运算得到在 m 位位数组中的映射位置,然后将位数组对应的位置置 1,所以这里在代码中我们遍历了初始化好的哈希函数,然后调用其 hash() 方法算出映射位置 offset,再利用 Redis 的 setbit() 方法将该位置 1。 在 exists() 方法中我们就需要实现判定是否重复的逻辑了,方法参数 value 即为待判断的元素,在这里我们首先定义了一个变量 exist,然后遍历了所有哈希函数对 value 进行哈希运算,得到映射位置,然后我们用 getbit() 方法取得该映射位置的结果,依次进行与运算。这样只有每次 getbit() 得到的结果都为 1 时,最后的 exist 才为 True,即代表 value 属于这个集合。如果其中只要有一次 getbit() 得到的结果为 0,即 m 位数组中有对应的 0 位,那么最终的结果 exist 就为 False,即代表 value 不属于这个集合。这样此方法最后的返回结果就是判定重复与否的结果了。 到现在为止 BloomFilter 的实现就已经完成了,我们可以用一个实例来测试一下,代码如下:
def__len__(self): """Return the length of the queue""" return self.server.llen(self.key)
defpush(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
defpop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
可以看到这个类继承了 Base 类,并重写了 len()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis 的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,len() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。 所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做 FifoQueue。 另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:
def__len__(self): """Return the length of the stack""" return self.server.llen(self.key)
defpush(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
defpop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key)
if data: return self._decode_request(data)
与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。 另外在源码中还有一个子类实现,叫做 PriorityQueue,顾名思义,它叫做优先级队列,实现如下:
classPriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set"""
def__len__(self): """Return the length of the queue""" return self.server.zcard(self.key)
defpush(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
defpop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
classRFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def__init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True
@classmethod deffrom_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)
defrequest_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request)
defclose(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear()
import hashlib def request_fingerprint(request, include_headers=None): if include_headers: include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers)) cache = _fingerprint_cache.setdefault(request, {}) if include_headers not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url))) fp.update(request.body or b'') if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[include_headers] = fp.hexdigest() return cache[include_headers]
首先我们要实现用户的大规模爬取。这里采用的爬取方式是,以微博的几个大 V 为起始点,爬取他们各自的粉丝和关注列表,然后获取粉丝和关注列表的粉丝和关注列表,以此类推,这样下去就可以实现递归爬取。如果一个用户与其他用户有社交网络上的关联,那他们的信息就会被爬虫抓取到,这样我们就可以做到对所有用户的爬取。通过这种方式,我们可以得到用户的唯一 ID,再根据 ID 获取每个用户发布的微博即可。
def parse_time(self, date): if re.match(' 刚刚 ', date): date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time())) if re.match('d + 分钟前 ', date): minute = re.match('(d+)', date).group(1) date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(minute) * 60)) if re.match('d + 小时前 ', date): hour = re.match('(d+)', date).group(1) date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(hour) * 60 * 60)) if re.match(' 昨天.*', date): date = re.match(' 昨天 (.*)', date).group(1).strip() date = time.strftime('% Y-% m-% d', time.localtime() - 24 * 60 * 60) + ' ' + date if re.match('d{2}-d{2}', date): date = time.strftime('% Y-', time.localtime()) + date + ' 00:00' returndate
我们用正则来提取一些关键数字,用 time 库来实现标准时间的转换。 以 X 分钟前的处理为例,爬取的时间会赋值为 created_at 字段。我们首先用正则匹配这个时间,表达式写作 d + 分钟前,如果提取到的时间符合这个表达式,那么就提取出其中的数字,这样就可以获取分钟数了。接下来使用 time 模块的 strftime() 方法,第一个参数传入要转换的时间格式,第二个参数就是时间戳。这里我们用当前的时间戳减去此分钟数乘以 60 就是当时的时间戳,这样我们就可以得到格式化后的正确时间了。 然后 Pipeline 可以实现如下处理:
1 2 3 4 5 6
class WeiboPipeline(): def process_item(self, item, spider): if isinstance(item, WeiboItem): ifitem.get('created_at'): item['created_at'] = item['created_at'].strip() item['created_at'] = self.parse_time(item.get('created_at'))
注明:突发性t5实例,别看到价格比较便宜就直接购买,里面很多套路,购买页面有提示:限制20%性能基线。释义:依靠CPU 积分来提升 CPU 性能,满足业务需求。当实例实际工作性能高于基准 CPU 计算性能时,会把服务器 CPU 的性能限制在 20%以下,如果这时20%CPU性能满足不了业务需求,云服务器CPU会跑满100%,到那时候你以为是被某大佬攻击了,很有可能是你突发性t5实例CPU 积分消耗完了。笔者建议:如果用户业务对 CPU 要求高的,可以直接略过,选择t5实例(无限制CPU性能)、n4共享型、通用型mn4。以下笔者建议爆款:
import sys from scrapy.utils.project import get_project_settings from scrapyuniversal.spiders.universal import UniversalSpider from scrapyuniversal.utils import get_config from scrapy.crawler import CrawlerProcess
from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from scrapyuniversal.utils import get_config from scrapyuniversal.rules import rules
yield SplashRequest(url, self.parse_result, args={ # optional; parameters passed to Splash HTTP API 'wait': 0.5, # 'url' is prefilled from request url # 'http_method' is setto'POST'for POST requests # 'body' is setto request body for POST requests }, endpoint='render.json', # optional; default is render.html splash_url='<url>', # optional; overrides SPLASH_URL )
from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from scrapy.http import HtmlResponse from logging import getLogger
@classmethod deffrom_crawler(cls, crawler): o = cls(crawler.settings['USER_AGENT']) crawler.signals.connect(o.spider_opened, signal=signals.spider_opened) return o
ADSL(Asymmetric Digital Subscriber Line,非对称数字用户环路),它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。 ADSL 通过拨号的方式上网,需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,主机的稳定性很好,代理响应速度很快。
接下来要做的就是拨号,并把新的 IP 保存到 Redis 散列表里。 首先是拨号定时,它分为定时拨号和非定时拨号两种选择。 非定时拨号:最好的方法就是向该主机发送一个信号,然后主机就启动拨号,但这样做的话,我们首先要搭建一个重新拨号的接口,如搭建一个 Web 接口,请求该接口即进行拨号,但开始拨号之后,此时主机的状态就从在线转为离线,而此时的 Web 接口也就相应失效了,拨号过程无法再连接,拨号之后接口的 IP 也变了,所以我们无法通过接口来方便地控制拨号过程和获取拨号结果,下次拨号还得改变拨号请求接口,所以非定时拨号的开销还是比较大的。 定时拨号:我们只需要在拨号主机上运行定时脚本即可,每隔一段时间拨号一次,更新 IP,然后将 IP 在 Redis 散列表中更新即可,非常简单易用,另外可以适当将拨号频率调高一点,减少短时间内 IP 被封的可能性。 在这里选择定时拨号。 接下来就是获取 IP。获取拨号后的 IP 非常简单,只需要调用 ifconfig 命令,然后解析出对应网卡的 IP 即可。 获取了 IP 之后,我们还需要进行有效性检测。拨号主机可以自己检测,比如可以利用 requests 设置自身的代理请求外网,如果成功,那么证明代理可用,然后再修改 Redis 散列表,更新代理。 需要注意,由于在拨号的间隙拨号主机是离线状态,而此时 Redis 散列表中还存留了上次的代理,一旦这个代理被取用了,该代理是无法使用的。为了避免这个情况,每台主机在拨号之前还需要将自身的代理从 Redis 散列表中移除。 这样基本的流程就理顺了,我们用如下代码实现:
class Sender(): def get_ip(self, ifname=ADSL_IFNAME): """ 获取本机 IP :param ifname: 网卡名称 :return: """ (status, output) = subprocess.getstatusoutput('ifconfig') if status == 0: pattern = re.compile(ifname + '.*?inet.*?(d+.d+.d+.d+).*?netmask', re.S) result = re.search(pattern, output) if result: ip = result.group(1) return ip
目前为止,我们已经成功实时更新拨号主机的代理。不过还缺少一个模块,那就是接口模块。像之前的代理池一样,我们也定义一些接口来获取代理,如 random 获取随机代理、count 获取代理个数等。 我们选用 Tornado 来实现,利用 Tornado 的 Server 模块搭建 Web 接口服务,示例如下:
defget(self, api=''): ifnot api: links = ['random', 'proxies', 'names', 'all', 'count'] self.write('<h4>Welcome to ADSL Proxy API</h4>') for link in links: self.write('<a href=' + link + '>' + link + '</a><br>')
if api == 'random': result = self.redis.random() if result: self.write(result)
if api == 'names': result = self.redis.names() if result: self.write(json.dumps(result))
if api == 'proxies': result = self.redis.proxies() if result: self.write(json.dumps(result))
if api == 'all': result = self.redis.all() if result: self.write(json.dumps(result))
if api == 'count': self.write(str(self.redis.count()))
这里首先利用选择器选取所有的 quote,并将其赋值为 quotes 变量,然后利用 for 循环对每个 quote 遍历,解析每个 quote 的内容。 对 text 来说,观察到它的 class 为 text,所以可以用.text 选择器来选取,这个结果实际上是整个带有标签的节点,要获取它的正文内容,可以加::text 来获取。这时的结果是长度为 1 的列表,所以还需要用 extract_first() 方法来获取第一个元素。而对于 tags 来说,由于我们要获取所有的标签,所以用 extract() 方法获取整个列表即可。 以第一个 quote 的结果为例,各个选择方法及结果的说明如下内容。 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
<div class="quote" itemscope=""itemtype="http://schema.org/CreativeWork"> <span class="text" itemprop="text">“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”</span> <span>by <small class="author" itemprop="author">Albert Einstein</small> <a href="/author/Albert-Einstein">(about)</a> </span> <div class="tags"> Tags: <meta class="keywords" itemprop="keywords" content="change,deep-thoughts,thinking,world"> <a class="tag" href="/tag/change/page/1/">change</a> <a class="tag" href="/tag/deep-thoughts/page/1/">deep-thoughts</a> <a class="tag" href="/tag/thinking/page/1/">thinking</a> <a class="tag" href="/tag/world/page/1/">world</a> </div> </div>
不同选择器的返回结果如下。
quote.css(‘.text’)
1
[<Selector xpath="descendant-or-self::*[@class and contains(concat(' ', normalize-space(@class), ' '), ' text ')]"data='<span class="text"itemprop="text">“The '>]
quote.css(‘.text::text’)
1
[<Selector xpath="descendant-or-self::*[@class and contains(concat(' ', normalize-space(@class), ' '), ' text ')]/text()"data='“The world as we have created it is a pr'>]
quote.css(‘.text’).extract()
1
['<span class="text"itemprop="text">“The world as we have created itis a process of our thinking. It cannot be changed without changing our thinking.”</span>']
quote.css(‘.text::text’).extract()
1
['“The world as we have created itis a process of our thinking. It cannot be changed without changing our thinking.”']
quote.css(‘.text::text’).extract_first()
1
“The world as we have created it is aprocessof our thinking. It cannot be changed without changing our thinking.”
defindex_page(self, response): for item in response.doc('.item').items(): self.crawl(item.find('a').attr.url, callback=self.detail_page, itag=item.find('.update-time').text())
@config(age=10 * 24 * 60 * 60) defindex_page(self, response): for each in response.doc('a[href^="http"]').items(): self.crawl(each.attr.href, callback=self.detail_page)
defindex_page(self, response): for each in response.doc('li> .tit > a').items(): self.crawl(each.attr.href, callback=self.detail_page, fetch_type='js') next = response.doc('.next').attr.href self.crawl(next, callback=self.index_page)
def response(flow): url = 'cdnware.m.jd.com' if url in flow.request.url: text = flow.response.text data = json.loads(text) if data.get('wareInfo') and data.get('wareInfo').get('basicInfo'): info = data.get('wareInfo').get('basicInfo') id = info.get('wareId') name = info.get('name') images = info.get('wareImage') print(id, name, images)
# 提取评论数据 url = 'api.m.jd.com/client.action' if url in flow.request.url: pattern = re.compile('sku".*?"(d+)"') # Request 请求参数中包含商品 ID body = unquote(flow.request.text) # 提取商品 ID id = re.search(pattern, body).group(1) if re.search(pattern, body) else None # 提取 Response Body text = flow.response.text data = json.loads(text) comments = data.get('commentInfoList') or [] # 提取评论数据 for comment in comments: if comment.get('commentInfo') and comment.get('commentInfo').get('commentData'): info = comment.get('commentInfo') text = info.get('commentData') date = info.get('commentDate') nickname = info.get('userNickName') pictures = info.get('pictureInfoList') print(id, nickname, text, date, pictures)
这里指定了接口的部分链接内容,以判断当前请求的 URL 是不是获取评论的 URL。如果满足条件,那么就提取商品的 ID 和评论信息。
商品的 ID 实际上隐藏在请求中,我们需要提取请求的表单内容来提取商品的 ID,这里直接用了正则表达式。
商品的评论信息在响应中,我们像刚才一样提取了响应的内容,然后对 JSON 进行解析,最后提取出商品评论人的昵称、评论正文、评论日期和图片信息。这些信息和商品的 ID 组合起来,形成一条评论数据。
from appium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from time import sleep
from appium import webdriver from selenium.webdriver.common.byimportBy from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC
el = driver.find_element_by_id('com.tencent.mm:id/cjk')
在 Selenium 中,其他查找元素的方法同样适用,在此不再赘述。
在 Android 平台上,我们还可以使用 UIAutomator 来进行元素选择,如下所示:
1 2
el = self.driver.find_element_by_android_uiautomator('new UiSelector().description("Animation")') els = self.driver.find_elements_by_android_uiautomator('new UiSelector().clickable(true)')
在 iOS 平台上,我们可以使用 UIAutomation 来进行元素选择,如下所示:
1 2
el = self.driver.find_element_by_ios_uiautomation('.elements()[0]') els = self.driver.find_elements_by_ios_uiautomation('.elements()')
还可以使用 iOS Predicates 来进行元素选择,如下所示:
1 2
el = self.driver.find_element_by_ios_predicate('wdName == "Buttons"') els = self.driver.find_elements_by_ios_predicate('wdValue == "SearchBar" AND isWDDivisible == 1')
也可以使用 iOS Class Chain 来进行选择,如下所示:
1 2
el = self.driver.find_element_by_ios_class_chain('XCUIElementTypeWindow/XCUIElementTypeButton[3]') els = self.driver.find_elements_by_ios_class_chain('XCUIElementTypeWindow/XCUIElementTypeButton')
This document defines an HTTP extension header field that allows proxy components to disclose information lost in the proxying process, for example, the originating IP address of a request or IP address of the proxy on the user-agent-facing interface. In a path of proxying components, this makes it possible to arrange it so that each subsequent component will have access to, for example, all IP addresses used in the chain of proxied HTTP requests. This document also specifies guidelines for a proxy administrator to anonymize the origin of a request.
大体意思为本文的定义(扩展)了一个 HTTP 头域,这个字段允许代理组件披露原始 IP 地址。 从这里我们了解到 X-Forward-For 的正向用途是便于服务端识别原始 IP,并根据原始 IP 作出动态处理。例如服务端按照 IP 地址进行负载均衡时,如果能够看破 IP 代理,取得原始 IP 地址,那么就能够作出有效的负载。否则有可能造成资源分配不均,导致假负载均衡的情况出现。 Example Usage 给出了 X-Forward-For 的使用示例:
A request from a client with IP address 192.0.2.43 passes through a proxy with IP address 198.51.100.17, then through another proxy with IP address 203.0.113.60 before reaching an origin server. This could, for example, be an office client behind a corporate malware filter talking to a origin server through a reverse proxy. o The HTTP request between the client and the first proxy has no “Forwarded” header field. o The HTTP request between the first and second proxy has a “Forwarded: for=192.0.2.43” header field. o The HTTP request between the second proxy and the origin server has a “Forwarded: for=192.0.2.43, for=198.51.100.17;by=203.0.113.60;proto=http;host=example.com” header field.
假设原始 IP 为192.0.2.43,它的请求使用了地址为 198.51.100.17 的代理,在到达目标服务器 203.0.113.60 之前还使用了另外一个代理(文章假设另外一个代理为 222.111.222.111)。 这种情况下
180.137.156.168 - - [24/Nov/2019:12:41:19 +0800]"GET / HTTP/1.1"200612"-""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Safari/605.1.15""-"
服务器记录到的信息含义如下:
客户端 IP 为 180.137.156.168
客户端时间为 [24/Nov/2019:12:41:19 +0800]
请求方式为 GET / HTTP/1.1
响应状态码为 200
响应正文大小为 612
Referer 为短横线,即为空
User-Agent 显示浏览器品牌为 Safari
代理清单为短横线,即为空。
由于本次并未使用 IP 代理,那么代理清单自然就是短横线。接着我们用 Python 代码测试一下,代码片段 Python-Request 为测试代码。
这次也没有使用 IP 代理,所以代理清单依旧是短横线。现在用代理 IP 测试一下,代码片段 Forwarded-Test 中使用了 IP 代理,我们就用它进行测试即可。这里的代理服务器 IP 地址为 220.185.128.170,根据之前对 RFC7239 的了解,猜测本次请求对应的 Forwarded 记录的会是原始 IP,而客户端 IP 则是代理服务器的 IP。 代码运行后,服务器记录到对应的日志信息如下:
果然,记录中客户端 IP 对应的是 220.185.128.170,即代理服务器的 IP。Forwarded 中记录的 180.137.156.168 是 Python 程序所在的计算机 IP 地址,即原始 IP。 这与 RFC7239 的描述完全相符,服务端可以通过 Forwarded 找到原始 IP,甚至是使用过的代理服务器 IP。
调皮的 IP 代理商
刚才我们用的是普通 IP 代理,由于它很容易被识别,达不到隐匿的目的,所以 IP 代理商又推出了高匿代理。 高匿代理: 相对于普通 IP 代理而言,使用高匿代理后,原始 IP 会被隐藏得更好,服务端更难发现。 这里我使用了 芝麻代理 服务商提供的免费高匿 IP,注册后就可以领取免费 IP,简直就是开箱即用。 将代码片段 Forwarded-Test 中用于设置代理服务器 IP 和端口号的字段值改为高匿 IP 及对应的端口号即可,例如:
原始 IP 为 125.82.188.4,代理清单为短横线。细心的你可能会有疑问,为什么填写的代理 IP 是 58.218.92.132,而日志中的却不是呢? 这是代理服务商做了多一层的转移,58.218.92.132 是给用户的入口,代理商的服务端会将入口为 58.218.92.132 的请求转给地址为 125.82.188.4。其中过程我们不用深究,高匿代理和普通代理的原理会再开一篇文章进行讨论。 日志记录说明高匿 IP 能够帮助我们实现隐匿的目的。说到这里不得不提一下,芝麻代理高匿 IP 的质量真的好,听说他们的 IP 还支持高并发调用,有需求的朋友不妨去试试。
200 <!DOCTYPE html> <html> <head> <title>Welcome to nginx!</title> <style> body { width: 35em; margin: 0 auto; font-family: Tahoma, Verdana, Arial, sans-serif; } </style> </head> <body> <h1>Welcome to nginx!</h1> <p>If you see this page, the nginx web server is successfully installed and working. Further configuration is required.</p>
<p>For online documentation and support please refer to <ahref="http://nginx.org/">nginx.org</a>.<br/> Commercial support is available at <ahref="http://nginx.com/">nginx.com</a>.</p>
<p><em>Thank you for using nginx.</em></p> </body> </html>
响应状态码是 200,并且返回了 Welcome to nginx 等字样,这说明请求成功。对应的日志记录为:
def response(flow): url = 'https://dedao.igetget.com/v3/discover/bookList' if flow.request.url.startswith(url): text = flow.response.text data = json.loads(text) books = data.get('c').get('list') for book in books: ctx.log.info(str(book))
import json import pymongo from mitmproxy import ctx
client = pymongo.MongoClient('localhost') db = client['igetget'] collection = db['books']
def response(flow): global collection url = 'https://dedao.igetget.com/v3/discover/bookList' if flow.request.url.startswith(url): text = flow.response.text data = json.loads(text) books = data.get('c').get('list') for book in books: data = {'title': book.get('operating_title'), 'cover': book.get('cover'), 'summary': book.get('other_share_summary'), 'price': book.get('price') } ctx.log.info(str(data)) collection.insert(data)
Charles 是一个网络抓包工具,我们可以用它来做 App 的抓包分析,得到 App 运行过程中发生的所有网络请求和响应内容,这就和 Web 端浏览器的开发者工具 Network 部分看到的结果一致。 相比 Fiddler 来说,Charles 的功能更强大,而且跨平台支持更好。所以我们选用 Charles 作为主要的移动端抓包工具,用于分析移动 App 的数据包,辅助完成 App 数据抓取工作。
1. 本节目标
本节我们以京东 App 为例,通过 Charles 抓取 App 运行过程中的网络数据包,然后查看具体的 Request 和 Response 内容,以此来了解 Charles 的用法。
2. 准备工作
请确保已经正确安装 Charles 并开启了代理服务,手机和 Charles 处于同一个局域网下,Charles 代理和 CharlesCA 证书设置好,另外需要开启 SSL 监听,具体的配置可以参考第 1 章的说明。
3. 原理
首先 Charles 运行在自己的 PC 上,Charles 运行的时候会在 PC 的 8888 端口开启一个代理服务,这个服务实际上是一个 HTTP/HTTPS 的代理。 确保手机和 PC 在同一个局域网内,我们可以使用手机模拟器通过虚拟网络连接,也可以使用手机真机和 PC 通过无线网络连接。 设置手机代理为 Charles 的代理地址,这样手机访问互联网的数据包就会流经 Charles,Charles 再转发这些数据包到真实的服务器,服务器返回的数据包再由 Charles 转发回手机,Charles 就起到中间人的作用,所有流量包都可以捕捉到,因此所有 HTTP 请求和响应都可以捕获到。同时 Charles 还有权力对请求和响应进行修改。
4. 抓包
初始状态下 Charles 的运行界面如图 11-1 所示: 图 11-1 Charles 运行界面 Charles 会一直监听 PC 和手机发生的网络数据包,捕获到的数据包就会显示在左侧,随着时间的推移,捕获的数据包越来越多,左侧列表的内容也会越来越多。 可以看到,图中左侧显示了 Charles 抓取到的请求站点,我们点击任意一个条目便可以查看对应请求的详细信息,其中包括 Request、Response 等内容。 接下来清空 Charles 的抓取结果,点击左侧的扫帚按钮即可清空当前捕获到的所有请求。然后点击第二个监听按钮,确保监听按钮是打开的,这表示 Charles 正在监听 App 的网络数据流,如图 11-2 所示。 图 11-2 监听过程 这时打开手机京东,注意一定要提前设置好 Charles 的代理并配置好 CA 证书,否则没有效果。 打开任意一个商品,如 iPhone,然后打开它的商品评论页面,如图 11-3 所示。 图 11-3 评论页面 不断上拉加载评论,可以看到 Charles 捕获到这个过程中京东 App 内发生的所有网络请求,如图 11-4 所示。 图 11-4 监听结果 左侧列表中会出现一个 api.m.jd.com 链接,而且它在不停闪动,很可能就是当前 App 发出的获取评论数据的请求被 Charles 捕获到了。我们点击将其展开,继续上拉刷新评论。随着上拉的进行,此处又会出现一个个网络请求记录,这时新出现的数据包请求确定就是获取评论的请求。 为了验证其正确性,我们点击查看其中一个条目的详情信息。切换到 Contents 选项卡,这时我们发现一些 JSON 数据,核对一下结果,结果有 commentData 字段,其内容和我们在 App 中看到的评论内容一致,如图 11-5 所示。 图 11-5 Json 数据结果 这时可以确定,此请求对应的接口就是获取商品评论的接口。这样我们就成功捕获到了在上拉刷新的过程中发生的请求和响应内容。
5. 分析
现在分析一下这个请求和响应的详细信息。首先可以回到 Overview 选项卡,上方显示了请求的接口 URL,接着是响应状态 Status Code、请求方式 Method 等,如图 11-6 所示。 图 11-6 监听结果 这个结果和原本在 Web 端用浏览器开发者工具内捕获到的结果形式是类似的。 接下来点击 Contents 选项卡,查看该请求和响应的详情信息。 上半部分显示的是 Request 的信息,下半部分显示的是 Response 的信息。比如针对 Reqeust,我们切换到 Headers 选项卡即可看到该 Request 的 Headers 信息,针对 Response,我们切换到 JSON TEXT 选项卡即可看到该 Response 的 Body 信息,并且该内容已经被格式化,如图 11-7 所示。 图 11-7 监听结果 由于这个请求是 POST 请求,所以我们还需要关心的就是 POST 的表单信息,切换到 Form 选项卡即可查看,如图 11-8 所示。 图 11-8 监听结果 这样我们就成功抓取 App 中的评论接口的请求和响应,并且可以查看 Response 返回的 JSON 数据。 至于其他 App,我们同样可以使用这样的方式来分析。如果我们可以直接分析得到请求的 URL 和参数的规律,直接用程序模拟即可批量抓取。
6. 重发
Charles 还有一个强大功能,它可以将捕获到的请求加以修改并发送修改后的请求。点击上方的修改按钮,左侧列表就多了一个以编辑图标为开头的链接,这就代表此链接对应的请求正在被我们修改,如图 11-9 所示。 图 11-9 编辑页面 我们可以将 Form 中的某个字段移除,比如这里将 partner 字段移除,然后点击 Remove。这时我们已经对原来请求携带的 Form Data 做了修改,然后点击下方的 Execute 按钮即可执行修改后的请求,如图 11-10 所示。 图 11-10 编辑页面 可以发现左侧列表再次出现了接口的请求结果,内容仍然不变,如图 11-11 所示。 图 11-11 重新请求后结果 删除 Form 表单中的 partner 字段并没有带来什么影响,所以这个字段是无关紧要的。 有了这个功能,我们就可以方便地使用 Charles 来做调试,可以通过修改参数、接口等来测试不同请求的响应状态,就可以知道哪些参数是必要的哪些是不必要的,以及参数分别有什么规律,最后得到一个最简单的接口和参数形式以供程序模拟调用使用。
我们以新浪微博为例来实现一个 Cookies 池的搭建过程。Cookies 池中保存了许多新浪微博账号和登录后的 Cookies 信息,并且 Cookies 池还需要定时检测每个 Cookies 的有效性,如果某 Cookies 无效,那就删除该 Cookies 并模拟登录生成新的 Cookies。同时 Cookies 池还需要一个非常重要的接口,即获取随机 Cookies 的接口,Cookies 运行后,我们只需请求该接口,即可随机获得一个 Cookies 并用其爬取。 由此可见,Cookies 池需要有自动生成 Cookies、定时检测 Cookies、提供随机 Cookies 等几大核心功能。
比较重要的方法是 random(),它主要用于从 Hash 里随机选取一个 Cookies 并返回。每调用一次 random() 方法,就会获得随机的 Cookies,此方法与接口模块对接即可实现请求接口获取随机 Cookies。
生成模块
生成模块负责获取各个账号信息并模拟登录,随后生成 Cookies 并保存。我们首先获取两个 Hash 的信息,看看账户的 Hash 比 Cookies 的 Hash 多了哪些还没有生成 Cookies 的账号,然后将剩余的账号遍历,再去生成 Cookies 即可。
这里主要逻辑就是找出那些还没有对应 Cookies 的账号,然后再逐个获取 Cookies,代码如下:
```python for username in accounts_usernames: if not username in cookies_usernames: password = self.accounts_db.get(username) print(‘ 正在生成 Cookies’, ‘ 账号 ‘, username, ‘ 密码 ‘, password) result = self.new_cookies(username, password)
1 2
因为我们对接的是新浪微博,前面我们已经破解了新浪微博的四宫格验证码,在这里我们直接对接过来即可,不过现在需要加一个获取 Cookies 的方法,并针对不同的情况返回不同的结果,逻辑如下所示:
如果要扩展其他站点,只需要实现 new_cookies() 方法即可,然后按此处理规则返回对应的模拟登录结果,比如 1 代表获取成功,2 代表用户名或密码错误。 代码运行之后就会遍历一次尚未生成 Cookies 的账号,模拟登录生成新的 Cookies。
#### 检测模块
我们现在可以用生成模块来生成 Cookies,但还是免不了 Cookies 失效的问题,例如时间太长导致 Cookies 失效,或者 Cookies 使用太频繁导致无法正常请求网页。如果遇到这样的 Cookies,我们肯定不能让它继续保存在数据库里。 所以我们还需要增加一个定时检测模块,它负责遍历池中的所有 Cookies,同时设置好对应的检测链接,我们用一个个 Cookies 去请求这个链接。如果请求成功,或者状态码合法,那么该 Cookies 有效;如果请求失败,或者无法获取正常的数据,比如直接跳回登录页面或者跳到验证页面,那么此 Cookies 无效,我们需要将该 Cookies 从数据库中移除。 此 Cookies 移除之后,刚才所说的生成模块就会检测到 Cookies 的 Hash 和账号的 Hash 相比少了此账号的 Cookies,生成模块就会认为这个账号还没生成 Cookies,那么就会用此账号重新登录,此账号的 Cookies 又被重新更新。 检测模块需要做的就是检测 Cookies 失效,然后将其从数据中移除。 为了实现通用可扩展性,我们首先定义一个检测器的父类,声明一些通用组件,实现如下所示:
test() 方法首先将 Cookies 转化为字典,检测 Cookies 的格式,如果格式不正确,直接将其删除,如果格式没问题,那么就拿此 Cookies 请求被检测的 URL。test() 方法在这里检测微博,检测的 URL 可以是某个 Ajax 接口,为了实现可配置化,我们将测试 URL 也定义成字典,如下所示:
TESTURLMAP = {‘weibo’: ‘https://m.weibo.cn/'}
1 2 3 4 5 6
如果要扩展其他站点,我们可以统一在字典里添加。对微博来说,我们用 Cookies 去请求目标站点,同时禁止重定向和设置超时时间,得到响应之后检测其返回状态码。如果直接返回 200 状态码,则 Cookies 有效,否则可能遇到了 302 跳转等情况,一般会跳转到登录页面,则 Cookies 已失效。如果 Cookies 失效,我们将其从 Cookies 的 Hash 里移除即可。
#### 接口模块
生成模块和检测模块如果定时运行就可以完成 Cookies 实时检测和更新。但是 Cookies 最终还是需要给爬虫来用,同时一个 Cookies 池可供多个爬虫使用,所以我们还需要定义一个 Web 接口,爬虫访问此接口便可以取到随机的 Cookies。我们采用 Flask 来实现接口的搭建,代码如下所示:
import json from flask import Flask, g app = Flask(__name
)
def get_conn(): for website in GENERATOR_MAP: if not hasattr(g, website): setattr(g, website + ‘_cookies’, eval(‘RedisClient’ + ‘(“cookies”, “‘ + website + ‘“)’)) return g
@app.route(‘//random’) def random(website): “”” 获取随机的 Cookie, 访问地址如 /weibo/random :return: 随机 Cookie “”” g = get_conn() cookies = getattr(g, website + ‘_cookies’).random() return cookies
import time from multiprocessing import Process from cookiespool.api import app from cookiespool.config import from cookiespool.generator import from cookiespool.tester import *
class Scheduler(object): @staticmethod def valid_cookie(cycle=CYCLE): while True: print(‘Cookies 检测进程开始运行 ‘) try: for website, cls in TESTER_MAP.items(): tester = eval(cls + ‘(website=”‘ + website + ‘“)’) tester.run() print(‘Cookies 检测完成 ‘) del tester time.sleep(cycle) except Exception as e: print(e.args)
@staticmethod
def generate_cookie(cycle=CYCLE):
while True:
print('Cookies 生成进程开始运行 ')
try:
for website, cls in GENERATOR_MAP.items():
generator = eval(cls + '(website="' + website + '")')
generator.run()
print('Cookies 生成完成 ')
generator.close()
time.sleep(cycle)
except Exception as e:
print(e.args)
@staticmethod
def api():
print('API 接口开始运行 ')
app.run(host=API_HOST, port=API_PORT)
def run(self):
if API_PROCESS:
api_process = Process(target=Scheduler.api)
api_process.start()
if GENERATOR_PROCESS:
generate_process = Process(target=Scheduler.generate_cookie)
generate_process.start()
if VALID_PROCESS:
valid_process = Process(target=Scheduler.valid_cookie)
valid_process.start()
Running on http://0.0.0.0:5000/ (Press CTRL+C to quit) Cookies 生成进程开始运行 Cookies 检测进程开始运行 正在生成 Cookies 账号 14747223314 密码 asdf1129 正在测试 Cookies 用户名 14747219309 Cookies 有效 14747219309 正在测试 Cookies 用户名 14740626332 Cookies 有效 14740626332 正在测试 Cookies 用户名 14740691419 Cookies 有效 14740691419 正在测试 Cookies 用户名 14740618009 Cookies 有效 14740618009 正在测试 Cookies 用户名 14740636046 Cookies 有效 14740636046 正在测试 Cookies 用户名 14747222472 Cookies 有效 14747222472 Cookies 检测完成 验证码位置 420 580 384 544 成功匹配 拖动顺序 [1, 4, 2, 3] 成功获取到 Cookies {‘SUHB’: ‘08J77UIj4w5n_T’, ‘SCF’: ‘AimcUCUVvHjswSBmTswKh0g4kNj4K7_U9k57YzxbqFt4SFBhXq3Lx4YSNO9VuBV841BMHFIaH4ipnfqZnK7W6Qs.’, ‘SSOLoginState’: ‘1501439488’, ‘_T_WM’: ‘99b7d656220aeb9207b5db97743adc02’, ‘M_WEIBOCN_PARAMS’: ‘uicode%3D20000174’, ‘SUB’: ‘_2A250elZQDeRhGeBM6VAR8ifEzTuIHXVXhXoYrDV6PUJbkdBeLXTxkW17ZoYhhJ92N_RGCjmHpfv9TB8OJQ..’} 成功保存 Cookies ```
以上所示是程序运行的控制台输出内容,我们从中可以看到各个模块都正常启动,测试模块逐个测试 Cookies,生成模块获取尚未生成 Cookies 的账号的 Cookies,各个模块并行运行,互不干扰。 我们可以访问接口获取随机的 Cookies,如图 10-13 所示。 图 10-13 接口页面 爬虫只需要请求该接口就可以实现随机 Cookies 的获取。
人工智能技术(以下称 AI)是人类优秀的发现和创造之一,它代表着至少几十年的未来。在传统的编程中,工程师将自己的想法和业务变成代码,计算机会根据代码设定的逻辑运行。与之不同的是,AI 使计算机有了「属于自己的思想」,它就像生物一样,能够「看」、「听」、「说」、「动」、「理解」、「分辨」和「思考」。 AI 在图像识别和文本处理方面的效果尤为突出,且已经应用到人类的生活中,例如人脸识别、对话、车牌识别、城市智慧大脑项目中的目标检测和目标分类等。 接下来,我们将了解图像分类的需求、完成任务的前提条件和任务实践。
图像分类以及目标检测的需求
AI 的能力和应用都非常广泛,这里我们主要讨论的是图像分类。 图像分类,其实是对图像中主要目标的识别和归类。例如在很多张随机图片中分辨出哪一张中有直升飞机、哪一张中有狗。或者给定一张图片,让计算机分辨图像中主要目标的类别。 目标检测,指的是检测目标在图片中的位置。例如智慧交通项目中,路面监控摄像头拍摄画面中车辆的位置。目标检测涉及两种技术:分类和定位。也就是说先判定图片中是否存在指定的目标,然后还需要确定目标在图片中的位置。 这样的技术将会应用在人脸识别打卡、视频监控警报、停车场、高速收费站和城市智慧交通等项目当中。
具备以上条件后,再通过短时间(几天或一周)的学习,我们就能够完成图像分类的任务。 讨论个额外的话题,人人都能够做 AI 工程师吗? AI 的门槛是比较高的,首先得具备高等数学、统计学习和编程等基础,然后要有很强的学习能力。对于 IT 工程师来说:
编程基础是没有问题的
学习能力看个人,但花时间、下功夫肯定会有进步
高等数学基础,得好好补
统计学习基础,也得好好补
经济上无压力
如果你想要成为一名 AI 工程师,那么「高学历」几乎是必备的。无论是一线互联网企业或者新崛起的 AI 独角兽,它们为 AI 工程师设立的学历门槛都是「硕士」。除非特别优秀的、才华横溢的大专或本科生,否则是不可能有机会进入这样的企业做 AI 工程师的。 AI 在硬件、软件、数据资料和人才方面都是很费钱的,普通的 IT 工程师也就是学习了解一下,远远达不到产品商用的要求。 普通的中小企业,极少有资质和经济能力吸引高学历且优秀的 AI 工程师,这就导致了资源的聚拢和倾斜。 想要将图像分类技术商用,在让计算机经历「看」、「认识」的步骤并拥有「分辨」能力后,还要将其转换为 Web 服务。 但我只想将人脸识别或者图像分类的功能集成到我的项目当中,就那么困难吗? 我只是一个很小的企业,想要在原来普通的视频监控系统中增加「家人识别」、「陌生人警报」、「火灾警报」和「生物闯入提醒」等功能,没有上述的条件和经济投入,就不能实现了吗? 我好苦恼! 有什么好办法吗?
ModelArts 简介和条件
ModelArts 是华为云推出的产品,它是面向开发者的一站式 AI 开发平台。 它为机器学习与深度学习提供海量数据预处理及半自动化标注、大规模分布式 Training、自动化模型生成,及端-边-云模型按需部署能力,帮助用户快速创建和部署模型,管理全周期 AI 工作流。 它为用户提供了以下可选模式:
零编码经验、零 AI 经验的自动学习模式
有 AI 研发经验的全流程开发模式
同时,它将 AI 开发的整个过程都集成了进来。例如数据标注、模型训练、参数优化、服务部署、开放接口等,这就是「全周期 AI 工作流」。 还有,平台上的操作都是可视化的。 这些条件对于想要将 AI 技术应用于产品,但无奈条件不佳的个人开发者和企业提供了机会,这很重要!可以说 ModelArts) 缩短了 AI 商用的时间,降低了对应的经济成本、时间成本和人力成本。 更贴心的是,华为云 ModelArts) 为用户准备了很多的教程。即使用户没有经验,但只要按照教程指引进行操作,也能够实现自己的 AI 需求。 想想就美滋滋,太棒了! 赶紧体验一下!