通过PID锁定PipeWire ID,以解决PipeWire元数据无法很好定位不同软件的问题

你是否曾希望能够用一个简单的工具来控制正在后台播放的音乐?如果你是 Linux 用户,那么这篇文章将带你探索一种强大的方法,利用 pactlD-Bus 这两个工具,来获取播放状态、调整音量,甚至进行播放控制。

这篇文章的核心亮点,在于我们不依赖于播放器固定的接口名称,而是通过查找进程 ID (PID),进而获取PipeWire ID,从而实现对应用程序音量的精准控制。


为什么选择这种方法?问题的根源

许多现代 Linux 应用程序通过 PipeWire(或旧的 PulseAudio)来管理音频。PipeWire 为每个正在播放音频的应用程序创建一个“sink input”,并附加元信息来描述它。然而,现实情况并不总是那么完美。

问题的根源在于,并非所有应用程序的元信息都清晰或唯一。例如,某些应用可能使用一个通用的名称(如“Electron”或“Chromium”),导致难以区分不同的应用实例;有些应用的 application.name 元信息可能根本不存在或为空。

因此,仅仅依赖 PipeWire 的元信息并不总是可靠。这就是我们选择先通过 PID(进程 ID) 来锁定目标的原因。

在 Linux 系统中,PID 是一个进程最可靠的唯一身份标识。通过 pgrep,我们可以精确地找到一个正在运行的应用程序的所有相关进程,无论它的 PipeWire 元信息有多么混乱。一旦我们获得了这个唯一的 PID,我们再反过来去 pactl 的输出中寻找带有这个 PID 的音频流。

这种“先锁定进程,再反向查找音频流”的策略,就像是先找到了一个人的身份证号,再去茫茫人海中找到他的音频流ID,从而完美绕过了元信息不准确或缺失的问题。


实现思路:核心功能解析与代码

接下来,让我们通过一个实际的 Python 代码来展示这一技术思路是如何实现的。

1. 获取 PipeWire ID

这个步骤是音量控制功能的核心。它将 pgreppactl 的命令行输出进行解析,实现了 PID 到 PipeWire ID 的映射。

import subprocess
import re
from typing import Optional

def get_pipewire_id_by_process_name(app_name: str) -> Optional[str]:
    """
    通过应用程序进程名查找其 PipeWire ID。
    """
    try:
        # 步骤1: 使用 pgrep 获取应用的 PID
        cmd = f"pgrep -if {app_name}"
        result = subprocess.check_output(cmd, shell=True, text=True)
        pids = result.strip().splitlines()

        if not pids:
            return None

        # 步骤2: 获取所有 PipeWire sink inputs
        cmd = "pactl list sink-inputs"
        result = subprocess.check_output(cmd, shell=True, text=True)
        # 按 "Sink Input #" 分割,方便处理每个 sink input
        sink_sections = result.split("Sink Input #")

        # 步骤3: 遍历并匹配 PID,提取 PipeWire ID
        for pid in pids:
            for section in sink_sections[1:]: # 第一个元素是空的
                if f'application.process.id = "{pid}"' in section:
                    id_match = re.match(r'(\d+)', section)
                    if id_match:
                        pipewire_id = id_match.group(1)
                        return pipewire_id
        
        return None
    except subprocess.CalledProcessError:
        # pgrep 或 pactl 命令失败
        return None
    except Exception:
        # 其他未知错误
        return None

2. 控制播放与音量

有了 PipeWire ID,我们就可以选择不同的工具进行控制:

  • 音量控制:

    利用 pactl 命令。将要设置的音量值转换为百分比,然后执行 pactl set-sink-input-volume <pipewire_id> <音量百分比>% 命令,就可以精准地调整音量。

  • 播放/暂停、切换歌曲:

    这些操作更适合使用 D-Bus。因为许多应用支持标准的 MPRIS 接口,我们可以通过发送 dbus-send 命令来调用播放器的相应方法,例如 org.mpris.MediaPlayer2.Player.PlayPause。

下面是一个异步函数,展示了如何根据操作类型选择不同的控制方法。

import asyncio
from typing import Any

async def control_player_by_name(app_name: str, action: str, value: Any = None):
    """
    通过应用名称控制音乐播放器。
    
    Args:
        app_name (str): 应用程序的进程名(例如 'yesplaymusic')。
        action (str): 控制动作,如 'playpause', 'next', 'set_volume'。
        value (Any, optional): 动作需要的参数。
    """
    if action == 'set_volume':
        pipewire_id = get_pipewire_id_by_process_name(app_name)
        if not pipewire_id:
            return
        
        try:
            volume_percent = int(float(value) * 100)
            cmd = f"pactl set-sink-input-volume {pipewire_id} {volume_percent}%"
            await asyncio.create_subprocess_shell(cmd)
        except Exception:
            pass
    else:
        # D-Bus 控制需要知道应用程序的服务名
        dbus_service = f"org.mpris.MediaPlayer2.{app_name}"
        dbus_command = ""
        
        # 构建 D-Bus 命令
        if action in ['playpause', 'next', 'previous']:
            command_map = {
                'playpause': 'PlayPause',
                'next': 'Next',
                'previous': 'Previous'
            }
            command = command_map.get(action)
            dbus_command = (
                f"dbus-send --print-reply --dest={dbus_service} "
                f"/org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.{command}"
            )
        
        if dbus_command:
            try:
                await asyncio.create_subprocess_shell(dbus_command)
            except Exception:
                pass

这段代码展示了如何将“通过 PID 查找 PipeWire ID”的思路整合到实际的控制逻辑中,实现了对任何遵循 MPRIS 和使用 PipeWire 的应用程序的通用控制。

相关业务代码:

# -*- coding: utf-8 -*-

import asyncio
import logging
import platform
import re
from typing import Any, Optional
import subprocess

def get_yesplaymusic_pipewire_id():
    """
    获取 YesPlayMusic 的 PipeWire ID
    返回: str 或 None - 找到时返回 PipeWire ID,未找到时返回 None
    """
    try:
        # 使用 pgrep 获取 YesPlayMusic 进程的 PID
        # -i: 忽略大小写, -f: 匹配完整命令行
        cmd = "pgrep -if yesplaymusic"
        result = subprocess.check_output(cmd, shell=True, text=True)
        pids = result.strip().splitlines()

        if not pids:
            logging.debug("未找到 YesPlayMusic 进程。")
            return None

        # 获取 PipeWire sink-inputs
        cmd = "pactl list sink-inputs"
        result = subprocess.check_output(cmd, shell=True, text=True)
        # 按 "Sink Input #" 分割,方便处理每个 sink input
        sink_sections = result.split("Sink Input #")

        for pid in pids:
            for section in sink_sections[1:]: # 第一个元素是空的
                # 检查进程ID是否在 sink input 的属性中
                if f'application.process.id = "{pid}"' in section:
                    # 从 section 开头提取 sink input ID
                    id_match = re.match(r'(\d+)', section)
                    if id_match:
                        pipewire_id = id_match.group(1)
                        logging.debug(f"找到 YesPlayMusic 的 PipeWire ID: {pipewire_id} (PID: {pid})")
                        return pipewire_id
        
        logging.debug("已找到 YesPlayMusic 进程,但未找到关联的 PipeWire sink input。可能没有在播放。")
        return None
    except subprocess.CalledProcessError:
        logging.debug("查找 YesPlayMusic PipeWire ID 时出错 (pgrep 或 pactl 命令失败)。")
        return None
    except Exception as e:
        logging.error(f"查找 PipeWire ID 时发生未知错误: {e}")
        return None

async def get_player_volume() -> Optional[float]:
    """
    通过 pactl 获取播放器的当前音量。
    此功能仅限于Linux系统。
    
    Returns:
        Optional[float]: 返回播放器音量(0.0到1.0之间),如果获取失败则返回None。
    """
    if platform.system() != "Linux":
        logging.info("非Linux系统,跳过获取音量。")
        return None

    pipewire_id = get_yesplaymusic_pipewire_id()
    if not pipewire_id:
        logging.warning("无法获取 YesPlayMusic 的 PipeWire ID,无法获取音量。可能没有在播放。")
        return None

    try:
        # 使用 pactl 获取指定 sink input 的音量
        cmd = (
            f'pactl list sink-inputs | grep -A20 "Sink Input #{pipewire_id}" | '
            f"grep 'Volume:' | head -n1 | grep -oP '\\d{{1,3}}%' | head -n1"
        )
        process = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()

        if process.returncode == 0 and stdout:
            volume_str = stdout.decode().strip().replace('%', '')
            volume = float(volume_str) / 100.0
            logging.info(f"成功获取到当前音量: {volume}")
            return volume
        else:
            error_message = stderr.decode().strip()
            # 如果没有输出,也认为是错误
            if not error_message and not stdout:
                error_message = "pactl 命令没有返回音量信息。"
            logging.error(f"获取音量失败: {error_message}")
            return None
    except Exception as e:
        logging.error(f"获取音量时发生错误: {e}")
        return None

async def get_player_status() -> Optional[str]:
    """
    通过 D-Bus 获取播放器的当前播放状态。
    此功能仅限于Linux系统。

    Returns:
        Optional[str]: 返回播放状态(例如 "Playing", "Paused"),如果获取失败则返回None。
    """
    if platform.system() != "Linux":
        logging.debug("非Linux系统,跳过获取播放状态。")
        return None

    dbus_command = (
        "dbus-send --print-reply --dest=org.mpris.MediaPlayer2.yesplaymusic "
        "/org/mpris/MediaPlayer2 org.freedesktop.DBus.Properties.Get "
        "string:'org.mpris.MediaPlayer2.Player' string:'PlaybackStatus'"
    )

    try:
        process = await asyncio.create_subprocess_shell(
            dbus_command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()

        if process.returncode == 0 and stdout:
            output = stdout.decode().strip()
            match = re.search(r'string "(Playing|Paused|Stopped)"', output)
            if match:
                status = match.group(1)
                logging.debug(f"成功获取到播放状态: {status}")
                return status
            else:
                logging.warning(f"无法从DBus输出中解析播放状态: {output}")
                return None
        else:
            error_message = stderr.decode().strip()
            if "was not provided by any .service files" in error_message:
                 logging.debug(f"获取播放状态失败,可能是播放器未运行: {error_message}")
            else:
                logging.error(f"获取播放状态失败: {error_message}")
            return None
    except Exception as e:
        logging.error(f"获取播放状态时发生错误: {e}")
        return None

async def control_player(action: str, value: Any = None):
    """
    通过D-Bus或pactl控制音乐播放器。
    支持播放/暂停、下一首、上一首、跳转进度和设置音量。
    此功能仅限于Linux系统。

    Args:
        action (str): 控制动作,如 'playpause', 'next', 'previous', 'seek', 'set_volume'。
        value (Any, optional): 动作需要的参数。例如,'seek'需要进度(毫秒),'set_volume'需要音量(0.0-1.0)。
    """
    if platform.system() != "Linux":
        logging.info(f"接收到 '{action}' 指令,但当前系统 ({platform.system()}) 不支持控制。")
        return

    # 音量控制使用 pactl
    if action == 'set_volume':
        pipewire_id = get_yesplaymusic_pipewire_id()
        if not pipewire_id:
            logging.warning("无法获取 YesPlayMusic 的 PipeWire ID,无法设置音量。可能没有在播放。")
            return
        
        try:
            # 将 0.0-1.0 的音量转换为百分比
            volume_percent = int(float(value) * 100)
            cmd = f"pactl set-sink-input-volume {pipewire_id} {volume_percent}%"
            
            logging.info(f"执行 pactl 命令: {cmd}")
            process = await asyncio.create_subprocess_shell(
                cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            stdout, stderr = await process.communicate()
            if process.returncode == 0:
                logging.info(f"成功执行 'set_volume' 操作。")
            else:
                logging.error(f"执行 pactl 命令失败: {stderr.decode().strip()}")
        except Exception as e:
            logging.error(f"执行 pactl 命令时出错: {e}")
        return

    # 其他控制继续使用 D-Bus
    dbus_command = ""
    if action in ['playpause', 'next', 'previous']:
        command_map = {
            'playpause': 'PlayPause',
            'next': 'Next',
            'previous': 'Previous'
        }
        command = command_map.get(action)
        dbus_command = (
            f"dbus-send --print-reply "
            f"--dest=org.mpris.MediaPlayer2.yesplaymusic "
            f"/org/mpris/MediaPlayer2 "
            f"org.mpris.MediaPlayer2.Player.{command}"
        )
    elif action == 'seek':
        position_micro = int(value * 1000)
        dbus_command = (
            f"dbus-send --print-reply "
            f"--dest=org.mpris.MediaPlayer2.yesplaymusic "
            f"/org/mpris/MediaPlayer2 "
            f"org.mpris.MediaPlayer2.Player.SetPosition "
            f"objpath:/not/used int64:{position_micro}"
        )
    else:
        logging.warning(f"未知的播放器控制动作: {action}")
        return

    try:
        logging.info(f"执行DBus命令: {dbus_command}")
        process = await asyncio.create_subprocess_shell(
            dbus_command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()
        if process.returncode == 0:
            logging.info(f"成功执行 '{action}' 操作。")
        else:
            logging.error(f"执行DBus命令失败: {stderr.decode().strip()}")
    except Exception as e:
        logging.error(f"执行DBus命令时出错: {e}")