From c423f655debb6ba7034668bd66ec141ea7360848 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Hamal=20Dvo=C5=99=C3=A1k?= <mordae@anilinux.org>
Date: Sun, 9 Mar 2025 21:21:20 +0100
Subject: [PATCH] Use synchronous threaded thumbnailer

---
 lazy_player/__init__.py    |  22 ++--
 lazy_player/thumbnailer.py | 258 +++++++++++++------------------------
 2 files changed, 106 insertions(+), 174 deletions(-)

diff --git a/lazy_player/__init__.py b/lazy_player/__init__.py
index ff0bf32..90f073b 100644
--- a/lazy_player/__init__.py
+++ b/lazy_player/__init__.py
@@ -479,14 +479,9 @@ class MainWindow(Gtk.ApplicationWindow):
 
 
 class App(Gtk.Application):
-    def __init__(self):
+    def __init__(self, thumbnailer: Thumbnailer):
         super().__init__()
-
-        # Initialize GStreamer
-        Gst.init(None)
-
-        # Create thumbnailer
-        self.thumbnailer = Thumbnailer()
+        self.thumbnailer = thumbnailer
 
         # Load CSS
         css_provider = Gtk.CssProvider()
@@ -512,5 +507,14 @@ def main():
     if len(sys.argv) >= 2:
         os.chdir(sys.argv[1])
 
-    app = App()
-    app.run(None)
+    # Initialize GStreamer
+    Gst.init(None)
+
+    thumbnailer = Thumbnailer()
+    app = App(thumbnailer)
+
+    try:
+        thumbnailer.start()
+        app.run(None)
+    finally:
+        thumbnailer.stop()
diff --git a/lazy_player/thumbnailer.py b/lazy_player/thumbnailer.py
index 20649f9..4534547 100644
--- a/lazy_player/thumbnailer.py
+++ b/lazy_player/thumbnailer.py
@@ -1,7 +1,7 @@
 from __future__ import annotations
 
-import sys
-from enum import Enum, auto
+import threading
+from queue import Empty, Queue
 
 import gi
 
@@ -11,185 +11,113 @@ gi.require_version("Gst", "1.0")
 from gi.repository import Gst  # NOQA: E402
 
 
-class State(Enum):
-    IDLE = auto()
-    INITIALIZING = auto()
-    SEEKING = auto()
-    CAPTURING = auto()
-    CLEANING_UP = auto()
-
-
-class Thumbnailer:
-    pipeline: Gst.Pipeline
-    uridecodebin: Gst.Element
-    sink: Gst.Element
-    queue: list[FileItem]
-    current_item: FileItem | None
-    state: State
+class Thumbnailer(threading.Thread):
+    queue: Queue[FileItem | None]
 
     def __init__(self):
-        self.queue = []
-        self.current_item = None
-        self.state = State.IDLE
+        super().__init__(daemon=True)
+        self.queue = Queue(maxsize=1)
+
+    def generate_thumbnail(self, file_item: FileItem):
+        """Add a file item to the thumbnail queue"""
+
+        if not file_item.full_path.is_file():
+            return
+
+        # Replace any pending item in the queue
+        try:
+            self.queue.get_nowait()
+        except Empty:
+            pass
+        self.queue.put_nowait(file_item)
+
+    def stop(self):
+        """Stop the thumbnailer thread"""
+
+        # Replace any pending items in the queue
+        try:
+            self.queue.get_nowait()
+        except Empty:
+            pass
+
+        self.queue.put_nowait(None)
+        self.join()
+
+    def run(self) -> None:
+        """Process items from the queue continuously"""
+
+        while True:
+            file_item = self.queue.get(block=True)
+
+            if file_item is None:
+                break
+
+            self._generate_thumbnail(file_item)
+
+    def _generate_thumbnail(self, file_item: FileItem):
+        """Generate thumbnail for a single file"""
 
         pipeline_str = (
             "uridecodebin name=uridecodebin ! "
             "videoconvert ! "
             "videoscale ! "
-            "videobox name=box,autocrop=true ! "
+            "videobox autocrop=true ! "
             "video/x-raw,width=384,height=216 ! "
             "jpegenc quality=85 ! "
             "appsink name=sink"
         )
 
         pipeline = Gst.parse_launch(pipeline_str)
-        if not isinstance(pipeline, Gst.Pipeline):
-            return
+        assert isinstance(pipeline, Gst.Pipeline)
 
-        self.pipeline = pipeline
-        uridecodebin = self.pipeline.get_by_name("uridecodebin")
-        assert uridecodebin is not None
-        self.uridecodebin = uridecodebin
+        sink = pipeline.get_by_name("sink")
+        assert isinstance(sink, Gst.Element)
 
-        sink = self.pipeline.get_by_name("sink")
-        assert sink is not None
-        self.sink = sink
+        uridecodebin = pipeline.get_by_name("uridecodebin")
+        assert isinstance(uridecodebin, Gst.Element)
 
-        # Configure appsink for better reliability
-        self.sink.set_property("emit-signals", True)
-        self.sink.set_property("max-buffers", 1)
-        self.sink.set_property("drop", True)
-        self.sink.connect("new-sample", self._on_new_sample)
-
-        # Set up bus message handler
-        bus = self.pipeline.get_bus()
-        bus.add_signal_watch()
-        bus.connect("message", self._on_message)
-
-    def generate_thumbnail(self, file_item: FileItem) -> None:
-        """Add a file item to the thumbnail queue"""
-
-        if not file_item.full_path.is_file():
-            return
-
-        self.queue.append(file_item)
-
-        if self.current_item is None:
-            self._process_next()
-
-    def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None:
-        if message.type == Gst.MessageType.ERROR:
-            err, _ = message.parse_error()
-            print(f"Error: {err.message}", file=sys.stderr)
-            self._cleanup()
-            return
-
-        if message.type == Gst.MessageType.STATE_CHANGED:
-            if message.src != self.pipeline:
-                return
-
-            _, new_state, _ = message.parse_state_changed()
-
-            if new_state == Gst.State.PAUSED and self.state == State.INITIALIZING:
-                self.state = State.SEEKING
-                self._on_pipeline_ready()
-            elif new_state == Gst.State.PLAYING:
-                self.state = State.CAPTURING
-            elif new_state == Gst.State.NULL:
-                if self.state == State.CAPTURING:
-                    self._on_capture_complete()
-                self.state = State.IDLE
-
-            return
-
-        if message.type == Gst.MessageType.ASYNC_DONE:
-            if self.state == State.SEEKING:
-                # Let the pipeline run to capture the frame
-                self.pipeline.set_state(Gst.State.PLAYING)
-                return
-
-        if message.type == Gst.MessageType.EOS:
-            self._on_capture_complete()
-            return
-
-    def _on_pipeline_ready(self) -> None:
-        """Called when pipeline is ready to seek"""
-
-        success, duration = self.pipeline.query_duration(Gst.Format.TIME)
-        if not success:
-            self._cleanup()
-            return
-
-        seek_pos = duration // 3
-        success = self.pipeline.seek_simple(
-            Gst.Format.TIME,
-            Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
-            seek_pos,
-        )
-
-        if not success:
-            self._cleanup()
-            return
-
-    def _on_new_sample(self, sink: Gst.Element) -> Gst.FlowReturn:
-        """Handle new sample from appsink"""
-
-        if self.state != State.CAPTURING or not self.current_item:
-            return Gst.FlowReturn.OK
-
-        sample = sink.emit("pull-sample")
-        if not sample:
-            self._cleanup()
-            return Gst.FlowReturn.ERROR
-
-        buffer = sample.get_buffer()
-        if not buffer:
-            self._cleanup()
-            return Gst.FlowReturn.ERROR
-
-        success, map_info = buffer.map(Gst.MapFlags.READ)
-        if not success:
-            self._cleanup()
-            return Gst.FlowReturn.ERROR
+        # Set file URI
+        uridecodebin.set_property("uri", Gst.filename_to_uri(str(file_item.full_path)))
 
         try:
-            self.current_item.thumbnail = bytes(map_info.data)
-            self.current_item.has_thumbnail = True
+            # Set pipeline to PAUSED to get duration
+            pipeline.set_state(Gst.State.PAUSED)
+            pipeline.get_state(Gst.SECOND)
+
+            # Seek to 1/3 of duration
+            success, duration = pipeline.query_duration(Gst.Format.TIME)
+            if not success:
+                return
+
+            seek_pos = duration // 3
+            pipeline.seek_simple(
+                Gst.Format.TIME,
+                Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
+                seek_pos,
+            )
+
+            # Start playing to capture frame
+            pipeline.set_state(Gst.State.PLAYING)
+
+            sample = sink.emit("pull-sample")
+            if not sample:
+                return
+
+            # Extract image data
+            buffer = sample.get_buffer()
+            if not buffer:
+                return
+
+            success, map_info = buffer.map(Gst.MapFlags.READ)
+            if not success:
+                return
+
+            try:
+                file_item.thumbnail = bytes(map_info.data)
+                file_item.has_thumbnail = True
+            finally:
+                buffer.unmap(map_info)
+        except Exception as err:
+            print("Failed:", file_item.full_path, err)
         finally:
-            buffer.unmap(map_info)
-
-        # We got our sample, clean up
-        self._cleanup()
-        return Gst.FlowReturn.OK
-
-    def _on_capture_complete(self) -> None:
-        """Called when capture is complete"""
-        self._cleanup()
-
-    def _cleanup(self) -> None:
-        """Clean up resources and process next item"""
-
-        # Ensure pipeline is stopped
-        self.pipeline.set_state(Gst.State.NULL)
-
-        # Reset state
-        self.state = State.IDLE
-        self.current_item = None
-
-        # Process next item
-        self._process_next()
-
-    def _process_next(self) -> None:
-        """Start processing the next item in the queue"""
-
-        if not self.queue:
-            self.state = State.IDLE
-            return
-
-        self.state = State.INITIALIZING
-        self.current_item = self.queue.pop(0)
-
-        # Update URI and start pipeline
-        video_uri = Gst.filename_to_uri(str(self.current_item.full_path))
-        self.uridecodebin.set_property("uri", video_uri)
-        self.pipeline.set_state(Gst.State.PLAYING)
+            pipeline.set_state(Gst.State.NULL)