#!/usr/bin/python import sys, os import argparse import subprocess from enum import Enum import tomllib as toml # Simple progress display, in a similar interface to the Progress library. class ProgressBar: def __init__(self, text, max): self.max = max self.text = text self.current = 0 def next(self): self.current += 1 print(f"\r{self.text} {self.current}/{self.max}", flush=True, end="") def finish(self): print(flush=True) try: from progress.bar import Bar as Progress HAS_PROGRESS = True except: HAS_PROGRESS = False Progress = ProgressBar class Processor(Enum): FFMPEG = 1 IMAGEMAGICK = 2 class Profile: def __init__(self, profile_dict): self.name = profile_dict.get("name", "") self.desc = profile_dict.get("desc", "") self.processor = Processor[profile_dict.get("processor", "IMAGEMAGICK")] self.command = profile_dict.get("command", "") self.output_ext = profile_dict.get("output_ext", "") @classmethod def get(cls, name): for p in PROFILES: if p["name"] == name: return cls(p) def __str__(self): return f"{self.name} ({self.processor.name}): {self.desc}\n" \ f"{self.command}" def getConfigFiles(): files = [] f = os.path.join(os.path.dirname(os.path.realpath(__file__)), "ffmagick.toml") files.append((f, os.path.exists(f))) f = "/etc/ffmagick.toml" files.append((f, os.path.exists(f))) if "HOME" in os.environ: home = os.environ["HOME"] if os.name == "nt" and home.endswith(':') and home.find('/') == -1 \ and home.find('\\') == -1: # $HOME is a drive letter (e.g. “D:”) on Windows. f = os.path.join(home, os.sep, ".config", "ffmagick.toml") else: f = os.path.join(home, ".config", "ffmagick.toml") files.append((f, os.path.exists(f))) return files class Configuration: def __init__(self): self._imagemagick_bin = None self._ffmpeg_bin = None self.profiles = [] # Get configuration from files in the system. @classmethod def get(cls): conf = cls() for f in getConfigFiles(): if f[1]: conf.merge(Configuration.readFromFile(f[0])) return conf @property def imagemagick_bin(self): if self._imagemagick_bin is None: return "magick" else: return self._imagemagick_bin @property def ffmpeg_bin(self): if self._ffmpeg_bin is None: return "ffmpeg" else: return self._ffmpeg_bin @classmethod def readFromFile(cls, filename): with open(filename, 'rb') as f: data = toml.load(f) result = cls() for key in data: item = data[key] if isinstance(item, dict): p = Profile(item) p.name = key result.profiles.append(p) else: if key == "imagemagick_bin": result._imagemagick_bin = item elif key == "ffmpeg_bin": result._ffmpeg_bin = item else: print("ERROR: invalid configuration key:", key) return result def merge(self, conf): if conf._imagemagick_bin is not None: self._imagemagick_bin = conf._imagemagick_bin if conf._ffmpeg_bin is not None: self._ffmpeg_bin = conf._ffmpeg_bin self.profiles = conf.profiles + self.profiles def getProfile(self, name): for p in self.profiles: if p.name == name: return p def runOnce(config, profile, input_file, output_dir): if output_dir is None: output_file = os.path.splitext(input_file)[0] + "." + profile.output_ext else: output_file = os.path.join(output_dir, os.path.basename(input_file)) output_file = os.path.splitext(output_file)[0] + "." + \ profile.output_ext if not os.path.exists(output_dir): os.makedirs(output_dir) if profile.processor == Processor.FFMPEG: cmd = f"\"{config.ffmpeg_bin}\" -y -i \"{input_file}\"" \ f" {profile.command} \"{output_file}\"" elif profile.processor == Processor.IMAGEMAGICK: cmd = f"\"{config.imagemagick_bin}\" \"{input_file}\""\ f" {profile.command} \"{output_file}\"" return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) def runAll(config, profile, input_files, output_dir, ignore_error=False): bar = Progress("Processing", max=len(input_files)) failed = [] for f in input_files: bar.next() result = runOnce(config, profile, f, output_dir) if result.returncode != 0: failed.append(f) if ignore_error: continue bar.finish() print("ERROR: failed to process {}. Log:\n") print(result.stdout) return failed bar.finish() return failed # Return a list of files found in paths, which is a list of files or # directories. For each element in “paths”, if it is a file, add it to the # result; if it is a directory, add all the files (not recursively) in it to the # result. def getFilesFromPaths(paths): files = [] for p in paths: if os.path.isdir(p): for entry in os.listdir(p): f = os.path.join(p, entry) if os.path.isfile(f): files.append(f) else: files.append(p) return files def main(): import argparse parser = argparse.ArgumentParser( prog='FFmagick', description='What the program does') parser.add_argument('files', metavar="FILE", nargs='*', help="File to process. If FILE is a directory, " "all files in it are processed (but not recursively).") parser.add_argument('-p', "--profile", metavar="PROFILE", dest="profile", help="Specify the profile to process the files") parser.add_argument("-l", "--list-profiles", action="store_true", dest="should_list_profiles", help="List all profiles") parser.add_argument("--list-config-files", action="store_true", dest="should_list_conf_files", help="List all config files found") parser.add_argument("-o", "--output-dir", metavar="DIR", dest="output_dir", help="Write the processed files into directory DIR. " "Create if not exists. Default: same directory as the " "input files.") args = parser.parse_args() config = Configuration.get() if args.should_list_profiles: for p in config.profiles: print(p) print() return 0 if args.should_list_conf_files: for f in getConfigFiles(): print("✅" if f[1] else "❌", f[0]) return 0 if "profile" not in args: print("Profile needed") return 1 output_dir = None if "output_dir" in args: output_dir = args.output_dir input_files = getFilesFromPaths(args.files) ignore_error = False if len(input_files) > len(args.files): ignore_error = True failed = runAll(config, config.getProfile(args.profile), input_files, output_dir, ignore_error) if failed: print("Failed files:") for f in failed: print(f) return 1 else: return 0 if __name__ == "__main__": sys.exit(main())