134 lines
3.7 KiB
Python
134 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from queue import LifoQueue
|
|
from typing import TYPE_CHECKING, Any, cast
|
|
|
|
from gi.repository import GLib, Gst
|
|
|
|
if TYPE_CHECKING:
|
|
from .file_model import FileItem
|
|
|
|
DEFAULT_SEEK_FLAGS = (
|
|
Gst.SeekFlags.FLUSH
|
|
| Gst.SeekFlags.KEY_UNIT
|
|
| Gst.SeekFlags.SNAP_NEAREST
|
|
| Gst.SeekFlags.TRICKMODE
|
|
| Gst.SeekFlags.TRICKMODE_KEY_UNITS
|
|
| Gst.SeekFlags.TRICKMODE_NO_AUDIO
|
|
)
|
|
|
|
__all__ = ["Thumbnailer", "generate_thumbnail_sync"]
|
|
|
|
|
|
MAX_WORKERS = max(1, multiprocessing.cpu_count() // 2)
|
|
|
|
|
|
class Thumbnailer(ThreadPoolExecutor):
|
|
def __init__(self):
|
|
super().__init__(
|
|
thread_name_prefix="Thumbnailer",
|
|
max_workers=MAX_WORKERS,
|
|
)
|
|
|
|
self._work_queue = cast(Any, LifoQueue())
|
|
|
|
def generate_thumbnail(self, file_item: FileItem):
|
|
"""Schedule thumbnail generation."""
|
|
|
|
if not file_item.full_path.is_file():
|
|
return
|
|
|
|
if file_item.attempted_thumbnail.value:
|
|
return
|
|
|
|
self.submit(generate_thumbnail_sync_nicely, file_item)
|
|
file_item.attempted_thumbnail.value = True
|
|
|
|
|
|
def generate_thumbnail_sync_nicely(file_item: FileItem):
|
|
os.nice(10)
|
|
return generate_thumbnail_sync(file_item)
|
|
|
|
|
|
def generate_thumbnail_sync(file_item: FileItem):
|
|
"""Generate thumbnail for a single file"""
|
|
|
|
# print("[thumbnailer] Generate:", file_item.full_path.name)
|
|
|
|
pipeline_str = (
|
|
"uridecodebin name=uridecodebin force-sw-decoders=true ! "
|
|
"videoconvert ! "
|
|
"jpegenc quality=85 ! "
|
|
"appsink sync=false name=sink"
|
|
)
|
|
|
|
pipeline: Gst.Pipeline | None = None
|
|
|
|
try:
|
|
pipeline = cast(Gst.Pipeline, Gst.parse_launch(pipeline_str))
|
|
|
|
sink = pipeline.get_by_name("sink")
|
|
assert isinstance(sink, Gst.Element)
|
|
|
|
uridecodebin = pipeline.get_by_name("uridecodebin")
|
|
assert isinstance(uridecodebin, Gst.Element)
|
|
|
|
# Set file URI
|
|
uridecodebin.set_property("uri", Gst.filename_to_uri(str(file_item.full_path)))
|
|
|
|
# Set pipeline to PAUSED to get duration
|
|
pipeline.set_state(Gst.State.PAUSED)
|
|
pipeline.get_state(Gst.CLOCK_TIME_NONE)
|
|
|
|
# Obtain total duration
|
|
success, duration = pipeline.query_duration(Gst.Format.TIME)
|
|
if not success:
|
|
raise RuntimeError("Failed to query duration")
|
|
|
|
def get_sample(seek_pos: int) -> bytes:
|
|
pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos)
|
|
|
|
# Start playing to capture frame
|
|
pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
sample = sink.emit("pull-sample")
|
|
if not sample:
|
|
raise RuntimeError("Failed to pull sample")
|
|
|
|
# Extract image data
|
|
buffer = sample.get_buffer()
|
|
if not buffer:
|
|
raise RuntimeError("Failed to get buffer")
|
|
|
|
success, map_info = buffer.map(Gst.MapFlags.READ)
|
|
if not success:
|
|
raise RuntimeError("Failed to map buffer")
|
|
|
|
try:
|
|
return bytes(map_info.data)
|
|
finally:
|
|
buffer.unmap(map_info)
|
|
|
|
candidates: list[bytes] = []
|
|
|
|
for i in range(3):
|
|
candidates.append(get_sample(duration // 3 - Gst.SECOND + i * Gst.SECOND))
|
|
|
|
thumbnail = max(candidates, key=len)
|
|
|
|
def set_thumbnail():
|
|
file_item.thumbnail.value = thumbnail
|
|
|
|
GLib.idle_add(set_thumbnail)
|
|
|
|
except Exception as err:
|
|
print("[thumbnailer] Error:", file_item.full_path.name, file=sys.stderr)
|
|
print("[thumbnailer]", err, file=sys.stderr)
|
|
|
|
finally:
|
|
if pipeline:
|
|
pipeline.set_state(Gst.State.NULL)
|