import fnmatch
import io
import logging
import os
import re
import shutil
import struct
import sys
import typing
import zipfile
from pathlib import Path
import zstandard as zstd
from Crypto.Cipher import AES
from scdatatools import plugins
from scdatatools.utils import norm_path
logger = logging.getLogger(__name__)
ZIP_ZSTD = 100
p4kFileHeader = b"PK\x03\x14"
DEFAULT_P4K_KEY = b"\x5E\x7A\x20\x02\x30\x2E\xEB\x1A\x3B\xB6\x17\xC3\x0F\xDE\x1E\x47"
DEFAULT_CRYXML_CONVERT_FMT = "xml"
compressor_names = zipfile.compressor_names
compressor_names[100] = "zstd"
[docs]def monitor_msg_from_info(info):
return f'{compressor_names[info.compress_type]} | {"Crypt" if info.is_encrypted else "Plain"} | {info.filename}'
def _p4k_extract_monitor(msg="", progress=None, total=None, level=logging.INFO, exc_info=None):
"""Default monitor for P4KFiles extract methods"""
print(msg)
class _SharedFile(zipfile._SharedFile):
def tell(self):
# fix multi-threaded reading of zipfiles: https://bugs.python.org/issue42369
return self._pos
def _P4KDecrypter(key):
cipher = AES.new(key, AES.MODE_CBC, b"\x00" * 16)
def decrypter(data):
return cipher.decrypt(data)
return decrypter
[docs]class ZStdDecompressor:
def __init__(self):
dctx = zstd.ZstdDecompressor()
self._decomp = dctx.decompressobj()
self.eof = False
[docs] def decompress(self, data):
result = b""
try:
result = self._decomp.decompress(data)
except zstd.ZstdError:
self.eof = True
return result
[docs]class P4KExtFile(zipfile.ZipExtFile):
MIN_READ_SIZE = 65536
def __init__(self, fileobj, mode, p4kinfo, decrypter=None, close_fileobj=False):
self.p4kinfo = p4kinfo
self._is_encrypted = p4kinfo.is_encrypted
self._decompressor = ZStdDecompressor()
self._fileobj = fileobj
self._decrypter = decrypter
self._close_fileobj = close_fileobj
self._compress_type = p4kinfo.compress_type
self._compress_left = p4kinfo.compress_size
self._left = p4kinfo.file_size
self._eof = False
self._readbuffer = b""
self._offset = 0
self.newlines = None
self.mode = mode
self.name = p4kinfo.filename
if hasattr(p4kinfo, "CRC"):
self._expected_crc = p4kinfo.CRC
self._running_crc = zipfile.crc32(b"")
else:
self._expected_crc = None
# TODO: the CRCs don't match, but im getting the same outputs as unp4k - we should figure out what exactly is
# going into calculating the CRC for P4K entry
self._expected_crc = None
self._seekable = False
try:
if fileobj.seekable():
self._orig_compress_start = fileobj.tell()
self._orig_compress_size = p4kinfo.compress_size
self._orig_file_size = p4kinfo.file_size
self._orig_start_crc = self._running_crc
self._seekable = True
except AttributeError:
pass
[docs]class P4KInfo(zipfile.ZipInfo):
def __init__(
self,
filename,
date_time=(1980, 1, 1, 0, 0, 0),
p4k=None,
subinfo=None,
archive=None,
):
# ensure posix file paths as specified by the Zip format
# super().__init__(*args, **kwargs)
self.orig_filename = filename # Original file name in archive
self.filename = norm_path(filename)
self.date_time = date_time # year, month, day, hour, min, sec
# Standard values:
self.compress_type = zipfile.ZIP_STORED # Type of compression for the file
self._compresslevel = None # Level for the compressor
self.comment = b"" # Comment for each file
self.extra = b"" # ZIP extra data
if sys.platform == "win32":
self.create_system = 0 # System which created ZIP archive
else:
# Assume everything else is unix-y
self.create_system = 3 # System which created ZIP archive
self.create_version = zipfile.DEFAULT_VERSION # Version which created ZIP archive
self.extract_version = zipfile.DEFAULT_VERSION # Version needed to extract archive
self.reserved = 0 # Must be zero
self.flag_bits = 0 # ZIP flag bits
self.volume = 0 # Volume number of file header
self.internal_attr = 0 # Internal attributes
self.external_attr = 0 # External file attributes
self.compress_size = 0 # Size of the compressed file
self.file_size = 0 # Size of the uncompressed file
# Other attributes are set by class ZipFile:
# header_offset Byte offset to the file header
# CRC CRC-32 of the uncompressed file
self.p4k = p4k
self.archive = archive
self.subinfo = subinfo
self.filelist = []
if self.subinfo is not None and self.subinfo.is_dir():
self.filename += "/" # Make sure still tracked as directory
self.is_encrypted = False
[docs] def open(self, *args, **kwargs):
return self.p4k.open(self, *args, **kwargs)
def _decodeExtra(self):
# Try to decode the extra field.
self.is_encrypted = len(self.extra) >= 168 and self.extra[168] > 0x00
# The following is the default ZipInfo decode, minus a few steps that would mark an encrypted it as invalid
# TODO: only do this if self.is_encrypted, otherwise call super?
offset = 0
total = len(self.extra)
while (offset + 4) < total:
tp, ln = struct.unpack("<HH", self.extra[offset : offset + 4])
if tp == 0x0001:
if ln >= 24:
counts = struct.unpack("<QQQ", self.extra[offset + 4 : offset + 28])
elif ln == 16:
counts = struct.unpack("<QQ", self.extra[offset + 4 : offset + 20])
elif ln == 8:
counts = struct.unpack("<Q", self.extra[offset + 4 : offset + 12])
elif ln == 0:
counts = ()
else:
raise zipfile.BadZipFile("Corrupt extra field %04x (size=%d)" % (tp, ln))
idx = 0
# ZIP64 extension (large files and/or large archives)
if self.file_size in (0xFFFFFFFFFFFFFFFF, 0xFFFFFFFF):
self.file_size = counts[idx]
idx += 1
if self.compress_size == 0xFFFFFFFF:
self.compress_size = counts[idx]
idx += 1
if self.header_offset == 0xFFFFFFFF:
self.header_offset = counts[idx]
idx += 1
offset += ln + 4
[docs]class P4KFile(zipfile.ZipFile):
filelist: typing.List[P4KInfo]
NameToInfo: typing.Dict[typing.Text, P4KInfo]
def __init__(self, file, mode="r", key=DEFAULT_P4K_KEY, load_monitor=None):
# Using ZIP_STORED to bypass the get_compressor/get_decompressor logic in zipfile. Our P4KExtFile will always
# use zstd
self.key = key
self.NameToInfoLower = {}
self.BaseNameToInfo = {}
self.subarchives = {}
self.file_tree = {}
if load_monitor is None:
self._load_monitor = lambda *a, **kw: True
else:
self._load_monitor = load_monitor
super().__init__(file, mode, compression=zipfile.ZIP_STORED)
# TODO: this doubles the time of opening a pack, so add some logic to delay expansion and add functionality to
# expand on the fly
# open up sub-archives and add them to the file list
# for filename in self.subarchives.keys():
self._update_file_tree()
def _update_file_tree(self):
for path in self.filelist:
parent = self.file_tree
for part in path.filename.split('/'):
parent = parent.setdefault(part, {})
[docs] def expand_subarchives(self):
"""Ensures all sub-archives have been expanded"""
for filename, info in self.subarchives.items():
if info is None:
self._expand_subarchive(filename)
self._update_file_tree()
def _expand_subarchive(self, filename):
if self.subarchives[filename] is not None:
return # already been opened
file_ext = f"{filename.split('.', maxsplit=1)[-1]}"
info = self.NameToInfo[filename]
self.subarchives[filename] = SUB_ARCHIVES[file_ext](self.open(info))
info.subarchive = self.subarchives[filename]
base_p4k_path = Path(filename[: -1 * len(file_ext) - 1])
archive_name = base_p4k_path.name
for si in info.subarchive.filelist:
# Create ZipInfo instance to store file information
# parent extension gets removed
# e.g. `100i_interior.socpak` -> `100i_interior`
sub_path = Path(si.filename)
if sub_path.parts[0] == archive_name:
# Paths in socpaks are referenced slightly strangely.
sub_path = Path(*sub_path.parts[1:])
x = P4KInfo(
str(base_p4k_path / sub_path),
p4k=self,
subinfo=si,
archive=self.subarchives[filename],
)
x.extra = si.extra
x.comment = si.comment
x.header_offset = si.header_offset
x.volume, x.internal_attr, x.external_attr = (
si.volume,
si.internal_attr,
si.external_attr,
)
x._raw_time = si._raw_time
x.date_time = si.date_time
x.create_version = si.create_version
x.create_system = si.create_system
x.extract_version = si.extract_version
x.reserved = si.reserved
x.flag_bits = si.flag_bits
x.compress_type = si.compress_type
x.CRC = si.CRC
x.compress_size = si.compress_size
x.file_size = si.file_size
info.filelist.append(x)
self.filelist.append(x)
self.NameToInfo[x.filename] = x
self.NameToInfoLower[x.filename.casefold()] = x
self.BaseNameToInfo.setdefault(
x.filename.split(".", maxsplit=1)[0].casefold(), []
).append(x)
def _RealGetContents(self):
"""Read in the table of contents for the ZIP file."""
fp = self.fp
try:
endrec = zipfile._EndRecData(fp)
except OSError:
raise zipfile.BadZipFile("File is not a valid p4k file")
if not endrec:
raise zipfile.BadZipFile("File is not a valid p4k file")
size_cd = endrec[zipfile._ECD_SIZE] # bytes in central directory
offset_cd = endrec[zipfile._ECD_OFFSET] # offset of central directory
self._comment = endrec[zipfile._ECD_COMMENT] # archive comment
# "concat" is zero, unless zip was concatenated to another file
concat = endrec[zipfile._ECD_LOCATION] - size_cd - offset_cd
if endrec[zipfile._ECD_SIGNATURE] == zipfile.stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= zipfile.sizeEndCentDir64 + zipfile.sizeEndCentDir64Locator
# self.start_dir: Position of start of central directory
self.start_dir = offset_cd + concat
fp.seek(self.start_dir, 0)
data = io.BytesIO(fp.read(size_cd))
while data.tell() < size_cd:
centdir = data.read(zipfile.sizeCentralDir)
if len(centdir) != zipfile.sizeCentralDir:
raise zipfile.BadZipFile("Truncated central directory")
centdir = struct.unpack(zipfile.structCentralDir, centdir)
if centdir[zipfile._CD_SIGNATURE] != zipfile.stringCentralDir:
raise zipfile.BadZipFile("Bad magic number for central directory")
filename = data.read(centdir[zipfile._CD_FILENAME_LENGTH])
flags = centdir[5]
if flags & 0x800:
# UTF-8 file names extension
filename = filename.decode("utf-8")
else:
# Historical ZIP filename encoding
filename = filename.decode("cp437")
if self._load_monitor is not None:
self._load_monitor(msg=filename, progress=data.tell(), total=size_cd)
# Create ZipInfo instance to store file information
x = P4KInfo(filename, p4k=self, archive=self)
x.extra = data.read(centdir[zipfile._CD_EXTRA_FIELD_LENGTH])
x.comment = data.read(centdir[zipfile._CD_COMMENT_LENGTH])
x.header_offset = centdir[zipfile._CD_LOCAL_HEADER_OFFSET]
(
x.create_version,
x.create_system,
x.extract_version,
x.reserved,
x.flag_bits,
x.compress_type,
t,
d,
x.CRC,
x.compress_size,
x.file_size,
) = centdir[1:12]
if x.extract_version > zipfile.MAX_EXTRACT_VERSION:
raise NotImplementedError("zip file version %.1f" % (x.extract_version / 10))
# x.volume, x.internal_attr, x.external_attr = centdir[15:18]
# Convert date/time code to (year, month, day, hour, min, sec)
x._raw_time = t
x.date_time = (
(d >> 9) + 1980,
(d >> 5) & 0xF,
d & 0x1F,
t >> 11,
(t >> 5) & 0x3F,
(t & 0x1F) * 2,
)
x._decodeExtra()
x.header_offset = x.header_offset + concat
self.filelist.append(x)
self.NameToInfo[x.filename] = x
self.NameToInfoLower[x.filename.casefold()] = x
self.BaseNameToInfo.setdefault(
x.filename.split(".", maxsplit=1)[0].casefold(), []
).append(x)
# Add sub-archives to be opened later (.pak/.sockpak,etc)
file_ext = x.filename.split(".", maxsplit=1)[-1]
if file_ext.casefold() in SUB_ARCHIVES:
if x.filename not in self.subarchives:
self.subarchives[x.filename] = None
[docs] def open(self, name, mode="r", pwd=None, *, force_zip64=False):
"""Return file-like object for 'name'.
name is a string for the file name within the ZIP file, or a ZipInfo
object.
mode should be 'r' to read a file already in the ZIP file, or 'w' to
write to a file newly added to the archive.
pwd is the password to decrypt files (only used for reading).
When writing, if the file size is not known in advance but may exceed
2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large
files. If the size is known in advance, it is best to pass a ZipInfo
instance for name, with zinfo.file_size set.
"""
mode = mode.strip("b")
if mode not in {"r", "w"}:
raise ValueError('open() requires mode "r" or "w"')
if pwd and not isinstance(pwd, bytes):
raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__)
if pwd and (mode == "w"):
raise ValueError("pwd is only supported for reading files")
if not self.fp:
raise ValueError("Attempt to use ZIP archive that was already closed")
# Make sure we have an info object
if isinstance(name, P4KInfo):
# 'name' is already an info object
zinfo = name
elif mode == "w":
zinfo = P4KInfo(name, p4k=self, archive=self)
zinfo.compress_type = self.compression
zinfo._compresslevel = self.compresslevel
else:
# Get info object for name
zinfo = self.getinfo(name)
# if file is in a sub-archive, let that archive handle it
if zinfo.subinfo is not None:
return zinfo.archive.open(zinfo.subinfo, mode=mode, pwd=pwd, force_zip64=force_zip64)
if mode == "w":
return self._open_to_write(zinfo, force_zip64=force_zip64)
if self._writing:
raise ValueError(
"Can't read from the ZIP file while there "
"is an open writing handle on it. "
"Close the writing handle before trying to read."
)
# Open for reading:
self._fileRefCnt += 1
zef_file = _SharedFile(
self.fp,
zinfo.header_offset,
self._fpclose,
self._lock,
lambda: self._writing,
)
try:
# Skip the file header:
fheader = zef_file.read(zipfile.sizeFileHeader)
if len(fheader) != zipfile.sizeFileHeader:
raise zipfile.BadZipFile("Truncated file header")
fheader = struct.unpack(zipfile.structFileHeader, fheader)
if (
fheader[zipfile._FH_SIGNATURE] != p4kFileHeader
and fheader[zipfile._FH_SIGNATURE] != zipfile.stringFileHeader
):
raise zipfile.BadZipFile("Bad magic number for file header")
fname = zef_file.read(fheader[zipfile._FH_FILENAME_LENGTH])
if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH])
if zinfo.flag_bits & 0x20:
# Zip 2.7: compressed patched data
raise NotImplementedError("compressed patched data (flag bit 5)")
if zinfo.flag_bits & 0x40:
# strong encryption
raise NotImplementedError("strong encryption (flag bit 6)")
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
fname_str = fname.decode("cp437")
if fname_str != zinfo.orig_filename:
raise zipfile.BadZipFile(
"File name in directory %r and header %r differ." % (zinfo.orig_filename, fname)
)
zd = None
if self.key and zinfo.is_encrypted:
zd = _P4KDecrypter(self.key)
return P4KExtFile(zef_file, mode, zinfo, zd, True)
except:
zef_file.close()
raise
[docs] def getinfo(self, name, case_insensitive=True):
name = norm_path(name)
try:
info = super().getinfo(name)
except KeyError:
if not case_insensitive:
raise
info = self.NameToInfoLower.get(name.casefold())
if info is None:
raise KeyError("There is no item named %r in the archive" % name)
return info
[docs] def search(
self,
file_filters: typing.Union[list, tuple, set, str],
exclude: typing.List[str] = None,
ignore_case: bool = True,
expand_subarchives: bool = False,
mode: str = "re",
) -> typing.List[P4KInfo]:
"""
Search the filelist by path
:param file_filters:
:param ignore_case: Match string case or not
:param expand_subarchives: Automatically expand sub-archives and search them as well.
:param exclude: List of filenames that should be excluded from the results.
This must be an exact match (although honors ignore_case).
:param mode: Method of performing a match. Valid values are:
`re`: File matching glob compiled into a regular expression - `re.match(filename)`
`startswith`: Uses the string `startswith` function - if any(filename.startswith(_) for _ in file_filters)
`endswith`: Uses the string `startswith` function - if any(filename.endswith(_) for _ in file_filters)
`in`: Performs and `in` check - filename in file_filters
`in_strip`: Performs an `in` check, but strips the file extension before performing the `in` check
:return:
"""
if not isinstance(file_filters, (list, tuple, set)):
file_filters = [file_filters]
# normalize path slashes from windows to posix
file_filters = [norm_path(_) for _ in file_filters]
exclude = exclude or []
if ignore_case:
file_filters = [_.casefold() for _ in file_filters]
exclude = [_.casefold() for _ in exclude]
def walk_items():
nonlocal ignore_case, expand_subarchives
i = 0
while i < len(self.filelist):
f = self.filelist[i]
if f.filename in self.subarchives and expand_subarchives:
self._expand_subarchive(f.filename)
if ignore_case:
yield f.filename.casefold(), f
else:
yield f.filename, f
i += 1
if mode == "re":
def in_exclude(f):
nonlocal ignore_case, exclude
return f.casefold() in exclude if ignore_case else f in exclude
r = re.compile(
"|".join(f"({fnmatch.translate(_)})" for _ in file_filters),
flags=re.IGNORECASE if ignore_case else 0,
)
return [info for fn, info in walk_items() if r.match(fn) and not in_exclude(fn)]
elif mode == "startswith":
return [
info
for fn, info in walk_items()
if any(fn.startswith(_) for _ in file_filters) and fn not in exclude
]
elif mode == "endswith":
return [
info
for fn, info in walk_items()
if any(fn.endswith(_) for _ in file_filters) and fn not in exclude
]
elif mode == "in":
return [
info
for fn, info in walk_items()
if any(_ in fn for _ in file_filters) and fn not in exclude
]
elif mode == "in_strip":
return [
info
for fn, info in walk_items()
if fn.split(".", maxsplit=1)[0] in file_filters and fn not in exclude
]
raise AttributeError(f"Invalid search mode: {mode}")
[docs] def save_to(self, member, *args, **kwargs):
"""Extract a member into `path`. This will no recreate the archive directory structure, it will place the
extracted file directly into `path` which must exist. Use `extract`"""
return self.extractall(members=[member], *args, **kwargs)
[docs] def close(self) -> None:
for archive in self.subarchives.values():
if archive is not None:
archive.close()
super().close()
def _extract_member(self, member, targetpath, pwd=None, save_to=False, overwrite=True):
"""Extract the ZipInfo object 'member' to a physical file on the path targetpath."""
if not isinstance(member, P4KInfo):
member = self.getinfo(member)
if save_to:
outpath = Path(targetpath) / Path(member.filename).name
else:
outpath = Path(targetpath) / Path(member.filename)
if outpath.is_file() and not overwrite:
return None
if save_to:
with self.open(member, pwd=pwd) as source, outpath.open("wb") as target:
shutil.copyfileobj(source, target)
else:
outpath = super()._extract_member(member, targetpath, pwd)
return outpath
class _ZipFileWithFlexibleFilenames(zipfile.ZipFile):
"""A regular ZipFile that doesn't enforce filenames perfectly matching the CD filename"""
def open(self, name, mode="r", pwd=None, *, force_zip64=False):
if mode not in {"r", "w"}:
raise ValueError('open() requires mode "r" or "w"')
if pwd and not isinstance(pwd, bytes):
raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__)
if pwd and (mode == "w"):
raise ValueError("pwd is only supported for reading files")
if not self.fp:
raise ValueError("Attempt to use ZIP archive that was already closed")
# Make sure we have an info object
if isinstance(name, zipfile.ZipInfo):
# 'name' is already an info object
zinfo = name
elif mode == "w":
zinfo = zipfile.ZipInfo(name)
zinfo.compress_type = self.compression
zinfo._compresslevel = self.compresslevel
else:
# Get info object for name
zinfo = self.getinfo(name)
if mode == "w":
return self._open_to_write(zinfo, force_zip64=force_zip64)
if self._writing:
raise ValueError(
"Can't read from the ZIP file while there "
"is an open writing handle on it. "
"Close the writing handle before trying to read."
)
# Open for reading:
self._fileRefCnt += 1
zef_file = _SharedFile(
self.fp,
zinfo.header_offset,
self._fpclose,
self._lock,
lambda: self._writing,
)
try:
# Skip the file header:
fheader = zef_file.read(zipfile.sizeFileHeader)
if len(fheader) != zipfile.sizeFileHeader:
raise zipfile.BadZipFile("Truncated file header")
fheader = struct.unpack(zipfile.structFileHeader, fheader)
if fheader[zipfile._FH_SIGNATURE] != zipfile.stringFileHeader:
raise zipfile.BadZipFile("Bad magic number for file header")
fname = zef_file.read(fheader[zipfile._FH_FILENAME_LENGTH])
if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH])
if zinfo.flag_bits & 0x20:
# Zip 2.7: compressed patched data
raise NotImplementedError("compressed patched data (flag bit 5)")
if zinfo.flag_bits & 0x40:
# strong encryption
raise NotImplementedError("strong encryption (flag bit 6)")
# check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
raise RuntimeError(
"File %r is encrypted, password " "required for extraction" % name
)
else:
pwd = None
return zipfile.ZipExtFile(zef_file, mode, zinfo, pwd, True)
except:
zef_file.close()
raise
[docs]class SOCPak(_ZipFileWithFlexibleFilenames):
pass
[docs]class Pak(_ZipFileWithFlexibleFilenames):
pass
# Down here so the can reference local classes
SUB_ARCHIVES = {"pak": Pak, "socpak": SOCPak}