一个 Flickr 爬虫

课程 Project 中,需要实现一篇论文。论文中提到 “从 Flickr 上抓取了 1 million 张图片”。这真真是极好的:Flickr 上的图片质量还是不错的,而且一般没有 NSFW 类型的图片出现。

本来想用之前的几个小爬虫来完成这个任务,但是发现改动量会比较大,那就重新写一个。

想法

使用 Flickr 的 flickr.interestingness.getList 接口,爬取每天最受欢迎的图片。

由于此网站在墙外,如果出现一些未知问题,需要能再次启动,启动后能从停下来的地方继续跑。

分析

特别想用 Redis 当队列。听说那它可以实现分布式?

想了一下,大致使用 “生产者 - 消费者” 模式来完成这个任务。生产者是一个 for 循环,从今天开始向前遍历每一天,从接口里面获取每日图片列表,拿到返回值,解析 Json/XML,放到一个下载队列里面;消费者从下载队列里面获取到图片的 url,开始下载。

Flickr 才诞生多长时间啊?就算它 10 年,10 年 3650 天,一会儿就能遍历完毕了。所以生产者使用单线程就可以了。消费者呢?没啥说的,绝对的多线程。当然,如果喜欢单线程我也不拦着。

图片存放的时候直接使用图片 ID 就可以了。虽然可以按照 Flickr 的规则,使用 farm/server/id 的形式进行存放——无所谓啦~

除了使用 Redis 做下载队列之外,为了达到 “能从摔倒的地方爬起来”,我们还要将生产者当前的 “时间” 计数器再保存到 Redis 中。每次启动的时候读取这个计数器。

实现

首先,不考虑上面的 “跌倒之后再爬起来” 的需求,先实现一个版本,然后再加上 Redis,透明替换原来的 Queue 和 Counter。

注意:调试的时候需要挂全局梯子(Console 挂梯子好像需要 TUN?)或者改 hosts。CN 以外的国家请无视。

import time
import redis
import json
import datetime
import os
from threading import Thread
from queue import Queue
import pickle
from urllib import request
from urllib.parse import urlencode


class Counter:
    def __init__(self):
        self.__date = datetime.date.today()

    def pre(self):
        self.__date -= datetime.timedelta(1)

    def next(self):
        self.__date += datetime.timedelta(1)

    def get_date(self):
        return self.__date.isoformat()


class RedisCounter:
    def __init__(self, name, namespace='counter', **redis_kwargs):
        self.__db = redis.Redis(**redis_kwargs)
        self.key = '%s:%s' % (namespace, name)
        self.__date = self.__db.get(self.key)
        if self.__date is None:
            self.__date = datetime.date.today()
        else:
            self.__date = pickle.loads(self.__date)

    def pre(self):
        self.__date -= datetime.timedelta(1)
        self.__update_to_redis()

    def next(self):
        self.__date += datetime.timedelta(1)
        self.__update_to_redis()

    def __update_to_redis(self):
        self.__db.set(self.key, pickle.dumps(self.__date))

    def get_date(self):
        return self.__date.isoformat()


class RedisQueue:
    """Simple Queue with Redis Backend"""

    def __init__(self, name, namespace='queue', **redis_kwargs):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db = redis.Redis(**redis_kwargs)
        self.key = '%s:%s' % (namespace, name)

    def qsize(self):
        """Return the approximate size of the queue."""
        return self.__db.llen(self.key)

    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0

    def put(self, item):
        """Put item into the queue."""
        self.__db.rpush(self.key, pickle.dumps(item))

    def get(self, block=True, timeout=None):
        """Remove and return an item from the queue.

        If optional args block is true and timeout is None (the default), block
        if necessary until an item is available."""
        if block:
            item = self.__db.blpop(self.key, timeout=timeout)
        else:
            item = self.__db.lpop(self.key)

        if item:
            item = item[1]
        return pickle.loads(item)

    def get_nowait(self):
        """Equivalent to get(False)."""
        return self.get(False)


class ImageFinder(Thread):
    def __init__(self, date_start=None, database=None, page_per_day = 5):
        super(ImageFinder, self).__init__()
        self.base_url = "https://api.flickr.com/services/rest/"
        self.__date = date_start
        self.__db = database
        self.__page_per_day = page_per_day

    def run(self):
        while True:
            if self.__db.qsize() < 10:
                try:
                    for page_num in range(self.__page_per_day):
                        self.__parse_page(self.__date.get_date(), page_num)
                except Exception as e:
                    pass
                self.__date.pre()

    def __parse_page(self, day, page_num):
        header, param = self.__class__.__param_gen(day, 
                                                    page = page_num,
                                                    per_page = 500)
        print(self.base_url + "?" + param.decode('utf-8'))
        data = json.loads(request.urlopen(self.base_url, param, timeout=5).read().decode('utf-8'))
        if "photos" in data.keys():
            for item in data["photos"]["photo"]:
                image_url = "https://farm{farm}.staticflickr.com/{server}/{id}_{secret}_b.jpg".format(
                    farm=item['farm'],
                    server=item['server'],
                    id=item['id'],
                    secret=item['secret']
                )
                self.__db.put({"id": item['id'], "url": image_url})

    @classmethod
    def __param_gen(cls, day, **kwargs):
        headers = {
            "Connection": "keep-alive",
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
            "Accept": "*/*",
            "Accept-Encoding": "gzip, deflate, sdch",
            "DNT": "1",
            "Accept-Language": "zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4"
        }
        request_parameter = {
            "format": "json",
            "method": "flickr.interestingness.getList",
            "api_key": "ab9bdd8345f528d08adf806d41712de0",
            "nojsoncallback": "1"
        }
        request_parameter["date"] = day
        for key, value in kwargs:
            request_parameter[key] = value
        return headers, urlencode(request_parameter).encode()


class DataSaver(Thread):
    def __init__(self, database=None, base_dir=""):
        super(DataSaver, self).__init__()
        self.base_dir = base_dir
        self.__db = database

    def run(self):
        while True:
            record = self.__db.get()
            try:
                request.urlretrieve(record["url"], os.path.join("result", record["id"] + ".jpg"))
            except Exception as e:
                print(e)


def main():
    counter = RedisCounter('flickr_counter')
    queue = RedisQueue('flickr_queue')
    # counter = Counter()
    # queue = Queue()
    image_finder = ImageFinder(counter, queue)
    image_finder.start()
    for i in range(5):
        ds = DataSaver(queue)
        ds.start()


if '__main__' == __name__:
    main()

API KEY 的获取

我们没有必要为了这个需求再去注册一个 Flickr 应用。在这个 API 调用示例页面里面就可以获取到。

点击 “Call Method”,下方会有一个 URL,找到 api_key,复制出来。

结果

1 个生产者和 5 个消费者,经过一晚上的艰苦爬行,墙外服务器上的程序顺利把硬盘撑爆(20G 硬盘)报错挂掉。墙内我的工作电脑顺利从 2016 年 12 月 1 日枚举到了 1986 年某月某日,收获图片 10W 余张,总大小 26G 左右。

这下有素材了。可以进行后续的分类和聚类工作了。

自我批评

上面的爬虫有一个先天缺陷:没有对 ID 进行判重,导致一些图片可能会被下载多次。不想改了,完成任务就好。是吧?

而且,通过看 API 文档,我们可以发现,每日不仅仅有 500 张图片。其实每日推荐图片有 500×10 张。这也就是论文中说 1 million 的缘故吧。~~懒得再加一层循环了……~~循环已加入,但未验证。

参考资料

  1. Automatic Content-Aware Color and Tone Stylization, CVPR 2016
  2. Flickr 服務:Flickr API: flickr.interestingness.getList
  3. Python3 Redis 使用说明

《一个 Flickr 爬虫》有3条留言

  1. 博主,您好,首先感谢你的无私共享,我运行你的代码,这句 if item: 出现语法错误,没改明白怎么回事,我想爬去的不是照片,想爬照片的点赞数,评论数,地理位置等附属标签信息,我是一名研二的学生,非计算机专业,想爬去数据做一些学术研究,但是苦于爬虫写不出来,基础数据都弄不到,不知道您是否有时间指导一下我,非常感谢。

    回复
  2. 博主您好,我用您的代码有一些错误:urllib 这个包里面没有 requests,请问如果用 urllib2 的话应该怎么写啊。
    我也是用 flickr 上面的图片来做东西,需要 20w 左右的图片。目前正在学习怎么爬取 flickr 的图片,如果可以的话您能不能稍微指导我一下啊。
    我目前是广州广东工业大学的一名研究生,非常感谢您。

    回复
    • 由于我自己是一个 “追新族”,所以不到万不得已,一般不会使用老版本。你可以看一下这个文档,开头有一些说明:

      The urllib module has been split into parts and renamed in Python 3 to urllib.request, urllib.parse, and urllib.error. The 2to3 tool will automatically adapt imports when converting your sources to Python 3. Also note that the urllib.request.urlopen() function in Python 3 is equivalent to urllib2.urlopen() and that urllib.urlopen() has been removed.

      我的代码仅仅是我的需求,所以你用的话可能没有那么复杂。首先,关于 Redis 的东西都可以去除,然后你说需要 20W 左右的图片,那么可以加一重循环,每天多爬几页。

留下评论