通过PID锁定PipeWire ID,以解决PipeWire元数据无法很好定位不同软件的问题
通过PID锁定PipeWire ID,以解决PipeWire元数据无法很好定位不同软件的问题
你是否曾希望能够用一个简单的工具来控制正在后台播放的音乐?如果你是 Linux 用户,那么这篇文章将带你探索一种强大的方法,利用 pactl
和 D-Bus
这两个工具,来获取播放状态、调整音量,甚至进行播放控制。
这篇文章的核心亮点,在于我们不依赖于播放器固定的接口名称,而是通过查找进程 ID (PID),进而获取PipeWire ID,从而实现对应用程序音量的精准控制。
为什么选择这种方法?问题的根源
许多现代 Linux 应用程序通过 PipeWire(或旧的 PulseAudio)来管理音频。PipeWire 为每个正在播放音频的应用程序创建一个“sink input”,并附加元信息来描述它。然而,现实情况并不总是那么完美。
问题的根源在于,并非所有应用程序的元信息都清晰或唯一。例如,某些应用可能使用一个通用的名称(如“Electron”或“Chromium”),导致难以区分不同的应用实例;有些应用的 application.name
元信息可能根本不存在或为空。
因此,仅仅依赖 PipeWire 的元信息并不总是可靠。这就是我们选择先通过 PID(进程 ID) 来锁定目标的原因。
在 Linux 系统中,PID 是一个进程最可靠的唯一身份标识。通过 pgrep
,我们可以精确地找到一个正在运行的应用程序的所有相关进程,无论它的 PipeWire 元信息有多么混乱。一旦我们获得了这个唯一的 PID,我们再反过来去 pactl
的输出中寻找带有这个 PID 的音频流。
这种“先锁定进程,再反向查找音频流”的策略,就像是先找到了一个人的身份证号,再去茫茫人海中找到他的音频流ID,从而完美绕过了元信息不准确或缺失的问题。
实现思路:核心功能解析与代码
接下来,让我们通过一个实际的 Python 代码来展示这一技术思路是如何实现的。
1. 获取 PipeWire ID
这个步骤是音量控制功能的核心。它将 pgrep
和 pactl
的命令行输出进行解析,实现了 PID 到 PipeWire ID 的映射。
import subprocess
import re
from typing import Optional
def get_pipewire_id_by_process_name(app_name: str) -> Optional[str]:
"""
通过应用程序进程名查找其 PipeWire ID。
"""
try:
# 步骤1: 使用 pgrep 获取应用的 PID
cmd = f"pgrep -if {app_name}"
result = subprocess.check_output(cmd, shell=True, text=True)
pids = result.strip().splitlines()
if not pids:
return None
# 步骤2: 获取所有 PipeWire sink inputs
cmd = "pactl list sink-inputs"
result = subprocess.check_output(cmd, shell=True, text=True)
# 按 "Sink Input #" 分割,方便处理每个 sink input
sink_sections = result.split("Sink Input #")
# 步骤3: 遍历并匹配 PID,提取 PipeWire ID
for pid in pids:
for section in sink_sections[1:]: # 第一个元素是空的
if f'application.process.id = "{pid}"' in section:
id_match = re.match(r'(\d+)', section)
if id_match:
pipewire_id = id_match.group(1)
return pipewire_id
return None
except subprocess.CalledProcessError:
# pgrep 或 pactl 命令失败
return None
except Exception:
# 其他未知错误
return None
2. 控制播放与音量
有了 PipeWire ID,我们就可以选择不同的工具进行控制:
音量控制:
利用 pactl 命令。将要设置的音量值转换为百分比,然后执行 pactl set-sink-input-volume <pipewire_id> <音量百分比>% 命令,就可以精准地调整音量。
播放/暂停、切换歌曲:
这些操作更适合使用 D-Bus。因为许多应用支持标准的 MPRIS 接口,我们可以通过发送 dbus-send 命令来调用播放器的相应方法,例如 org.mpris.MediaPlayer2.Player.PlayPause。
下面是一个异步函数,展示了如何根据操作类型选择不同的控制方法。
import asyncio
from typing import Any
async def control_player_by_name(app_name: str, action: str, value: Any = None):
"""
通过应用名称控制音乐播放器。
Args:
app_name (str): 应用程序的进程名(例如 'yesplaymusic')。
action (str): 控制动作,如 'playpause', 'next', 'set_volume'。
value (Any, optional): 动作需要的参数。
"""
if action == 'set_volume':
pipewire_id = get_pipewire_id_by_process_name(app_name)
if not pipewire_id:
return
try:
volume_percent = int(float(value) * 100)
cmd = f"pactl set-sink-input-volume {pipewire_id} {volume_percent}%"
await asyncio.create_subprocess_shell(cmd)
except Exception:
pass
else:
# D-Bus 控制需要知道应用程序的服务名
dbus_service = f"org.mpris.MediaPlayer2.{app_name}"
dbus_command = ""
# 构建 D-Bus 命令
if action in ['playpause', 'next', 'previous']:
command_map = {
'playpause': 'PlayPause',
'next': 'Next',
'previous': 'Previous'
}
command = command_map.get(action)
dbus_command = (
f"dbus-send --print-reply --dest={dbus_service} "
f"/org/mpris/MediaPlayer2 org.mpris.MediaPlayer2.Player.{command}"
)
if dbus_command:
try:
await asyncio.create_subprocess_shell(dbus_command)
except Exception:
pass
这段代码展示了如何将“通过 PID 查找 PipeWire ID”的思路整合到实际的控制逻辑中,实现了对任何遵循 MPRIS 和使用 PipeWire 的应用程序的通用控制。
相关业务代码:
# -*- coding: utf-8 -*-
import asyncio
import logging
import platform
import re
from typing import Any, Optional
import subprocess
def get_yesplaymusic_pipewire_id():
"""
获取 YesPlayMusic 的 PipeWire ID
返回: str 或 None - 找到时返回 PipeWire ID,未找到时返回 None
"""
try:
# 使用 pgrep 获取 YesPlayMusic 进程的 PID
# -i: 忽略大小写, -f: 匹配完整命令行
cmd = "pgrep -if yesplaymusic"
result = subprocess.check_output(cmd, shell=True, text=True)
pids = result.strip().splitlines()
if not pids:
logging.debug("未找到 YesPlayMusic 进程。")
return None
# 获取 PipeWire sink-inputs
cmd = "pactl list sink-inputs"
result = subprocess.check_output(cmd, shell=True, text=True)
# 按 "Sink Input #" 分割,方便处理每个 sink input
sink_sections = result.split("Sink Input #")
for pid in pids:
for section in sink_sections[1:]: # 第一个元素是空的
# 检查进程ID是否在 sink input 的属性中
if f'application.process.id = "{pid}"' in section:
# 从 section 开头提取 sink input ID
id_match = re.match(r'(\d+)', section)
if id_match:
pipewire_id = id_match.group(1)
logging.debug(f"找到 YesPlayMusic 的 PipeWire ID: {pipewire_id} (PID: {pid})")
return pipewire_id
logging.debug("已找到 YesPlayMusic 进程,但未找到关联的 PipeWire sink input。可能没有在播放。")
return None
except subprocess.CalledProcessError:
logging.debug("查找 YesPlayMusic PipeWire ID 时出错 (pgrep 或 pactl 命令失败)。")
return None
except Exception as e:
logging.error(f"查找 PipeWire ID 时发生未知错误: {e}")
return None
async def get_player_volume() -> Optional[float]:
"""
通过 pactl 获取播放器的当前音量。
此功能仅限于Linux系统。
Returns:
Optional[float]: 返回播放器音量(0.0到1.0之间),如果获取失败则返回None。
"""
if platform.system() != "Linux":
logging.info("非Linux系统,跳过获取音量。")
return None
pipewire_id = get_yesplaymusic_pipewire_id()
if not pipewire_id:
logging.warning("无法获取 YesPlayMusic 的 PipeWire ID,无法获取音量。可能没有在播放。")
return None
try:
# 使用 pactl 获取指定 sink input 的音量
cmd = (
f'pactl list sink-inputs | grep -A20 "Sink Input #{pipewire_id}" | '
f"grep 'Volume:' | head -n1 | grep -oP '\\d{{1,3}}%' | head -n1"
)
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0 and stdout:
volume_str = stdout.decode().strip().replace('%', '')
volume = float(volume_str) / 100.0
logging.info(f"成功获取到当前音量: {volume}")
return volume
else:
error_message = stderr.decode().strip()
# 如果没有输出,也认为是错误
if not error_message and not stdout:
error_message = "pactl 命令没有返回音量信息。"
logging.error(f"获取音量失败: {error_message}")
return None
except Exception as e:
logging.error(f"获取音量时发生错误: {e}")
return None
async def get_player_status() -> Optional[str]:
"""
通过 D-Bus 获取播放器的当前播放状态。
此功能仅限于Linux系统。
Returns:
Optional[str]: 返回播放状态(例如 "Playing", "Paused"),如果获取失败则返回None。
"""
if platform.system() != "Linux":
logging.debug("非Linux系统,跳过获取播放状态。")
return None
dbus_command = (
"dbus-send --print-reply --dest=org.mpris.MediaPlayer2.yesplaymusic "
"/org/mpris/MediaPlayer2 org.freedesktop.DBus.Properties.Get "
"string:'org.mpris.MediaPlayer2.Player' string:'PlaybackStatus'"
)
try:
process = await asyncio.create_subprocess_shell(
dbus_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0 and stdout:
output = stdout.decode().strip()
match = re.search(r'string "(Playing|Paused|Stopped)"', output)
if match:
status = match.group(1)
logging.debug(f"成功获取到播放状态: {status}")
return status
else:
logging.warning(f"无法从DBus输出中解析播放状态: {output}")
return None
else:
error_message = stderr.decode().strip()
if "was not provided by any .service files" in error_message:
logging.debug(f"获取播放状态失败,可能是播放器未运行: {error_message}")
else:
logging.error(f"获取播放状态失败: {error_message}")
return None
except Exception as e:
logging.error(f"获取播放状态时发生错误: {e}")
return None
async def control_player(action: str, value: Any = None):
"""
通过D-Bus或pactl控制音乐播放器。
支持播放/暂停、下一首、上一首、跳转进度和设置音量。
此功能仅限于Linux系统。
Args:
action (str): 控制动作,如 'playpause', 'next', 'previous', 'seek', 'set_volume'。
value (Any, optional): 动作需要的参数。例如,'seek'需要进度(毫秒),'set_volume'需要音量(0.0-1.0)。
"""
if platform.system() != "Linux":
logging.info(f"接收到 '{action}' 指令,但当前系统 ({platform.system()}) 不支持控制。")
return
# 音量控制使用 pactl
if action == 'set_volume':
pipewire_id = get_yesplaymusic_pipewire_id()
if not pipewire_id:
logging.warning("无法获取 YesPlayMusic 的 PipeWire ID,无法设置音量。可能没有在播放。")
return
try:
# 将 0.0-1.0 的音量转换为百分比
volume_percent = int(float(value) * 100)
cmd = f"pactl set-sink-input-volume {pipewire_id} {volume_percent}%"
logging.info(f"执行 pactl 命令: {cmd}")
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
logging.info(f"成功执行 'set_volume' 操作。")
else:
logging.error(f"执行 pactl 命令失败: {stderr.decode().strip()}")
except Exception as e:
logging.error(f"执行 pactl 命令时出错: {e}")
return
# 其他控制继续使用 D-Bus
dbus_command = ""
if action in ['playpause', 'next', 'previous']:
command_map = {
'playpause': 'PlayPause',
'next': 'Next',
'previous': 'Previous'
}
command = command_map.get(action)
dbus_command = (
f"dbus-send --print-reply "
f"--dest=org.mpris.MediaPlayer2.yesplaymusic "
f"/org/mpris/MediaPlayer2 "
f"org.mpris.MediaPlayer2.Player.{command}"
)
elif action == 'seek':
position_micro = int(value * 1000)
dbus_command = (
f"dbus-send --print-reply "
f"--dest=org.mpris.MediaPlayer2.yesplaymusic "
f"/org/mpris/MediaPlayer2 "
f"org.mpris.MediaPlayer2.Player.SetPosition "
f"objpath:/not/used int64:{position_micro}"
)
else:
logging.warning(f"未知的播放器控制动作: {action}")
return
try:
logging.info(f"执行DBus命令: {dbus_command}")
process = await asyncio.create_subprocess_shell(
dbus_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
logging.info(f"成功执行 '{action}' 操作。")
else:
logging.error(f"执行DBus命令失败: {stderr.decode().strip()}")
except Exception as e:
logging.error(f"执行DBus命令时出错: {e}")