From 07fb4b2d4a59319ab5ba03d842274aba8182c412 Mon Sep 17 00:00:00 2001 From: Florrie Date: Sat, 11 Jul 2020 17:37:54 -0300 Subject: synchronize playing new tracks across sockets --- backend.js | 16 +++++- general-util.js | 4 +- socket.js | 176 +++++++++++++++++++++++++++++++++++++++++--------------- ui.js | 8 +-- 4 files changed, 151 insertions(+), 53 deletions(-) diff --git a/backend.js b/backend.js index dc819ab..573a4fc 100644 --- a/backend.js +++ b/backend.js @@ -69,6 +69,7 @@ class QueuePlayer extends EventEmitter { this.playingTrack = null this.queueGrouplike = {name: 'Queue', isTheQueue: true, items: []} this.pauseNextTrack = false + this.alwaysStartPaused = false this.playedTrackToEnd = false this.timeData = null @@ -383,7 +384,7 @@ class QueuePlayer extends EventEmitter { } - async play(item, forceStartPaused = false) { + async play(item, forceStartPaused) { if (this.player === null) { throw new Error('Attempted to play before a player was loaded') } @@ -431,10 +432,11 @@ class QueuePlayer extends EventEmitter { this.timeData = null this.time = null this.playingTrack = item + this.emit('playing details', this.playingTrack, oldTrack, this) this.emit('playing', this.playingTrack, oldTrack, this) await this.player.kill() - if (forceStartPaused) { + if (this.alwaysStartPaused || forceStartPaused) { this.player.setPause(true) } else if (this.playedTrackToEnd) { this.player.setPause(this.pauseNextTrack) @@ -637,6 +639,7 @@ class Backend extends EventEmitter { } this.queuePlayers = [] + this.alwaysStartPaused = false this.recordStore = new RecordStore() this.throttleMetadata = throttlePromise(10) @@ -668,6 +671,8 @@ class Backend extends EventEmitter { return error } + queuePlayer.alwaysStartPaused = this.alwaysStartPaused + this.queuePlayers.push(queuePlayer) this.emit('added queue player', queuePlayer) @@ -793,6 +798,13 @@ class Backend extends EventEmitter { return {seconds, string, noticedMissingMetadata, approxSymbol} } + setAlwaysStartPaused(value) { + this.alwaysStartPaused = !!value + for (const queuePlayer of this.queuePlayers) { + queuePlayer.alwaysStartPaused = !!value + } + } + async stopPlayingAll() { for (const queuePlayer of this.queuePlayers) { await queuePlayer.stopPlaying() diff --git a/general-util.js b/general-util.js index e352960..0f5bdd5 100644 --- a/general-util.js +++ b/general-util.js @@ -311,7 +311,7 @@ parseOptions.handleDashless = Symbol() module.exports.parseOptions = parseOptions -module.exports.silenceEvents = function(emitter, eventsToSilence, callback) { +module.exports.silenceEvents = async function(emitter, eventsToSilence, callback) { const oldEmit = emitter.emit emitter.emit = function(event, ...data) { @@ -320,7 +320,7 @@ module.exports.silenceEvents = function(emitter, eventsToSilence, callback) { } } - callback() + await callback() emitter.emit = oldEmit } diff --git a/socket.js b/socket.js index a972f0c..fe48063 100644 --- a/socket.js +++ b/socket.js @@ -114,6 +114,11 @@ function validateCommand(command) { ) )) ) + case 'play': + return ( + typeof command.queuePlayer === 'string' && + isItemRef(command.track) + ) case 'queue': return ( typeof command.queuePlayer === 'string' && @@ -143,10 +148,20 @@ function validateCommand(command) { case 'set-pause': return ( typeof command.queuePlayer === 'string' && - typeof command.paused === 'boolean' + typeof command.paused === 'boolean' && + ( + typeof command.startingTrack === 'boolean' && + command.sender === 'server' + ) || !command.startingTrack ) case 'status': - return typeof command.status === 'string' + return ( + ( + command.status === 'ready-to-resume' && + typeof command.queuePlayer === 'string' + ) || + command.status === 'sync-playback' + ) case 'unqueue': return ( typeof command.queuePlayer === 'string' && @@ -173,57 +188,86 @@ function makeSocketServer() { server.canonicalBackend = null + // readyToResume -> queue player id -> array: socket + const readyToResume = {} + function receivedData(socket, data) { // Parse data as a command and validate it. If invalid, drop this data. - let command - try { - command = deserializeDataToCommand(data) - } catch (error) { - return - } + for (const line of data.toString().trim().split('\n')) { + let command + try { + command = deserializeDataToCommand(line) + } catch (error) { + return + } - command.sender = 'client' + command.sender = 'client' - if (!validateCommand(command)) { - return - } + if (!validateCommand(command)) { + return + } - // If it's a status command, respond appropriately, and return so that it - // is not relayed. - - if (command.code === 'status') { - switch (command.status) { - case 'sync-playback': - for (const QP of server.canonicalBackend.queuePlayers) { - if (QP.timeData) { - socket.write(JSON.stringify({ - sender: 'server', - code: 'seek-to', - queuePlayer: QP.id, - time: QP.timeData.curSecTotal - }) + '\n') - socket.write(JSON.stringify({ - sender: 'server', - code: 'set-pause', - queuePlayer: QP.id, - paused: QP.player.isPaused - })) + // If it's a status command, respond appropriately, and return so that it + // is not relayed. + + if (command.code === 'status') { + switch (command.status) { + case 'ready-to-resume': { + const readySockets = readyToResume[command.queuePlayer] + if (readySockets && !readySockets.includes(socket)) { + readySockets.push(socket) + if (readySockets.length === sockets.length) { + for (const socket of sockets) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: command.queuePlayer, + startingTrack: true, + paused: false + }) + '\n') + } + delete readyToResume[command.queuePlayer] + } } + break } - - break + case 'sync-playback': + for (const QP of server.canonicalBackend.queuePlayers) { + if (QP.timeData) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'seek-to', + queuePlayer: QP.id, + time: QP.timeData.curSecTotal + }) + '\n') + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: QP.id, + startingTrack: true, + paused: QP.player.isPaused + }) + '\n') + } + } + break + } + return } - return - } + // If it's a 'play' command, set up a new readyToResume array. + + if (command.code === 'play') { + readyToResume[command.queuePlayer] = [] + } - // Relay the command to client sockets besides the sender. + // Relay the command to client sockets besides the sender. - const otherSockets = sockets.filter(s => s !== socket) + const otherSockets = sockets.filter(s => s !== socket) - for (const socket of otherSockets) { - socket.write(JSON.stringify(command)) + for (const socket of otherSockets) { + socket.write(JSON.stringify(command) + '\n') + } } } @@ -250,7 +294,7 @@ function makeSocketServer() { sender: 'server', code: 'initialize-backend', backend: savedBackend - })) + }) + '\n') }) return server @@ -266,14 +310,14 @@ function makeSocketClient() { client.sendCommand = function(command) { const data = serializeCommandToData(command) - client.socket.write(data) + client.socket.write(data + '\n') } client.socket.on('data', data => { // Same sort of "guarding" deserialization/validation as in the server // code, because it's possible the client and server backends mismatch. - for (const line of data.toString().split('\n')) { + for (const line of data.toString().trim().split('\n')) { let command try { command = deserializeDataToCommand(line) @@ -298,6 +342,8 @@ function attachBackendToSocketClient(backend, client, { // All actual logic for instances of the mtui backend interacting with each // other through commands lives here. + backend.setAlwaysStartPaused(true) + client.on('command', async command => { switch (command.sender) { case 'server': @@ -343,6 +389,20 @@ function attachBackendToSocketClient(backend, client, { } )) return + case 'play': + if (QP) { + QP.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: QP.id + }) + }) + silenceEvents(QP, ['playing'], () => QP.play( + restoreNewItem(command.track, getPlaylistSources()) + )) + } + return case 'queue': if (QP) silenceEvents(QP, ['queue'], () => QP.queue( restoreNewItem(command.topItem, getPlaylistSources()), @@ -361,9 +421,18 @@ function attachBackendToSocketClient(backend, client, { case 'seek-to': if (QP) silenceEvents(QP, ['seek-to'], () => QP.seekTo(command.time)) return - case 'set-pause': - if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + case 'set-pause': { + let playingThisTrack = true + QP.once('playing new track', () => { + playingThisTrack = false + }) + setTimeout(() => { + if (playingThisTrack) { + if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + } + }, command.startingTrack ? 500 : 0) return + } case 'unqueue': if (QP) silenceEvents(QP, ['unqueue'], () => QP.unqueue( restoreNewItem(command.topItem, getPlaylistSources()) @@ -406,6 +475,23 @@ function attachBackendToSocketClient(backend, client, { }) }) + backend.on('playing', (queuePlayer, track) => { + if (track) { + client.sendCommand({ + code: 'play', + queuePlayer: queuePlayer.id, + track: saveItemReference(track) + }) + queuePlayer.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: queuePlayer.id + }) + }) + } + }) + backend.on('queue', (queuePlayer, topItem, afterItem, opts) => { client.sendCommand({ code: 'queue', diff --git a/ui.js b/ui.js index 1e48877..c9e30e5 100644 --- a/ui.js +++ b/ui.js @@ -425,7 +425,7 @@ class AppElement extends FocusElement { bindListeners() { for (const key of [ - 'handlePlaying', + 'handlePlayingDetails', 'handleReceivedTimeData', 'handleProcessMetadataProgress', 'handleQueueUpdated', @@ -463,7 +463,7 @@ class AppElement extends FocusElement { PIE.on('toggle pause', () => PIE.queuePlayer.togglePause()) queuePlayer.on('received time data', this.handleReceivedTimeData) - queuePlayer.on('playing', this.handlePlaying) + queuePlayer.on('playing details', this.handlePlayingDetails) queuePlayer.on('queue updated', this.handleQueueUpdated) } @@ -492,7 +492,7 @@ class AppElement extends FocusElement { } queuePlayer.removeListener('receivedTimeData', this.handleReceivedTimeData) - queuePlayer.removeListener('playing', this.handlePlaying) + queuePlayer.removeListener('playing details', this.handlePlayingDetails) queuePlayer.removeListener('queue updated', this.handleQueueUpdated) queuePlayer.stopPlaying() } @@ -520,7 +520,7 @@ class AppElement extends FocusElement { } } - async handlePlaying(track, oldTrack, queuePlayer) { + async handlePlayingDetails(track, oldTrack, queuePlayer) { const PIE = this.getPlaybackInfoElementForQueuePlayer(queuePlayer) if (PIE) { PIE.updateTrack() -- cgit 1.3.0-6-gf8a5