课程 Project 中,需要实现一篇论文。论文中提到“从 Flickr 上抓取了 1 million 张图片”。这真真是极好的:Flickr 上的图片质量还是不错的,而且一般没有 NSFW 类型的图片出现。
本来想用之前的几个小爬虫来完成这个任务,但是发现改动量会比较大,那就重新写一个。
想法
使用 Flickr 的 flickr.interestingness.getList
接口,爬取每天最受欢迎的图片。
由于此网站在墙外,如果出现一些未知问题,需要能再次启动,启动后能从停下来的地方继续跑。
分析
特别想用 Redis 当队列。听说那它可以实现分布式?
想了一下,大致使用 “生产者 – 消费者” 模式来完成这个任务。生产者是一个 for 循环,从今天开始向前遍历每一天,从接口里面获取每日图片列表,拿到返回值,解析 Json/XML,放到一个下载队列里面;消费者从下载队列里面获取到图片的 url,开始下载。
Flickr 才诞生多长时间啊?就算它 10 年,10 年 3650 天,一会儿就能遍历完毕了。所以生产者使用单线程就可以了。消费者呢?没啥说的,绝对的多线程。当然,如果喜欢单线程我也不拦着。
图片存放的时候直接使用图片 ID 就可以了。虽然可以按照 Flickr 的规则,使用 farm/server/id
的形式进行存放——无所谓啦~
除了使用 Redis 做下载队列之外,为了达到 “能从摔倒的地方爬起来”,我们还要将生产者当前的“时间” 计数器再保存到 Redis 中。每次启动的时候读取这个计数器。
实现
首先,不考虑上面的 “跌倒之后再爬起来” 的需求,先实现一个版本,然后再加上 Redis,透明替换原来的 Queue 和 Counter。
注意:调试的时候需要挂全局梯子(Console 挂梯子好像需要 TUN?)或者改 hosts。CN 以外的国家请无视。
import time
import redis
import json
import datetime
import os
from threading import Thread
from queue import Queue
import pickle
from urllib import request
from urllib.parse import urlencode
class Counter:
def __init__(self):
self.__date = datetime.date.today()
def pre(self):
self.__date -= datetime.timedelta(1)
def next(self):
self.__date += datetime.timedelta(1)
def get_date(self):
return self.__date.isoformat()
class RedisCounter:
def __init__(self, name, namespace='counter', **redis_kwargs):
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
self.__date = self.__db.get(self.key)
if self.__date is None:
self.__date = datetime.date.today()
else:
self.__date = pickle.loads(self.__date)
def pre(self):
self.__date -= datetime.timedelta(1)
self.__update_to_redis()
def next(self):
self.__date += datetime.timedelta(1)
self.__update_to_redis()
def __update_to_redis(self):
self.__db.set(self.key, pickle.dumps(self.__date))
def get_date(self):
return self.__date.isoformat()
class RedisQueue:
"""Simple Queue with Redis Backend"""
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.llen(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def put(self, item):
"""Put item into the queue."""
self.__db.rpush(self.key, pickle.dumps(item))
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return pickle.loads(item)
def get_nowait(self):
"""Equivalent to get(False)."""
return self.get(False)
class ImageFinder(Thread):
def __init__(self, date_start=None, database=None, page_per_day = 5):
super(ImageFinder, self).__init__()
self.base_url = "https://api.flickr.com/services/rest/"
self.__date = date_start
self.__db = database
self.__page_per_day = page_per_day
def run(self):
while True:
if self.__db.qsize() < 10:
try:
for page_num in range(self.__page_per_day):
self.__parse_page(self.__date.get_date(), page_num)
except Exception as e:
pass
self.__date.pre()
def __parse_page(self, day, page_num):
header, param = self.__class__.__param_gen(day,
page = page_num,
per_page = 500)
print(self.base_url + "?" + param.decode('utf-8'))
data = json.loads(request.urlopen(self.base_url, param, timeout=5).read().decode('utf-8'))
if "photos" in data.keys():
for item in data["photos"]["photo"]:
image_url = "https://farm{farm}.staticflickr.com/{server}/{id}_{secret}_b.jpg".format(
farm=item['farm'],
server=item['server'],
id=item['id'],
secret=item['secret']
)
self.__db.put({"id": item['id'], "url": image_url})
@classmethod
def __param_gen(cls, day, **kwargs):
headers = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, sdch",
"DNT": "1",
"Accept-Language": "zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4"
}
request_parameter = {
"format": "json",
"method": "flickr.interestingness.getList",
"api_key": "ab9bdd8345f528d08adf806d41712de0",
"nojsoncallback": "1"
}
request_parameter["date"] = day
for key, value in kwargs:
request_parameter[key] = value
return headers, urlencode(request_parameter).encode()
class DataSaver(Thread):
def __init__(self, database=None, base_dir=""):
super(DataSaver, self).__init__()
self.base_dir = base_dir
self.__db = database
def run(self):
while True:
record = self.__db.get()
try:
request.urlretrieve(record["url"], os.path.join("result", record["id"] + ".jpg"))
except Exception as e:
print(e)
def main():
counter = RedisCounter('flickr_counter')
queue = RedisQueue('flickr_queue')
# counter = Counter()
# queue = Queue()
image_finder = ImageFinder(counter, queue)
image_finder.start()
for i in range(5):
ds = DataSaver(queue)
ds.start()
if '__main__' == __name__:
main()
API KEY 的获取
我们没有必要为了这个需求再去注册一个Flickr应用。在这个API调用示例页面里面就可以获取到。
点击“Call Method”,下方会有一个URL,找到api_key
,复制出来。
结果
1 个生产者和 5 个消费者,经过一晚上的艰苦爬行,墙外服务器上的程序顺利把硬盘撑爆(20G 硬盘)报错挂掉。墙内我的工作电脑顺利从 2016 年 12 月 1 日枚举到了 1986 年某月某日,收获图片 10W 余张,总大小 26G 左右。
这下有素材了。可以进行后续的分类和聚类工作了。
自我批评
上面的爬虫有一个先天缺陷:没有对 ID 进行判重,导致一些图片可能会被下载多次。不想改了,完成任务就好。是吧?
而且,通过看 API 文档,我们可以发现,每日不仅仅有 500 张图片。其实每日推荐图片有 500×10 张。这也就是论文中说 1 million
的缘故吧。~~懒得再加一层循环了……~~循环已加入,但未验证。
参考资料
- Automatic Content-Aware Color and Tone Stylization, CVPR 2016
- Flickr 服務:Flickr API: flickr.interestingness.getList
- Python3 Redis 使用说明
发表回复