diff --git a/app/common/script_manager.py b/app/common/script_manager.py index e99fdc8..99d2a1f 100644 --- a/app/common/script_manager.py +++ b/app/common/script_manager.py @@ -1,5 +1,6 @@ -from PySide6.QtCore import QThread, QProcess, Signal, QObject import os +from PySide6.QtCore import QProcess, Signal, QObject + class ScriptManager(QObject): def __init__(self, signalBus): diff --git a/modules/create_backup.py b/modules/create_backup.py index 8ef0d08..951ded3 100644 --- a/modules/create_backup.py +++ b/modules/create_backup.py @@ -1,11 +1,9 @@ -import os - class CreateBackup: def __init__(self, config, file_manager): - """Initializes the Bounty module. + """Initializes the CreateBackup module. Args: - config (Config): BAAuto Config instance + config (Config): KKAFIO Config instance """ self.config = config self.file_manager = file_manager @@ -15,8 +13,8 @@ class CreateBackup: self.filename = self.config.create_backup["Filename"] self.output_path = self.config.create_backup["OutputPath"] - def logic_wrapper(self): - output_path = os.path.join(self.output_path, self.filename) + def run(self): + output_path = self.output_path / self.filename self.file_manager.create_archive(self.folders, output_path) diff --git a/modules/fc_kks.py b/modules/fc_kks.py index ab2efeb..6a8f3b7 100644 --- a/modules/fc_kks.py +++ b/modules/fc_kks.py @@ -1,43 +1,43 @@ -import os -import re as regex -import codecs import shutil +from pathlib import Path from util.logger import logger + class FilterConvertKKS: def __init__(self, config, file_manager): - """Initializes the Bounty module. + """Initializes the FilterConvertKKS module. Args: - config (Config): BAAuto Config instance + config (Config): KKAFIO Config instance """ self.config = config self.file_manager = file_manager self.convert = self.config.fc_kks["Convert"] def get_list(self, folder_path): - new_list = [] - for root, dirs, files in os.walk(folder_path): - for filename in files: - if regex.match(r".*(\.png)$", filename): - new_list.append(os.path.join(root, filename)) - return new_list + """Get list of PNG files in the folder.""" + folder = Path(folder_path) + return [str(file) for file in folder.rglob("*.png")] def check_png(self, card_path): - with codecs.open(card_path, "rb") as card: + """Check the PNG file and return its type.""" + card_path = Path(card_path) + with card_path.open("rb") as card: data = card.read() card_type = 0 - if data.find(b"KoiKatuChara") != -1: + if b"KoiKatuChara" in data: card_type = 1 - if data.find(b"KoiKatuCharaSP") != -1: + if b"KoiKatuCharaSP" in data: card_type = 2 - elif data.find(b"KoiKatuCharaSun") != -1: + elif b"KoiKatuCharaSun" in data: card_type = 3 logger.info(f"[{card_type}]", f"{card_path}") return card_type def convert_kk(self, card_name, card_path, destination_path): - with codecs.open(card_path, mode="rb") as card: + """Convert KKS card to KK.""" + card_path = Path(card_path) # Convert to Path object + with card_path.open(mode="rb") as card: data = card.read() replace_list = [ @@ -46,20 +46,20 @@ class FilterConvertKKS: [b"version\xa50.0.6\xa3sex", b"version\xa50.0.5\xa3sex"], ] - for text in replace_list: - data = data.replace(text[0], text[1]) + for old_text, new_text in replace_list: + data = data.replace(old_text, new_text) - new_file_path = os.path.normpath(os.path.join(destination_path, f"KKS2KK_{card_name}")) - # print(f"new_file_path {new_file_path}") + new_file_path = Path(destination_path) / f"KKS2KK_{card_name}" - with codecs.open(new_file_path, "wb") as new_card: + with new_file_path.open("wb") as new_card: new_card.write(data) - def logic_wrapper(self): - path = self.config.fc_kks["InputPath"] + def run(self): + """Main logic for processing the KKS to KK conversion.""" + path = Path(self.config.fc_kks["InputPath"]) kks_card_list = [] - kks_folder = "_KKS_card_" - kks_folder2 = "_KKS_to_KK_" + kks_folder = path / "_KKS_card_" + kks_folder2 = path / "_KKS_to_KK_" png_list = self.get_list(path) @@ -70,38 +70,34 @@ class FilterConvertKKS: if self.check_png(png) == 3: kks_card_list.append(png) else: - logger.success("SCRIPT", f"no PNG found") + logger.success("SCRIPT", "No PNG files found") return count = len(kks_card_list) if count > 0: print(kks_card_list) - target_folder = os.path.normpath(os.path.join(path, kks_folder)) - target_folder2 = os.path.normpath(os.path.join(path, kks_folder2)) - if not os.path.isdir(target_folder): - os.mkdir(target_folder) + # Create target directories if they don't exist + kks_folder.mkdir(exist_ok=True) if self.convert: logger.info("SCRIPT", f"Conversion to KK is [{self.convert}]") - if not os.path.isdir(target_folder2): - os.mkdir(target_folder2) + kks_folder2.mkdir(exist_ok=True) for card_path in kks_card_list: - source = card_path - card = os.path.basename(card_path) - target = os.path.normpath(os.path.join(target_folder, card)) + source = Path(card_path) + target = kks_folder / source.name - # copy & convert before move + # Copy & convert before moving if self.convert: - self.convert_kk(card, source, target_folder2) + self.convert_kk(source.name, source, kks_folder2) - # move file - shutil.move(source, target) + # Move file + shutil.move(str(source), str(target)) if self.convert: - logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder, converted and save to [{kks_folder2}] folder") + logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder, converted and saved to [{kks_folder2}] folder") else: logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder") else: - logger.success("SCRIPT", f"no KKS card found") + logger.success("SCRIPT: No KKS cards found") diff --git a/modules/install_chara.py b/modules/install_chara.py index df3ecce..1ed679d 100644 --- a/modules/install_chara.py +++ b/modules/install_chara.py @@ -1,40 +1,43 @@ -import os +from pathlib import Path import codecs from util.logger import logger + class InstallChara: def __init__(self, config, file_manager): - """Initializes the Bounty module. + """Initializes the InstallChara module. Args: - config (Config): BAAuto Config instance + config (Config): KKAFIO Config instance """ self.config = config self.file_manager = file_manager self.game_path = self.config.game_path - self.input_path = self.config.install_chara["InputPath"] + self.input_path = Path(self.config.install_chara["InputPath"]) # Using Path for input path def resolve_png(self, image_path): with codecs.open(image_path[0], "rb") as card: data = card.read() - if data.find(b"KoiKatuChara") != -1: - if data.find(b"KoiKatuCharaSP") != -1 or data.find(b"KoiKatuCharaSun") != -1: - basename = os.path.basename(image_path[0]) + if b"KoiKatuChara" in data: + if b"KoiKatuCharaSP" in data or b"KoiKatuCharaSun" in data: + basename = Path(image_path[0]).name # Use Path's .name to get the basename logger.error("CHARA", f"{basename} is a KKS card") return self.file_manager.copy_and_paste("CHARA", image_path, self.game_path["chara"]) - elif data.find(b"KoiKatuClothes") != -1: - self.file_manager.copy_and_paste("COORD",image_path, self.game_path["coordinate"]) + elif b"KoiKatuClothes" in data: + self.file_manager.copy_and_paste("COORD", image_path, self.game_path["coordinate"]) else: self.file_manager.copy_and_paste("OVERLAYS", image_path, self.game_path["Overlays"]) - def logic_wrapper(self, folder_path=None): + def run(self, folder_path=None): if folder_path is None: folder_path = self.input_path - foldername = os.path.basename(folder_path) + folder_path = Path(folder_path) + foldername = folder_path.name logger.line() logger.info("FOLDER", foldername) - file_list, compressed_file_list = self.file_manager.find_all_files(folder_path) + + file_list, archive_list = self.file_manager.find_all_files(folder_path) for file in file_list: file_extension = file[2] @@ -44,14 +47,11 @@ class InstallChara: case ".png": self.resolve_png(file) case _: - basename = os.path.basename(file[0]) - logger.error("UKNOWN", f"Cannot classify {basename}") + basename = Path(file[0]).name + logger.error("UNKNOWN", f"Cannot classify {basename}") logger.line() - for compressed in compressed_file_list: - extract_path = self.file_manager.extract_archive(compressed[0]) + for archive in archive_list: + extract_path = self.file_manager.extract_archive(archive[0]) if extract_path is not None: - self.logic_wrapper(extract_path) - - - \ No newline at end of file + self.run(extract_path) diff --git a/modules/remove_chara.py b/modules/remove_chara.py index ba0d801..5cb15a7 100644 --- a/modules/remove_chara.py +++ b/modules/remove_chara.py @@ -1,14 +1,13 @@ -import os import codecs from util.logger import logger class RemoveChara: def __init__(self, config, file_manager): - """Initializes the Bounty module. + """Initializes the RemoveChara module. Args: - config (Config): BAAuto Config instance + config (Config): KKAFIO Config instance """ self.config = config self.file_manager = file_manager @@ -18,23 +17,24 @@ class RemoveChara: def resolve_png(self, image_path): with codecs.open(image_path[0], "rb") as card: data = card.read() - if data.find(b"KoiKatuChara") != -1: - if data.find(b"KoiKatuCharaSP") != -1 or data.find(b"KoiKatuCharaSun") != -1: + if b"KoiKatuChara" in data: + if b"KoiKatuCharaSP" in data or b"KoiKatuCharaSun" in data: return self.file_manager.find_and_remove("CHARA", image_path, self.game_path["chara"]) - elif data.find(b"KoiKatuClothes") != -1: - self.file_manager.find_and_remove("COORD",image_path, self.game_path["coordinate"]) + elif b"KoiKatuClothes" in data: + self.file_manager.find_and_remove("COORD", image_path, self.game_path["coordinate"]) else: self.file_manager.find_and_remove("OVERLAYS", image_path, self.game_path["Overlays"]) - def logic_wrapper(self): - foldername = os.path.basename(self.input_path) + def run(self): + foldername = self.input_path.name logger.info("FOLDER", foldername) + file_list, archive_list = self.file_manager.find_all_files(self.input_path) for file in file_list: - file_extension = file[2] - match file_extension: + extension = file[2] + match extension: case ".zipmod": self.file_manager.find_and_remove("MODS", file, self.game_path["mods"]) case ".png": @@ -42,7 +42,3 @@ class RemoveChara: case _: pass logger.line() - - - - \ No newline at end of file diff --git a/script.py b/script.py index 3ae88df..3fd6cba 100644 --- a/script.py +++ b/script.py @@ -21,39 +21,35 @@ try: Args: config (Config): BAAuto Config instance """ - logger.logger_signal = None self.config = config self.file_manager = file_manager - self.modules = { - 'InstallChara': None, - 'RemoveChara': None, - 'CreateBackup': None, - 'FilterConvertKKS': None, + self.task_to_module = { + 'CreateBackup': CreateBackup, + 'FilterConvertKKS': FilterConvertKKS, + 'InstallChara': InstallChara, + 'RemoveChara': RemoveChara, } - if self.config.install_chara['Enable']: - self.modules['InstallChara'] = InstallChara(self.config, self.file_manager) - if self.config.remove_chara['Enable']: - self.modules['RemoveChara'] = RemoveChara(self.config, self.file_manager) - if self.config.create_backup['Enable']: - self.modules['CreateBackup'] = CreateBackup(self.config, self.file_manager) - if self.config.fc_kks["Enable"]: - self.modules['FilterConvertKKS'] = FilterConvertKKS(self.config, self.file_manager) def run(self): - for task in self.config.tasks: - if self.modules[task]: - logger.info("SCRIPT", f'Start Task: {task}') - try: - self.modules[task].logic_wrapper() - except: - logger.error("SCRIPT", f'Task error: {task}. For more info, check the traceback.log file.') - with open('traceback.log', 'a') as f: - f.write(f'[{task}]\n') - traceback.print_exc(None, f, True) - f.write('\n') - sys.exit(1) + for task, module in self.task_to_module.items(): + if not self.config.config_data[task]["Enable"]: + continue + + logger.info("SCRIPT", f'Start Task: {task}') + try: + module(self.config, self.file_manager).run() + except: + logger.error("SCRIPT", f'Task error: {task}. For more info, check the traceback.log file.') + self.write_traceback(task) + sys.exit(1) sys.exit(0) - + + def write_traceback(self, task): + with open('traceback.log', 'a') as f: + f.write(f'[{task}]\n') + traceback.print_exc(None, f, True) + f.write('\n') + except: print(f'[ERROR] Script Initialisation Error. For more info, check the traceback.log file.') with open('traceback.log', 'w') as f: diff --git a/util/config.py b/util/config.py index f70082b..96e894d 100644 --- a/util/config.py +++ b/util/config.py @@ -1,8 +1,9 @@ import sys import json -import os +from pathlib import Path from util.logger import logger + class Config: def __init__(self, config_file): logger.info("SCRIPT", "Initializing config module") @@ -10,15 +11,12 @@ class Config: self.ok = False self.initialized = False self.config_data = None - self.changed = False self.read() def read(self): - backup_config = self._deepcopy_dict(self.__dict__) - try: - with open(self.config_file, 'r') as json_file: - self.config_data = json.load(json_file) + with open(self.config_file, 'r') as f: + self.config_data = json.load(f) except FileNotFoundError: logger.error("SCRIPT", f"Config file '{self.config_file}' not found.") sys.exit(1) @@ -31,60 +29,54 @@ class Config: if self.ok and not self.initialized: logger.info("SCRIPT", "Starting KKAFIO!") self.initialized = True - self.changed = True elif not self.ok and not self.initialized: logger.error("SCRIPT", "Invalid config. Please check your config file.") sys.exit(1) - elif not self.ok and self.initialized: - logger.warning("SCRIPT", "Config change detected, but with problems. Rolling back config.") - self._rollback_config(backup_config) - elif self.ok and self.initialized: - if backup_config != self.__dict__: - logger.warning("SCRIPT", "Config change detected. Hot-reloading.") - self.changed = True def validate(self): logger.info("SCRIPT", "Validating config") self.ok = True - self.tasks = ["CreateBackup", "FilterConvertKKS", "InstallChara", "RemoveChara"] - self.create_gamepath() + self.validate_gamepath() + self.validate_tasks() - for task in self.tasks: - if self.config_data[task]["Enable"]: - if "InputPath" in self.config_data[task]: - path = self.config_data[task]["InputPath"] - elif "OutputPath" in self.config_data[task]: - path = self.config_data[task]["OutputPath"] - if not os.path.exists(path): - logger.error("SCRIPT", f"Path invalid for task {task}") - raise Exception() - - self.install_chara = self.config_data.get("InstallChara", {}) - self.create_backup = self.config_data.get("CreateBackup", {}) - self.remove_chara = self.config_data.get("RemoveChara", {}) - self.fc_kks = self.config_data.get("FilterConvertKKS", {}) - - def create_gamepath(self): - base = self.config_data["Core"]["GamePath"] + def validate_gamepath(self): + base = Path(self.config_data["Core"]["GamePath"]) self.game_path = { "base": base, - "UserData": os.path.join(base, "UserData"), - "BepInEx": os.path.join(base, "BepInEx"), - "mods": os.path.join(base, "mods"), - "chara": os.path.join(base, "UserData\\chara\\female"), - "coordinate": os.path.join(base, "UserData\\coordinate"), - "Overlays": os.path.join(base, "UserData\\Overlays") - } + "UserData": base / "UserData", + "BepInEx": base / "BepInEx", + "mods": base / "mods", + "chara": base / "UserData" / "chara" / "female", + "coordinate": base / "UserData" / "coordinate", + "Overlays": base / "UserData" / "Overlays" + } for path in self.game_path.values(): - if not os.path.exists(path): + if not path.exists(): logger.error("SCRIPT", "Game path not valid") - raise Exception() - - def _deepcopy_dict(self, dictionary): - from copy import deepcopy - return deepcopy(dictionary) + raise Exception(f"Game path not valid: {path}") - def _rollback_config(self, config): - for key, value in config.items(): - setattr(self, key, value) + def validate_tasks(self): + tasks = ["CreateBackup", "FilterConvertKKS", "InstallChara", "RemoveChara"] + + for task in tasks: + task_config = self.config_data[task] + if not task_config["Enable"]: + continue + + if "InputPath" in task_config: + path_obj = Path(task_config["InputPath"]) + task_config["InputPath"] = path_obj + + elif "OutputPath" in task_config: + path_obj = Path(task_config["OutputPath"]) + task_config["OutputPath"] = path_obj + + if not path_obj.exists(): + logger.error("SCRIPT", f"Path invalid for task {task}") + raise Exception() + + self.create_backup = self.config_data["CreateBackup"] + self.fc_kks = self.config_data["FilterConvertKKS"] + self.install_chara = self.config_data["InstallChara"] + self.remove_chara = self.config_data["RemoveChara"] diff --git a/util/file_manager.py b/util/file_manager.py index 77f661b..422d260 100644 --- a/util/file_manager.py +++ b/util/file_manager.py @@ -1,44 +1,47 @@ -import os import shutil import datetime import patoolib import customtkinter import subprocess import time +from pathlib import Path from util.logger import logger + class FileManager: def __init__(self, config): self.config = config def find_all_files(self, directory): + """Find all files in the given directory.""" + directory = Path(directory) file_list = [] - compressed_file_list = [] - compressed_extensions = [".rar", ".zip", ".7z"] + archive_list = [] + archive_extensions = {".rar", ".zip", ".7z"} - for root, dirs, files in os.walk(directory): - for file in files: + for file_path in directory.rglob('*'): + file_size = file_path.stat().st_size + file_extension = file_path.suffix - file_path = os.path.join(root, file) - file_size = os.path.getsize(file_path) - _, file_extension = os.path.splitext(file_path) - - if file_extension in compressed_extensions: - compressed_file_list.append((file_path, file_size, file_extension)) - else: - file_list.append((file_path, file_size, file_extension)) + if file_extension in archive_extensions: + archive_list.append((file_path, file_size, file_extension)) + else: + file_list.append((file_path, file_size, file_extension)) file_list.sort(key=lambda x: x[1]) - compressed_file_list.sort(key=lambda x: x[1]) - return file_list, compressed_file_list + archive_list.sort(key=lambda x: x[1]) + return file_list, archive_list def copy_and_paste(self, type, source_path, destination_folder): - source_path = source_path[0] - base_name = os.path.basename(source_path) - destination_path = os.path.join(destination_folder, base_name) + """Copy file from source to destination, handling file conflicts.""" + source_path = Path(source_path[0]) + destination_folder = Path(destination_folder) + + base_name = source_path.name + destination_path = destination_folder / base_name conflicts = self.config.install_chara["FileConflicts"] - already_exists = os.path.exists(destination_path) + already_exists = destination_path.exists() if already_exists and conflicts == "Skip": logger.skipped(type, base_name) @@ -51,26 +54,19 @@ class FileManager: max_retries = 3 for attempt in range(max_retries): try: - filename, file_extension = os.path.splitext(source_path) - new_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f') - new_source_path = f"{filename}_{new_name}{file_extension}" - os.rename(source_path, new_source_path) - source_path = new_source_path logger.renamed(type, base_name) - - filename, file_extension = os.path.splitext(destination_path) - destination_path = f"{filename}_{new_name}{file_extension}" - break # Exit the loop if renaming is successful + new_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f') + destination_path = destination_path.with_stem(f"{destination_path.stem}_{new_name}") + break except PermissionError: if attempt < max_retries - 1: - time.sleep(1) # Wait for 1 second before retrying + time.sleep(1) else: logger.error(type, f"Failed to rename {base_name} after {max_retries} attempts.") return try: shutil.copy(source_path, destination_path) - print(f"File copied successfully from {source_path} to {destination_path}") if not already_exists: logger.success(type, base_name) except FileNotFoundError: @@ -80,70 +76,78 @@ class FileManager: except Exception as e: logger.error(type, f"An error occurred: {e}") - def find_and_remove(self, type, source_path, destination_folder): - source_path = source_path[0] - base_name = os.path.basename(source_path) - destination_path = os.path.join(destination_folder, base_name) - if os.path.exists(destination_path): + def find_and_remove(self, type, source_path, destination_folder_path): + """Remove file if it exists at the destination.""" + source_path = Path(source_path[0]) + destination_folder_path = Path(destination_folder_path) + + base_name = source_path.name + destination_path = destination_folder_path / base_name + + if destination_path.exists(): try: - os.remove(destination_path) + destination_path.unlink() logger.removed(type, base_name) except OSError as e: logger.error(type, base_name) def create_archive(self, folders, archive_path): + """Create an archive of the given folders using 7zip.""" # Specify the full path to the 7zip executable path_to_7zip = patoolib.util.find_program("7z") if not path_to_7zip: logger.error("SCRIPT", "7zip not found. Unable to create backup") raise Exception() - if os.path.exists(archive_path+".7z"): - os.remove(archive_path+".7z") - - path_to_7zip = f'"{path_to_7zip}"' - archive_path = f'"{archive_path}"' + archive_path = Path(archive_path) + archive_file = archive_path.with_suffix(".7z") + + if archive_file.exists(): + archive_file.unlink() + exclude_folders = [ - '"Sideloader Modpack"', - '"Sideloader Modpack - Studio"', - '"Sideloader Modpack - KK_UncensorSelector"', - '"Sideloader Modpack - Maps"', - '"Sideloader Modpack - KK_MaterialEditor"', - '"Sideloader Modpack - Fixes"', - '"Sideloader Modpack - Exclusive KK KKS"', - '"Sideloader Modpack - Exclusive KK"', - '"Sideloader Modpack - Animations"' + "Sideloader Modpack", + "Sideloader Modpack - Studio", + "Sideloader Modpack - KK_UncensorSelector", + "Sideloader Modpack - Maps", + "Sideloader Modpack - KK_MaterialEditor", + "Sideloader Modpack - Fixes", + "Sideloader Modpack - Exclusive KK KKS", + "Sideloader Modpack - Exclusive KK", + "Sideloader Modpack - Animations" ] # Create a string of folder names to exclude - exclude_string = '' - for folder in exclude_folders: - exclude_string += f'-xr!{folder} ' + exclude_string = ' '.join([f'-xr!"{folder}"' for folder in exclude_folders]) # Create a string of folder names to include - include_string = '' - for folder in folders: - include_string += f'"{folder}" ' + include_string = ' '.join([f'"{folder}"' for folder in folders]) # Construct the 7zip command - command = f'{path_to_7zip} a -t7z {archive_path} {include_string} {exclude_string}' + command = f'"{path_to_7zip}" a -t7z "{archive_file}" {include_string} {exclude_string}' + # Call the command process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Print the output for line in process.stdout.decode().split('\n'): - if line.strip() != "": + if line.strip(): logger.info("7-Zip", line) + # Check the return code if process.returncode not in [0, 1]: raise Exception() def extract_archive(self, archive_path): + """Extract the archive.""" + archive_path = Path(archive_path) + archive_name = archive_path.name + logger.info("ARCHIVE", f"Extracting {archive_name}") + + extract_path = archive_path.with_stem(f"{archive_path.stem}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}") + try: - archive_name = os.path.basename(archive_path) - logger.info("ARCHIVE", f"Extracting {archive_name}") - extract_path = os.path.join(f"{os.path.splitext(archive_path)[0]}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}") - patoolib.extract_archive(archive_path, outdir=extract_path) + patoolib.extract_archive(str(archive_path), outdir=str(extract_path)) return extract_path except patoolib.util.PatoolError as e: @@ -153,8 +157,8 @@ class FileManager: dialog = customtkinter.CTkInputDialog(text=text, title="Enter Password") password = dialog.get_input() - if password is not None or "": - patoolib.extract_archive(archive_path, outdir=extract_path, password=password) + if password: + patoolib.extract_archive(str(archive_path), outdir=str(extract_path), password=password) return extract_path else: break @@ -162,4 +166,3 @@ class FileManager: text = f"Wrong password or {archive_name} is corrupted. Please enter password again or click Cancel" logger.skipped("ARCHIVE", archive_name) -