532 lines
18 KiB
Python
532 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, cast, overload
|
|
|
|
import gi
|
|
|
|
from .file_model import FileItem, FileType
|
|
|
|
gi.require_version("Gdk", "4.0")
|
|
gi.require_version("Gtk", "4.0")
|
|
gi.require_version("Gst", "1.0")
|
|
gi.require_version("Pango", "1.0")
|
|
from gi.repository import Gdk, Gio, Gst, Gtk, Pango # NOQA: E402
|
|
|
|
|
|
class MainWindow(Gtk.ApplicationWindow):
|
|
file_info_label: Gtk.Label
|
|
stack: Gtk.Stack
|
|
list_view: Gtk.ListView
|
|
list_store: Gio.ListStore
|
|
selection_model: Gtk.SingleSelection
|
|
video_widget: Gtk.Picture
|
|
pipeline: Gst.Pipeline
|
|
playbin: Gst.Element
|
|
overlay_tick_callback_id: int
|
|
overlay_label: Gtk.Label
|
|
overlay_hide_time: float
|
|
last_position_save: float
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# For overlay text timeout
|
|
self.overlay_hide_time = 0.0
|
|
self.last_position_save = 0.0
|
|
self.overlay_label = Gtk.Label()
|
|
self.overlay_label.set_name("overlay-text")
|
|
self.overlay_label.set_valign(Gtk.Align.CENTER)
|
|
self.overlay_label.set_halign(Gtk.Align.CENTER)
|
|
self.overlay_label.set_visible(False)
|
|
self.overlay_label.set_wrap(True)
|
|
self.overlay_label.set_wrap_mode(Pango.WrapMode.WORD_CHAR)
|
|
|
|
def update_overlay(widget: Gtk.Widget, frame_clock: Gdk.FrameClock, data: Any) -> bool:
|
|
current_time = frame_clock.get_frame_time() / 1_000_000
|
|
if current_time >= self.overlay_hide_time:
|
|
self.overlay_label.set_visible(False)
|
|
return True
|
|
|
|
self.overlay_tick_callback_id = self.add_tick_callback(update_overlay, None)
|
|
|
|
# Make window fullscreen and borderless
|
|
self.set_decorated(False)
|
|
self.fullscreen()
|
|
|
|
# Setup key event controller
|
|
key_controller = Gtk.EventControllerKey()
|
|
key_controller.connect("key-pressed", self._on_key_pressed)
|
|
self.add_controller(key_controller)
|
|
|
|
# Main horizontal box to split the screen
|
|
main_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
|
|
|
|
# Create stack to hold our widgets
|
|
self.stack = Gtk.Stack()
|
|
|
|
# Create video widget and overlay
|
|
self.video_widget = Gtk.Picture()
|
|
self.video_widget.set_can_shrink(True)
|
|
self.video_widget.set_keep_aspect_ratio(True)
|
|
self.video_widget.set_vexpand(True)
|
|
self.video_widget.set_hexpand(True)
|
|
|
|
overlay_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
|
|
overlay_box.set_name("black-overlay")
|
|
overlay_box.set_vexpand(True)
|
|
overlay_box.set_hexpand(True)
|
|
|
|
# Create an overlay container
|
|
overlay = Gtk.Overlay()
|
|
overlay.set_child(self.video_widget)
|
|
overlay.add_overlay(self.overlay_label)
|
|
|
|
overlay_box.append(overlay)
|
|
|
|
# Setup GStreamer pipeline
|
|
self.pipeline = Gst.Pipeline.new("video-player")
|
|
|
|
playbin = Gst.ElementFactory.make("playbin", "playbin")
|
|
if not playbin:
|
|
raise RuntimeError("Failed to create playbin element")
|
|
|
|
self.playbin = playbin
|
|
|
|
video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink")
|
|
if not video_sink:
|
|
raise RuntimeError("Failed to create gtk4paintablesink element")
|
|
|
|
self.playbin.set_property("video-sink", video_sink)
|
|
self.pipeline.add(self.playbin)
|
|
|
|
# Link video widget to sink
|
|
paintable = video_sink.get_property("paintable")
|
|
self.video_widget.set_paintable(paintable)
|
|
|
|
# Add both main menu and overlay to stack
|
|
self.stack.add_named(main_box, "menu")
|
|
self.stack.add_named(overlay_box, "overlay")
|
|
self.stack.set_visible_child_name("menu")
|
|
|
|
# Create a grid to handle the 1:2 ratio
|
|
grid = Gtk.Grid()
|
|
grid.set_column_homogeneous(True)
|
|
grid.set_hexpand(True)
|
|
|
|
# Left third (1/3 width)
|
|
left_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
|
|
left_box.set_valign(Gtk.Align.CENTER)
|
|
left_box.set_halign(Gtk.Align.FILL)
|
|
self.file_info_label = Gtk.Label(label="")
|
|
self.file_info_label.set_wrap(True)
|
|
left_box.append(self.file_info_label)
|
|
|
|
# Right two-thirds (2/3 width)
|
|
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
|
|
right_box.set_hexpand(True)
|
|
|
|
# Attach boxes to grid with specific column spans
|
|
grid.attach(left_box, 0, 0, 1, 1)
|
|
grid.attach(right_box, 1, 0, 2, 1)
|
|
|
|
main_box.append(grid)
|
|
|
|
# Create list store and view
|
|
self.list_store = Gio.ListStore(item_type=FileItem)
|
|
self.list_view = Gtk.ListView()
|
|
self.selection_model = Gtk.SingleSelection.new(self.list_store)
|
|
self.selection_model.connect("selection-changed", self._on_selection_changed)
|
|
self.list_view.set_model(self.selection_model)
|
|
self.list_view.set_vexpand(True)
|
|
|
|
def on_activate(widget: Gtk.ListView, index: int):
|
|
selected_item = self.selection_model.get_item(index)
|
|
if selected_item:
|
|
file_item = cast(FileItem, selected_item)
|
|
|
|
if file_item.file_type == FileType.DIRECTORY:
|
|
os.chdir(file_item.full_path)
|
|
self._populate_file_list()
|
|
return
|
|
|
|
position = self._load_attribute("position", 0)
|
|
|
|
# Start playing the video
|
|
full_path = os.path.abspath(file_item.full_path)
|
|
self.playbin.set_property("uri", f"file://{full_path}")
|
|
|
|
track = self._load_attribute("subtitle_track", -2)
|
|
|
|
if track >= 0:
|
|
flags = self.playbin.get_property("flags")
|
|
flags |= 0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", track)
|
|
elif track == -1:
|
|
flags = self.playbin.get_property("flags")
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
|
|
if position:
|
|
# Pause and wait for it to complete.
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
|
|
|
|
# Seek to saved position.
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME,
|
|
Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
|
|
position,
|
|
)
|
|
|
|
# Now start playing
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
self.stack.set_visible_child_name("overlay")
|
|
self.show_overlay_text(f"Playing: {file_item.name}")
|
|
|
|
self.list_view.connect("activate", on_activate)
|
|
|
|
# Factory for list items
|
|
factory = Gtk.SignalListItemFactory()
|
|
factory.connect("setup", self._setup_list_item)
|
|
factory.connect("bind", self._bind_list_item)
|
|
self.list_view.set_factory(factory)
|
|
|
|
# Populate the list store
|
|
self._populate_file_list()
|
|
|
|
# Add list view to a scrolled window
|
|
scrolled = Gtk.ScrolledWindow()
|
|
scrolled.set_child(self.list_view)
|
|
right_box.append(scrolled)
|
|
|
|
self.set_child(self.stack)
|
|
|
|
@property
|
|
def currently_playing(self):
|
|
selected_item = self.selection_model.get_selected_item()
|
|
|
|
if not selected_item:
|
|
return None
|
|
|
|
file_item = cast(FileItem, selected_item)
|
|
|
|
if file_item.file_type == FileType.DIRECTORY:
|
|
return None
|
|
|
|
return file_item.full_path
|
|
|
|
def _setup_list_item(self, factory: Gtk.SignalListItemFactory, list_item: Gtk.ListItem):
|
|
# Create horizontal box to hold icon and label
|
|
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
|
|
box.set_spacing(8)
|
|
|
|
# Create icon placeholder
|
|
icon = Gtk.Box()
|
|
icon.set_css_classes(["file-icon"])
|
|
box.append(icon)
|
|
|
|
# Create label
|
|
label = Gtk.Label()
|
|
label.set_halign(Gtk.Align.START)
|
|
box.append(label)
|
|
|
|
list_item.set_child(box)
|
|
|
|
def _bind_list_item(self, factory: Gtk.SignalListItemFactory, list_item: Gtk.ListItem):
|
|
box = cast(Gtk.Box, list_item.get_child())
|
|
icon = cast(Gtk.Box, box.get_first_child())
|
|
label = cast(Gtk.Label, box.get_last_child())
|
|
item = cast(FileItem, list_item.get_item())
|
|
|
|
# Make icon transparent for directories
|
|
icon.set_opacity(0.0 if item.file_type == FileType.DIRECTORY else 1.0)
|
|
label.set_text(item.name)
|
|
|
|
def _on_selection_changed(
|
|
self,
|
|
selection_model: Gtk.SingleSelection,
|
|
position: int,
|
|
n_items: int,
|
|
):
|
|
if selection_model.get_selected() == Gtk.INVALID_LIST_POSITION:
|
|
self.file_info_label.set_text("")
|
|
else:
|
|
selected_item = selection_model.get_selected_item()
|
|
if selected_item:
|
|
file_item = cast(FileItem, selected_item)
|
|
self.file_info_label.set_text(file_item.full_path)
|
|
|
|
def _toggle_play_pause(self) -> None:
|
|
"""Toggle between play and pause states"""
|
|
_, state, _ = self.pipeline.get_state(0)
|
|
if state == Gst.State.PLAYING:
|
|
self.pipeline.set_state(Gst.State.PAUSED)
|
|
else:
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
def _on_video_key_pressed(
|
|
self,
|
|
keyval: int,
|
|
keycode: int,
|
|
state: Gdk.ModifierType,
|
|
) -> bool:
|
|
if keyval == Gdk.keyval_from_name("space"):
|
|
self._toggle_play_pause()
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Escape"):
|
|
self._save_position()
|
|
self.pipeline.set_state(Gst.State.NULL)
|
|
self.stack.set_visible_child_name("menu")
|
|
self.list_view.grab_focus()
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Left"):
|
|
self._seek_relative(-10)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Right"):
|
|
self._seek_relative(10)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Up"):
|
|
self._seek_relative(60)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Down"):
|
|
self._seek_relative(-60)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("Home"):
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0
|
|
)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("End"):
|
|
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
|
|
if success:
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, duration - 1
|
|
)
|
|
return True
|
|
|
|
elif keyval == Gdk.keyval_from_name("j"):
|
|
self._cycle_subtitles()
|
|
return True
|
|
|
|
return False
|
|
|
|
def _on_key_pressed(
|
|
self,
|
|
controller: Gtk.EventControllerKey,
|
|
keyval: int,
|
|
keycode: int,
|
|
state: Gdk.ModifierType,
|
|
) -> bool:
|
|
# If we're showing video, handle keys differently
|
|
if self.stack.get_visible_child_name() == "overlay":
|
|
return self._on_video_key_pressed(keyval, keycode, state)
|
|
return False
|
|
|
|
def _seek_relative(self, offset: int) -> None:
|
|
"""Seek relative to current position by offset seconds"""
|
|
|
|
# Query current position
|
|
success, current = self.pipeline.query_position(Gst.Format.TIME)
|
|
if not success:
|
|
return
|
|
|
|
# Convert offset to nanoseconds and add to current
|
|
new_pos = current + (offset * Gst.SECOND)
|
|
|
|
# Ensure we don't seek before start
|
|
if new_pos < 0:
|
|
new_pos = 0
|
|
|
|
# Perform seek
|
|
self.pipeline.seek_simple(
|
|
Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, new_pos
|
|
)
|
|
|
|
def _get_subtitle_info(self, track_index: int) -> str:
|
|
"""Get subtitle track info including language if available"""
|
|
|
|
# Query the subtitle track's tags
|
|
caps: Gst.TagList | None = self.playbin.emit("get-text-tags", track_index)
|
|
if not caps:
|
|
return str(track_index)
|
|
|
|
found, lang = caps.get_string("language-code")
|
|
return f"{track_index} ({lang})" if found else str(track_index)
|
|
|
|
def _save_position(self) -> None:
|
|
"""Save current playback position as xattr"""
|
|
|
|
success, position = self.pipeline.query_position(Gst.Format.TIME)
|
|
success2, duration = self.pipeline.query_duration(Gst.Format.TIME)
|
|
|
|
if success and success2:
|
|
self._save_attribute("position", position)
|
|
self._save_attribute("duration", duration)
|
|
|
|
def _save_attribute(self, name: str, value: str | float | int | None):
|
|
path = self.currently_playing
|
|
|
|
if path is None:
|
|
return
|
|
|
|
try:
|
|
if value is None:
|
|
os.removexattr(path, f"user.lazy_player.{name}")
|
|
else:
|
|
os.setxattr(path, f"user.lazy_player.{name}", str(value).encode("utf8"))
|
|
except OSError as err:
|
|
print(err, file=sys.stderr)
|
|
|
|
@overload
|
|
def _load_attribute(self, name: str, dfl: str) -> str: ...
|
|
|
|
@overload
|
|
def _load_attribute(self, name: str, dfl: int) -> int: ...
|
|
|
|
def _load_attribute(self, name: str, dfl: str | int) -> str | int:
|
|
path = self.currently_playing
|
|
|
|
if path is None:
|
|
return dfl
|
|
|
|
try:
|
|
strval = os.getxattr(path, f"user.lazy_player.{name}")
|
|
return type(dfl)(strval)
|
|
except OSError as err:
|
|
print(err, file=sys.stderr)
|
|
return dfl
|
|
|
|
def _cycle_subtitles(self) -> None:
|
|
"""Cycle through available subtitle tracks, including off state"""
|
|
|
|
# Get current flags and subtitle track
|
|
flags = self.playbin.get_property("flags")
|
|
current = self.playbin.get_property("current-text")
|
|
n_text = self.playbin.get_property("n-text")
|
|
|
|
if n_text == 0:
|
|
self.show_overlay_text("No subtitles available")
|
|
return
|
|
|
|
# If subtitles are disabled, enable them and set to first track
|
|
if not (flags & 0x00000004): # TEXT flag
|
|
flags |= 0x00000004
|
|
self.playbin.set_property("flags", flags)
|
|
self.playbin.set_property("current-text", 0)
|
|
track_info = self._get_subtitle_info(0)
|
|
self.show_overlay_text(f"Subtitle track: {track_info}")
|
|
self._save_attribute("subtitle_track", 0)
|
|
return
|
|
|
|
# If we're on the last track, disable subtitles
|
|
if current >= n_text - 1:
|
|
flags &= ~0x00000004 # TEXT flag
|
|
self.playbin.set_property("flags", flags)
|
|
self.show_overlay_text("Subtitles: Off")
|
|
self._save_attribute("subtitle_track", -1)
|
|
return
|
|
|
|
# Otherwise cycle to next track
|
|
next_track = current + 1
|
|
self.playbin.set_property("current-text", next_track)
|
|
track_info = self._get_subtitle_info(next_track)
|
|
self.show_overlay_text(f"Subtitle track: {track_info}")
|
|
self._save_attribute("subtitle_track", next_track)
|
|
|
|
def show_overlay_text(self, text: str, timeout_seconds: float = 1.0) -> None:
|
|
"""Show text in a centered overlay that disappears after timeout"""
|
|
self.overlay_label.set_text(text)
|
|
self.overlay_label.set_visible(True)
|
|
|
|
# Set absolute time when overlay should hide
|
|
frame_clock = self.get_frame_clock()
|
|
if frame_clock is None:
|
|
return
|
|
|
|
frame_time = frame_clock.get_frame_time() / 1_000_000 # Convert to seconds
|
|
self.overlay_hide_time = frame_time + timeout_seconds
|
|
|
|
# Save position every 60 seconds
|
|
if frame_time - self.last_position_save >= 60.0:
|
|
self._save_position()
|
|
self.last_position_save = frame_time
|
|
|
|
def _populate_file_list(self) -> None:
|
|
# TODO: Implement proper version sort (strverscmp equivalent)
|
|
|
|
items: list[FileItem] = []
|
|
|
|
# Add parent directory
|
|
items.append(FileItem("..", FileType.DIRECTORY, "../"))
|
|
|
|
with os.scandir(".") as it:
|
|
for entry in it:
|
|
if entry.name != ".." and not entry.name.startswith("."):
|
|
try:
|
|
if entry.is_dir():
|
|
items.append(FileItem(entry.name, FileType.DIRECTORY, entry.name + "/"))
|
|
else:
|
|
file_item = FileItem.from_path(entry.name)
|
|
items.append(file_item)
|
|
except ValueError:
|
|
# Skip unsupported file types
|
|
continue
|
|
|
|
# Sort directories first, then files, both alphabetically
|
|
items.sort(key=lambda x: (x.file_type != FileType.DIRECTORY, x.name.lower()))
|
|
|
|
while self.list_store.get_n_items():
|
|
self.list_store.remove(0)
|
|
|
|
for item in items:
|
|
self.list_store.append(item)
|
|
|
|
if items:
|
|
self.file_info_label.set_text(items[0].full_path)
|
|
|
|
|
|
class App(Gtk.Application):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
# Initialize GStreamer
|
|
Gst.init(None)
|
|
|
|
# Load CSS
|
|
css_provider = Gtk.CssProvider()
|
|
css_file = Path(__file__).parent / "style.css"
|
|
css_provider.load_from_path(str(css_file))
|
|
|
|
display = Gdk.Display.get_default()
|
|
if display is None:
|
|
raise RuntimeError("No display available")
|
|
|
|
Gtk.StyleContext.add_provider_for_display(
|
|
display,
|
|
css_provider,
|
|
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
|
)
|
|
|
|
def do_activate(self):
|
|
win = MainWindow(application=self)
|
|
win.present()
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) >= 2:
|
|
os.chdir(sys.argv[1])
|
|
|
|
app = App()
|
|
app.run(None)
|