# # Module to record our understanding of everything on disk. This # includes the raw DVDs, the ripped videos, and the Plex libraries. # # We use asyncinotify to monitor changes to the disk, updating the # internal structures. # import os.path import logging import errno import easydict import asyncinotify LOGGER = logging.getLogger(__name__) # The video extensions we'll look for. VIDEO_EXT = { '.mp4', '.m4v', '.mkv', } # Flags for changes on the disk. _DEPARTED = ( asyncinotify.Mask.DELETE | asyncinotify.Mask.MOVED_FROM ) _ARRIVED = ( asyncinotify.Mask.CREATE | asyncinotify.Mask.MOVED_TO ) class DVDRoot: def __init__(self, rootdir): assert os.path.isabs(rootdir) self.rootdir = rootdir self.dvds = { } # basename: DVDDirectory self.inotify = asyncinotify.Inotify() self.watches = { } # path: asyncinotify.Watch # Watch the root video directory, to see new DVDs arriving. self.watches[rootdir] = self.inotify.add_watch(rootdir, _DEPARTED | _ARRIVED) LOGGER.info(f'Watching "{rootdir}" for changes') for entry in os.scandir(self.rootdir): if entry.is_dir(): #LOGGER.debug(f'ADDING: {entry.name}') self.dvds[entry.path] = DVDDirectory(entry.name, entry.path) self.watches[entry.path] = self.inotify.add_watch(entry.path, _DEPARTED | _ARRIVED) LOGGER.info(f'Watching {len(self.dvds)} DVD subdirs for changes') async def watch_dirs(self): "Run forever, to watch for changes in the root and DVD dirs." rootwatch = self.watches[self.rootdir] ### hack in a little bit of exception handling. Fix upstream ### to better handle exceptions. try: with self.inotify: async for event in self.inotify: path = str(event.path) # pathlib.Path instance to str() LOGGER.debug(f'Event {event.mask:x} at: "{path}"') if event.mask & asyncinotify.Mask.IGNORED: # We don't need to see these; messes with the logic. continue name = os.path.basename(path) #print('WATCH PATH:', repr(event.watch.path)) #print('ROOTDIR:', repr(self.rootdir)) #print('WATCHER:', event.watch) if event.watch is rootwatch: # Change in the root directory (the set of DVDs). if not (event.mask & asyncinotify.Mask.ISDIR): # We only care about directories in the root. LOGGER.warning(f'Non-directory appeared in DVDROOT: "{name}"') continue if event.mask & _ARRIVED: LOGGER.info(f'New DVD: "{name}"') self.inotify.add_watch(path, _DEPARTED | _ARRIVED) self.dvds[path] = DVDDirectory(name, path) elif event.mask & _DEPARTED: LOGGER.info(f'DVD directory removed: "{name}"') try: dvd = self.dvds[path] except KeyError: # Just keep going, if this is missing. LOGGER.exception(f'where did "{path}" go?') #print('EVENT:', event) continue del self.dvds[path] dvd.dvd_removed() # NOTE: no need to call .rm_watch() as that happens # automatically when the directory is deleted. else: # Change within a DVD's directory. dirname = os.path.dirname(path) dvd = self.dvds.get(dirname) if not dvd: LOGGER.error(f'where did "{dirname}" go??') #print('EVENT:', event) continue if event.mask & _ARRIVED: dvd.name_arrive(name) elif event.mask & _DEPARTED: dvd.name_depart(name) except Exception as e: #print('EXCEPTION:', repr(e)) import traceback traceback.print_exception(e) LOGGER.error(f'EXIT: watch_dirs() for "{self.rootdir}"') def valid_for(self, lib): "Return set of streams that are valid for LIB." ### return DVDs instead of streams?? valid = set() for d in self.dvds.values(): if lib in PLEX.libs_for(d): valid.update(d.streams) return valid class DVDDirectory: def __init__(self, name, dirpath): self.name = name # basename self.dvddir = dirpath self.libs = set() # which Plex libraries to link to this DVD self.streams = set() # what streams are found here (BASENAMEs) self.tssize = None # size of VIDEO_TS directory # Load initial data. Scan the DVDDIR to gather data. # # MUST set our libs first. We're only adding names, so a *later* # fname that specifies LIB won't fix up links already added. fnames = set(os.listdir(self.dvddir)) self.libs = PLEX.libs & fnames for fname in fnames - PLEX.libs: self.name_arrive(fname) #LOGGER.debug(f'LIBRARY: {self.lib} STREAMS: {self.streams}') def name_arrive(self, name): "A new name has arrived." if name in PLEX.libs: self.libs.add(name) PLEX.library_added(self, name) elif is_video(os.path.join(self.dvddir, name)): self.streams.add(name) PLEX.stream_added(self, name) elif name == 'VIDEO_TS': ### is this an offloaded symlink? if not, get total disk ### space used for a potential offload feature. pass def name_depart(self, name): "A name has departed." if name in PLEX.libs: self.libs.discard(name) PLEX.library_removed(self, name) elif is_video(os.path.join(self.dvddir, name)): self.streams.discard(name) PLEX.stream_removed(self, name) elif name == 'VIDEO_TS': ### is this an offloaded symlink? if not, get total disk ### space used for a potential offload feature. pass def dvd_removed(self): "The DVD directory has been removed, so remove all streams." # Note: .name_depart() will modify STREAMS as we iterate over it, # so we need to use a copy. for sname in self.streams.copy(): self.name_depart(sname) def is_video(fname): return os.path.splitext(fname)[1] in VIDEO_EXT def format_size(size): if size >= 1e12: return f'{size/1e12:.1f}T' if size >= 1e9: return f'{size/1e9:.1f}G' if size >= 1e6: return f'{size/1e6:.1f}M' if size >= 1e3: return f'{size/1e3:.1f}k' return f'{size} bytes' def create_symlink(stream, link): "Create LINK pointing to STREAM, retaining mtime." stat = os.stat(stream) os.symlink(stream, link) os.utime(link, ns=(stat.st_atime_ns, stat.st_mtime_ns), follow_symlinks=False) class PlexLibraries: # Status values for a symlink in a Plex library. MISSING = 'missing' # nothing is present MATCHES = 'matches' # a link pointing to expected target DIFFERS = 'differs' # a link pointing to unexpected target NOTLINK = 'notlink' # something besides a link is present def __init__(self, rootdir, libs): # Parent directory for each library directory. assert os.path.isabs(rootdir) self.rootdir = rootdir # Stash away the library information. self.default_lib = libs[0] self.libs = set(libs) def scrub_libs(self, dvdroots): # Review each of the Plex libraries for crappy. async def bg_clean(lib, dvdroots): self.clean_lib(lib, dvdroots) loop = asyncio.get_event_loop() for lib in self.libs: loop.create_task(bg_clean(lib, dvdroots)) def clean_lib(self, lib, dvdroots): "Clean out LIB for anything not in DVDROOTS." entries = list(os.scandir(os.path.join(self.rootdir, lib))) check_names = set() # links to check further for entry in entries: if not entry.is_symlink(): LOGGER.warning(f'Not a symlink: "{lib}/{entry.name}"') continue if not entry.is_file(): # Follows symlinks. Should point to a file (eg. .mp4) LOGGER.warning(f'REMOVED: stale OR not-file: "{lib}/{entry.name}"') # Safe to remove. os.unlink(entry.path) continue # Possibly okay. Needs more scrutiny. check_names.add(entry.name) valid = set() for root in dvdroots: valid.update(root.valid_for(lib)) # Review all symlinks, to find invalids. for name in check_names: if name in valid: continue LOGGER.warning(f'INVALID: "{lib}/{name}" should not be here') ### do something? def path_status(self, lib, name, expect): assert os.path.isabs(expect) path = os.path.join(self.rootdir, lib, name) try: target = os.readlink(path) except FileNotFoundError: return path, self.MISSING except OSError as e: if e.errno == errno.EINVAL: LOGGER.warning(f'Not a symlink: "{lib}/{name}"') return path, self.NOTLINK raise # EXPECT is absolute. Ensure that TARGET is absolute. target = os.path.realpath(os.path.join(self.rootdir, lib, target)) if target == expect: return path, self.MATCHES LOGGER.warning(f'Unexpected: "{lib}/{name}" should point to "{expect}"') return path, self.DIFFERS def libs_for(self, dvd): "Return iterable of libraries this DVD belongs to." return dvd.libs or (self.default_lib,) def library_added(self, dvd, lib): "In DVD, the LIB was added." LOGGER.info(f'New DVD at "{dvd.dvddir}"') # Add all streams into the target LIB. for sname in dvd.streams: expect = os.path.join(dvd.dvddir, sname) plexpath, status = self.path_status(lib, sname, expect) # Other status values are okay: logged or matching. if status != self.MISSING: continue # Create the symlink: PLEXPATH -> EXPECT create_symlink(expect, plexpath) LOGGER.info(f'Added: PLEX/{os.path.relpath(plexpath, self.rootdir)} -> .../{sname}') def library_removed(self, dvd, lib): "In DVD, the LIB was removed." LOGGER.info(f'Removed DVD at "{dvd.dvddir}"') # Remove all streams from the target LIB. for sname in dvd.streams: expect = os.path.join(dvd.dvddir, sname) plexpath, status = self.path_status(lib, sname, expect) # Other status values are okay: logged or missing. if status != self.MATCHES: continue os.unlink(plexpath) LOGGER.info(f'Removed: PLEX/{os.path.relpath(plexpath, self.rootdir)}') def stream_added(self, dvd, stream): "In DVD, a new STREAM was added." expect = os.path.join(dvd.dvddir, stream) LOGGER.info(f'New stream at "{expect}"') # Add symlinks to EXPECT in all defined LIBS. for lib in self.libs_for(dvd): plexpath, status = self.path_status(lib, stream, expect) # Other status values are okay: logged or matching. if status != self.MISSING: continue # Create the symlink: PLEXPATH -> EXPECT create_symlink(expect, plexpath) LOGGER.info(f'Added: PLEX/{os.path.relpath(plexpath, self.rootdir)} -> .../{stream}') def stream_removed(self, dvd, stream): "In DVD, a new STREAM was removed." expect = os.path.join(dvd.dvddir, stream) LOGGER.info(f'Removed stream at "{expect}"') # Remove symlinks to EXPECT from all defined LIBS. for lib in self.libs_for(dvd): plexpath, status = self.path_status(lib, stream, expect) # Other status values are okay: logged or missing. if status != self.MATCHES: continue os.unlink(plexpath) LOGGER.info(f'Removed: PLEX/{os.path.relpath(plexpath, self.rootdir)}') # Singleton, to manage all the Plex libraries. # This will be injected by main.py based on the configuration. #PLEX = PlexLibraries(...) if __name__ == '__main__': import sys root = DVDRoot(sys.argv[1])