Source code for scdatatools.cli.commands.forge

import logging
import sys
import typing
from collections import OrderedDict
from itertools import chain
from pathlib import Path

from nubia import command, argument
from nubia.internal.typing import _ArgDecoratorSpec, _init_attr
from rich import get_console, table

from scdatatools.forge import DataCoreBinary
from . import common
from ..utils import track

if typing.TYPE_CHECKING:
    pass


logger = logging.getLogger(__name__)


def _dump_record(dcb, record, output, guid, guid_if_exists, xml):
    if output == "-":
        if xml:
            sys.stdout.write(dcb.dump_record_xml(record) + "\n")
        else:
            sys.stdout.write(dcb.dump_record_json(record) + "\n")
    else:
        if output.is_dir():
            output = output / Path(record.filename)
        output.parent.mkdir(parents=True, exist_ok=True)
        suffix = ".xml" if xml else ".json"
        if guid or (guid_if_exists and output.is_file()):
            output = output.parent / f"{output.stem}.{record.id.value}{suffix}"
        else:
            output = output.parent / f"{output.stem}{suffix}"
        logger.info(str(output))
        try:
            with open(output, "w") as target:
                if xml:
                    target.write(dcb.dump_record_xml(record))
                else:
                    target.write(dcb.dump_record_json(record))
        except ValueError as e:
            print(f"ERROR: Error processing {record.filename}: {e}")


[docs]def forge_common_args(function): _init_attr(function, "__annotations__", OrderedDict()) _init_attr(function, "__arguments_decorator_specs", {}) function.__annotations__.setdefault("forge_file", str) function.__arguments_decorator_specs["forge_file"] = _ArgDecoratorSpec( arg="forge_file", name="forge_file", aliases=[], positional=True, choices=[], description="DataForge (.dcb) file to extract data from. (or Data.p4k or Star Citizen installation directory)", ) function.__annotations__.setdefault("filter", typing.List[str]) function.__arguments_decorator_specs["filter"] = _ArgDecoratorSpec( arg="filter", name="filter", choices=[], aliases=["-f"], positional=False, description="Posix style file filter of which files to extract", ) return function
[docs]def parse_forge_args(forge_file, filter): forge_file = Path(forge_file).expanduser() filters = [_.strip("'").strip('"') for _ in filter] if forge_file.suffix.casefold() == ".p4k" or forge_file.is_dir(): logger.info(f"Opening DataCore from {forge_file}") dcb = common.open_sc_dir(forge_file).datacore else: logger.info(f"Opening DataForge file: {forge_file}") dcb = DataCoreBinary(str(forge_file)) # using the dict like this ends up removing duplicated but keeping the order of insertion records = dict(chain((str(r.id), r) for f in filters for r in dcb.search_filename(f))) return dcb, filters, records
[docs]@command class forge: """Tools for interacting with the Dataforge database"""
[docs] @command @forge_common_args def ls(self, forge_file: str, filter: typing.List[str] = ("*",)): """List records in the Data Core""" dcb, filters, records = parse_forge_args(forge_file, filter) logger.info(f"{filters = }") # TODO: add more ls type formatting flags t = table.Table(box=None) t.add_column("GUID") t.add_column("filename") for r in records.values(): t.add_row(str(r.id), r.filename) get_console().print(t)
[docs] @command(exclusive_arguments=("xml", "json"), aliases=["x"]) @forge_common_args @argument("single", description="Extract first matching file only", aliases=["-1"]) @argument( "guid", aliases=["-g"], description="Include the GUID in the filename (avoids overwriting from records with the same 'filename') " "(Default: False)", ) @argument( "guid_if_exists", aliases=["-G"], description="Include the GUID in the filename only if the output file already exists. (Default: True)", ) @argument("xml", aliases=["-x"], description="Convert to XML (Default)") @argument("json", aliases=["-j"], description="Convert to JSON") @argument( "output", description="The output directory to extract files into or the output path if --single. " "Defaults to current directory. Use '-' to output a single file to the stdout", aliases=["-o"], ) def extract( self, forge_file: typing.Text, filter: typing.List[str] = ("*",), output: typing.Text = ".", guid: bool = False, guid_if_exists: bool = True, xml: bool = True, json: bool = False, single: bool = False, ): """Extracts DataCore records and converts them to a given format (xml/json). Use the `--filter` argument to down-select which records to extract, by default it will extract all of them to the `--output` directory.""" dcb, filters, records = parse_forge_args(forge_file, filter) output = Path(output).absolute() if output != "-" else output if single: print(f"Extracting first match for filters '{','.join(filters)}' to {output}") print("=" * 120) if not records: sys.stderr.write(f"No files found for filter") sys.exit(2) _dump_record(dcb, next(iter(records.values())), output, guid, guid_if_exists, not json) else: print(f"Extracting files into {output} with filter '{filters}'") print("=" * 120) try: if output == "-": # don't output the progress bar if we're dumping to the console for record in records.values(): _dump_record(dcb, record, output, guid, guid_if_exists, not json) else: output = Path(output) output.mkdir(parents=True, exist_ok=True) for record in track( records.values(), description="Extracting records", unit="records" ): _dump_record(dcb, record, output, guid, guid_if_exists, not json) except KeyboardInterrupt: pass