diff options
author | Florrie <towerofnix@gmail.com> | 2020-07-11 17:37:54 -0300 |
---|---|---|
committer | (quasar) nebula <qznebula@protonmail.com> | 2024-05-16 19:03:04 -0300 |
commit | f88d7c4959e7603714a5bb8f25807889a25304ee (patch) | |
tree | 5ea79ed92134e25d59e49bbfc82dd19161a565b6 | |
parent | 2c7e3c8fb279f20da3d1b4f5610e65dc43a22ac2 (diff) |
synchronize playing new tracks across sockets
-rw-r--r-- | backend.js | 15 | ||||
-rw-r--r-- | general-util.js | 4 | ||||
-rw-r--r-- | socket.js | 176 | ||||
-rw-r--r-- | ui.js | 8 |
4 files changed, 150 insertions, 53 deletions
diff --git a/backend.js b/backend.js index c59dfdf..6e669d9 100644 --- a/backend.js +++ b/backend.js @@ -67,6 +67,7 @@ class QueuePlayer extends EventEmitter { this.queueGrouplike = {name: 'Queue', isTheQueue: true, items: []} this.pauseNextTrack = false this.queueEndMode = 'end' // end, loop, shuffle + this.alwaysStartPaused = false this.playedTrackToEnd = false this.timeData = null @@ -369,7 +370,6 @@ class QueuePlayer extends EventEmitter { this.clearPlayingTrack() } - async play(item, startTime = 0, forceStartPaused = false) { if (this.player === null) { throw new Error('Attempted to play before a player was loaded') @@ -418,10 +418,11 @@ class QueuePlayer extends EventEmitter { this.timeData = null this.time = null this.playingTrack = item + this.emit('playing details', this.playingTrack, oldTrack, startTime, this) this.emit('playing', this.playingTrack, oldTrack, startTime, this) await this.player.kill() - if (forceStartPaused) { + if (this.alwaysStartPaused || forceStartPaused) { this.player.setPause(true) } else if (this.playedTrackToEnd) { this.player.setPause(this.pauseNextTrack) @@ -687,6 +688,7 @@ export default class Backend extends EventEmitter { } this.queuePlayers = [] + this.alwaysStartPaused = false this.recordStore = new RecordStore() this.throttleMetadata = throttlePromise(10) @@ -718,6 +720,8 @@ export default class Backend extends EventEmitter { return error } + queuePlayer.alwaysStartPaused = this.alwaysStartPaused + this.queuePlayers.push(queuePlayer) this.emit('added queue player', queuePlayer) @@ -845,6 +849,13 @@ export default class Backend extends EventEmitter { return {seconds, string, noticedMissingMetadata, approxSymbol} } + setAlwaysStartPaused(value) { + this.alwaysStartPaused = !!value + for (const queuePlayer of this.queuePlayers) { + queuePlayer.alwaysStartPaused = !!value + } + } + async stopPlayingAll() { for (const queuePlayer of this.queuePlayers) { await queuePlayer.stopPlaying() diff --git a/general-util.js b/general-util.js index b4491de..4bfd491 100644 --- a/general-util.js +++ b/general-util.js @@ -336,7 +336,7 @@ export async function parseOptions(options, optionDescriptorMap) { parseOptions.handleDashless = Symbol() -export function silenceEvents(emitter, eventsToSilence, callback) { +export async function silenceEvents(emitter, eventsToSilence, callback) { const oldEmit = emitter.emit emitter.emit = function(event, ...data) { @@ -345,7 +345,7 @@ export function silenceEvents(emitter, eventsToSilence, callback) { } } - callback() + await callback() emitter.emit = oldEmit } diff --git a/socket.js b/socket.js index be337e2..61d3a62 100644 --- a/socket.js +++ b/socket.js @@ -114,6 +114,11 @@ function validateCommand(command) { ) )) ) + case 'play': + return ( + typeof command.queuePlayer === 'string' && + isItemRef(command.track) + ) case 'queue': return ( typeof command.queuePlayer === 'string' && @@ -143,10 +148,20 @@ function validateCommand(command) { case 'set-pause': return ( typeof command.queuePlayer === 'string' && - typeof command.paused === 'boolean' + typeof command.paused === 'boolean' && + ( + typeof command.startingTrack === 'boolean' && + command.sender === 'server' + ) || !command.startingTrack ) case 'status': - return typeof command.status === 'string' + return ( + ( + command.status === 'ready-to-resume' && + typeof command.queuePlayer === 'string' + ) || + command.status === 'sync-playback' + ) case 'unqueue': return ( typeof command.queuePlayer === 'string' && @@ -173,57 +188,86 @@ export function makeSocketServer() { server.canonicalBackend = null + // readyToResume -> queue player id -> array: socket + const readyToResume = {} + function receivedData(socket, data) { // Parse data as a command and validate it. If invalid, drop this data. - let command - try { - command = deserializeDataToCommand(data) - } catch (error) { - return - } + for (const line of data.toString().trim().split('\n')) { + let command + try { + command = deserializeDataToCommand(line) + } catch (error) { + return + } - command.sender = 'client' + command.sender = 'client' - if (!validateCommand(command)) { - return - } + if (!validateCommand(command)) { + return + } - // If it's a status command, respond appropriately, and return so that it - // is not relayed. - - if (command.code === 'status') { - switch (command.status) { - case 'sync-playback': - for (const QP of server.canonicalBackend.queuePlayers) { - if (QP.timeData) { - socket.write(JSON.stringify({ - sender: 'server', - code: 'seek-to', - queuePlayer: QP.id, - time: QP.timeData.curSecTotal - }) + '\n') - socket.write(JSON.stringify({ - sender: 'server', - code: 'set-pause', - queuePlayer: QP.id, - paused: QP.player.isPaused - })) + // If it's a status command, respond appropriately, and return so that it + // is not relayed. + + if (command.code === 'status') { + switch (command.status) { + case 'ready-to-resume': { + const readySockets = readyToResume[command.queuePlayer] + if (readySockets && !readySockets.includes(socket)) { + readySockets.push(socket) + if (readySockets.length === sockets.length) { + for (const socket of sockets) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: command.queuePlayer, + startingTrack: true, + paused: false + }) + '\n') + } + delete readyToResume[command.queuePlayer] + } } + break } - - break + case 'sync-playback': + for (const QP of server.canonicalBackend.queuePlayers) { + if (QP.timeData) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'seek-to', + queuePlayer: QP.id, + time: QP.timeData.curSecTotal + }) + '\n') + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: QP.id, + startingTrack: true, + paused: QP.player.isPaused + }) + '\n') + } + } + break + } + return } - return - } + // If it's a 'play' command, set up a new readyToResume array. + + if (command.code === 'play') { + readyToResume[command.queuePlayer] = [] + } - // Relay the command to client sockets besides the sender. + // Relay the command to client sockets besides the sender. - const otherSockets = sockets.filter(s => s !== socket) + const otherSockets = sockets.filter(s => s !== socket) - for (const socket of otherSockets) { - socket.write(JSON.stringify(command)) + for (const socket of otherSockets) { + socket.write(JSON.stringify(command) + '\n') + } } } @@ -250,7 +294,7 @@ export function makeSocketServer() { sender: 'server', code: 'initialize-backend', backend: savedBackend - })) + }) + '\n') }) return server @@ -266,14 +310,14 @@ export function makeSocketClient() { client.sendCommand = function(command) { const data = serializeCommandToData(command) - client.socket.write(data) + client.socket.write(data + '\n') } client.socket.on('data', data => { // Same sort of "guarding" deserialization/validation as in the server // code, because it's possible the client and server backends mismatch. - for (const line of data.toString().split('\n')) { + for (const line of data.toString().trim().split('\n')) { let command try { command = deserializeDataToCommand(line) @@ -298,6 +342,8 @@ export function attachBackendToSocketClient(backend, client, { // All actual logic for instances of the mtui backend interacting with each // other through commands lives here. + backend.setAlwaysStartPaused(true) + client.on('command', async command => { switch (command.sender) { case 'server': @@ -343,6 +389,20 @@ export function attachBackendToSocketClient(backend, client, { } )) return + case 'play': + if (QP) { + QP.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: QP.id + }) + }) + silenceEvents(QP, ['playing'], () => QP.play( + restoreNewItem(command.track, getPlaylistSources()) + )) + } + return case 'queue': if (QP) silenceEvents(QP, ['queue'], () => QP.queue( restoreNewItem(command.topItem, getPlaylistSources()), @@ -361,9 +421,18 @@ export function attachBackendToSocketClient(backend, client, { case 'seek-to': if (QP) silenceEvents(QP, ['seek-to'], () => QP.seekTo(command.time)) return - case 'set-pause': - if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + case 'set-pause': { + let playingThisTrack = true + QP.once('playing new track', () => { + playingThisTrack = false + }) + setTimeout(() => { + if (playingThisTrack) { + if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + } + }, command.startingTrack ? 500 : 0) return + } case 'unqueue': if (QP) silenceEvents(QP, ['unqueue'], () => QP.unqueue( restoreNewItem(command.topItem, getPlaylistSources()) @@ -406,6 +475,23 @@ export function attachBackendToSocketClient(backend, client, { }) }) + backend.on('playing', (queuePlayer, track) => { + if (track) { + client.sendCommand({ + code: 'play', + queuePlayer: queuePlayer.id, + track: saveItemReference(track) + }) + queuePlayer.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: queuePlayer.id + }) + }) + } + }) + backend.on('queue', (queuePlayer, topItem, afterItem, opts) => { client.sendCommand({ code: 'queue', diff --git a/ui.js b/ui.js index dd1123b..f9f8e5f 100644 --- a/ui.js +++ b/ui.js @@ -459,7 +459,7 @@ export default class AppElement extends FocusElement { bindListeners() { for (const key of [ - 'handlePlaying', + 'handlePlayingDetails', 'handleReceivedTimeData', 'handleProcessMetadataProgress', 'handleQueueUpdated', @@ -498,7 +498,7 @@ export default class AppElement extends FocusElement { PIE.on('toggle pause', () => PIE.queuePlayer.togglePause()) queuePlayer.on('received time data', this.handleReceivedTimeData) - queuePlayer.on('playing', this.handlePlaying) + queuePlayer.on('playing details', this.handlePlayingDetails) queuePlayer.on('queue updated', this.handleQueueUpdated) } @@ -527,7 +527,7 @@ export default class AppElement extends FocusElement { } queuePlayer.removeListener('receivedTimeData', this.handleReceivedTimeData) - queuePlayer.removeListener('playing', this.handlePlaying) + queuePlayer.removeListener('playing details', this.handlePlayingDetails) queuePlayer.removeListener('queue updated', this.handleQueueUpdated) queuePlayer.stopPlaying() } @@ -561,7 +561,7 @@ export default class AppElement extends FocusElement { this.updateQueueLengthLabel() } - async handlePlaying(track, oldTrack, startTime, queuePlayer) { + async handlePlayingDetails(track, oldTrack, startTime, queuePlayer) { const PIE = this.getPlaybackInfoElementForQueuePlayer(queuePlayer) if (PIE) { PIE.updateTrack() |