215 lines
6.7 KiB
Python
215 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
from time import time
|
|
|
|
from gi.repository import GObject, Gst, Gtk
|
|
|
|
from .file_model import FileItem
|
|
from .reactive import Ref
|
|
|
|
SEEK_FORWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_AFTER
|
|
SEEK_BACKWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_BEFORE
|
|
SEEK_ACCURATE = Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE
|
|
|
|
|
|
class VideoPlayer(GObject.Object):
|
|
picture: Gtk.Picture
|
|
pipeline: Gst.Pipeline
|
|
playbin: Gst.Element
|
|
|
|
__gtype_name__ = "VideoPlayer"
|
|
|
|
file_item: Ref[FileItem | None]
|
|
is_playing: Ref[bool]
|
|
is_paused: Ref[bool]
|
|
|
|
last_user_input: Ref[float]
|
|
|
|
def __init__(self, picture: Gtk.Picture):
|
|
super().__init__()
|
|
|
|
self.picture = picture
|
|
|
|
self.file_item = Ref(None)
|
|
self.is_playing = Ref(False)
|
|
self.is_paused = Ref(True)
|
|
self.last_user_input = Ref(time())
|
|
|
|
self.pipeline = Gst.Pipeline.new("video-player")
|
|
|
|
playbin = Gst.ElementFactory.make("playbin", "playbin")
|
|
if not playbin:
|
|
raise RuntimeError("Failed to create playbin element")
|
|
|
|
self.playbin = playbin
|
|
|
|
video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink")
|
|
if not video_sink:
|
|
raise RuntimeError("Failed to create gtk4paintablesink element")
|
|
|
|
self.playbin.set_property("video-sink", video_sink)
|
|
self.pipeline.add(self.playbin)
|
|
|
|
# Link picture to sink
|
|
paintable = video_sink.get_property("paintable")
|
|
self.picture.set_paintable(paintable)
|
|
|
|
def play(
|
|
self,
|
|
file_item: FileItem,
|
|
position: int = 0,
|
|
subtitle_track: int = -2,
|
|
audio_track: int = 0,
|
|
) -> None:
|
|
"""Start playing a video file"""
|
|
|
|
self.file_item.value = file_item
|
|
|
|
uri = Gst.filename_to_uri(str(file_item.full_path))
|
|
assert uri is not None
|
|
|
|
self.playbin.set_property("uri", uri)
|
|
|
|
if subtitle_track >= 0:
|
|
flags = self.playbin.get_property("flags")
|
|
flags |= 0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", subtitle_track)
|
|
elif subtitle_track == -1:
|
|
flags = self.playbin.get_property("flags")
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
|
|
self.playbin.set_property("current-audio", audio_track)
|
|
|
|
# Pause and wait for it to complete
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
|
|
|
|
if position:
|
|
# Seek to saved position
|
|
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_ACCURATE, position)
|
|
|
|
# Start playing
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
self.is_playing.value = True
|
|
self.is_paused.value = False
|
|
self.last_user_input.value = time()
|
|
|
|
def stop(self) -> None:
|
|
"""Stop playback and release resources"""
|
|
|
|
self.pipeline.set_state(Gst.State.NULL)
|
|
self.file_item.value = None
|
|
|
|
self.is_playing.value = True
|
|
self.is_paused.value = False
|
|
self.last_user_input.value = time()
|
|
|
|
def toggle_play_pause(self) -> None:
|
|
"""Toggle between play and pause states"""
|
|
|
|
self.last_user_input.value = time()
|
|
|
|
_, state, _ = self.pipeline.get_state(0)
|
|
|
|
if state == Gst.State.PLAYING:
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
self.is_paused.value = True
|
|
else:
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
self.is_paused.value = False
|
|
|
|
def seek_relative(self, offset: float) -> None:
|
|
"""Seek relative to current position by offset seconds"""
|
|
|
|
self.last_user_input.value = time()
|
|
|
|
success, current = self.pipeline.query_position(Gst.Format.TIME)
|
|
if not success:
|
|
return
|
|
|
|
new_pos = current + int(offset * Gst.SECOND)
|
|
if new_pos < 0:
|
|
new_pos = 0
|
|
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
SEEK_FORWARD if offset >= 0 else SEEK_BACKWARD,
|
|
new_pos,
|
|
)
|
|
|
|
def seek_start(self):
|
|
"""Seek to the start of the video."""
|
|
|
|
self.last_user_input.value = time()
|
|
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_BACKWARD, 0)
|
|
|
|
def seek_end(self):
|
|
"""Seek to the end of the video."""
|
|
|
|
self.last_user_input.value = time()
|
|
|
|
if duration := self.get_duration():
|
|
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_FORWARD, duration)
|
|
|
|
def get_position(self) -> int | None:
|
|
"""Get current playback position in nanoseconds"""
|
|
|
|
success, position = self.pipeline.query_position(Gst.Format.TIME)
|
|
return position if success else None
|
|
|
|
def get_duration(self) -> int | None:
|
|
"""Get total duration in nanoseconds"""
|
|
|
|
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
|
|
return duration if success else None
|
|
|
|
def cycle_subtitles(self) -> tuple[bool, int, str]:
|
|
"""Cycle through available subtitle tracks, including off state"""
|
|
|
|
if not self.file_item.value:
|
|
return False, 0, ""
|
|
|
|
flags = self.playbin.get_property("flags")
|
|
current = self.playbin.get_property("current-text")
|
|
n_text = self.playbin.get_property("n-text")
|
|
|
|
if not n_text:
|
|
return False, 0, ""
|
|
|
|
if not (flags & 0x00000004): # TEXT flag
|
|
flags |= 0x00000004
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", 0)
|
|
return True, 1, self._get_track_lang("text", 0)
|
|
|
|
if current >= n_text - 1:
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
return True, 0, ""
|
|
|
|
next_track = current + 1
|
|
self.playbin.set_property("current-text", next_track)
|
|
return True, next_track + 1, self._get_track_lang("text", next_track)
|
|
|
|
def cycle_audio(self) -> tuple[bool, int, str]:
|
|
"""Cycle through available audio tracks"""
|
|
|
|
current = self.playbin.get_property("current-audio")
|
|
n_audio = self.playbin.get_property("n-audio")
|
|
|
|
if n_audio == 0:
|
|
return False, 0, ""
|
|
|
|
next_track = (current + 1) % n_audio
|
|
self.playbin.set_property("current-audio", next_track)
|
|
return True, next_track + 1, self._get_track_lang("audio", next_track)
|
|
|
|
def _get_track_lang(self, track_type: str, track_index: int) -> str:
|
|
caps: Gst.TagList | None = self.playbin.emit(f"get-{track_type}-tags", track_index)
|
|
if not caps:
|
|
return str("???")
|
|
|
|
found, lang = caps.get_string("language-code")
|
|
return lang if found else "???"
|