From f88d7c4959e7603714a5bb8f25807889a25304ee Mon Sep 17 00:00:00 2001 From: Florrie Date: Sat, 11 Jul 2020 17:37:54 -0300 Subject: synchronize playing new tracks across sockets --- socket.js | 176 ++++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 131 insertions(+), 45 deletions(-) (limited to 'socket.js') diff --git a/socket.js b/socket.js index be337e2..61d3a62 100644 --- a/socket.js +++ b/socket.js @@ -114,6 +114,11 @@ function validateCommand(command) { ) )) ) + case 'play': + return ( + typeof command.queuePlayer === 'string' && + isItemRef(command.track) + ) case 'queue': return ( typeof command.queuePlayer === 'string' && @@ -143,10 +148,20 @@ function validateCommand(command) { case 'set-pause': return ( typeof command.queuePlayer === 'string' && - typeof command.paused === 'boolean' + typeof command.paused === 'boolean' && + ( + typeof command.startingTrack === 'boolean' && + command.sender === 'server' + ) || !command.startingTrack ) case 'status': - return typeof command.status === 'string' + return ( + ( + command.status === 'ready-to-resume' && + typeof command.queuePlayer === 'string' + ) || + command.status === 'sync-playback' + ) case 'unqueue': return ( typeof command.queuePlayer === 'string' && @@ -173,57 +188,86 @@ export function makeSocketServer() { server.canonicalBackend = null + // readyToResume -> queue player id -> array: socket + const readyToResume = {} + function receivedData(socket, data) { // Parse data as a command and validate it. If invalid, drop this data. - let command - try { - command = deserializeDataToCommand(data) - } catch (error) { - return - } + for (const line of data.toString().trim().split('\n')) { + let command + try { + command = deserializeDataToCommand(line) + } catch (error) { + return + } - command.sender = 'client' + command.sender = 'client' - if (!validateCommand(command)) { - return - } + if (!validateCommand(command)) { + return + } - // If it's a status command, respond appropriately, and return so that it - // is not relayed. - - if (command.code === 'status') { - switch (command.status) { - case 'sync-playback': - for (const QP of server.canonicalBackend.queuePlayers) { - if (QP.timeData) { - socket.write(JSON.stringify({ - sender: 'server', - code: 'seek-to', - queuePlayer: QP.id, - time: QP.timeData.curSecTotal - }) + '\n') - socket.write(JSON.stringify({ - sender: 'server', - code: 'set-pause', - queuePlayer: QP.id, - paused: QP.player.isPaused - })) + // If it's a status command, respond appropriately, and return so that it + // is not relayed. + + if (command.code === 'status') { + switch (command.status) { + case 'ready-to-resume': { + const readySockets = readyToResume[command.queuePlayer] + if (readySockets && !readySockets.includes(socket)) { + readySockets.push(socket) + if (readySockets.length === sockets.length) { + for (const socket of sockets) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: command.queuePlayer, + startingTrack: true, + paused: false + }) + '\n') + } + delete readyToResume[command.queuePlayer] + } } + break } - - break + case 'sync-playback': + for (const QP of server.canonicalBackend.queuePlayers) { + if (QP.timeData) { + socket.write(JSON.stringify({ + sender: 'server', + code: 'seek-to', + queuePlayer: QP.id, + time: QP.timeData.curSecTotal + }) + '\n') + socket.write(JSON.stringify({ + sender: 'server', + code: 'set-pause', + queuePlayer: QP.id, + startingTrack: true, + paused: QP.player.isPaused + }) + '\n') + } + } + break + } + return } - return - } + // If it's a 'play' command, set up a new readyToResume array. + + if (command.code === 'play') { + readyToResume[command.queuePlayer] = [] + } - // Relay the command to client sockets besides the sender. + // Relay the command to client sockets besides the sender. - const otherSockets = sockets.filter(s => s !== socket) + const otherSockets = sockets.filter(s => s !== socket) - for (const socket of otherSockets) { - socket.write(JSON.stringify(command)) + for (const socket of otherSockets) { + socket.write(JSON.stringify(command) + '\n') + } } } @@ -250,7 +294,7 @@ export function makeSocketServer() { sender: 'server', code: 'initialize-backend', backend: savedBackend - })) + }) + '\n') }) return server @@ -266,14 +310,14 @@ export function makeSocketClient() { client.sendCommand = function(command) { const data = serializeCommandToData(command) - client.socket.write(data) + client.socket.write(data + '\n') } client.socket.on('data', data => { // Same sort of "guarding" deserialization/validation as in the server // code, because it's possible the client and server backends mismatch. - for (const line of data.toString().split('\n')) { + for (const line of data.toString().trim().split('\n')) { let command try { command = deserializeDataToCommand(line) @@ -298,6 +342,8 @@ export function attachBackendToSocketClient(backend, client, { // All actual logic for instances of the mtui backend interacting with each // other through commands lives here. + backend.setAlwaysStartPaused(true) + client.on('command', async command => { switch (command.sender) { case 'server': @@ -343,6 +389,20 @@ export function attachBackendToSocketClient(backend, client, { } )) return + case 'play': + if (QP) { + QP.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: QP.id + }) + }) + silenceEvents(QP, ['playing'], () => QP.play( + restoreNewItem(command.track, getPlaylistSources()) + )) + } + return case 'queue': if (QP) silenceEvents(QP, ['queue'], () => QP.queue( restoreNewItem(command.topItem, getPlaylistSources()), @@ -361,9 +421,18 @@ export function attachBackendToSocketClient(backend, client, { case 'seek-to': if (QP) silenceEvents(QP, ['seek-to'], () => QP.seekTo(command.time)) return - case 'set-pause': - if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + case 'set-pause': { + let playingThisTrack = true + QP.once('playing new track', () => { + playingThisTrack = false + }) + setTimeout(() => { + if (playingThisTrack) { + if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused)) + } + }, command.startingTrack ? 500 : 0) return + } case 'unqueue': if (QP) silenceEvents(QP, ['unqueue'], () => QP.unqueue( restoreNewItem(command.topItem, getPlaylistSources()) @@ -406,6 +475,23 @@ export function attachBackendToSocketClient(backend, client, { }) }) + backend.on('playing', (queuePlayer, track) => { + if (track) { + client.sendCommand({ + code: 'play', + queuePlayer: queuePlayer.id, + track: saveItemReference(track) + }) + queuePlayer.once('received time data', data => { + client.sendCommand({ + code: 'status', + status: 'ready-to-resume', + queuePlayer: queuePlayer.id + }) + }) + } + }) + backend.on('queue', (queuePlayer, topItem, afterItem, opts) => { client.sendCommand({ code: 'queue', -- cgit 1.3.0-6-gf8a5