Use synchronous threaded thumbnailer
This commit is contained in:
parent
fe00f49b5d
commit
c423f655de
2 changed files with 106 additions and 174 deletions
lazy_player
|
@ -479,14 +479,9 @@ class MainWindow(Gtk.ApplicationWindow):
|
||||||
|
|
||||||
|
|
||||||
class App(Gtk.Application):
|
class App(Gtk.Application):
|
||||||
def __init__(self):
|
def __init__(self, thumbnailer: Thumbnailer):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.thumbnailer = thumbnailer
|
||||||
# Initialize GStreamer
|
|
||||||
Gst.init(None)
|
|
||||||
|
|
||||||
# Create thumbnailer
|
|
||||||
self.thumbnailer = Thumbnailer()
|
|
||||||
|
|
||||||
# Load CSS
|
# Load CSS
|
||||||
css_provider = Gtk.CssProvider()
|
css_provider = Gtk.CssProvider()
|
||||||
|
@ -512,5 +507,14 @@ def main():
|
||||||
if len(sys.argv) >= 2:
|
if len(sys.argv) >= 2:
|
||||||
os.chdir(sys.argv[1])
|
os.chdir(sys.argv[1])
|
||||||
|
|
||||||
app = App()
|
# Initialize GStreamer
|
||||||
app.run(None)
|
Gst.init(None)
|
||||||
|
|
||||||
|
thumbnailer = Thumbnailer()
|
||||||
|
app = App(thumbnailer)
|
||||||
|
|
||||||
|
try:
|
||||||
|
thumbnailer.start()
|
||||||
|
app.run(None)
|
||||||
|
finally:
|
||||||
|
thumbnailer.stop()
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import sys
|
import threading
|
||||||
from enum import Enum, auto
|
from queue import Empty, Queue
|
||||||
|
|
||||||
import gi
|
import gi
|
||||||
|
|
||||||
|
@ -11,185 +11,113 @@ gi.require_version("Gst", "1.0")
|
||||||
from gi.repository import Gst # NOQA: E402
|
from gi.repository import Gst # NOQA: E402
|
||||||
|
|
||||||
|
|
||||||
class State(Enum):
|
class Thumbnailer(threading.Thread):
|
||||||
IDLE = auto()
|
queue: Queue[FileItem | None]
|
||||||
INITIALIZING = auto()
|
|
||||||
SEEKING = auto()
|
|
||||||
CAPTURING = auto()
|
|
||||||
CLEANING_UP = auto()
|
|
||||||
|
|
||||||
|
|
||||||
class Thumbnailer:
|
|
||||||
pipeline: Gst.Pipeline
|
|
||||||
uridecodebin: Gst.Element
|
|
||||||
sink: Gst.Element
|
|
||||||
queue: list[FileItem]
|
|
||||||
current_item: FileItem | None
|
|
||||||
state: State
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.queue = []
|
super().__init__(daemon=True)
|
||||||
self.current_item = None
|
self.queue = Queue(maxsize=1)
|
||||||
self.state = State.IDLE
|
|
||||||
|
def generate_thumbnail(self, file_item: FileItem):
|
||||||
|
"""Add a file item to the thumbnail queue"""
|
||||||
|
|
||||||
|
if not file_item.full_path.is_file():
|
||||||
|
return
|
||||||
|
|
||||||
|
# Replace any pending item in the queue
|
||||||
|
try:
|
||||||
|
self.queue.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
pass
|
||||||
|
self.queue.put_nowait(file_item)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the thumbnailer thread"""
|
||||||
|
|
||||||
|
# Replace any pending items in the queue
|
||||||
|
try:
|
||||||
|
self.queue.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.queue.put_nowait(None)
|
||||||
|
self.join()
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
"""Process items from the queue continuously"""
|
||||||
|
|
||||||
|
while True:
|
||||||
|
file_item = self.queue.get(block=True)
|
||||||
|
|
||||||
|
if file_item is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
self._generate_thumbnail(file_item)
|
||||||
|
|
||||||
|
def _generate_thumbnail(self, file_item: FileItem):
|
||||||
|
"""Generate thumbnail for a single file"""
|
||||||
|
|
||||||
pipeline_str = (
|
pipeline_str = (
|
||||||
"uridecodebin name=uridecodebin ! "
|
"uridecodebin name=uridecodebin ! "
|
||||||
"videoconvert ! "
|
"videoconvert ! "
|
||||||
"videoscale ! "
|
"videoscale ! "
|
||||||
"videobox name=box,autocrop=true ! "
|
"videobox autocrop=true ! "
|
||||||
"video/x-raw,width=384,height=216 ! "
|
"video/x-raw,width=384,height=216 ! "
|
||||||
"jpegenc quality=85 ! "
|
"jpegenc quality=85 ! "
|
||||||
"appsink name=sink"
|
"appsink name=sink"
|
||||||
)
|
)
|
||||||
|
|
||||||
pipeline = Gst.parse_launch(pipeline_str)
|
pipeline = Gst.parse_launch(pipeline_str)
|
||||||
if not isinstance(pipeline, Gst.Pipeline):
|
assert isinstance(pipeline, Gst.Pipeline)
|
||||||
return
|
|
||||||
|
|
||||||
self.pipeline = pipeline
|
sink = pipeline.get_by_name("sink")
|
||||||
uridecodebin = self.pipeline.get_by_name("uridecodebin")
|
assert isinstance(sink, Gst.Element)
|
||||||
assert uridecodebin is not None
|
|
||||||
self.uridecodebin = uridecodebin
|
|
||||||
|
|
||||||
sink = self.pipeline.get_by_name("sink")
|
uridecodebin = pipeline.get_by_name("uridecodebin")
|
||||||
assert sink is not None
|
assert isinstance(uridecodebin, Gst.Element)
|
||||||
self.sink = sink
|
|
||||||
|
|
||||||
# Configure appsink for better reliability
|
# Set file URI
|
||||||
self.sink.set_property("emit-signals", True)
|
uridecodebin.set_property("uri", Gst.filename_to_uri(str(file_item.full_path)))
|
||||||
self.sink.set_property("max-buffers", 1)
|
|
||||||
self.sink.set_property("drop", True)
|
|
||||||
self.sink.connect("new-sample", self._on_new_sample)
|
|
||||||
|
|
||||||
# Set up bus message handler
|
|
||||||
bus = self.pipeline.get_bus()
|
|
||||||
bus.add_signal_watch()
|
|
||||||
bus.connect("message", self._on_message)
|
|
||||||
|
|
||||||
def generate_thumbnail(self, file_item: FileItem) -> None:
|
|
||||||
"""Add a file item to the thumbnail queue"""
|
|
||||||
|
|
||||||
if not file_item.full_path.is_file():
|
|
||||||
return
|
|
||||||
|
|
||||||
self.queue.append(file_item)
|
|
||||||
|
|
||||||
if self.current_item is None:
|
|
||||||
self._process_next()
|
|
||||||
|
|
||||||
def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None:
|
|
||||||
if message.type == Gst.MessageType.ERROR:
|
|
||||||
err, _ = message.parse_error()
|
|
||||||
print(f"Error: {err.message}", file=sys.stderr)
|
|
||||||
self._cleanup()
|
|
||||||
return
|
|
||||||
|
|
||||||
if message.type == Gst.MessageType.STATE_CHANGED:
|
|
||||||
if message.src != self.pipeline:
|
|
||||||
return
|
|
||||||
|
|
||||||
_, new_state, _ = message.parse_state_changed()
|
|
||||||
|
|
||||||
if new_state == Gst.State.PAUSED and self.state == State.INITIALIZING:
|
|
||||||
self.state = State.SEEKING
|
|
||||||
self._on_pipeline_ready()
|
|
||||||
elif new_state == Gst.State.PLAYING:
|
|
||||||
self.state = State.CAPTURING
|
|
||||||
elif new_state == Gst.State.NULL:
|
|
||||||
if self.state == State.CAPTURING:
|
|
||||||
self._on_capture_complete()
|
|
||||||
self.state = State.IDLE
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
if message.type == Gst.MessageType.ASYNC_DONE:
|
|
||||||
if self.state == State.SEEKING:
|
|
||||||
# Let the pipeline run to capture the frame
|
|
||||||
self.pipeline.set_state(Gst.State.PLAYING)
|
|
||||||
return
|
|
||||||
|
|
||||||
if message.type == Gst.MessageType.EOS:
|
|
||||||
self._on_capture_complete()
|
|
||||||
return
|
|
||||||
|
|
||||||
def _on_pipeline_ready(self) -> None:
|
|
||||||
"""Called when pipeline is ready to seek"""
|
|
||||||
|
|
||||||
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
|
|
||||||
if not success:
|
|
||||||
self._cleanup()
|
|
||||||
return
|
|
||||||
|
|
||||||
seek_pos = duration // 3
|
|
||||||
success = self.pipeline.seek_simple(
|
|
||||||
Gst.Format.TIME,
|
|
||||||
Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
|
|
||||||
seek_pos,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not success:
|
|
||||||
self._cleanup()
|
|
||||||
return
|
|
||||||
|
|
||||||
def _on_new_sample(self, sink: Gst.Element) -> Gst.FlowReturn:
|
|
||||||
"""Handle new sample from appsink"""
|
|
||||||
|
|
||||||
if self.state != State.CAPTURING or not self.current_item:
|
|
||||||
return Gst.FlowReturn.OK
|
|
||||||
|
|
||||||
sample = sink.emit("pull-sample")
|
|
||||||
if not sample:
|
|
||||||
self._cleanup()
|
|
||||||
return Gst.FlowReturn.ERROR
|
|
||||||
|
|
||||||
buffer = sample.get_buffer()
|
|
||||||
if not buffer:
|
|
||||||
self._cleanup()
|
|
||||||
return Gst.FlowReturn.ERROR
|
|
||||||
|
|
||||||
success, map_info = buffer.map(Gst.MapFlags.READ)
|
|
||||||
if not success:
|
|
||||||
self._cleanup()
|
|
||||||
return Gst.FlowReturn.ERROR
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.current_item.thumbnail = bytes(map_info.data)
|
# Set pipeline to PAUSED to get duration
|
||||||
self.current_item.has_thumbnail = True
|
pipeline.set_state(Gst.State.PAUSED)
|
||||||
|
pipeline.get_state(Gst.SECOND)
|
||||||
|
|
||||||
|
# Seek to 1/3 of duration
|
||||||
|
success, duration = pipeline.query_duration(Gst.Format.TIME)
|
||||||
|
if not success:
|
||||||
|
return
|
||||||
|
|
||||||
|
seek_pos = duration // 3
|
||||||
|
pipeline.seek_simple(
|
||||||
|
Gst.Format.TIME,
|
||||||
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
|
||||||
|
seek_pos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start playing to capture frame
|
||||||
|
pipeline.set_state(Gst.State.PLAYING)
|
||||||
|
|
||||||
|
sample = sink.emit("pull-sample")
|
||||||
|
if not sample:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extract image data
|
||||||
|
buffer = sample.get_buffer()
|
||||||
|
if not buffer:
|
||||||
|
return
|
||||||
|
|
||||||
|
success, map_info = buffer.map(Gst.MapFlags.READ)
|
||||||
|
if not success:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
file_item.thumbnail = bytes(map_info.data)
|
||||||
|
file_item.has_thumbnail = True
|
||||||
|
finally:
|
||||||
|
buffer.unmap(map_info)
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed:", file_item.full_path, err)
|
||||||
finally:
|
finally:
|
||||||
buffer.unmap(map_info)
|
pipeline.set_state(Gst.State.NULL)
|
||||||
|
|
||||||
# We got our sample, clean up
|
|
||||||
self._cleanup()
|
|
||||||
return Gst.FlowReturn.OK
|
|
||||||
|
|
||||||
def _on_capture_complete(self) -> None:
|
|
||||||
"""Called when capture is complete"""
|
|
||||||
self._cleanup()
|
|
||||||
|
|
||||||
def _cleanup(self) -> None:
|
|
||||||
"""Clean up resources and process next item"""
|
|
||||||
|
|
||||||
# Ensure pipeline is stopped
|
|
||||||
self.pipeline.set_state(Gst.State.NULL)
|
|
||||||
|
|
||||||
# Reset state
|
|
||||||
self.state = State.IDLE
|
|
||||||
self.current_item = None
|
|
||||||
|
|
||||||
# Process next item
|
|
||||||
self._process_next()
|
|
||||||
|
|
||||||
def _process_next(self) -> None:
|
|
||||||
"""Start processing the next item in the queue"""
|
|
||||||
|
|
||||||
if not self.queue:
|
|
||||||
self.state = State.IDLE
|
|
||||||
return
|
|
||||||
|
|
||||||
self.state = State.INITIALIZING
|
|
||||||
self.current_item = self.queue.pop(0)
|
|
||||||
|
|
||||||
# Update URI and start pipeline
|
|
||||||
video_uri = Gst.filename_to_uri(str(self.current_item.full_path))
|
|
||||||
self.uridecodebin.set_property("uri", video_uri)
|
|
||||||
self.pipeline.set_state(Gst.State.PLAYING)
|
|
||||||
|
|
Loading…
Reference in a new issue