工作中查Hive标有一些限制。为了绕过限制,可以将数据直接Dump到HDFS里。但是这样做会导致查出来一大堆文件,用起来很是不便。
那么,就稍微做个封装,搞成一个假的CSV来用:
import csv
from collections import namedtuple
from pathlib import Path
from typing import Union, List
class HiveCSVReader:
def __init__(self, path: Union[str, Path], columns: Union[str, List[str], None]):
self.path = Path(path)
self._row_class = namedtuple("DataRow", columns) if columns else None
def __iter__(self):
for file in filter(lambda x: x.is_file(), self.path.rglob("*")):
if file.name == "_SUCCESS" or file.name.endswith("._COPYING_"):
continue
for row in csv.reader(file.open("r"), delimiter="\x01"):
yield self._row_class(*row) if self._row_class else row
这里用到了 namedtupl
e这个东西。如果不习惯的话,还可以想办法把它转成dict来用。
发表回复