Bläddra i källkod

Resolve merge conflict in agi.websocket.go with v2.026

Take the v2.026 rewrite (goroutine-safe wsConn, background reader,
buffered msgChan, pump_messages/available/isClosed APIs) and apply
the logger transformation to the three new log.Println calls.
Claude 2 veckor sedan
förälder
incheckning
ce86f3ab52
1 ändrade filer med 307 tillägg och 139 borttagningar
  1. 307 139
      src/mod/agi/agi.websocket.go

+ 307 - 139
src/mod/agi/agi.websocket.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"net/http"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -15,12 +16,37 @@ import (
 /*
 AJGI WebSocket Request Library
 
-This is a library for allowing AGI based connection upgrade to WebSocket
-Different from other agi module, this do not use the register lib interface
-deal to it special nature.
+This is a library for allowing AGI based connection upgrade to WebSocket.
+Different from other agi modules, this does not use the register lib interface
+due to its special nature.
+
+New functions exposed to AGI scripts:
+
+  websocket.upgrade(timeoutSec)
+    Upgrades the connection and overrides delay() with a message-pumping version
+    so that websocket.onMessage fires naturally during pauses.
+
+  websocket.send(text)         → bool
+  websocket.read(timeoutMs?)   → string | null | false
+    timeoutMs = 0 / omitted → block until message arrives or connection closes
+    timeoutMs > 0           → return null on timeout (connection still open)
+    returns false           → connection is closed
+
+  websocket.available()        → int
+    Number of messages currently waiting in the inbound buffer. Non-blocking.
+
+  websocket.isClosed()         → bool
+    true when the connection is no longer active.
+
+  websocket.onMessage          → assign function(msg) to receive messages
+    msg = { data: string, timestamp: int64 ms, type: int }
+    Fired inside delay() on the script's own goroutine — Otto-safe.
+
+  websocket.close()
 
 Author: tobychui
 */
+
 var upgrader = websocket.Upgrader{
 	ReadBufferSize:  1024,
 	WriteBufferSize: 1024,
@@ -28,207 +54,349 @@ var upgrader = websocket.Upgrader{
 		return true
 	},
 }
-var connections = sync.Map{}
 
-// This is a very special function to check if the connection has been updated or not
-// Return upgrade status (true for already upgraded) and connection uuid
-func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
-	if value, err := vm.Get("_websocket_conn_id"); err == nil {
-		//Exists!
-		//Check if this is undefined
-		if value == otto.UndefinedValue() {
-			//WebSocket connection has closed
-			return false, "", nil
-		}
+// wsMsg is a single inbound WebSocket frame delivered to the AGI script.
+type wsMsg struct {
+	Data      string // text payload
+	Timestamp int64  // arrival time as unix milliseconds
+	Type      int    // gorilla message type (1 = text, 2 = binary)
+}
 
-		//Connection is still live. Try convert it to string
-		connId, err := value.ToString()
-		if err != nil {
-			return false, "", nil
-		}
+// wsConn wraps a gorilla websocket.Conn with a buffered inbound message channel.
+// All fields shared across goroutines are accessed via atomics or the channel itself.
+// The Otto VM is NEVER touched from any goroutine other than the main script goroutine.
+type wsConn struct {
+	conn        *websocket.Conn
+	msgChan     chan wsMsg // filled by the background reader goroutine
+	closed      int32     // 1 when closed; use atomic load/store
+	lastOprTime int64     // unix seconds of last activity; use atomic load/store
+}
 
-		//Load the conenction from SyncMap
-		if c, ok := connections.Load(connId); ok {
-			//Return the conncetion object
-			return true, connId, c.(*websocket.Conn)
-		}
+func newWsConn(c *websocket.Conn) *wsConn {
+	wsc := &wsConn{
+		conn:    c,
+		msgChan: make(chan wsMsg, 128),
+	}
+	atomic.StoreInt64(&wsc.lastOprTime, time.Now().Unix())
+	return wsc
+}
+
+func (w *wsConn) isClosed() bool    { return atomic.LoadInt32(&w.closed) == 1 }
+func (w *wsConn) markClosed()       { atomic.StoreInt32(&w.closed, 1) }
+func (w *wsConn) touchLastOpr()     { atomic.StoreInt64(&w.lastOprTime, time.Now().Unix()) }
+func (w *wsConn) getLastOpr() int64 { return atomic.LoadInt64(&w.lastOprTime) }
 
-		//Connection object not found (Maybe already closed?)
+var connections = sync.Map{}
+
+// checkWebSocketConnectionUpgradeStatus returns whether the current VM has an
+// active WebSocket connection.  Returns (active, connID, *wsConn).
+func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *wsConn) {
+	value, err := vm.Get("_websocket_conn_id")
+	if err != nil || value.IsUndefined() || value.IsNull() {
 		return false, "", nil
+	}
+	connId, err := value.ToString()
+	if err != nil || connId == "" {
+		return false, "", nil
+	}
+	raw, ok := connections.Load(connId)
+	if !ok {
+		return false, "", nil
+	}
+	wsc := raw.(*wsConn)
+	if wsc.isClosed() {
+		return false, connId, nil
+	}
+	return true, connId, wsc
+}
 
+// cleanupWsConn sends a close frame, closes the raw connection, and removes the
+// connection from both the sync.Map and the VM.
+// MUST be called from the main (script) goroutine so vm.Set is safe.
+func cleanupWsConn(vm *otto.Otto, connID string, wsc *wsConn) {
+	if !wsc.isClosed() {
+		wsc.markClosed()
+		wsc.conn.WriteMessage(
+			websocket.CloseMessage,
+			websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
+		)
+		time.Sleep(150 * time.Millisecond)
+		wsc.conn.Close()
 	}
-	return false, "", nil
+	vm.Set("_websocket_conn_id", otto.UndefinedValue())
+	connections.Delete(connID)
+}
+
+// dispatchOnMessage calls websocket.onMessage(msg) on the script goroutine.
+// Passing data through vm.Set avoids the complexities of otto.Value.Call.
+// MUST be called from the main (script) goroutine.
+func dispatchOnMessage(vm *otto.Otto, msg wsMsg) {
+	vm.Set("_ws_incoming", map[string]interface{}{
+		"data":      msg.Data,
+		"timestamp": msg.Timestamp,
+		"type":      msg.Type,
+	})
+	if _, err := vm.Run(`
+		if (typeof websocket !== 'undefined' && typeof websocket.onMessage === 'function') {
+			websocket.onMessage(_ws_incoming);
+		}
+	`); err != nil {
+		agiLogger.PrintAndLog("Agi", fmt.Sprint("*AGI WebSocket* onMessage handler error:", err), nil)
+	}
+	vm.Set("_ws_incoming", otto.UndefinedValue())
 }
 
 func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
 
+	// ── websocket.upgrade(timeoutSeconds) ────────────────────────────────────
 	vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
-		//Check if the user specified any timeout time in seconds
-		//Default to 5 minutes
 		timeout, err := call.Argument(0).ToInteger()
-		if err != nil {
+		if err != nil || timeout <= 0 {
 			timeout = 300
 		}
 
-		//Check if the connection has already been updated
-		connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState {
-			//Already upgraded
-			return otto.TrueValue()
+		if connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm); connState {
+			return otto.TrueValue() // already upgraded
 		}
 
-		//Not upgraded. Upgrade it now
 		c, err := upgrader.Upgrade(w, r, nil)
 		if err != nil {
-			agiLogger.PrintAndLog("Agi", fmt.Sprint("*AGI WebSocket*  WebSocket upgrade failed:", err), nil)
+			agiLogger.PrintAndLog("Agi", fmt.Sprint("*AGI WebSocket* upgrade failed:", err), nil)
 			return otto.FalseValue()
 		}
 
-		//Generate a UUID for this connection
+		wsc := newWsConn(c)
 		connUUID := uuid.NewV4().String()
+		connections.Store(connUUID, wsc)
 		vm.Set("_websocket_conn_id", connUUID)
-		connections.Store(connUUID, c)
-
-		//Record its creation time as opr time
-		vm.Set("_websocket_conn_lastopr", time.Now().Unix())
-
-		//Create a go routine to monitor the connection status and disconnect it if timeup
-		if timeout > 0 {
-			go func() {
-				time.Sleep(1 * time.Second)
-				//Check if the last edit time > timeout time
-				connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-				for connStatus {
-					//For this connection exists
-					if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
-						lastOprTime, err := value.ToInteger()
-						if err != nil {
-							continue
-						}
-						//log.Println(time.Now().Unix(), lastOprTime)
-						if time.Now().Unix()-lastOprTime > timeout {
-							//Timeout! Kill this socket
-							conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
-							time.Sleep(300)
-							conn.Close()
-
-							//Clean up the connection in sync map and vm
-							vm.Set("_websocket_conn_id", otto.UndefinedValue())
-							connections.Delete(connID)
-
-							agiLogger.PrintAndLog("Agi", "*AGI WebSocket* Closing connection due to timeout", nil)
-							break
-						}
-					}
-					time.Sleep(1 * time.Second)
-					connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
-				}
 
+		// Background reader — feeds all inbound frames into msgChan.
+		// Never touches the Otto VM; only updates wsc atomics and the channel.
+		go func() {
+			defer func() {
+				wsc.markClosed()
+				close(wsc.msgChan)
+				// Do NOT call vm.Set here — Otto is not goroutine-safe.
 			}()
-		}
+			for {
+				msgType, message, err := c.ReadMessage()
+				if err != nil {
+					return // connection closed or error
+				}
+				wsc.touchLastOpr()
+				select {
+				case wsc.msgChan <- wsMsg{
+					Data:      string(message),
+					Timestamp: time.Now().UnixMilli(),
+					Type:      msgType,
+				}:
+				default:
+					agiLogger.PrintAndLog("Agi", "*AGI WebSocket* inbound buffer full, dropping frame", nil)
+				}
+			}
+		}()
+
+		// Idle-timeout watcher — closes the raw connection when no activity.
+		// Does NOT touch the VM; closing the connection causes the reader to exit.
+		go func() {
+			ticker := time.NewTicker(1 * time.Second)
+			defer ticker.Stop()
+			for range ticker.C {
+				if wsc.isClosed() {
+					return
+				}
+				if time.Now().Unix()-wsc.getLastOpr() > timeout {
+					agiLogger.PrintAndLog("Agi", "*AGI WebSocket* idle timeout — closing connection", nil)
+					c.Close()
+					return
+				}
+			}
+		}()
+
+		// Override delay() so that websocket.onMessage callbacks fire naturally
+		// inside pauses without the user needing an explicit poll() call.
+		vm.Run(`
+			var _ws_orig_delay = (typeof delay === 'function') ? delay : function(ms){};
+			delay = function(ms){ _websocket_pump_messages(ms); };
+		`)
 
 		return otto.TrueValue()
 	})
 
+	// ── websocket.send(text) ─────────────────────────────────────────────────
 	vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
-		//Get the content to send
 		content, err := call.Argument(0).ToString()
 		if err != nil {
 			g.RaiseError(err)
 			return otto.FalseValue()
 		}
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if !connState {
+			return otto.FalseValue()
+		}
+		if err := wsc.conn.WriteMessage(websocket.TextMessage, []byte(content)); err != nil {
+			cleanupWsConn(vm, connID, wsc)
+			return otto.FalseValue()
+		}
+		wsc.touchLastOpr()
+		return otto.TrueValue()
+	})
 
-		//Send it
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
+	// ── websocket.read(timeout ms) ───────────────────────────────────────────
+	// timeoutMs = 0 or omitted → block until a message arrives or socket closes
+	// timeoutMs > 0            → return null if no message within that many ms
+	// Returns: string on message · null on timeout · false if connection closed
+	vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
+		timeoutMs, _ := call.Argument(0).ToInteger()
+
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
 		if !connState {
-			//Already upgraded
-			//log.Println("*AGI WebSocket* Connection id not found in VM")
+			if connID != "" {
+				// Stale entry — tidy up from the main goroutine
+				vm.Set("_websocket_conn_id", otto.UndefinedValue())
+				connections.Delete(connID)
+			}
 			return otto.FalseValue()
 		}
 
-		err = conn.WriteMessage(1, []byte(content))
-		if err != nil {
+		var msg wsMsg
+		var ok bool
 
-			//Client connection could have been closed. Close the connection
-			conn.Close()
+		if timeoutMs > 0 {
+			select {
+			case msg, ok = <-wsc.msgChan:
+			case <-time.After(time.Duration(timeoutMs) * time.Millisecond):
+				return otto.NullValue() // timed out; connection still open
+			}
+		} else {
+			msg, ok = <-wsc.msgChan // block until message or channel close
+		}
 
-			//Clean up the connection in sync map and vm
-			vm.Set("_websocket_conn_id", otto.UndefinedValue())
-			connections.Delete(connID)
+		if !ok {
+			// Channel closed — background reader exited (connection gone)
+			cleanupWsConn(vm, connID, wsc)
 			return otto.FalseValue()
 		}
 
-		//Write succeed
+		wsc.touchLastOpr()
+		v, err := otto.ToValue(msg.Data)
+		if err != nil {
+			return otto.NullValue()
+		}
+		return v
+	})
 
-		//Update last opr time
-		vm.Set("_websocket_conn_lastopr", time.Now().Unix())
+	// ── websocket.available() ────────────────────────────────────────────────
+	// Returns the number of messages currently waiting in the inbound buffer.
+	// Non-blocking — safe to poll in a tight loop.
+	vm.Set("_websocket_available", func(call otto.FunctionCall) otto.Value {
+		_, _, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		count := 0
+		if wsc != nil {
+			count = len(wsc.msgChan)
+		}
+		v, _ := otto.ToValue(count)
+		return v
+	})
 
+	// ── websocket.isClosed() ─────────────────────────────────────────────────
+	// Returns true when the WebSocket connection is no longer active.
+	vm.Set("_websocket_is_closed", func(call otto.FunctionCall) otto.Value {
+		connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
+		if connState {
+			return otto.FalseValue()
+		}
 		return otto.TrueValue()
 	})
 
-	vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState == true {
-			_, message, err := conn.ReadMessage()
-			if err != nil {
-				//Client connection could have been closed. Close the connection
-				conn.Close()
-
-				//Clean up the connection in sync map and vm
-				vm.Set("_websocket_conn_id", otto.UndefinedValue())
-				connections.Delete(connID)
+	// ── _websocket_pump_messages(ms) ─────────────────────────────────────────
+	// Replaces delay() after upgrade.  Sleeps for ms milliseconds while
+	// dispatching any queued messages to websocket.onMessage.
+	// All JS execution happens on this (main script) goroutine — Otto-safe.
+	//
+	// Important: messages are only consumed from the buffer when onMessage is
+	// actually a function.  When it is null/undefined the function falls back to
+	// a plain sleep so that websocket.available() / websocket.read() can still
+	// see the queued frames afterwards (Mode 2 / manual-read patterns).
+	vm.Set("_websocket_pump_messages", func(call otto.FunctionCall) otto.Value {
+		ms, err := call.Argument(0).ToInteger()
+		if err != nil || ms < 0 {
+			ms = 0
+		}
 
-				agiLogger.PrintAndLog("Agi", "*AGI WebSocket* Trying to read from a closed socket", nil)
-				return otto.FalseValue()
+		_, _, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if wsc == nil {
+			// No active WebSocket — fall back to plain sleep
+			if ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
 			}
-			//Update last opr time
-			vm.Set("_websocket_conn_lastopr", time.Now().Unix())
-
-			//Parse the incoming message
-			incomingString, err := otto.ToValue(string(message))
-			if err != nil {
-				agiLogger.PrintAndLog("Agi", fmt.Sprint(err), nil)
-				//Unable to parse to JavaScript. Something out of the scope of otto?
-				return otto.NullValue()
+			return otto.UndefinedValue()
+		}
+
+		// Check once whether a handler is registered.  We evaluate in JS so
+		// that the typeof check is unambiguous regardless of Otto internals.
+		hasHandlerVal, _ := vm.Run(`typeof websocket !== 'undefined' && typeof websocket.onMessage === 'function'`)
+		hasHandler, _ := hasHandlerVal.ToBoolean()
+		if !hasHandler {
+			// No handler — plain sleep; leave messages in the buffer untouched.
+			if ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
 			}
+			return otto.UndefinedValue()
+		}
 
-			//Return the incoming string to the AGI script
-			return incomingString
-		} else {
-			//WebSocket not exists
-			//log.Println("*AGI WebSocket* Trying to read from a closed socket")
-			return otto.FalseValue()
+		const tickSize = 20 * time.Millisecond
+		deadline := time.Now().Add(time.Duration(ms) * time.Millisecond)
+
+		for {
+			remaining := time.Until(deadline)
+			if remaining <= 0 {
+				break
+			}
+			tick := tickSize
+			if remaining < tick {
+				tick = remaining
+			}
+
+			select {
+			case msg, ok := <-wsc.msgChan:
+				if !ok {
+					// Connection closed while waiting — stop pumping
+					return otto.UndefinedValue()
+				}
+				wsc.touchLastOpr()
+				dispatchOnMessage(vm, msg)
+
+			case <-time.After(tick):
+				// No message in this 20 ms slice — keep waiting
+			}
 		}
+		return otto.UndefinedValue()
 	})
 
+	// ── websocket.close() ────────────────────────────────────────────────────
 	vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState == true {
-			//Close the Websocket gracefully
-			conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
-			time.Sleep(300)
-			conn.Close()
-
-			//Clean up the connection in sync map and vm
-			vm.Set("_websocket_conn_id", otto.UndefinedValue())
-			connections.Delete(connID)
-
-			//Return true value
-			return otto.TrueValue()
-		} else {
-			//Connection not opened or closed already
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if !connState {
 			return otto.FalseValue()
 		}
-
+		cleanupWsConn(vm, connID, wsc)
+		return otto.TrueValue()
 	})
 
-	//Wrap all the native code function into an imagelib class
+	// ── JS wrapper ───────────────────────────────────────────────────────────
 	vm.Run(`
 		var websocket = {};
-		websocket.upgrade = _websocket_upgrade;
-		websocket.send = _websocket_send;
-		websocket.read = _websocket_read;
-		websocket.close = _websocket_close;
-		
+		websocket.upgrade   = _websocket_upgrade;
+		websocket.send      = _websocket_send;
+		websocket.read      = _websocket_read;
+		websocket.close     = _websocket_close;
+		websocket.available = _websocket_available;
+		websocket.isClosed  = _websocket_is_closed;
+
+		// Assign a function(msg) here to receive messages asynchronously.
+		// msg = { data: string, timestamp: number, type: number }
+		// The handler fires inside delay() after websocket.upgrade() is called.
+		websocket.onMessage = null;
 	`)
 }