diff --git a/Lib/test/test_zipfile64.py b/Lib/test/test_zipfile64.py index 7d802d59849ce1f..e13f064f2ac4fd4 100644 --- a/Lib/test/test_zipfile64.py +++ b/Lib/test/test_zipfile64.py @@ -17,6 +17,7 @@ import sys import unittest.mock as mock +from contextlib import contextmanager from tempfile import TemporaryFile from test.support import os_helper @@ -91,176 +92,133 @@ def tearDown(self): os_helper.unlink(TESTFN2) -class TestRepack(unittest.TestCase): - def setUp(self): - # Create test data. - line_gen = ("Test of zipfile line %d." % i for i in range(1000000)) - self.data = '\n'.join(line_gen).encode('ascii') - - # It will contain enough copies of self.data to reach about 8 GiB. - self.datacount = 8*1024**3 // len(self.data) +class TestRepacker(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.largefilename = 'largefile.txt' - # memory usage should not exceed 10 MiB - self.allowed_memory = 10*1024**2 + line_gen = ("Test of zipfile line %d." % i for i in range(1000000)) + cls.chunk = '\n'.join(line_gen).encode('ascii') + + # It will contain enough copies of cls.chunk to reach about 4.1 GiB. + cls.chunkcount = int(4.1*1024**3 / len(cls.chunk)) + + cls.filename = 'file.txt' + cls.lorem = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + + # Memory usage should not exceed 10 MiB during repacking. + # This empirical threshold ensures that the internal processing + # like signature scanning, compressed block end tracing, and + # data copying are properly buffered without loading the entire + # large file into memory. + cls.allowed_memory = 10*1024**2 + + @contextmanager + def assert_memory_usage(self, threshold): + tracemalloc.start() + try: + yield + finally: + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + self.assertLess(peak, threshold) def _write_large_file(self, fh): next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL - for num in range(self.datacount): - fh.write(self.data) + for num in range(self.chunkcount): + fh.write(self.chunk) # Print still working message since this test can be really slow if next_time <= time.monotonic(): next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL print(( ' writing %d of %d, be patient...' % - (num, self.datacount)), file=sys.__stdout__) + (num, self.chunkcount)), file=sys.__stdout__) sys.__stdout__.flush() def test_strip_removed_large_file(self): """Should move the physical data of a file positioned after a large removed file without causing a memory issue.""" - # Try the temp file. If we do TESTFN2, then it hogs - # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file(f) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(f, 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - zh.repack() - self.assertIsNone(zh.testzip()) + with zipfile.ZipFile(f, 'w') as zh: + with zh.open(self.largefilename, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(self.filename, self.lorem) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(self.largefilename) + zh.repack() + self.assertIsNone(zh.testzip()) def test_strip_removed_file_before_large_file(self): """Should move the physical data of a large file positioned after a removed file without causing a memory issue.""" - # Try the temp file. If we do TESTFN2, then it hogs - # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_file_before_large_file(f) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_file_before_large_file(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(f, 'w') as zh: - zh.writestr(file, data) - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file) - zh.repack() - self.assertIsNone(zh.testzip()) + with zipfile.ZipFile(f, 'w') as zh: + zh.writestr(self.filename, self.lorem) + with zh.open(self.largefilename, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(self.filename) + zh.repack() + self.assertIsNone(zh.testzip()) def test_strip_removed_large_file_with_dd(self): """Should scan for the data descriptor of a removed large file without causing a memory issue.""" - # Try the temp file. If we do TESTFN2, then it hogs - # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd(f) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(Unseekable(f), 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - zh.repack() - self.assertIsNone(zh.testzip()) + with zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(self.largefilename, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(self.filename, self.lorem) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(self.largefilename) + zh.repack() + self.assertIsNone(zh.testzip()) def test_strip_removed_large_file_with_dd_no_sig(self): - """Should scan for the data descriptor (without signature) of a removed - large file without causing a memory issue.""" + """Should scan for the unsigned data descriptor of a removed large file + without causing a memory issue.""" # Reduce data scale for this test, as it's especially slow... - self.datacount = 30*1024**2 // len(self.data) - self.allowed_memory = 200*1024 + self.chunkcount = int(30*1024**2 / len(self.chunk)) - # Try the temp file. If we do TESTFN2, then it hogs - # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd_no_sig(f) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd_no_sig(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): - with zipfile.ZipFile(Unseekable(f), 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig), \ + zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(self.largefilename, 'w', force_zip64=True) as fh: self._write_large_file(fh) - zh.writestr(file, data) + zh.writestr(self.filename, self.lorem) - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - # strict_descriptor=False to scan the unsigned data descriptor - # (scanning is disabled under the strict_descriptor=True default) - zh.repack(strict_descriptor=False) - self.assertIsNone(zh.testzip()) + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(self.largefilename) + # strict_descriptor=False to scan the unsigned data descriptor + # (scanning is disabled under the strict_descriptor=True default) + zh.repack(strict_descriptor=False) + self.assertIsNone(zh.testzip()) @requires_zlib() def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self): - """Should scan for the data descriptor (without signature) of a removed - large file without causing a memory issue.""" - # Try the temp file. If we do TESTFN2, then it hogs - # gigabytes of disk space for the duration of the test. + """Should scan for the unsigned data descriptor (via tracing compressed + block end) of a removed large file without causing a memory issue.""" with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd_no_sig_by_decompression( - f, zipfile.ZIP_DEFLATED) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd_no_sig_by_decompression(self, f, method): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): - with zipfile.ZipFile(Unseekable(f), 'w', compression=method) as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig), \ + zipfile.ZipFile(Unseekable(f), 'w', compression=zipfile.ZIP_DEFLATED) as zh: + with zh.open(self.largefilename, 'w', force_zip64=True) as fh: self._write_large_file(fh) - zh.writestr(file, data) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - # strict_descriptor=False to detect the unsigned data descriptor - # (scanning is disabled under the strict_descriptor=True default) - zh.repack(strict_descriptor=False) - self.assertIsNone(zh.testzip()) + zh.writestr(self.filename, self.lorem) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(self.largefilename) + # strict_descriptor=False to detect the unsigned data descriptor + # (scanning is disabled under the strict_descriptor=True default) + zh.repack(strict_descriptor=False) + self.assertIsNone(zh.testzip()) class OtherTests(unittest.TestCase):