Files
KKAFIO/util/file_manager.py
2024-09-26 04:01:29 +01:00

189 lines
7.3 KiB
Python

import shutil
import datetime
import patoolib
import subprocess
import time
from pathlib import Path
from util.logger import logger
from typing import Union, Literal
FileEntry = tuple[Path, int, str]
class FileManager:
def __init__(self, config):
self.config = config
def find_all_files(self, directory: Union[Path, str]) -> tuple[list[FileEntry], list[FileEntry]]:
"""Find all files and archive files in the given directory.
Returns:
Tuple containing:
- A list of regular files (path, size, extension)
- A list of archive files (path, size, extension)
"""
directory = Path(directory)
file_list: list[FileEntry] = []
archive_list: list[FileEntry] = []
archive_extensions = {".rar", ".zip", ".7z"}
for file_path in directory.glob('**/*'):
if file_path.is_file():
file_size = file_path.stat().st_size
file_extension = file_path.suffix
file_entry: FileEntry = (file_path, file_size, file_extension)
if file_extension in archive_extensions:
archive_list.append(file_entry)
else:
file_list.append(file_entry)
file_list.sort(key=lambda x: x[1])
archive_list.sort(key=lambda x: x[1])
return file_list, archive_list
def copy_and_paste(self, type: str, source_path: Union[Path, str], destination_folder: Union[str, Path]):
"""Copy file from source to destination, handling file conflicts."""
source_path = Path(source_path)
destination_folder = Path(destination_folder)
base_name = source_path.name
destination_path = destination_folder / base_name
conflicts = self.config.install_chara["FileConflicts"]
already_exists = destination_path.exists()
if already_exists and conflicts == "Skip":
logger.skipped(type, base_name)
return
elif already_exists and conflicts == "Replace":
logger.replaced(type, base_name)
elif already_exists and conflicts == "Rename":
max_retries = 3
for attempt in range(max_retries):
try:
logger.renamed(type, base_name)
new_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f')
destination_path = destination_path.with_stem(f"{destination_path.stem}_{new_name}")
break
except PermissionError:
if attempt < max_retries - 1:
time.sleep(1)
else:
logger.error(type, f"Failed to rename {base_name} after {max_retries} attempts.")
return
try:
shutil.copy(source_path, destination_path)
if not already_exists:
logger.success(type, base_name)
except FileNotFoundError:
logger.error(type, f"{base_name} does not exist.")
except PermissionError:
logger.error(type, f"Permission denied for {base_name}")
except Exception as e:
logger.error(type, f"An error occurred: {e}")
def find_and_remove(self, file_type: str, source_path: Union[str, Path], destination_folder: Union[str, Path]):
"""Remove file if it exists at the destination."""
source_path = Path(source_path)
destination_folder = Path(destination_folder)
base_name = source_path.name
destination_path = destination_folder / base_name
if destination_path.exists():
try:
destination_path.unlink()
logger.removed(file_type, base_name)
except OSError as e:
logger.error(file_type, base_name)
def create_archive(self, folders: list[Literal["mods", "UserData", "BepInEx"]], archive_path: Union[str, Path]):
"""Create an archive of the given folders using 7zip."""
# Specify the full path to the 7zip executable
path_to_7zip = patoolib.util.find_program("7z")
if not path_to_7zip:
logger.error("SCRIPT", "7zip not found. Unable to create backup")
raise Exception()
archive_path = Path(archive_path)
archive_file = archive_path.with_suffix(".7z")
if archive_file.exists():
archive_file.unlink()
exclude_folders = [
"Sideloader Modpack",
"Sideloader Modpack - Studio",
"Sideloader Modpack - KK_UncensorSelector",
"Sideloader Modpack - Maps",
"Sideloader Modpack - KK_MaterialEditor",
"Sideloader Modpack - Fixes",
"Sideloader Modpack - Exclusive KK KKS",
"Sideloader Modpack - Exclusive KK",
"Sideloader Modpack - Animations"
]
# Create a string of folder names to exclude
exclude_string = ' '.join([f'-xr!"{folder}"' for folder in exclude_folders])
# Create a string of folder names to include
include_string = ' '.join([f'"{folder}"' for folder in folders])
# Construct the 7zip command
command = f'"{path_to_7zip}" a -t7z "{archive_file}" {include_string} {exclude_string}'
# Call the command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Print the output
while process.poll() is None:
for line in process.stdout:
if line.strip():
logger.info("7-Zip", line)
# Check the return code
if process.returncode not in [0, 1]:
logger.error("7-Zip", f"Exited with return code: {process.returncode}")
raise Exception(f"7-zip exited with return code: {process.returncode}")
def extract_archive(self, archive_path: Union[Path, str]):
from app.components.password_dialog import password_dialog
"""Extract the archive."""
archive_path = Path(archive_path)
archive_name = archive_path.name
logger.info("ARCHIVE", f"Extracting {archive_name}")
extract_path = archive_path.with_name(f"{archive_path.stem}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}")
try:
patoolib.extract_archive(str(archive_path), outdir=str(extract_path))
return extract_path
except:
text = f"There is an error with the archive {archive_name}, but it is impossible to detect the cause. Maybe it requires a password?"
while True:
try:
password = password_dialog('Enter Password', text)
if not password:
break
patoolib.extract_archive(str(archive_path), outdir=str(extract_path), password=password)
return extract_path
except patoolib.util.PatoolError as e:
text = f"Wrong password or {archive_name} is corrupted. Please enter password again or click Cancel."
print(f"Error: {str(e)}")
except Exception as e:
print(f"An unexpected error occurred: {str(e)}")
break
logger.skipped("ARCHIVE", archive_name)