diff --git a/lazy_player/__init__.py b/lazy_player/__init__.py index 1dc9c99..cca5f73 100644 --- a/lazy_player/__init__.py +++ b/lazy_player/__init__.py @@ -29,7 +29,6 @@ def main(): app = Application(thumbnailer=thumbnailer) try: - thumbnailer.start() app.run(None) finally: - thumbnailer.stop() + thumbnailer.shutdown() diff --git a/lazy_player/thumbnailer.py b/lazy_player/thumbnailer.py index 866a549..a446468 100644 --- a/lazy_player/thumbnailer.py +++ b/lazy_player/thumbnailer.py @@ -1,8 +1,8 @@ from __future__ import annotations +import multiprocessing import sys -import threading -from queue import Empty, Full, LifoQueue +from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING from gi.repository import GLib, Gst @@ -22,15 +22,18 @@ DEFAULT_SEEK_FLAGS = ( __all__ = ["Thumbnailer", "generate_thumbnail_sync"] -class Thumbnailer(threading.Thread): - queue: LifoQueue[FileItem | None] +MAX_WORKERS = max(1, multiprocessing.cpu_count() // 2) + +class Thumbnailer(ThreadPoolExecutor): def __init__(self): - super().__init__(daemon=True) - self.queue = LifoQueue(maxsize=20) + super().__init__( + thread_name_prefix="Thumbnailer", + max_workers=MAX_WORKERS, + ) def generate_thumbnail(self, file_item: FileItem): - """Add a file item to the thumbnail queue""" + """Schedule thumbnail generation""" if not file_item.full_path.is_file(): return @@ -38,41 +41,19 @@ class Thumbnailer(threading.Thread): if file_item.attempted_thumbnail: return - try: - self.queue.put_nowait(file_item) - except Full: - pass + if self._work_queue.qsize() >= MAX_WORKERS: + # print("[thumbnail] Ignoring:", file_item.full_path.name) + return + self.submit(generate_thumbnail_sync, file_item) file_item.attempted_thumbnail = True - def stop(self): - """Stop the thumbnailer thread""" - - try: - # Drop all pending items - while True: - self.queue.get_nowait() - except Empty: - pass - - self.queue.put_nowait(None) - self.join() - - def run(self) -> None: - """Process items from the queue continuously""" - - while True: - file_item = self.queue.get(block=True) - - if file_item is None: - break - - generate_thumbnail_sync(file_item) - def generate_thumbnail_sync(file_item: FileItem): """Generate thumbnail for a single file""" + # print("[thumbnailer] Generate:", file_item.full_path.name) + pipeline_str = ( "uridecodebin name=uridecodebin ! " "videoconvert ! " @@ -95,13 +76,14 @@ def generate_thumbnail_sync(file_item: FileItem): try: # Set pipeline to PAUSED to get duration pipeline.set_state(Gst.State.PAUSED) - pipeline.get_state(Gst.SECOND) + pipeline.get_state(Gst.CLOCK_TIME_NONE) - # Seek to 1/3 of duration + # Obtain total duration success, duration = pipeline.query_duration(Gst.Format.TIME) if not success: - return + raise RuntimeError("Failed to query duration") + # Seek to 1/3 of duration seek_pos = duration // 3 pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos) @@ -110,16 +92,16 @@ def generate_thumbnail_sync(file_item: FileItem): sample = sink.emit("pull-sample") if not sample: - return + raise RuntimeError("Failed to pull sample") # Extract image data buffer = sample.get_buffer() if not buffer: - return + raise RuntimeError("Failed to get buffer") success, map_info = buffer.map(Gst.MapFlags.READ) if not success: - return + raise RuntimeError("Failed to map buffer") try: thumbnail = bytes(map_info.data) @@ -133,7 +115,8 @@ def generate_thumbnail_sync(file_item: FileItem): GLib.idle_add(set_thumbnail) except Exception as err: - print("[thumbnailer] Error:", file_item.full_path, err, file=sys.stderr) + print("[thumbnailer] Error:", file_item.full_path.name, file=sys.stderr) + print("[thumbnailer]", err, file=sys.stderr) finally: pipeline.set_state(Gst.State.NULL)