183 lines
6.1 KiB
Python
183 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from gi.repository import GObject, Gst, Gtk
|
|
|
|
|
|
class VideoPlayer(GObject.Object):
|
|
pipeline: Gst.Pipeline
|
|
playbin: Gst.Element
|
|
|
|
__gtype_name__ = "VideoPlayer"
|
|
|
|
is_playing = GObject.Property(type=bool, default=False)
|
|
is_paused = GObject.Property(type=bool, default=True)
|
|
|
|
def __init__(self, picture: Gtk.Picture):
|
|
super().__init__()
|
|
|
|
self.pipeline = Gst.Pipeline.new("video-player")
|
|
|
|
playbin = Gst.ElementFactory.make("playbin", "playbin")
|
|
if not playbin:
|
|
raise RuntimeError("Failed to create playbin element")
|
|
|
|
self.playbin = playbin
|
|
|
|
video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink")
|
|
if not video_sink:
|
|
raise RuntimeError("Failed to create gtk4paintablesink element")
|
|
|
|
self.playbin.set_property("video-sink", video_sink)
|
|
self.pipeline.add(self.playbin)
|
|
|
|
# Link picture to sink
|
|
paintable = video_sink.get_property("paintable")
|
|
picture.set_paintable(paintable)
|
|
|
|
def play(
|
|
self,
|
|
file_path: Path | str,
|
|
position: int = 0,
|
|
subtitle_track: int = -2,
|
|
audio_track: int = 0,
|
|
) -> None:
|
|
"""Start playing a video file"""
|
|
|
|
self.playbin.set_property("uri", Gst.filename_to_uri(str(file_path)))
|
|
|
|
if subtitle_track >= 0:
|
|
flags = self.playbin.get_property("flags")
|
|
flags |= 0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", subtitle_track)
|
|
elif subtitle_track == -1:
|
|
flags = self.playbin.get_property("flags")
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
|
|
self.playbin.set_property("current-audio", audio_track)
|
|
|
|
# Pause and wait for it to complete
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
|
|
|
|
if position:
|
|
# Seek to saved position
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
|
|
position,
|
|
)
|
|
|
|
# Start playing
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
self.set_property("is-playing", True)
|
|
self.set_property("is-paused", False)
|
|
|
|
def stop(self) -> None:
|
|
"""Stop playback and release resources"""
|
|
self.pipeline.set_state(Gst.State.NULL)
|
|
self.set_property("is-paused", True)
|
|
self.set_property("is-playing", False)
|
|
|
|
def toggle_play_pause(self) -> None:
|
|
"""Toggle between play and pause states"""
|
|
_, state, _ = self.pipeline.get_state(0)
|
|
if state == Gst.State.PLAYING:
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
self.set_property("is-paused", True)
|
|
else:
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
self.set_property("is-paused", False)
|
|
|
|
def seek_relative(self, offset: float) -> None:
|
|
"""Seek relative to current position by offset seconds"""
|
|
success, current = self.pipeline.query_position(Gst.Format.TIME)
|
|
if not success:
|
|
return
|
|
|
|
new_pos = current + int(offset * Gst.SECOND)
|
|
if new_pos < 0:
|
|
new_pos = 0
|
|
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
|
|
new_pos,
|
|
)
|
|
|
|
def seek_start(self):
|
|
"""Seek to the start of the video."""
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
|
|
0,
|
|
)
|
|
|
|
def seek_end(self):
|
|
"""Seek to the end of the video."""
|
|
duration = self.get_duration()
|
|
if duration:
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
|
|
duration,
|
|
)
|
|
|
|
def get_position(self) -> int | None:
|
|
"""Get current playback position in nanoseconds"""
|
|
success, position = self.pipeline.query_position(Gst.Format.TIME)
|
|
return position if success else None
|
|
|
|
def get_duration(self) -> int | None:
|
|
"""Get total duration in nanoseconds"""
|
|
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
|
|
return duration if success else None
|
|
|
|
def cycle_subtitles(self) -> tuple[bool, int, str]:
|
|
"""Cycle through available subtitle tracks, including off state"""
|
|
|
|
flags = self.playbin.get_property("flags")
|
|
current = self.playbin.get_property("current-text")
|
|
n_text = self.playbin.get_property("n-text")
|
|
|
|
if n_text == 0:
|
|
return False, 0, ""
|
|
|
|
if not (flags & 0x00000004): # TEXT flag
|
|
flags |= 0x00000004
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", 0)
|
|
return True, 1, self._get_track_lang("text", 0)
|
|
|
|
if current >= n_text - 1:
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
return True, 0, ""
|
|
|
|
next_track = current + 1
|
|
self.playbin.set_property("current-text", next_track)
|
|
return True, next_track + 1, self._get_track_lang("text", next_track)
|
|
|
|
def cycle_audio(self) -> tuple[bool, int, str]:
|
|
"""Cycle through available audio tracks"""
|
|
|
|
current = self.playbin.get_property("current-audio")
|
|
n_audio = self.playbin.get_property("n-audio")
|
|
|
|
if n_audio == 0:
|
|
return False, 0, ""
|
|
|
|
next_track = (current + 1) % n_audio
|
|
self.playbin.set_property("current-audio", next_track)
|
|
return True, next_track + 1, self._get_track_lang("audio", next_track)
|
|
|
|
def _get_track_lang(self, track_type: str, track_index: int) -> str:
|
|
caps: Gst.TagList | None = self.playbin.emit(f"get-{track_type}-tags", track_index)
|
|
if not caps:
|
|
return str(track_index)
|
|
|
|
found, lang = caps.get_string("language-code")
|
|
return lang if found else "???"
|