Przeglądaj źródła

Add receiver idle timeouts and room.closed handling

Implement server-side room lifecycle cleanup and notify senders before teardown.
Toby Chui 1 tydzień temu
rodzic
commit
f3b9720ba8

+ 105 - 30
src/arozcast.go

@@ -14,9 +14,30 @@ package main
 	  GET  /api/arozcast/ping?code=XXXX     → {"exists":true/false}
 	  POST /api/arozcast/publish            → "OK"  (code=, msg=)
 	  GET  /api/arozcast/ws?code=XXXX       → WebSocket upgrade
+
+	Room lifecycle / idle timeouts
+	──────────────────────────────
+	A sweep goroutine runs every 15 seconds and closes rooms that meet
+	either of the following conditions:
+
+	  1. Receiver idle — the receiver (Arozcast page) sends a
+	     'status.update' frame every 3 s.  If no such frame has been
+	     seen for receiverIdleTimeout (30 s) the receiver is considered
+	     gone and the room is closed.  This guard only activates after
+	     the receiver has connected at least once (lastStatusUpdate is
+	     non-zero), so a freshly-created room is not immediately culled.
+
+	  2. Empty + idle — the room has no connected clients and has been
+	     idle for emptyRoomTimeout (10 min).  This catches rooms whose
+	     owner never opened the Arozcast page.
+
+	Before closing, the backend broadcasts {"topic":"room.closed"} to
+	every connected sender so they can react immediately (emit 'giveup')
+	instead of waiting for the watchdog cycle to fire.
 */
 
 import (
+	"encoding/json"
 	"fmt"
 	"math/rand"
 	"net/http"
@@ -28,6 +49,18 @@ import (
 	"imuslab.com/arozos/mod/utils"
 )
 
+// Idle-timeout constants.  Adjust here if you need different behaviour.
+const (
+	acReceiverIdleTimeout = 30 * time.Second // close room when receiver goes silent
+	acEmptyRoomTimeout    = 10 * time.Minute // clean up empty rooms
+	acSweepInterval       = 15 * time.Second // how often the sweep goroutine runs
+)
+
+// roomClosedMsg is broadcast to all senders before the room is torn down,
+// giving arozcast.js (and the built-in sender apps) a chance to emit
+// 'giveup' immediately rather than going through the full watchdog/retry cycle.
+var roomClosedMsg = []byte(`{"topic":"room.closed","payload":{}}`)
+
 // acClient is one WebSocket participant in a room.
 type acClient struct {
 	conn *websocket.Conn
@@ -41,12 +74,13 @@ func (c *acClient) safeClose() {
 
 // acRoom holds all connected clients sharing a 4-digit code.
 type acRoom struct {
-	code         string
-	owner        string
-	clients      map[*acClient]struct{}
-	mu           sync.Mutex
-	createdAt    time.Time
-	lastActivity time.Time // updated on every client join / leave
+	code             string
+	owner            string
+	clients          map[*acClient]struct{}
+	mu               sync.Mutex
+	createdAt        time.Time
+	lastActivity     time.Time // updated on every client join / leave
+	lastStatusUpdate time.Time // updated each time a 'status.update' frame is relayed
 }
 
 func (r *acRoom) add(c *acClient) {
@@ -73,7 +107,7 @@ func (r *acRoom) broadcast(msg []byte, exclude *acClient) {
 		}
 		select {
 		case c.send <- append([]byte(nil), msg...):
-		default: // drop frame if buffer is full
+		default: // drop frame if send buffer is full
 		}
 	}
 }
@@ -90,23 +124,65 @@ var acUpgrader = websocket.Upgrader{
 	CheckOrigin:     func(r *http.Request) bool { return true },
 }
 
+// acCloseRoom removes a room from the manager and disconnects all its clients.
+// It first broadcasts roomClosedMsg so senders can react before the socket drops.
+// Safe to call even if the room no longer exists.
+func acCloseRoom(mgr *acManager, code string) {
+	mgr.mu.Lock()
+	room, exists := mgr.rooms[code]
+	if exists {
+		delete(mgr.rooms, code)
+	}
+	mgr.mu.Unlock()
+
+	if !exists {
+		return
+	}
+
+	// Notify senders that the room is going away.
+	// broadcast() queues the message into each client's send channel;
+	// the writer goroutine will deliver it before exiting when the channel
+	// is closed below.
+	room.broadcast(roomClosedMsg, nil)
+
+	room.mu.Lock()
+	for c := range room.clients {
+		c.safeClose()
+	}
+	room.mu.Unlock()
+}
+
 func ArozcastInit() {
 	mgr := &acManager{rooms: make(map[string]*acRoom)}
 
-	// Sweep rooms with no active clients that have been idle for 10 minutes.
+	// ── Sweep goroutine ───────────────────────────────────────────────────
+	// Runs every acSweepInterval and closes rooms that match either idle
+	// condition.  Rooms to close are collected while holding a read-lock,
+	// then actually closed (write-lock + WS teardown) after releasing it.
 	go func() {
-		ticker := time.NewTicker(1 * time.Minute)
+		ticker := time.NewTicker(acSweepInterval)
 		defer ticker.Stop()
 		for range ticker.C {
-			mgr.mu.Lock()
+			var toClose []string
+
+			mgr.mu.RLock()
 			for code, room := range mgr.rooms {
 				room.mu.Lock()
-				if len(room.clients) == 0 && time.Since(room.lastActivity) > 10*time.Minute {
-					delete(mgr.rooms, code)
-				}
+				receiverDead := !room.lastStatusUpdate.IsZero() &&
+					time.Since(room.lastStatusUpdate) > acReceiverIdleTimeout
+				emptyIdle := len(room.clients) == 0 &&
+					time.Since(room.lastActivity) > acEmptyRoomTimeout
 				room.mu.Unlock()
+
+				if receiverDead || emptyIdle {
+					toClose = append(toClose, code)
+				}
+			}
+			mgr.mu.RUnlock()
+
+			for _, code := range toClose {
+				acCloseRoom(mgr, code)
 			}
-			mgr.mu.Unlock()
 		}
 	}()
 
@@ -154,22 +230,7 @@ func ArozcastInit() {
 			utils.SendErrorResponse(w, "Missing code")
 			return
 		}
-
-		mgr.mu.Lock()
-		room, exists := mgr.rooms[code]
-		if exists {
-			delete(mgr.rooms, code)
-		}
-		mgr.mu.Unlock()
-
-		if exists {
-			room.mu.Lock()
-			for c := range room.clients {
-				c.safeClose()
-			}
-			room.mu.Unlock()
-		}
-
+		acCloseRoom(mgr, code)
 		utils.SendOK(w)
 	})
 
@@ -249,6 +310,8 @@ func ArozcastInit() {
 		room.add(client)
 
 		// Writer goroutine: drains client.send and writes to the socket.
+		// When the send channel is closed (safeClose), the goroutine delivers
+		// any queued messages (e.g. roomClosedMsg) before closing the connection.
 		go func() {
 			defer conn.Close()
 			for msg := range client.send {
@@ -259,16 +322,28 @@ func ArozcastInit() {
 		}()
 
 		// Reader loop: relay incoming frames to all other room members.
+		// Also inspects each frame: if it is a 'status.update' from the
+		// receiver, refresh lastStatusUpdate so the sweep goroutine knows
+		// the receiver is still alive.
 		defer func() {
 			room.remove(client)
 			client.safeClose()
 		}()
 
+		var topicCheck struct {
+			Topic string `json:"topic"`
+		}
+
 		for {
 			_, msg, err := conn.ReadMessage()
 			if err != nil {
 				break
 			}
+			if json.Unmarshal(msg, &topicCheck) == nil && topicCheck.Topic == "status.update" {
+				room.mu.Lock()
+				room.lastStatusUpdate = time.Now()
+				room.mu.Unlock()
+			}
 			room.broadcast(msg, client)
 		}
 	})

+ 46 - 1
src/web/Arozcast/README.md

@@ -664,6 +664,32 @@ document.addEventListener('visibilitychange', function() {
 | All retries exhausted | Call `resumeLocally()` — fall back to device playback |
 | User explicitly presses "Disconnect" | Send `media.stop`, skip retries, resume locally |
 | Page/tab closed | Send only `media.stop` if explicit disconnect; otherwise let receiver keep playing |
+| Server sends `room.closed` | Give up immediately — no retries; see [receiver idle timeout](#receiver-idle-timeout) |
+
+> **Important:** Do not reset the reconnect counter when the WebSocket opens. Only reset it after the **first `status.update` is received** — this proves the receiver is alive. Resetting on WS open alone creates an infinite loop when the room exists on the server but the receiver page is gone.
+
+```javascript
+ws.onopen = function() {
+    // Do NOT set reconnectCount = 0 here
+    pendingCode = null;
+    castWs = ws; castCode = code; castMode = true;
+    castSend('peer.hello', {});
+    startHeartbeat();
+};
+
+ws.onmessage = function(evt) {
+    var msg = JSON.parse(evt.data);
+    if (msg.topic === 'status.update') {
+        reconnectCount = 0; // receiver confirmed alive — now safe to reset
+        // ... sync playback state
+    } else if (msg.topic === 'room.closed') {
+        // Backend closed the room — stop retrying immediately
+        reconnectCount = 0; pendingCode = null;
+        clearTimeout(reconnectTimer);
+        resumeLocally();
+    }
+};
+```
 
 ---
 
@@ -712,7 +738,26 @@ Setting `repeat === 'all'` does **not** make the receiver loop automatically. In
 Setting `repeat === 'one'` sets `loop = true` on the receiver's media element, so the browser handles looping natively and `media.ended` is never fired.
 
 ### Room lifetime
-Rooms are automatically garbage-collected after **10 minutes of inactivity** (no connected clients). Always call `/api/arozcast/close` when tearing down intentionally so the slot is freed immediately.
+
+Rooms are closed automatically by the server in two cases:
+
+| Condition | Timeout | Notes |
+|-----------|---------|-------|
+| **Receiver idle** | **30 seconds** | The receiver (`index.html`) sends `status.update` every 3 s. If no `status.update` has been seen for 30 s the receiver is considered gone and the room is closed. |
+| **Empty room** | **10 minutes** | The room has no connected WebSocket clients. Catches rooms whose owner never opened the Arozcast page. |
+
+The sweep runs every 15 seconds. When a room is closed by the sweep, the backend first broadcasts `{"topic":"room.closed","payload":{}}` to every connected sender before dropping their sockets. A well-behaved sender should stop retrying immediately on receiving this message.
+
+Always call `/api/arozcast/close` when tearing down intentionally so the slot is freed immediately without waiting for the sweep.
+
+#### Receiver idle timeout
+
+The **30-second receiver idle** guard is the second line of defence against zombie sessions where the Arozcast iframe was force-removed from the DOM without triggering `beforeunload` (so the room was not explicitly closed via `/api/arozcast/close`). When the sweep fires:
+
+1. Room is deleted from the server's room map.
+2. `{"topic":"room.closed","payload":{}}` is broadcast to all connected senders.
+3. All WebSocket connections are closed.
+4. Any sender that receives `room.closed` should give up immediately; any sender that misses it will discover the room is gone when its next reconnect attempt gets a **404 Room not found** response.
 
 ### HTTP publish for non-WS contexts
 AGI scripts and server-side code that cannot hold a WebSocket can use `/api/arozcast/publish` to inject any message into a live room. This is useful for automation (e.g. skip to next track on a timer) without modifying the frontend.

+ 32 - 4
src/web/Arozcast/arozcast.js

@@ -469,7 +469,11 @@ class ArozCast {
 
         ws.onopen = () => {
             clearTimeout(timeout);
-            self._reconnectCount = 0;
+            // Do NOT reset _reconnectCount here — only reset it once the receiver
+            // confirms it is alive by sending 'status.update'.  Resetting on WS
+            // open alone would cause an infinite loop when the room exists on the
+            // server but the receiver is gone (WS opens → no status.update → watchdog
+            // fires → onclose → _scheduleReconnect with count reset to 0 → repeat).
             self._pendingCode    = null;
             self._ws             = ws;
             self.code            = code;
@@ -488,14 +492,38 @@ class ArozCast {
 
     /**
      * Dispatch incoming receiver messages to the event system.
-     * Only `status.update` and `media.ended` are receiver-originated.
+     *
+     * Receiver-originated topics: `status.update`, `media.ended`
+     * Backend-originated topic:   `room.closed` (server terminated the room)
      * All other topics are sender-originated and should not appear here.
      * @private
      */
     _handleIncoming(msg) {
         switch (msg.topic) {
-            case 'status.update': this._emit('status', msg.payload); break;
-            case 'media.ended':   this._emit('ended',  msg.payload); break;
+            case 'status.update':
+                // Receiver confirmed alive — safe to reset the retry counter so a
+                // subsequent real network drop gets the full backoff sequence again.
+                this._reconnectCount = 0;
+                this._emit('status', msg.payload);
+                break;
+            case 'media.ended':
+                this._emit('ended', msg.payload);
+                break;
+            case 'room.closed': {
+                // The backend closed the room (receiver idle timeout).
+                // Emit 'giveup' immediately and mark the close as intentional so
+                // the WS-close event that follows does not trigger reconnect attempts.
+                const savedCode = this.code;
+                this._intentional    = true;
+                this._reconnectCount = 0;
+                this._pendingCode    = null;
+                clearTimeout(this._reconnectTimer);
+                this._reconnectTimer = null;
+                this.code            = null;
+                this.connected       = false;
+                this._emit('giveup', { code: savedCode });
+                break;
+            }
             // Unknown/loopback topics are silently ignored
         }
     }

+ 6 - 1
src/web/Arozcast/arozcast.md

@@ -431,7 +431,10 @@ cast.on('reconnecting', ({ code, attempt, delay }) => {
 
 ### `giveup` — `{ code }`
 
-Fired when all reconnection attempts are exhausted. Resume local playback.
+Fired in two situations:
+
+1. **Retries exhausted** — all reconnection attempts failed (network error, room no longer exists).
+2. **Server closed the room** — the backend broadcast `room.closed` because the Arozcast receiver page was idle for more than 30 seconds. In this case `giveup` fires immediately, without going through the reconnect cycle.
 
 ```javascript
 cast.on('giveup', ({ code }) => {
@@ -440,6 +443,8 @@ cast.on('giveup', ({ code }) => {
 });
 ```
 
+> **Note:** `room.closed` is a backend-internal topic; you never need to handle it directly when using this SDK — it is converted to the `giveup` event automatically.
+
 ---
 
 ### `takeover` — `{}`

+ 2 - 1
src/web/Movie/index.html

@@ -1501,7 +1501,7 @@ function _attemptCastReconnect() {
         ws.onopen = ws.onclose = ws.onerror = null; ws.close();
         _startCastReconnect(code);
     }, 8000);
-    ws.onopen  = function() { clearTimeout(openTimer); castReconnectCount = 0; castPendingCode = null; _castDidReconnect(ws, code); };
+    ws.onopen  = function() { clearTimeout(openTimer); castPendingCode = null; _castDidReconnect(ws, code); };
     ws.onerror = function() {};   // let onclose handle it
     ws.onclose = function() { clearTimeout(openTimer); _startCastReconnect(code); };
 }
@@ -1537,6 +1537,7 @@ function _castDidReconnect(ws, code) {
 
 function _handleCastMessage(msg) {
     if (msg.topic === 'status.update') {
+        castReconnectCount = 0; // receiver confirmed alive — reset retry counter
         castCurrentTime = msg.payload.currentTime || 0;
         castDuration    = msg.payload.duration    || 0;
         castIsPlaying   = msg.payload.isPlaying   || false;

+ 2 - 1
src/web/Musicify/musicify.js

@@ -1431,7 +1431,7 @@ function musicifyApp() {
                 ws.onopen = ws.onclose = ws.onerror = null; ws.close();
                 self._startCastReconnect(code);
             }, 8000);
-            ws.onopen  = function() { clearTimeout(openTimer); self._castReconnectCount = 0; self._castPendingCode = null; self._castDidReconnect(ws, code); };
+            ws.onopen  = function() { clearTimeout(openTimer); self._castPendingCode = null; self._castDidReconnect(ws, code); };
             ws.onerror = function() {};
             ws.onclose = function() { clearTimeout(openTimer); self._startCastReconnect(code); };
         },
@@ -1448,6 +1448,7 @@ function musicifyApp() {
                 try {
                     var msg = JSON.parse(evt.data);
                     if (msg.topic === 'status.update') {
+                        self._castReconnectCount = 0; // receiver confirmed alive — reset retry counter
                         if (!self.isSeeking) self.currentTime = msg.payload.currentTime || 0;
                         self.duration = msg.payload.duration || 0;
                         self.isPlaying = msg.payload.isPlaying || false;

+ 2 - 2
src/web/Photo/photo.js

@@ -1038,7 +1038,7 @@ function _attemptPhotoCastReconnect() {
         ws.onopen = ws.onclose = ws.onerror = null; ws.close();
         _startPhotoCastReconnect(code);
     }, 8000);
-    ws.onopen  = function() { clearTimeout(openTimer); _photoCastReconnectCount = 0; _photoCastPendingCode = null; _photoCastDidReconnect(ws, code); };
+    ws.onopen  = function() { clearTimeout(openTimer); _photoCastPendingCode = null; _photoCastDidReconnect(ws, code); };
     ws.onerror = function() {};
     ws.onclose = function() { clearTimeout(openTimer); _startPhotoCastReconnect(code); };
 }
@@ -1047,7 +1047,7 @@ function _photoCastDidReconnect(ws, code) {
     _photoCastWs = ws;
     _photoCastCode = code;
     _photoCastLastSeen = Date.now();
-    ws.onmessage = function() { _photoCastLastSeen = Date.now(); };
+    ws.onmessage = function() { _photoCastLastSeen = Date.now(); _photoCastReconnectCount = 0; }; // any reply = receiver alive — reset retry counter
     ws.onclose = function() {
         clearInterval(_photoCastPingTimer); clearInterval(_photoCastWatchTimer);
         var savedCode = _photoCastCode;