refactor: use pathlib instead of os.path

This commit is contained in:
RedDeadDepresso
2024-09-25 00:00:07 +01:00
parent 8d3af9996b
commit 8ec39fe30d
8 changed files with 206 additions and 224 deletions

View File

@@ -1,5 +1,6 @@
from PySide6.QtCore import QThread, QProcess, Signal, QObject
import os
from PySide6.QtCore import QProcess, Signal, QObject
class ScriptManager(QObject):
def __init__(self, signalBus):

View File

@@ -1,11 +1,9 @@
import os
class CreateBackup:
def __init__(self, config, file_manager):
"""Initializes the Bounty module.
"""Initializes the CreateBackup module.
Args:
config (Config): BAAuto Config instance
config (Config): KKAFIO Config instance
"""
self.config = config
self.file_manager = file_manager
@@ -15,8 +13,8 @@ class CreateBackup:
self.filename = self.config.create_backup["Filename"]
self.output_path = self.config.create_backup["OutputPath"]
def logic_wrapper(self):
output_path = os.path.join(self.output_path, self.filename)
def run(self):
output_path = self.output_path / self.filename
self.file_manager.create_archive(self.folders, output_path)

View File

@@ -1,43 +1,43 @@
import os
import re as regex
import codecs
import shutil
from pathlib import Path
from util.logger import logger
class FilterConvertKKS:
def __init__(self, config, file_manager):
"""Initializes the Bounty module.
"""Initializes the FilterConvertKKS module.
Args:
config (Config): BAAuto Config instance
config (Config): KKAFIO Config instance
"""
self.config = config
self.file_manager = file_manager
self.convert = self.config.fc_kks["Convert"]
def get_list(self, folder_path):
new_list = []
for root, dirs, files in os.walk(folder_path):
for filename in files:
if regex.match(r".*(\.png)$", filename):
new_list.append(os.path.join(root, filename))
return new_list
"""Get list of PNG files in the folder."""
folder = Path(folder_path)
return [str(file) for file in folder.rglob("*.png")]
def check_png(self, card_path):
with codecs.open(card_path, "rb") as card:
"""Check the PNG file and return its type."""
card_path = Path(card_path)
with card_path.open("rb") as card:
data = card.read()
card_type = 0
if data.find(b"KoiKatuChara") != -1:
if b"KoiKatuChara" in data:
card_type = 1
if data.find(b"KoiKatuCharaSP") != -1:
if b"KoiKatuCharaSP" in data:
card_type = 2
elif data.find(b"KoiKatuCharaSun") != -1:
elif b"KoiKatuCharaSun" in data:
card_type = 3
logger.info(f"[{card_type}]", f"{card_path}")
return card_type
def convert_kk(self, card_name, card_path, destination_path):
with codecs.open(card_path, mode="rb") as card:
"""Convert KKS card to KK."""
card_path = Path(card_path) # Convert to Path object
with card_path.open(mode="rb") as card:
data = card.read()
replace_list = [
@@ -46,20 +46,20 @@ class FilterConvertKKS:
[b"version\xa50.0.6\xa3sex", b"version\xa50.0.5\xa3sex"],
]
for text in replace_list:
data = data.replace(text[0], text[1])
for old_text, new_text in replace_list:
data = data.replace(old_text, new_text)
new_file_path = os.path.normpath(os.path.join(destination_path, f"KKS2KK_{card_name}"))
# print(f"new_file_path {new_file_path}")
new_file_path = Path(destination_path) / f"KKS2KK_{card_name}"
with codecs.open(new_file_path, "wb") as new_card:
with new_file_path.open("wb") as new_card:
new_card.write(data)
def logic_wrapper(self):
path = self.config.fc_kks["InputPath"]
def run(self):
"""Main logic for processing the KKS to KK conversion."""
path = Path(self.config.fc_kks["InputPath"])
kks_card_list = []
kks_folder = "_KKS_card_"
kks_folder2 = "_KKS_to_KK_"
kks_folder = path / "_KKS_card_"
kks_folder2 = path / "_KKS_to_KK_"
png_list = self.get_list(path)
@@ -70,38 +70,34 @@ class FilterConvertKKS:
if self.check_png(png) == 3:
kks_card_list.append(png)
else:
logger.success("SCRIPT", f"no PNG found")
logger.success("SCRIPT", "No PNG files found")
return
count = len(kks_card_list)
if count > 0:
print(kks_card_list)
target_folder = os.path.normpath(os.path.join(path, kks_folder))
target_folder2 = os.path.normpath(os.path.join(path, kks_folder2))
if not os.path.isdir(target_folder):
os.mkdir(target_folder)
# Create target directories if they don't exist
kks_folder.mkdir(exist_ok=True)
if self.convert:
logger.info("SCRIPT", f"Conversion to KK is [{self.convert}]")
if not os.path.isdir(target_folder2):
os.mkdir(target_folder2)
kks_folder2.mkdir(exist_ok=True)
for card_path in kks_card_list:
source = card_path
card = os.path.basename(card_path)
target = os.path.normpath(os.path.join(target_folder, card))
source = Path(card_path)
target = kks_folder / source.name
# copy & convert before move
# Copy & convert before moving
if self.convert:
self.convert_kk(card, source, target_folder2)
self.convert_kk(source.name, source, kks_folder2)
# move file
shutil.move(source, target)
# Move file
shutil.move(str(source), str(target))
if self.convert:
logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder, converted and save to [{kks_folder2}] folder")
logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder, converted and saved to [{kks_folder2}] folder")
else:
logger.success("SCRIPT", f"[{count}] cards moved to [{kks_folder}] folder")
else:
logger.success("SCRIPT", f"no KKS card found")
logger.success("SCRIPT: No KKS cards found")

View File

@@ -1,40 +1,43 @@
import os
from pathlib import Path
import codecs
from util.logger import logger
class InstallChara:
def __init__(self, config, file_manager):
"""Initializes the Bounty module.
"""Initializes the InstallChara module.
Args:
config (Config): BAAuto Config instance
config (Config): KKAFIO Config instance
"""
self.config = config
self.file_manager = file_manager
self.game_path = self.config.game_path
self.input_path = self.config.install_chara["InputPath"]
self.input_path = Path(self.config.install_chara["InputPath"]) # Using Path for input path
def resolve_png(self, image_path):
with codecs.open(image_path[0], "rb") as card:
data = card.read()
if data.find(b"KoiKatuChara") != -1:
if data.find(b"KoiKatuCharaSP") != -1 or data.find(b"KoiKatuCharaSun") != -1:
basename = os.path.basename(image_path[0])
if b"KoiKatuChara" in data:
if b"KoiKatuCharaSP" in data or b"KoiKatuCharaSun" in data:
basename = Path(image_path[0]).name # Use Path's .name to get the basename
logger.error("CHARA", f"{basename} is a KKS card")
return
self.file_manager.copy_and_paste("CHARA", image_path, self.game_path["chara"])
elif data.find(b"KoiKatuClothes") != -1:
elif b"KoiKatuClothes" in data:
self.file_manager.copy_and_paste("COORD", image_path, self.game_path["coordinate"])
else:
self.file_manager.copy_and_paste("OVERLAYS", image_path, self.game_path["Overlays"])
def logic_wrapper(self, folder_path=None):
def run(self, folder_path=None):
if folder_path is None:
folder_path = self.input_path
foldername = os.path.basename(folder_path)
folder_path = Path(folder_path)
foldername = folder_path.name
logger.line()
logger.info("FOLDER", foldername)
file_list, compressed_file_list = self.file_manager.find_all_files(folder_path)
file_list, archive_list = self.file_manager.find_all_files(folder_path)
for file in file_list:
file_extension = file[2]
@@ -44,14 +47,11 @@ class InstallChara:
case ".png":
self.resolve_png(file)
case _:
basename = os.path.basename(file[0])
logger.error("UKNOWN", f"Cannot classify {basename}")
basename = Path(file[0]).name
logger.error("UNKNOWN", f"Cannot classify {basename}")
logger.line()
for compressed in compressed_file_list:
extract_path = self.file_manager.extract_archive(compressed[0])
for archive in archive_list:
extract_path = self.file_manager.extract_archive(archive[0])
if extract_path is not None:
self.logic_wrapper(extract_path)
self.run(extract_path)

View File

@@ -1,14 +1,13 @@
import os
import codecs
from util.logger import logger
class RemoveChara:
def __init__(self, config, file_manager):
"""Initializes the Bounty module.
"""Initializes the RemoveChara module.
Args:
config (Config): BAAuto Config instance
config (Config): KKAFIO Config instance
"""
self.config = config
self.file_manager = file_manager
@@ -18,23 +17,24 @@ class RemoveChara:
def resolve_png(self, image_path):
with codecs.open(image_path[0], "rb") as card:
data = card.read()
if data.find(b"KoiKatuChara") != -1:
if data.find(b"KoiKatuCharaSP") != -1 or data.find(b"KoiKatuCharaSun") != -1:
if b"KoiKatuChara" in data:
if b"KoiKatuCharaSP" in data or b"KoiKatuCharaSun" in data:
return
self.file_manager.find_and_remove("CHARA", image_path, self.game_path["chara"])
elif data.find(b"KoiKatuClothes") != -1:
elif b"KoiKatuClothes" in data:
self.file_manager.find_and_remove("COORD", image_path, self.game_path["coordinate"])
else:
self.file_manager.find_and_remove("OVERLAYS", image_path, self.game_path["Overlays"])
def logic_wrapper(self):
foldername = os.path.basename(self.input_path)
def run(self):
foldername = self.input_path.name
logger.info("FOLDER", foldername)
file_list, archive_list = self.file_manager.find_all_files(self.input_path)
for file in file_list:
file_extension = file[2]
match file_extension:
extension = file[2]
match extension:
case ".zipmod":
self.file_manager.find_and_remove("MODS", file, self.game_path["mods"])
case ".png":
@@ -42,7 +42,3 @@ class RemoveChara:
case _:
pass
logger.line()

View File

@@ -21,38 +21,34 @@ try:
Args:
config (Config): BAAuto Config instance
"""
logger.logger_signal = None
self.config = config
self.file_manager = file_manager
self.modules = {
'InstallChara': None,
'RemoveChara': None,
'CreateBackup': None,
'FilterConvertKKS': None,
self.task_to_module = {
'CreateBackup': CreateBackup,
'FilterConvertKKS': FilterConvertKKS,
'InstallChara': InstallChara,
'RemoveChara': RemoveChara,
}
if self.config.install_chara['Enable']:
self.modules['InstallChara'] = InstallChara(self.config, self.file_manager)
if self.config.remove_chara['Enable']:
self.modules['RemoveChara'] = RemoveChara(self.config, self.file_manager)
if self.config.create_backup['Enable']:
self.modules['CreateBackup'] = CreateBackup(self.config, self.file_manager)
if self.config.fc_kks["Enable"]:
self.modules['FilterConvertKKS'] = FilterConvertKKS(self.config, self.file_manager)
def run(self):
for task in self.config.tasks:
if self.modules[task]:
for task, module in self.task_to_module.items():
if not self.config.config_data[task]["Enable"]:
continue
logger.info("SCRIPT", f'Start Task: {task}')
try:
self.modules[task].logic_wrapper()
module(self.config, self.file_manager).run()
except:
logger.error("SCRIPT", f'Task error: {task}. For more info, check the traceback.log file.')
self.write_traceback(task)
sys.exit(1)
sys.exit(0)
def write_traceback(self, task):
with open('traceback.log', 'a') as f:
f.write(f'[{task}]\n')
traceback.print_exc(None, f, True)
f.write('\n')
sys.exit(1)
sys.exit(0)
except:
print(f'[ERROR] Script Initialisation Error. For more info, check the traceback.log file.')

View File

@@ -1,8 +1,9 @@
import sys
import json
import os
from pathlib import Path
from util.logger import logger
class Config:
def __init__(self, config_file):
logger.info("SCRIPT", "Initializing config module")
@@ -10,15 +11,12 @@ class Config:
self.ok = False
self.initialized = False
self.config_data = None
self.changed = False
self.read()
def read(self):
backup_config = self._deepcopy_dict(self.__dict__)
try:
with open(self.config_file, 'r') as json_file:
self.config_data = json.load(json_file)
with open(self.config_file, 'r') as f:
self.config_data = json.load(f)
except FileNotFoundError:
logger.error("SCRIPT", f"Config file '{self.config_file}' not found.")
sys.exit(1)
@@ -31,60 +29,54 @@ class Config:
if self.ok and not self.initialized:
logger.info("SCRIPT", "Starting KKAFIO!")
self.initialized = True
self.changed = True
elif not self.ok and not self.initialized:
logger.error("SCRIPT", "Invalid config. Please check your config file.")
sys.exit(1)
elif not self.ok and self.initialized:
logger.warning("SCRIPT", "Config change detected, but with problems. Rolling back config.")
self._rollback_config(backup_config)
elif self.ok and self.initialized:
if backup_config != self.__dict__:
logger.warning("SCRIPT", "Config change detected. Hot-reloading.")
self.changed = True
def validate(self):
logger.info("SCRIPT", "Validating config")
self.ok = True
self.tasks = ["CreateBackup", "FilterConvertKKS", "InstallChara", "RemoveChara"]
self.create_gamepath()
self.validate_gamepath()
self.validate_tasks()
for task in self.tasks:
if self.config_data[task]["Enable"]:
if "InputPath" in self.config_data[task]:
path = self.config_data[task]["InputPath"]
elif "OutputPath" in self.config_data[task]:
path = self.config_data[task]["OutputPath"]
if not os.path.exists(path):
logger.error("SCRIPT", f"Path invalid for task {task}")
raise Exception()
self.install_chara = self.config_data.get("InstallChara", {})
self.create_backup = self.config_data.get("CreateBackup", {})
self.remove_chara = self.config_data.get("RemoveChara", {})
self.fc_kks = self.config_data.get("FilterConvertKKS", {})
def create_gamepath(self):
base = self.config_data["Core"]["GamePath"]
def validate_gamepath(self):
base = Path(self.config_data["Core"]["GamePath"])
self.game_path = {
"base": base,
"UserData": os.path.join(base, "UserData"),
"BepInEx": os.path.join(base, "BepInEx"),
"mods": os.path.join(base, "mods"),
"chara": os.path.join(base, "UserData\\chara\\female"),
"coordinate": os.path.join(base, "UserData\\coordinate"),
"Overlays": os.path.join(base, "UserData\\Overlays")
"UserData": base / "UserData",
"BepInEx": base / "BepInEx",
"mods": base / "mods",
"chara": base / "UserData" / "chara" / "female",
"coordinate": base / "UserData" / "coordinate",
"Overlays": base / "UserData" / "Overlays"
}
for path in self.game_path.values():
if not os.path.exists(path):
if not path.exists():
logger.error("SCRIPT", "Game path not valid")
raise Exception(f"Game path not valid: {path}")
def validate_tasks(self):
tasks = ["CreateBackup", "FilterConvertKKS", "InstallChara", "RemoveChara"]
for task in tasks:
task_config = self.config_data[task]
if not task_config["Enable"]:
continue
if "InputPath" in task_config:
path_obj = Path(task_config["InputPath"])
task_config["InputPath"] = path_obj
elif "OutputPath" in task_config:
path_obj = Path(task_config["OutputPath"])
task_config["OutputPath"] = path_obj
if not path_obj.exists():
logger.error("SCRIPT", f"Path invalid for task {task}")
raise Exception()
def _deepcopy_dict(self, dictionary):
from copy import deepcopy
return deepcopy(dictionary)
def _rollback_config(self, config):
for key, value in config.items():
setattr(self, key, value)
self.create_backup = self.config_data["CreateBackup"]
self.fc_kks = self.config_data["FilterConvertKKS"]
self.install_chara = self.config_data["InstallChara"]
self.remove_chara = self.config_data["RemoveChara"]

View File

@@ -1,44 +1,47 @@
import os
import shutil
import datetime
import patoolib
import customtkinter
import subprocess
import time
from pathlib import Path
from util.logger import logger
class FileManager:
def __init__(self, config):
self.config = config
def find_all_files(self, directory):
"""Find all files in the given directory."""
directory = Path(directory)
file_list = []
compressed_file_list = []
compressed_extensions = [".rar", ".zip", ".7z"]
archive_list = []
archive_extensions = {".rar", ".zip", ".7z"}
for root, dirs, files in os.walk(directory):
for file in files:
for file_path in directory.rglob('*'):
file_size = file_path.stat().st_size
file_extension = file_path.suffix
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
_, file_extension = os.path.splitext(file_path)
if file_extension in compressed_extensions:
compressed_file_list.append((file_path, file_size, file_extension))
if file_extension in archive_extensions:
archive_list.append((file_path, file_size, file_extension))
else:
file_list.append((file_path, file_size, file_extension))
file_list.sort(key=lambda x: x[1])
compressed_file_list.sort(key=lambda x: x[1])
return file_list, compressed_file_list
archive_list.sort(key=lambda x: x[1])
return file_list, archive_list
def copy_and_paste(self, type, source_path, destination_folder):
source_path = source_path[0]
base_name = os.path.basename(source_path)
destination_path = os.path.join(destination_folder, base_name)
"""Copy file from source to destination, handling file conflicts."""
source_path = Path(source_path[0])
destination_folder = Path(destination_folder)
base_name = source_path.name
destination_path = destination_folder / base_name
conflicts = self.config.install_chara["FileConflicts"]
already_exists = os.path.exists(destination_path)
already_exists = destination_path.exists()
if already_exists and conflicts == "Skip":
logger.skipped(type, base_name)
@@ -51,26 +54,19 @@ class FileManager:
max_retries = 3
for attempt in range(max_retries):
try:
filename, file_extension = os.path.splitext(source_path)
new_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')
new_source_path = f"{filename}_{new_name}{file_extension}"
os.rename(source_path, new_source_path)
source_path = new_source_path
logger.renamed(type, base_name)
filename, file_extension = os.path.splitext(destination_path)
destination_path = f"{filename}_{new_name}{file_extension}"
break # Exit the loop if renaming is successful
new_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f')
destination_path = destination_path.with_stem(f"{destination_path.stem}_{new_name}")
break
except PermissionError:
if attempt < max_retries - 1:
time.sleep(1) # Wait for 1 second before retrying
time.sleep(1)
else:
logger.error(type, f"Failed to rename {base_name} after {max_retries} attempts.")
return
try:
shutil.copy(source_path, destination_path)
print(f"File copied successfully from {source_path} to {destination_path}")
if not already_exists:
logger.success(type, base_name)
except FileNotFoundError:
@@ -80,70 +76,78 @@ class FileManager:
except Exception as e:
logger.error(type, f"An error occurred: {e}")
def find_and_remove(self, type, source_path, destination_folder):
source_path = source_path[0]
base_name = os.path.basename(source_path)
destination_path = os.path.join(destination_folder, base_name)
if os.path.exists(destination_path):
def find_and_remove(self, type, source_path, destination_folder_path):
"""Remove file if it exists at the destination."""
source_path = Path(source_path[0])
destination_folder_path = Path(destination_folder_path)
base_name = source_path.name
destination_path = destination_folder_path / base_name
if destination_path.exists():
try:
os.remove(destination_path)
destination_path.unlink()
logger.removed(type, base_name)
except OSError as e:
logger.error(type, base_name)
def create_archive(self, folders, archive_path):
"""Create an archive of the given folders using 7zip."""
# Specify the full path to the 7zip executable
path_to_7zip = patoolib.util.find_program("7z")
if not path_to_7zip:
logger.error("SCRIPT", "7zip not found. Unable to create backup")
raise Exception()
if os.path.exists(archive_path+".7z"):
os.remove(archive_path+".7z")
archive_path = Path(archive_path)
archive_file = archive_path.with_suffix(".7z")
if archive_file.exists():
archive_file.unlink()
path_to_7zip = f'"{path_to_7zip}"'
archive_path = f'"{archive_path}"'
exclude_folders = [
'"Sideloader Modpack"',
'"Sideloader Modpack - Studio"',
'"Sideloader Modpack - KK_UncensorSelector"',
'"Sideloader Modpack - Maps"',
'"Sideloader Modpack - KK_MaterialEditor"',
'"Sideloader Modpack - Fixes"',
'"Sideloader Modpack - Exclusive KK KKS"',
'"Sideloader Modpack - Exclusive KK"',
'"Sideloader Modpack - Animations"'
"Sideloader Modpack",
"Sideloader Modpack - Studio",
"Sideloader Modpack - KK_UncensorSelector",
"Sideloader Modpack - Maps",
"Sideloader Modpack - KK_MaterialEditor",
"Sideloader Modpack - Fixes",
"Sideloader Modpack - Exclusive KK KKS",
"Sideloader Modpack - Exclusive KK",
"Sideloader Modpack - Animations"
]
# Create a string of folder names to exclude
exclude_string = ''
for folder in exclude_folders:
exclude_string += f'-xr!{folder} '
exclude_string = ' '.join([f'-xr!"{folder}"' for folder in exclude_folders])
# Create a string of folder names to include
include_string = ''
for folder in folders:
include_string += f'"{folder}" '
include_string = ' '.join([f'"{folder}"' for folder in folders])
# Construct the 7zip command
command = f'{path_to_7zip} a -t7z {archive_path} {include_string} {exclude_string}'
command = f'"{path_to_7zip}" a -t7z "{archive_file}" {include_string} {exclude_string}'
# Call the command
process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Print the output
for line in process.stdout.decode().split('\n'):
if line.strip() != "":
if line.strip():
logger.info("7-Zip", line)
# Check the return code
if process.returncode not in [0, 1]:
raise Exception()
def extract_archive(self, archive_path):
try:
archive_name = os.path.basename(archive_path)
"""Extract the archive."""
archive_path = Path(archive_path)
archive_name = archive_path.name
logger.info("ARCHIVE", f"Extracting {archive_name}")
extract_path = os.path.join(f"{os.path.splitext(archive_path)[0]}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}")
patoolib.extract_archive(archive_path, outdir=extract_path)
extract_path = archive_path.with_stem(f"{archive_path.stem}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}")
try:
patoolib.extract_archive(str(archive_path), outdir=str(extract_path))
return extract_path
except patoolib.util.PatoolError as e:
@@ -153,8 +157,8 @@ class FileManager:
dialog = customtkinter.CTkInputDialog(text=text, title="Enter Password")
password = dialog.get_input()
if password is not None or "":
patoolib.extract_archive(archive_path, outdir=extract_path, password=password)
if password:
patoolib.extract_archive(str(archive_path), outdir=str(extract_path), password=password)
return extract_path
else:
break
@@ -162,4 +166,3 @@ class FileManager:
text = f"Wrong password or {archive_name} is corrupted. Please enter password again or click Cancel"
logger.skipped("ARCHIVE", archive_name)