HUAWEI DLS 解压缩数据集实录

实体机多好啊…… 为啥要用云平台…… 怨念 ing

HUAWEI 的深度学习训练都迁移到了云平台上了。和普通的服务器差别不太大,无非就是多了几个概念。其中,最大的变化就是把文件读写的接口改掉了,同时对于训练用的机器来说,磁盘大小有比较强的限制。所有数据均放在 S3 上,做到 “数据和代码分离”。

OK,你这一分离,我们就痛苦了…… 怎么说呢,文件地址都是 s3://开头的,只能在 Python 里面使用,操作系统是不认得的。对于开发环境,为了做测试,需要将一部分(注意,是一部分,而不是全部)数据从 S3 同步到开发环境中,稍微大一点就同步不过去了。

好吧,说了这么多,因为我刚刚好碰到了上面所说的所有限制:

  1. 从 S3 里面同步 20 个大小为 1G 的分卷压缩文件到开发环境里
  2. 要解压缩它们,然后再传回 S3

上个月搞过一次,失败掉了。这次趁有时间,可以好好折腾一次。

首先,建立一个 GPU 开发环境。必须是 GPU 开发环境,因为它有 600G 的/tmp 文件夹可以用。CPU 开发环境只有 5G 的/可以用。随便挂载一个桶目录即可。

然后,我写了一个 Python 文件。下面按照 notebook 的格式,一点一点拆开来看。

首先是 import 一大堆东西:

import hashlib
import os
import tarfile
import moxing as mox
import shutil

from pathlib import Path

mox.file.shift('os', 'mox')
print(os.getcwd())

其中 moxing 是 HUAWEI 自己搞的模型。我们这里只用它的文件读写相关功能。 里面那句 mox.file.shift('os', 'mox')就是一键替换文件读写后端的,之后就可以直接使用 Python 的语法来读写文件了。

然后设置路径:

s3_path = Path("s3://bucket-84110000/Data/Something_Something")
local_path = Path("/tmp/something")
compress_path = local_path / "compressed"
md5_path = local_path / "md5"
extract_path = local_path / "video"

这里需要注意的是,上面说了,挂载的时候随便挂载一个桶,之后在这里,知道知道文件的完整地址,就能取文件,而不用管是不是在相同的桶里面、是不是被挂载了。另外,我个人喜欢用 Path 这个类。

下面是将文件取回本地。

shutil.copytree(str(s3_path), str(local_path))

使用了多线程的取回整个文件夹的功能。当然也可以自己写循环一个一个取。 公司内部应该是千兆网,20G 的东西几分钟不到就 OK 了。如果建立的是 CPU 开发环境,由于空间不足……它会进入一个死循环:发现装不下了,删除这个文件,然后继续尝试拉取。所以,一定要建立 GPU 开发环境,保证空间充足。

下面是计算一下文件的 MD5 值,看看上传到桶里面的东西是不是有问题:

def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()
for x in compress_path.iterdir():
    md5_file, filename = (md5_path / (x.name + ".md5")).open().read().strip().split()
    md5_value = md5(str(x))
    print("{} {} {} {}".format(filename, md5_file, md5_value, md5_file == md5_value))
print("OK")

然后是解压缩这些文件:

tar_file = local_path / "something.tar"
with tar_file.open("wb") as f:
    files = sorted(list(compress_path.iterdir()))
    for x in files:
        print(x)
        f.write(Path(x).open("rb").read())
tar = tarfile.open(str(tar_file))
tar.extractall(str(local_path))
tar.close()
(local_path/"20bn-something-something-v2").rename(extract_path)
print(len(list(extract_path.iterdir())))

这里,官方给的做法是 cat 20bn-something-something-v2-?? | tar zx,但是这样的话就需要先切换路径,再做一大堆东西,麻烦。或者可以用类似原理,使用 subprocess,将两个线程通过管道联系在一起。但是我没有成功哎。所以呢,就使用了比较原始的办法,先将所有文件进行合并,然后对合并过的文件解压缩。

最后,将所有解压缩后的文件传回 S3 里:

shutil.copytree(str(extract_path), str(s3_path / "video"))

妈妈说要再打扫一下战场:

shutil.rmtree(str(local_path))

好了,大致就这么多了。

其他细节,可以看这个文档

参考资料

留下评论