Source code for scdatatools.forge

__all__ = ["DataCoreBinary"]

import ctypes
import fnmatch
import json
import mmap
import sys
import typing
from pathlib import Path

from scdatatools.engine.cryxml.utils import pprint_xml_tree
from scdatatools.forge import dftypes
from scdatatools.forge.dftypes.enums import DataTypes
from scdatatools.forge.utils import read_and_seek
from scdatatools.utils import dict_to_etree
from .dftypes import Record


# from benedict import benedict


class DataCoreBinaryMMap(mmap.mmap):
    def __new__(cls, filename_or_file, *args, **kwargs):
        if hasattr(filename_or_file, "fileno"):
            _ = filename_or_file
        else:
            _ = open(filename_or_file, "rb+")
        instance = super().__new__(cls, fileno=_.fileno(), length=0, *args, **kwargs)
        instance.file = _
        return instance

    def close(self, *args, **kwargs):
        try:
            super().close(*args, **kwargs)
        finally:
            self.file.close()

    def seek(self, *args, **kwargs):
        # make this work like normal seek() where you get the offset after the seek
        super().seek(*args, **kwargs)
        return self.tell()


[docs]class DataCoreBinary: def __init__(self, filename_or_data: typing.Union[str, Path, bytes, bytearray]): if isinstance(filename_or_data, bytes): self.raw_data = bytearray(filename_or_data) elif isinstance(filename_or_data, bytearray): self.raw_data = filename_or_data elif isinstance(filename_or_data, str): filename = Path(filename_or_data) if not filename.is_file(): raise ValueError(f"Expected bytes or filename, not: {filename_or_data}") self.raw_data = bytearray(filename.open("rb").read()) else: self.raw_data = bytearray(filename_or_data.read()) self.raw_data = memoryview(self.raw_data) # used to track position while reading the header offset = 0 def _read_and_seek(data_type): nonlocal offset, self r = data_type.from_buffer(self.raw_data, offset) setattr(r, "_dcb", self) offset += ctypes.sizeof(r) return r self.header = _read_and_seek(dftypes.DataCoreHeader) self.structure_definitions = _read_and_seek( dftypes.StructureDefinition * self.header.structure_definition_count ) self.property_definitions = _read_and_seek( dftypes.PropertyDefinition * self.header.property_definition_count ) self.enum_definitions = _read_and_seek( dftypes.EnumDefinition * self.header.enum_definition_count ) if self.header.version >= 5: self.data_mapping_definitions = _read_and_seek( dftypes.DataMappingDefinition32 * self.header.data_mapping_definition_count ) else: self.data_mapping_definitions = _read_and_seek( dftypes.DataMappingDefinition16 * self.header.data_mapping_definition_count ) self.records = _read_and_seek(dftypes.Record * self.header.record_definition_count) self.values = { DataTypes.Int8: _read_and_seek(ctypes.c_int8 * self.header.int8_count), DataTypes.Int16: _read_and_seek(ctypes.c_int16 * self.header.int16_count), DataTypes.Int32: _read_and_seek(ctypes.c_int32 * self.header.int32_count), DataTypes.Int64: _read_and_seek(ctypes.c_int64 * self.header.int64_count), DataTypes.UInt8: _read_and_seek(ctypes.c_uint8 * self.header.uint8_count), DataTypes.UInt16: _read_and_seek(ctypes.c_uint16 * self.header.uint16_count), DataTypes.UInt32: _read_and_seek(ctypes.c_uint32 * self.header.uint32_count), DataTypes.UInt64: _read_and_seek(ctypes.c_uint64 * self.header.uint64_count), DataTypes.Boolean: _read_and_seek(ctypes.c_bool * self.header.boolean_count), DataTypes.Float: _read_and_seek(ctypes.c_float * self.header.float_count), DataTypes.Double: _read_and_seek(ctypes.c_double * self.header.double_count), DataTypes.GUID: _read_and_seek(dftypes.GUID * self.header.guid_count), DataTypes.StringRef: _read_and_seek(dftypes.StringReference * self.header.string_count), DataTypes.Locale: _read_and_seek(dftypes.LocaleReference * self.header.locale_count), DataTypes.EnumChoice: _read_and_seek(dftypes.EnumChoice * self.header.enum_count), DataTypes.StrongPointer: _read_and_seek( dftypes.StrongPointer * self.header.strong_value_count ), DataTypes.WeakPointer: _read_and_seek( dftypes.WeakPointer * self.header.weak_value_count ), DataTypes.Reference: _read_and_seek(dftypes.Reference * self.header.reference_count), DataTypes.EnumValueName: _read_and_seek( dftypes.StringReference * self.header.enum_option_name_count ), } self.text_offset = offset offset += self.header.text_length self.structure_instances = {} self.structure_instances_by_offset = {} for mapping in self.data_mapping_definitions: struct_def = self.structure_definitions[mapping.structure_index] struct_size = struct_def.calculated_data_size for i in range(mapping.structure_count): self.structure_instances.setdefault(mapping.structure_index, []).append(offset) offset += struct_size assert offset == len(self.raw_data) self._string_cache = {} self.records_by_guid = {} self.record_types = set() self.entities: dict[str, Record] = {} for r in self.records: if r.type == "EntityClassDefinition": self.entities[r.name] = r self.records_by_guid[r.id.value] = r self.record_types.add(r.type) # self._records_by_path = benedict(keypath_separator='/')
[docs] def get_structure_instance_from_offset(self, structure_index, offset): if offset not in self.structure_instances_by_offset.setdefault(structure_index, {}): struct_def = self.structure_definitions[structure_index] self.structure_instances_by_offset[structure_index][offset] = dftypes.StructureInstance( self, offset, struct_def ) return self.structure_instances_by_offset[structure_index][offset]
[docs] def get_structure_instance(self, structure_index, instance): if not isinstance( self.structure_instances[structure_index][instance], dftypes.StructureInstance, ): offset = self.structure_instances[structure_index][instance] self.structure_instances[structure_index][ instance ] = self.get_structure_instance_from_offset(structure_index, offset) # self.structure_instances[structure_index][ # instance # ] = dftypes.StructureInstance( # self, # self.raw_data[offset: offset + size], # self.structure_definitions[structure_index], # ) return self.structure_instances[structure_index][instance]
[docs] def string_for_offset(self, offset: int, encoding="UTF-8") -> str: if offset not in self._string_cache: try: if offset >= self.header.text_length: raise IndexError(f'Text offset "{offset}" is out of range') end = self.raw_data.obj.index( 0x00, self.text_offset + offset, self.text_offset + self.header.text_length, ) self._string_cache[offset] = bytes( self.raw_data[self.text_offset + offset : end] ).decode(encoding) except ValueError: sys.stderr.write(f"Invalid string offset: {offset}") return "" return self._string_cache[offset]
[docs] def record_to_dict(self, record, depth=100): d = {} refd = set() def _add_props(base, r, cur_depth): rid = "" if hasattr(r, "id"): base["__id"] = r.id.value rid = r.id.value if hasattr(r, "filename"): base["__path"] = r.filename if getattr(r, "structure_definition", None) is not None: if r.structure_definition.parent is not None: base["__type"] = r.structure_definition.parent.name base["__polymorphicType"] = r.structure_definition.name else: base["__type"] = r.structure_definition.name if hasattr(r, "instance_index"): rid = f"{r.name}:{r.instance_index}" if rid: refd.add(rid) for name, prop in r.properties.items(): if isinstance(prop, dftypes.Reference) and prop.value.value in self.records_by_guid: prop = self.records_by_guid[prop.value.value] def _handle_prop(p, pname=""): if isinstance( p, ( dftypes.StructureInstance, dftypes.ClassReference, dftypes.Record, dftypes.StrongPointer, ), ): b = {} pid = "" if hasattr(p, "id"): pid = p.id.value elif hasattr(p, "instance_index"): pid = f"{p.name}:{p.instance_index}" if cur_depth > 0: # NextState/parent tends to lead to infinite loops if pname.lower() in ["nextstate", "parent"] or (pid and pid in refd): nextdepth = 0 else: nextdepth = cur_depth - 1 _add_props(b, p, nextdepth) else: if hasattr(b, "properties"): b = [str(_) for _ in prop.properties] else: b = [str(_) for _ in prop] if isinstance(prop, list) else str(prop) return b else: return getattr(p, "value", p) if isinstance(prop, list): base[name] = [ {p.name: _handle_prop(p, p.name)} if hasattr(p, "name") else _handle_prop(p) for p in prop ] else: base[name] = _handle_prop(prop, name) _add_props(d, record, depth) return d
[docs] def record_to_etree(self, record, depth=100): return dict_to_etree({f"{record.type}.{record.name}": self.record_to_dict(record, depth)})
[docs] def dump_record_xml(self, record, indent=2, *args, **kwargs): return pprint_xml_tree(self.record_to_etree(record), indent)
[docs] def dump_record_json(self, record, indent=2, *args, **kwargs): return json.dumps( self.record_to_dict(record, *args, **kwargs), indent=indent, default=str, sort_keys=True, )
[docs] def search_filename( self, file_filter, ignore_case=True, mode="fnmatch" ) -> typing.List[dftypes.Record]: """ Search the datacore for objects by filename. :param file_filter: :param ignore_case: :param mode: Method of performing a match. Valid values are: `fnmatch`: Compiles `file_filters` into a regular expression - `re.match(filename)` `startswith`: Uses the string `startswith` function - if any(filename.startswith(_) for _ in file_filters) `endswith`: Uses the string `startswith` function - if any(filename.endswith(_) for _ in file_filters) `in`: Performs and `in` check - filename in file_filters :return: List of :class:`Record` objects that matched the filter """ file_filter = "/".join( file_filter.split("\\") ) # normalize path slashes from windows to posix if ignore_case: file_filter = file_filter.lower() if mode == "fnmatch": if ignore_case: return [_ for _ in self.records if fnmatch.fnmatch(_.filename.lower(), file_filter)] return [_ for _ in self.records if fnmatch.fnmatchcase(_.filename, file_filter)] elif mode == "startswith": if ignore_case: return [_ for _ in self.records if _.filename.lower().startswith(file_filter)] else: return [_ for _ in self.records if _.filename.startswith(file_filter)] elif mode == "endswith": if ignore_case: return [_ for _ in self.records if _.filename.lower().endswith(file_filter)] else: return [_ for _ in self.records if _.filename.endswith(file_filter)] elif mode == "in": if ignore_case: return [_ for _ in self.records if file_filter in _.filename.lower()] else: return [_ for _ in self.records if file_filter in _.filename] raise AttributeError(f"Invalid search mode: {mode}")
# @property # def records_by_path(self): # if not self._records_by_path: # for r in self.records: # path = r.filename # if path in self._records_by_path: # if not isinstance(self._records_by_path[path], list): # self._records_by_path[path] = [self._records_by_path[path]] # self._records_by_path[path].append(r) # else: # self._records_by_path[path] = r # return self._records_by_path