« get me outta code hell

synchronize playing new tracks across sockets - mtui - Music Text User Interface - user-friendly command line music player
about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorrie <towerofnix@gmail.com>2020-07-11 17:37:54 -0300
committerFlorrie <towerofnix@gmail.com>2020-07-11 17:42:22 -0300
commit07fb4b2d4a59319ab5ba03d842274aba8182c412 (patch)
tree6dc82202e30a67b064c2ea2c05e9fb13c802c46b
parent991b2f0a8280c31b93ad91d6a215b74183417352 (diff)
synchronize playing new tracks across sockets
-rw-r--r--backend.js16
-rw-r--r--general-util.js4
-rw-r--r--socket.js176
-rw-r--r--ui.js8
4 files changed, 151 insertions, 53 deletions
diff --git a/backend.js b/backend.js
index dc819ab..573a4fc 100644
--- a/backend.js
+++ b/backend.js
@@ -69,6 +69,7 @@ class QueuePlayer extends EventEmitter {
     this.playingTrack = null
     this.queueGrouplike = {name: 'Queue', isTheQueue: true, items: []}
     this.pauseNextTrack = false
+    this.alwaysStartPaused = false
 
     this.playedTrackToEnd = false
     this.timeData = null
@@ -383,7 +384,7 @@ class QueuePlayer extends EventEmitter {
   }
 
 
-  async play(item, forceStartPaused = false) {
+  async play(item, forceStartPaused) {
     if (this.player === null) {
       throw new Error('Attempted to play before a player was loaded')
     }
@@ -431,10 +432,11 @@ class QueuePlayer extends EventEmitter {
       this.timeData = null
       this.time = null
       this.playingTrack = item
+      this.emit('playing details', this.playingTrack, oldTrack, this)
       this.emit('playing', this.playingTrack, oldTrack, this)
 
       await this.player.kill()
-      if (forceStartPaused) {
+      if (this.alwaysStartPaused || forceStartPaused) {
         this.player.setPause(true)
       } else if (this.playedTrackToEnd) {
         this.player.setPause(this.pauseNextTrack)
@@ -637,6 +639,7 @@ class Backend extends EventEmitter {
     }
 
     this.queuePlayers = []
+    this.alwaysStartPaused = false
 
     this.recordStore = new RecordStore()
     this.throttleMetadata = throttlePromise(10)
@@ -668,6 +671,8 @@ class Backend extends EventEmitter {
       return error
     }
 
+    queuePlayer.alwaysStartPaused = this.alwaysStartPaused
+
     this.queuePlayers.push(queuePlayer)
     this.emit('added queue player', queuePlayer)
 
@@ -793,6 +798,13 @@ class Backend extends EventEmitter {
     return {seconds, string, noticedMissingMetadata, approxSymbol}
   }
 
+  setAlwaysStartPaused(value) {
+    this.alwaysStartPaused = !!value
+    for (const queuePlayer of this.queuePlayers) {
+      queuePlayer.alwaysStartPaused = !!value
+    }
+  }
+
   async stopPlayingAll() {
     for (const queuePlayer of this.queuePlayers) {
       await queuePlayer.stopPlaying()
diff --git a/general-util.js b/general-util.js
index e352960..0f5bdd5 100644
--- a/general-util.js
+++ b/general-util.js
@@ -311,7 +311,7 @@ parseOptions.handleDashless = Symbol()
 
 module.exports.parseOptions = parseOptions
 
-module.exports.silenceEvents = function(emitter, eventsToSilence, callback) {
+module.exports.silenceEvents = async function(emitter, eventsToSilence, callback) {
   const oldEmit = emitter.emit
 
   emitter.emit = function(event, ...data) {
@@ -320,7 +320,7 @@ module.exports.silenceEvents = function(emitter, eventsToSilence, callback) {
     }
   }
 
-  callback()
+  await callback()
 
   emitter.emit = oldEmit
 }
diff --git a/socket.js b/socket.js
index a972f0c..fe48063 100644
--- a/socket.js
+++ b/socket.js
@@ -114,6 +114,11 @@ function validateCommand(command) {
               )
             ))
           )
+        case 'play':
+          return (
+            typeof command.queuePlayer === 'string' &&
+            isItemRef(command.track)
+          )
         case 'queue':
           return (
             typeof command.queuePlayer === 'string' &&
@@ -143,10 +148,20 @@ function validateCommand(command) {
         case 'set-pause':
           return (
             typeof command.queuePlayer === 'string' &&
-            typeof command.paused === 'boolean'
+            typeof command.paused === 'boolean' &&
+            (
+              typeof command.startingTrack === 'boolean' &&
+              command.sender === 'server'
+            ) || !command.startingTrack
           )
         case 'status':
-          return typeof command.status === 'string'
+          return (
+            (
+              command.status === 'ready-to-resume' &&
+              typeof command.queuePlayer === 'string'
+            ) ||
+            command.status === 'sync-playback'
+          )
         case 'unqueue':
           return (
             typeof command.queuePlayer === 'string' &&
@@ -173,57 +188,86 @@ function makeSocketServer() {
 
   server.canonicalBackend = null
 
+  // readyToResume -> queue player id -> array: socket
+  const readyToResume = {}
+
   function receivedData(socket, data) {
     // Parse data as a command and validate it. If invalid, drop this data.
 
-    let command
-    try {
-      command = deserializeDataToCommand(data)
-    } catch (error) {
-      return
-    }
+    for (const line of data.toString().trim().split('\n')) {
+      let command
+      try {
+        command = deserializeDataToCommand(line)
+      } catch (error) {
+        return
+      }
 
-    command.sender = 'client'
+      command.sender = 'client'
 
-    if (!validateCommand(command)) {
-      return
-    }
+      if (!validateCommand(command)) {
+        return
+      }
 
-    // If it's a status command, respond appropriately, and return so that it
-    // is not relayed.
-
-    if (command.code === 'status') {
-      switch (command.status) {
-        case 'sync-playback':
-          for (const QP of server.canonicalBackend.queuePlayers) {
-            if (QP.timeData) {
-              socket.write(JSON.stringify({
-                sender: 'server',
-                code: 'seek-to',
-                queuePlayer: QP.id,
-                time: QP.timeData.curSecTotal
-              }) + '\n')
-              socket.write(JSON.stringify({
-                sender: 'server',
-                code: 'set-pause',
-                queuePlayer: QP.id,
-                paused: QP.player.isPaused
-              }))
+      // If it's a status command, respond appropriately, and return so that it
+      // is not relayed.
+
+      if (command.code === 'status') {
+        switch (command.status) {
+          case 'ready-to-resume': {
+            const readySockets = readyToResume[command.queuePlayer]
+            if (readySockets && !readySockets.includes(socket)) {
+              readySockets.push(socket)
+              if (readySockets.length === sockets.length) {
+                for (const socket of sockets) {
+                  socket.write(JSON.stringify({
+                    sender: 'server',
+                    code: 'set-pause',
+                    queuePlayer: command.queuePlayer,
+                    startingTrack: true,
+                    paused: false
+                  }) + '\n')
+                }
+                delete readyToResume[command.queuePlayer]
+              }
             }
+            break
           }
-
-          break
+          case 'sync-playback':
+            for (const QP of server.canonicalBackend.queuePlayers) {
+              if (QP.timeData) {
+                socket.write(JSON.stringify({
+                  sender: 'server',
+                  code: 'seek-to',
+                  queuePlayer: QP.id,
+                  time: QP.timeData.curSecTotal
+                }) + '\n')
+                socket.write(JSON.stringify({
+                  sender: 'server',
+                  code: 'set-pause',
+                  queuePlayer: QP.id,
+                  startingTrack: true,
+                  paused: QP.player.isPaused
+                }) + '\n')
+              }
+            }
+            break
+        }
+        return
       }
 
-      return
-    }
+      // If it's a 'play' command, set up a new readyToResume array.
+
+      if (command.code === 'play') {
+        readyToResume[command.queuePlayer] = []
+      }
 
-    // Relay the command to client sockets besides the sender.
+      // Relay the command to client sockets besides the sender.
 
-    const otherSockets = sockets.filter(s => s !== socket)
+      const otherSockets = sockets.filter(s => s !== socket)
 
-    for (const socket of otherSockets) {
-      socket.write(JSON.stringify(command))
+      for (const socket of otherSockets) {
+        socket.write(JSON.stringify(command) + '\n')
+      }
     }
   }
 
@@ -250,7 +294,7 @@ function makeSocketServer() {
       sender: 'server',
       code: 'initialize-backend',
       backend: savedBackend
-    }))
+    }) + '\n')
   })
 
   return server
@@ -266,14 +310,14 @@ function makeSocketClient() {
 
   client.sendCommand = function(command) {
     const data = serializeCommandToData(command)
-    client.socket.write(data)
+    client.socket.write(data + '\n')
   }
 
   client.socket.on('data', data => {
     // Same sort of "guarding" deserialization/validation as in the server
     // code, because it's possible the client and server backends mismatch.
 
-    for (const line of data.toString().split('\n')) {
+    for (const line of data.toString().trim().split('\n')) {
       let command
       try {
         command = deserializeDataToCommand(line)
@@ -298,6 +342,8 @@ function attachBackendToSocketClient(backend, client, {
   // All actual logic for instances of the mtui backend interacting with each
   // other through commands lives here.
 
+  backend.setAlwaysStartPaused(true)
+
   client.on('command', async command => {
     switch (command.sender) {
       case 'server':
@@ -343,6 +389,20 @@ function attachBackendToSocketClient(backend, client, {
               }
             ))
             return
+          case 'play':
+            if (QP) {
+              QP.once('received time data', data => {
+                client.sendCommand({
+                  code: 'status',
+                  status: 'ready-to-resume',
+                  queuePlayer: QP.id
+                })
+              })
+              silenceEvents(QP, ['playing'], () => QP.play(
+                restoreNewItem(command.track, getPlaylistSources())
+              ))
+            }
+            return
           case 'queue':
             if (QP) silenceEvents(QP, ['queue'], () => QP.queue(
               restoreNewItem(command.topItem, getPlaylistSources()),
@@ -361,9 +421,18 @@ function attachBackendToSocketClient(backend, client, {
           case 'seek-to':
             if (QP) silenceEvents(QP, ['seek-to'], () => QP.seekTo(command.time))
             return
-          case 'set-pause':
-            if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused))
+          case 'set-pause': {
+            let playingThisTrack = true
+            QP.once('playing new track', () => {
+              playingThisTrack = false
+            })
+            setTimeout(() => {
+              if (playingThisTrack) {
+                if (QP) silenceEvents(QP, ['set-pause'], () => QP.setPause(command.paused))
+              }
+            }, command.startingTrack ? 500 : 0)
             return
+          }
           case 'unqueue':
             if (QP) silenceEvents(QP, ['unqueue'], () => QP.unqueue(
               restoreNewItem(command.topItem, getPlaylistSources())
@@ -406,6 +475,23 @@ function attachBackendToSocketClient(backend, client, {
     })
   })
 
+  backend.on('playing', (queuePlayer, track) => {
+    if (track) {
+      client.sendCommand({
+        code: 'play',
+        queuePlayer: queuePlayer.id,
+        track: saveItemReference(track)
+      })
+      queuePlayer.once('received time data', data => {
+        client.sendCommand({
+          code: 'status',
+          status: 'ready-to-resume',
+          queuePlayer: queuePlayer.id
+        })
+      })
+    }
+  })
+
   backend.on('queue', (queuePlayer, topItem, afterItem, opts) => {
     client.sendCommand({
       code: 'queue',
diff --git a/ui.js b/ui.js
index 1e48877..c9e30e5 100644
--- a/ui.js
+++ b/ui.js
@@ -425,7 +425,7 @@ class AppElement extends FocusElement {
 
   bindListeners() {
     for (const key of [
-      'handlePlaying',
+      'handlePlayingDetails',
       'handleReceivedTimeData',
       'handleProcessMetadataProgress',
       'handleQueueUpdated',
@@ -463,7 +463,7 @@ class AppElement extends FocusElement {
     PIE.on('toggle pause', () => PIE.queuePlayer.togglePause())
 
     queuePlayer.on('received time data', this.handleReceivedTimeData)
-    queuePlayer.on('playing', this.handlePlaying)
+    queuePlayer.on('playing details', this.handlePlayingDetails)
     queuePlayer.on('queue updated', this.handleQueueUpdated)
   }
 
@@ -492,7 +492,7 @@ class AppElement extends FocusElement {
     }
 
     queuePlayer.removeListener('receivedTimeData', this.handleReceivedTimeData)
-    queuePlayer.removeListener('playing', this.handlePlaying)
+    queuePlayer.removeListener('playing details', this.handlePlayingDetails)
     queuePlayer.removeListener('queue updated', this.handleQueueUpdated)
     queuePlayer.stopPlaying()
   }
@@ -520,7 +520,7 @@ class AppElement extends FocusElement {
     }
   }
 
-  async handlePlaying(track, oldTrack, queuePlayer) {
+  async handlePlayingDetails(track, oldTrack, queuePlayer) {
     const PIE = this.getPlaybackInfoElementForQueuePlayer(queuePlayer)
     if (PIE) {
       PIE.updateTrack()