背景 链接到标题
在之前的一篇博客《HTTP Content-Length 学习》 中提到自己踩了一个坑,就是 content-length
与实际大小不匹配导致文件下载失败,在解决过程中用到了 zipstreamer ,今天来看看 zipstreamer 是如何工作的。
zipfile 链接到标题
Python 标准库中提供了 zipfile 用来对 Zip 文件进行操作,可以进行 Zip 的创建、写入、读取、解压等动作,但是 zipfile 只能对文件进行操作,没办法传入 stream,所以能做的操作有限。
示例 链接到标题
import zipfile
def test():
for i in range(1, 4):
f = open("file" + str(i) + ".txt", 'w')
f.write(str(i))
f.close()
f = zipfile.ZipFile('filename.zip', 'w', zipfile.ZIP_DEFLATED)
f.write('file1.txt')
f.write('file2.txt')
f.write('file3.txt')
f.close()
f = zipfile.ZipFile('filename.zip')
f.extractall()
f.close()
if __name__ == "__main__":
test()
zipstreamer 链接到标题
在提供文件下载接口时,有一个比较常见的需求是传过来一个 stream,然后我们要将 stream 作为 Zip 中的一个文件转发出去,实时下载,这时候就需要 zipstreamer 来实现了。
示例 链接到标题
z = ZipStream(files=[
ZipFile('file.txt', 4, lambda: StringIO('test'), None, None),
ZipFile('emptydir/', None, None, None, None),
ZipFile('dir/remote.txt', remote_file_size, get_remote_file, None, None),
])
size = z.size()
res = Response(z.generate(), mimetype='application/zip')
res.headers['Content-Length'] = str(size)
源码 链接到标题
在看代码之前,我们先看下 zip 文件的数据格式:
root@localhost:~/project/python-zipstreamer/zipstreamer
master ✔ $ tree .
.
├── compat.py # py2 py3 兼容
├── __init__.py # 主要逻辑
└── __version__.py # 版本信息
zipstreamer 的目录结构比较简单,所有逻辑都在 __init__.py
中,先来看看 compat.py
中做了什么,根据 Python 版本来决定 BytesIO 的引用,修改下默认的 str 定义:
#: Python 2.x?
IS_PY2 = (_VER[0] == 2)
#: Python 3.x?
IS_PY3 = (_VER[0] == 3)
if IS_PY2:
from StringIO import StringIO as BytesIO
str = unicode
elif IS_PY3:
from io import BytesIO
str = str
那么来看看 __init__.py
中都定义了哪些类,根据示例我们知道有 ZipStream
,并且它肯定实现了 generate
方法:
class ZipStream(object):
def __init__(self, files, comment=None):
def generate(self):
def size(self):
def _incr(self, buf):
def _generate_file(self, zip_file):
def _generate_dir_entry(self, entry):
def _generate_zip_file(self):
先来看看构造函数中定义了什么:
def __init__(self, files, comment=None):
if isinstance(comment, str):
raise ZipFileBytesRequired('ZIP comment should bytes')
# 所有 ZipFile 的集合
self.files = files
self.comment = comment
# 定义初始值及必要的 flag
self._dir = None
self._pos = None # 计算文件大小时用到
self._generating = False
self._calculate_size = False
最终传给 Response 的是 z.generate()
,所以先看看 generate()
:
def generate(self):
# 构造函数中定义的 flag,因为是生成器,如果已经执行过那么后续执行都会异常,所以这里提前判断
if self._generating:
raise ZipFileInProgress('ZipFile generator already in progress')
self._generating = True
try:
for chunk in self._generate_zip_file():
yield chunk
finally:
self._generating = False
继续看 self._generate_zip_file()
这里是整个 zipstreamer 中最核心的逻辑,包含了 Zip 文件内容定义,数据格式定义:
def _generate_zip_file(self):
self._dir = []
self._pos = 0
for zip_file in self.files:
# 生成文件数据
for chunk in self._generate_file(zip_file):
yield chunk
# 这里可以判断 self._generate_file 会修改 self._pos
start = self._pos
for entry in self._dir:
# 生成文件夹数据
for chunk in self._generate_dir_entry(entry):
yield chunk
# 同样 self._generate_dir_entry 也会修改 self._pos
end = self._pos
# 可以猜测在 self._generate_dir_entry 会更新 self._dir
cent_dir_count = len(self._dir)
cent_dir_size = end - start
cent_dir_offset = start
# 根据条件判断最终 Zip 文件是否为 64 位
if (cent_dir_count >= UINT16_MAX or cent_dir_size >= UINT32_MAX or cent_dir_offset >= UINT32_MAX):
zip64_end_rec = struct.pack(
STRUCT_END_ARCHIVE64, STRING_END_ARCHIVE64, 44, ZIP_VERSION_45,
ZIP_VERSION_45, 0, 0, cent_dir_count, cent_dir_count,
cent_dir_size, cent_dir_offset)
yield self._incr(zip64_end_rec)
zip64_loc_rec = struct.pack(
STRUCT_END_ARCHIVE64_LOCATOR, STRING_END_ARCHIVE64_LOCATOR, 0,
end, 1)
yield self._incr(zip64_loc_rec)
cent_dir_count = UINT16_MAX
cent_dir_size = UINT32_MAX
cent_dir_offset = UINT32_MAX
eocd_comment = b'' if self.comment is None else self.comment
endrec = struct.pack(
STRUCT_END_ARCHIVE, STRING_END_ARCHIVE, 0, 0, cent_dir_count,
cent_dir_count, cent_dir_size, cent_dir_offset, len(eocd_comment))
# 返回 Zip 格式要求的 central 部分
yield self._incr(endrec)
yield self._incr(eocd_comment)
self._generate_file
和 self._generate_dir_entry
都涉及到具体的 Zip 数据格式的定义,这里不详细展开了,需要注意的是在 self._generate_file
中的传输 chunk 大小的定义,一次返回的数据大小为 4096字节:
elif file_obj is not None:
while True:
buf = file_obj.read(4096)
if not buf:
break
文件大小计算 链接到标题
在示例代码中我们使用 z.size()
来获取打包后的 Zip 文件大小,来看看这里是怎么获取的:
def size(self):
if self._generating:
raise ZipFileInProgress(
'ZipFile generator already in progress. You need to call'
' size() before generate()')
self._calculate_size = True
try:
# 执行一次 generate() 方法
for _ in self.generate():
pass
finally:
self._calculate_size = False
# 最终返回 self._pos
return self._pos
在 zipstream 执行 yield 的时候,都是跟着 self._incr
的,这里保存这文件大小的计算逻辑:
def _incr(self, buf):
assert not isinstance(buf, str)
self._pos += len(buf)
return buf
我们只需要保证我们返回的所有数据都用 self._incr
封装了,那么最终的 self._pos
就是文件的总大小。
Zip 文件格式 链接到标题
大概了解了 zipstream 是怎么做的,回过头来看看 Zip 的文件格式,根据 yield 顺序汇总一下,可以看到与我们在看代码之前看到的 Zip 文件格式图表是一致的:
# files
yield self._incr(header)
yield self._incr(filename)
yield self._incr(extra)
yield self._incr(buf)
yield self._incr(data_descriptor)
# dirs
yield self._incr(central_dir)
yield self._incr(entry.filename)
yield self._incr(extra)
yield self._incr(entry.comment)
# zip structure
yield self._incr(zip64_end_rec)
yield self._incr(zip64_loc_rec)
yield self._incr(endrec)
yield self._incr(eocd_comment)