Browse Source

Add async websocket support in agi

Toby Chui 2 weeks ago
parent
commit
13a36ec15a
3 changed files with 679 additions and 196 deletions
  1. 245 26
      src/mod/agi/README.md
  2. 307 139
      src/mod/agi/agi.websocket.go
  3. 127 31
      src/web/UnitTest/special/websocket.js

+ 245 - 26
src/mod/agi/README.md

@@ -756,13 +756,257 @@ ffmpeg.convertWithProgress("user:/in.mp4", "user:/out.gif", "tmp:/conv_progress.
 
 ## websocket API
 
+The websocket library upgrades the current HTTP connection to a WebSocket session.
+It is only available in script paths reached via a live HTTP request context
+(standard `InterfaceHandler` or token-handler routes — not `execd` children).
+
 Load:
 
 ```javascript
 requirelib("websocket");
 ```
 
-This library is only available in request handlers with active HTTP request/response context.
+> **Note on `delay()` after upgrade** — `websocket.upgrade()` replaces the global
+> `delay()` with a message-pumping version. While the script sleeps inside `delay()`,
+> any queued inbound frames are dispatched to `websocket.onMessage` (if set).
+> `delay()` is therefore the natural yield point in event-driven loops.
+> When `onMessage` is `null` the buffer is left untouched so that
+> `available()` and `read()` can still see the frames.
+
+---
+
+### `websocket.upgrade(timeoutSec)` → `bool`
+
+Upgrades the HTTP connection to WebSocket and starts the background frame reader.
+The connection is closed automatically after `timeoutSec` seconds of idle time
+(default `300`). Also installs the message-pumping `delay()` override.
+
+Returns `false` if the upgrade fails.
+
+```javascript
+requirelib("websocket");
+if (!websocket.upgrade(120)) exit();
+```
+
+---
+
+### `websocket.send(text)` → `bool`
+
+Sends a UTF-8 text frame to the client. Returns `false` if the connection is closed.
+
+```javascript
+websocket.send("Hello from server");
+```
+
+---
+
+### `websocket.read(timeoutMs?)` → `string | null | false`
+
+Reads the next inbound message from the internal buffer.
+
+| Return value | Meaning |
+|---|---|
+| `string` | Message text |
+| `null` | `timeoutMs` elapsed with no message; connection still open |
+| `false` | Connection is closed |
+
+`timeoutMs = 0` or omitted blocks indefinitely until a message arrives or the
+connection closes.
+
+```javascript
+// Block until a message arrives or connection closes
+var msg = websocket.read();
+
+// Wait at most 5 s; returns null on timeout
+var msg = websocket.read(5000);
+
+if (msg === false) { /* connection closed */ }
+if (msg === null)  { /* timed out, still open */ }
+```
+
+---
+
+### `websocket.available()` → `number`
+
+Returns the number of messages currently queued in the inbound buffer.
+Non-blocking — safe to call on every iteration of a tight loop.
+
+```javascript
+if (websocket.available() > 0) {
+    var msg = websocket.read();
+}
+```
+
+---
+
+### `websocket.isClosed()` → `bool`
+
+Returns `true` when the WebSocket connection is no longer active.
+
+```javascript
+while (!websocket.isClosed()) {
+    websocket.send("tick");
+    delay(1000);
+}
+```
+
+---
+
+### `websocket.onMessage`
+
+Assign a `function(msg)` callback to receive messages asynchronously.
+The handler fires inside `delay()` on the script's own goroutine — Otto-safe, no
+concurrent JS execution.
+
+**Message object properties:**
+
+| Property | Type | Description |
+|---|---|---|
+| `msg.data` | `string` | Text payload |
+| `msg.timestamp` | `number` | Arrival time (Unix milliseconds) |
+| `msg.type` | `number` | Frame type: `1` = text, `2` = binary |
+
+```javascript
+websocket.onMessage = function(msg) {
+    console.log("Received at " + msg.timestamp + " ms: " + msg.data);
+};
+```
+
+Set back to `null` to stop receiving callbacks and leave messages in the buffer:
+
+```javascript
+websocket.onMessage = null;
+```
+
+---
+
+### `websocket.close()`
+
+Sends a normal-closure frame and closes the connection.
+
+```javascript
+websocket.close();
+```
+
+---
+
+### Pattern 1 — blocking read with optional timeout
+
+Simplest pattern. `read(timeoutMs)` returns `null` on timeout so the loop can
+send a keep-alive or do other work without blocking forever.
+
+```javascript
+requirelib("websocket");
+if (!websocket.upgrade(120)) exit();
+
+websocket.send("Connected. Commands: echo <text> | stop");
+
+while (true) {
+    var msg = websocket.read(30000); // wait up to 30 s
+
+    if (msg === false) break;        // remote side closed
+    if (msg === null)  {             // 30-second idle timeout
+        websocket.send("Still here.");
+        continue;
+    }
+
+    msg = msg.trim();
+    if (msg === "stop") {
+        websocket.send("Bye!");
+        break;
+    } else if (msg.indexOf("echo ") === 0) {
+        websocket.send(msg.slice(5));
+    } else if (msg !== "") {
+        websocket.send("Unknown command: '" + msg + "'");
+    }
+}
+
+websocket.close();
+```
+
+---
+
+### Pattern 2 — `available()` polling (Arduino-style)
+
+Use when you want to drain all queued frames in one shot each iteration, or when
+the main loop body does other work regardless of incoming messages.
+
+`onMessage` must be `null` (the default) so that `delay()` does **not** consume
+frames behind your back.
+
+```javascript
+requirelib("websocket");
+if (!websocket.upgrade(120)) exit();
+
+websocket.send("available() polling mode.");
+
+while (true) {
+    if (websocket.isClosed()) break;
+
+    var n = websocket.available();
+    if (n > 0) {
+        // Drain all waiting frames without blocking
+        for (var i = 0; i < n; i++) {
+            var msg = websocket.read(); // data already queued, returns immediately
+            if (msg === false) break;
+            msg = msg.trim();
+            if (msg === "stop") {
+                websocket.send("Bye!");
+                websocket.close();
+                break;
+            }
+            websocket.send("Echo: " + msg);
+        }
+    } else {
+        delay(500); // sleep; buffer is untouched because onMessage is null
+    }
+}
+```
+
+---
+
+### Pattern 3 — `onMessage` callback with `delay()` pump
+
+Event-driven style. The callback fires inside `delay()` on the script goroutine.
+Use a shared variable to hand data from the callback to the main loop.
+
+```javascript
+requirelib("websocket");
+if (!websocket.upgrade(120)) exit();
+
+websocket.send("onMessage mode. Commands: echo <text> | stop");
+
+var lastMessage = "";
+
+websocket.onMessage = function(msg) {
+    // Runs on the script goroutine during delay() — safe to update shared state
+    lastMessage = msg.data;
+};
+
+while (true) {
+    if (lastMessage !== "") {
+        var msg = lastMessage.trim();
+        lastMessage = "";
+
+        if (msg === "stop") {
+            websocket.send("Bye!");
+            break;
+        } else if (msg.indexOf("echo ") === 0) {
+            websocket.send(msg.slice(5));
+        } else if (msg !== "") {
+            websocket.send("Unknown command: '" + msg + "'");
+        }
+    }
+
+    if (websocket.isClosed()) break;
+
+    // delay() pumps the inbound channel and fires onMessage for each queued frame
+    delay(100);
+}
+
+websocket.onMessage = null;
+websocket.close();
+```
 
 ## Scheduler Library (`scheduler`)
 
@@ -846,31 +1090,6 @@ The scheduler calls `cron.agi` with the permissions of the user who approved it,
 
 ## Examples
 
-### Complete File Upload Handler
-### `websocket.upgrade(timeoutSec)`
-Upgrades current HTTP request to WebSocket. Default timeout is 300 seconds.
-
-```javascript
-if (!websocket.upgrade(300)) exit();
-```
-
-### `websocket.send(text)`
-```javascript
-websocket.send("hello client");
-```
-
-### `websocket.read()`
-Returns incoming message string, or `false` when closed.
-
-```javascript
-var msg = websocket.read();
-```
-
-### `websocket.close()`
-```javascript
-websocket.close();
-```
-
 ### Background Scheduler (webapp backend)
 
 A typical webapp has three files that work together to set up a background task.

+ 307 - 139
src/mod/agi/agi.websocket.go

@@ -4,6 +4,7 @@ import (
 	"log"
 	"net/http"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -15,12 +16,37 @@ import (
 /*
 AJGI WebSocket Request Library
 
-This is a library for allowing AGI based connection upgrade to WebSocket
-Different from other agi module, this do not use the register lib interface
-deal to it special nature.
+This is a library for allowing AGI based connection upgrade to WebSocket.
+Different from other agi modules, this does not use the register lib interface
+due to its special nature.
+
+New functions exposed to AGI scripts:
+
+  websocket.upgrade(timeoutSec)
+    Upgrades the connection and overrides delay() with a message-pumping version
+    so that websocket.onMessage fires naturally during pauses.
+
+  websocket.send(text)         → bool
+  websocket.read(timeoutMs?)   → string | null | false
+    timeoutMs = 0 / omitted → block until message arrives or connection closes
+    timeoutMs > 0           → return null on timeout (connection still open)
+    returns false           → connection is closed
+
+  websocket.available()        → int
+    Number of messages currently waiting in the inbound buffer. Non-blocking.
+
+  websocket.isClosed()         → bool
+    true when the connection is no longer active.
+
+  websocket.onMessage          → assign function(msg) to receive messages
+    msg = { data: string, timestamp: int64 ms, type: int }
+    Fired inside delay() on the script's own goroutine — Otto-safe.
+
+  websocket.close()
 
 Author: tobychui
 */
+
 var upgrader = websocket.Upgrader{
 	ReadBufferSize:  1024,
 	WriteBufferSize: 1024,
@@ -28,207 +54,349 @@ var upgrader = websocket.Upgrader{
 		return true
 	},
 }
-var connections = sync.Map{}
 
-// This is a very special function to check if the connection has been updated or not
-// Return upgrade status (true for already upgraded) and connection uuid
-func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
-	if value, err := vm.Get("_websocket_conn_id"); err == nil {
-		//Exists!
-		//Check if this is undefined
-		if value == otto.UndefinedValue() {
-			//WebSocket connection has closed
-			return false, "", nil
-		}
+// wsMsg is a single inbound WebSocket frame delivered to the AGI script.
+type wsMsg struct {
+	Data      string // text payload
+	Timestamp int64  // arrival time as unix milliseconds
+	Type      int    // gorilla message type (1 = text, 2 = binary)
+}
 
-		//Connection is still live. Try convert it to string
-		connId, err := value.ToString()
-		if err != nil {
-			return false, "", nil
-		}
+// wsConn wraps a gorilla websocket.Conn with a buffered inbound message channel.
+// All fields shared across goroutines are accessed via atomics or the channel itself.
+// The Otto VM is NEVER touched from any goroutine other than the main script goroutine.
+type wsConn struct {
+	conn        *websocket.Conn
+	msgChan     chan wsMsg // filled by the background reader goroutine
+	closed      int32      // 1 when closed; use atomic load/store
+	lastOprTime int64      // unix seconds of last activity; use atomic load/store
+}
 
-		//Load the conenction from SyncMap
-		if c, ok := connections.Load(connId); ok {
-			//Return the conncetion object
-			return true, connId, c.(*websocket.Conn)
-		}
+func newWsConn(c *websocket.Conn) *wsConn {
+	wsc := &wsConn{
+		conn:    c,
+		msgChan: make(chan wsMsg, 128),
+	}
+	atomic.StoreInt64(&wsc.lastOprTime, time.Now().Unix())
+	return wsc
+}
+
+func (w *wsConn) isClosed() bool    { return atomic.LoadInt32(&w.closed) == 1 }
+func (w *wsConn) markClosed()       { atomic.StoreInt32(&w.closed, 1) }
+func (w *wsConn) touchLastOpr()     { atomic.StoreInt64(&w.lastOprTime, time.Now().Unix()) }
+func (w *wsConn) getLastOpr() int64 { return atomic.LoadInt64(&w.lastOprTime) }
 
-		//Connection object not found (Maybe already closed?)
+var connections = sync.Map{}
+
+// checkWebSocketConnectionUpgradeStatus returns whether the current VM has an
+// active WebSocket connection.  Returns (active, connID, *wsConn).
+func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *wsConn) {
+	value, err := vm.Get("_websocket_conn_id")
+	if err != nil || value.IsUndefined() || value.IsNull() {
 		return false, "", nil
+	}
+	connId, err := value.ToString()
+	if err != nil || connId == "" {
+		return false, "", nil
+	}
+	raw, ok := connections.Load(connId)
+	if !ok {
+		return false, "", nil
+	}
+	wsc := raw.(*wsConn)
+	if wsc.isClosed() {
+		return false, connId, nil
+	}
+	return true, connId, wsc
+}
 
+// cleanupWsConn sends a close frame, closes the raw connection, and removes the
+// connection from both the sync.Map and the VM.
+// MUST be called from the main (script) goroutine so vm.Set is safe.
+func cleanupWsConn(vm *otto.Otto, connID string, wsc *wsConn) {
+	if !wsc.isClosed() {
+		wsc.markClosed()
+		wsc.conn.WriteMessage(
+			websocket.CloseMessage,
+			websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
+		)
+		time.Sleep(150 * time.Millisecond)
+		wsc.conn.Close()
 	}
-	return false, "", nil
+	vm.Set("_websocket_conn_id", otto.UndefinedValue())
+	connections.Delete(connID)
+}
+
+// dispatchOnMessage calls websocket.onMessage(msg) on the script goroutine.
+// Passing data through vm.Set avoids the complexities of otto.Value.Call.
+// MUST be called from the main (script) goroutine.
+func dispatchOnMessage(vm *otto.Otto, msg wsMsg) {
+	vm.Set("_ws_incoming", map[string]interface{}{
+		"data":      msg.Data,
+		"timestamp": msg.Timestamp,
+		"type":      msg.Type,
+	})
+	if _, err := vm.Run(`
+		if (typeof websocket !== 'undefined' && typeof websocket.onMessage === 'function') {
+			websocket.onMessage(_ws_incoming);
+		}
+	`); err != nil {
+		log.Println("*AGI WebSocket* onMessage handler error:", err)
+	}
+	vm.Set("_ws_incoming", otto.UndefinedValue())
 }
 
 func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
 
+	// ── websocket.upgrade(timeoutSeconds) ────────────────────────────────────
 	vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
-		//Check if the user specified any timeout time in seconds
-		//Default to 5 minutes
 		timeout, err := call.Argument(0).ToInteger()
-		if err != nil {
+		if err != nil || timeout <= 0 {
 			timeout = 300
 		}
 
-		//Check if the connection has already been updated
-		connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState {
-			//Already upgraded
-			return otto.TrueValue()
+		if connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm); connState {
+			return otto.TrueValue() // already upgraded
 		}
 
-		//Not upgraded. Upgrade it now
 		c, err := upgrader.Upgrade(w, r, nil)
 		if err != nil {
-			log.Print("*AGI WebSocket*  WebSocket upgrade failed:", err)
+			log.Println("*AGI WebSocket* upgrade failed:", err)
 			return otto.FalseValue()
 		}
 
-		//Generate a UUID for this connection
+		wsc := newWsConn(c)
 		connUUID := uuid.NewV4().String()
+		connections.Store(connUUID, wsc)
 		vm.Set("_websocket_conn_id", connUUID)
-		connections.Store(connUUID, c)
-
-		//Record its creation time as opr time
-		vm.Set("_websocket_conn_lastopr", time.Now().Unix())
-
-		//Create a go routine to monitor the connection status and disconnect it if timeup
-		if timeout > 0 {
-			go func() {
-				time.Sleep(1 * time.Second)
-				//Check if the last edit time > timeout time
-				connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-				for connStatus {
-					//For this connection exists
-					if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
-						lastOprTime, err := value.ToInteger()
-						if err != nil {
-							continue
-						}
-						//log.Println(time.Now().Unix(), lastOprTime)
-						if time.Now().Unix()-lastOprTime > timeout {
-							//Timeout! Kill this socket
-							conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
-							time.Sleep(300)
-							conn.Close()
-
-							//Clean up the connection in sync map and vm
-							vm.Set("_websocket_conn_id", otto.UndefinedValue())
-							connections.Delete(connID)
-
-							log.Println("*AGI WebSocket* Closing connection due to timeout")
-							break
-						}
-					}
-					time.Sleep(1 * time.Second)
-					connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
-				}
 
+		// Background reader — feeds all inbound frames into msgChan.
+		// Never touches the Otto VM; only updates wsc atomics and the channel.
+		go func() {
+			defer func() {
+				wsc.markClosed()
+				close(wsc.msgChan)
+				// Do NOT call vm.Set here — Otto is not goroutine-safe.
 			}()
-		}
+			for {
+				msgType, message, err := c.ReadMessage()
+				if err != nil {
+					return // connection closed or error
+				}
+				wsc.touchLastOpr()
+				select {
+				case wsc.msgChan <- wsMsg{
+					Data:      string(message),
+					Timestamp: time.Now().UnixMilli(),
+					Type:      msgType,
+				}:
+				default:
+					log.Println("*AGI WebSocket* inbound buffer full, dropping frame")
+				}
+			}
+		}()
+
+		// Idle-timeout watcher — closes the raw connection when no activity.
+		// Does NOT touch the VM; closing the connection causes the reader to exit.
+		go func() {
+			ticker := time.NewTicker(1 * time.Second)
+			defer ticker.Stop()
+			for range ticker.C {
+				if wsc.isClosed() {
+					return
+				}
+				if time.Now().Unix()-wsc.getLastOpr() > timeout {
+					log.Println("*AGI WebSocket* idle timeout — closing connection")
+					c.Close()
+					return
+				}
+			}
+		}()
+
+		// Override delay() so that websocket.onMessage callbacks fire naturally
+		// inside pauses without the user needing an explicit poll() call.
+		vm.Run(`
+			var _ws_orig_delay = (typeof delay === 'function') ? delay : function(ms){};
+			delay = function(ms){ _websocket_pump_messages(ms); };
+		`)
 
 		return otto.TrueValue()
 	})
 
+	// ── websocket.send(text) ─────────────────────────────────────────────────
 	vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
-		//Get the content to send
 		content, err := call.Argument(0).ToString()
 		if err != nil {
 			g.RaiseError(err)
 			return otto.FalseValue()
 		}
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if !connState {
+			return otto.FalseValue()
+		}
+		if err := wsc.conn.WriteMessage(websocket.TextMessage, []byte(content)); err != nil {
+			cleanupWsConn(vm, connID, wsc)
+			return otto.FalseValue()
+		}
+		wsc.touchLastOpr()
+		return otto.TrueValue()
+	})
 
-		//Send it
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
+	// ── websocket.read(timeout ms) ───────────────────────────────────────────
+	// timeoutMs = 0 or omitted → block until a message arrives or socket closes
+	// timeoutMs > 0            → return null if no message within that many ms
+	// Returns: string on message · null on timeout · false if connection closed
+	vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
+		timeoutMs, _ := call.Argument(0).ToInteger()
+
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
 		if !connState {
-			//Already upgraded
-			//log.Println("*AGI WebSocket* Connection id not found in VM")
+			if connID != "" {
+				// Stale entry — tidy up from the main goroutine
+				vm.Set("_websocket_conn_id", otto.UndefinedValue())
+				connections.Delete(connID)
+			}
 			return otto.FalseValue()
 		}
 
-		err = conn.WriteMessage(1, []byte(content))
-		if err != nil {
+		var msg wsMsg
+		var ok bool
 
-			//Client connection could have been closed. Close the connection
-			conn.Close()
+		if timeoutMs > 0 {
+			select {
+			case msg, ok = <-wsc.msgChan:
+			case <-time.After(time.Duration(timeoutMs) * time.Millisecond):
+				return otto.NullValue() // timed out; connection still open
+			}
+		} else {
+			msg, ok = <-wsc.msgChan // block until message or channel close
+		}
 
-			//Clean up the connection in sync map and vm
-			vm.Set("_websocket_conn_id", otto.UndefinedValue())
-			connections.Delete(connID)
+		if !ok {
+			// Channel closed — background reader exited (connection gone)
+			cleanupWsConn(vm, connID, wsc)
 			return otto.FalseValue()
 		}
 
-		//Write succeed
+		wsc.touchLastOpr()
+		v, err := otto.ToValue(msg.Data)
+		if err != nil {
+			return otto.NullValue()
+		}
+		return v
+	})
 
-		//Update last opr time
-		vm.Set("_websocket_conn_lastopr", time.Now().Unix())
+	// ── websocket.available() ────────────────────────────────────────────────
+	// Returns the number of messages currently waiting in the inbound buffer.
+	// Non-blocking — safe to poll in a tight loop.
+	vm.Set("_websocket_available", func(call otto.FunctionCall) otto.Value {
+		_, _, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		count := 0
+		if wsc != nil {
+			count = len(wsc.msgChan)
+		}
+		v, _ := otto.ToValue(count)
+		return v
+	})
 
+	// ── websocket.isClosed() ─────────────────────────────────────────────────
+	// Returns true when the WebSocket connection is no longer active.
+	vm.Set("_websocket_is_closed", func(call otto.FunctionCall) otto.Value {
+		connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
+		if connState {
+			return otto.FalseValue()
+		}
 		return otto.TrueValue()
 	})
 
-	vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState == true {
-			_, message, err := conn.ReadMessage()
-			if err != nil {
-				//Client connection could have been closed. Close the connection
-				conn.Close()
-
-				//Clean up the connection in sync map and vm
-				vm.Set("_websocket_conn_id", otto.UndefinedValue())
-				connections.Delete(connID)
+	// ── _websocket_pump_messages(ms) ─────────────────────────────────────────
+	// Replaces delay() after upgrade.  Sleeps for ms milliseconds while
+	// dispatching any queued messages to websocket.onMessage.
+	// All JS execution happens on this (main script) goroutine — Otto-safe.
+	//
+	// Important: messages are only consumed from the buffer when onMessage is
+	// actually a function.  When it is null/undefined the function falls back to
+	// a plain sleep so that websocket.available() / websocket.read() can still
+	// see the queued frames afterwards (Mode 2 / manual-read patterns).
+	vm.Set("_websocket_pump_messages", func(call otto.FunctionCall) otto.Value {
+		ms, err := call.Argument(0).ToInteger()
+		if err != nil || ms < 0 {
+			ms = 0
+		}
 
-				log.Println("*AGI WebSocket* Trying to read from a closed socket")
-				return otto.FalseValue()
+		_, _, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if wsc == nil {
+			// No active WebSocket — fall back to plain sleep
+			if ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
 			}
-			//Update last opr time
-			vm.Set("_websocket_conn_lastopr", time.Now().Unix())
-
-			//Parse the incoming message
-			incomingString, err := otto.ToValue(string(message))
-			if err != nil {
-				log.Println(err)
-				//Unable to parse to JavaScript. Something out of the scope of otto?
-				return otto.NullValue()
+			return otto.UndefinedValue()
+		}
+
+		// Check once whether a handler is registered.  We evaluate in JS so
+		// that the typeof check is unambiguous regardless of Otto internals.
+		hasHandlerVal, _ := vm.Run(`typeof websocket !== 'undefined' && typeof websocket.onMessage === 'function'`)
+		hasHandler, _ := hasHandlerVal.ToBoolean()
+		if !hasHandler {
+			// No handler — plain sleep; leave messages in the buffer untouched.
+			if ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
 			}
+			return otto.UndefinedValue()
+		}
 
-			//Return the incoming string to the AGI script
-			return incomingString
-		} else {
-			//WebSocket not exists
-			//log.Println("*AGI WebSocket* Trying to read from a closed socket")
-			return otto.FalseValue()
+		const tickSize = 20 * time.Millisecond
+		deadline := time.Now().Add(time.Duration(ms) * time.Millisecond)
+
+		for {
+			remaining := time.Until(deadline)
+			if remaining <= 0 {
+				break
+			}
+			tick := tickSize
+			if remaining < tick {
+				tick = remaining
+			}
+
+			select {
+			case msg, ok := <-wsc.msgChan:
+				if !ok {
+					// Connection closed while waiting — stop pumping
+					return otto.UndefinedValue()
+				}
+				wsc.touchLastOpr()
+				dispatchOnMessage(vm, msg)
+
+			case <-time.After(tick):
+				// No message in this 20 ms slice — keep waiting
+			}
 		}
+		return otto.UndefinedValue()
 	})
 
+	// ── websocket.close() ────────────────────────────────────────────────────
 	vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
-		connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
-		if connState == true {
-			//Close the Websocket gracefully
-			conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
-			time.Sleep(300)
-			conn.Close()
-
-			//Clean up the connection in sync map and vm
-			vm.Set("_websocket_conn_id", otto.UndefinedValue())
-			connections.Delete(connID)
-
-			//Return true value
-			return otto.TrueValue()
-		} else {
-			//Connection not opened or closed already
+		connState, connID, wsc := checkWebSocketConnectionUpgradeStatus(vm)
+		if !connState {
 			return otto.FalseValue()
 		}
-
+		cleanupWsConn(vm, connID, wsc)
+		return otto.TrueValue()
 	})
 
-	//Wrap all the native code function into an imagelib class
+	// ── JS wrapper ───────────────────────────────────────────────────────────
 	vm.Run(`
 		var websocket = {};
-		websocket.upgrade = _websocket_upgrade;
-		websocket.send = _websocket_send;
-		websocket.read = _websocket_read;
-		websocket.close = _websocket_close;
-		
+		websocket.upgrade   = _websocket_upgrade;
+		websocket.send      = _websocket_send;
+		websocket.read      = _websocket_read;
+		websocket.close     = _websocket_close;
+		websocket.available = _websocket_available;
+		websocket.isClosed  = _websocket_is_closed;
+
+		// Assign a function(msg) here to receive messages asynchronously.
+		// msg = { data: string, timestamp: number, type: number }
+		// The handler fires inside delay() after websocket.upgrade() is called.
+		websocket.onMessage = null;
 	`)
 }

+ 127 - 31
src/web/UnitTest/special/websocket.js

@@ -1,59 +1,155 @@
 /*
     WebSocket Test Script
 
-    Supports an interactive command loop:
-      echo <text>  — sends <text> back to the client
-      stop         — closes the connection gracefully
+    Demonstrates three read patterns:
 
-    Author: tobychui
-*/
+    MODE 1 — blocking read with timeout  (default)
+      Send "mode2" or "mode3" to switch to another demo.
+      echo <text>  → echoes text back
+      stop         → closes the connection
 
-function setup() {
-    if (!requirelib("websocket")) {
-        console.log("WebSocket library load failed");
-        return false;
-    }
+    MODE 2 — available() polling (Arduino-style)
+      Uses websocket.available() + websocket.read(0) in a tight loop.
 
-    // Upgrade to WebSocket; 120-second idle timeout
-    if (!websocket.upgrade(120)) {
-        console.log("WebSocket upgrade failed");
-        return false;
-    }
+    MODE 3 — onMessage callback
+      Assigns websocket.onMessage; the handler fires inside delay().
+      echo <text>  → echoes text back
+      stop         → closes the connection
 
+    Author: tobychui
+*/
+
+if (!requirelib("websocket")) {
+    console.log("WebSocket library load failed");
+} else if (!websocket.upgrade(120)) {
+    // upgrade() also overrides delay() with a message-pumping version
+    console.log("WebSocket upgrade failed");
+} else {
     console.log("WebSocket opened");
-    return true;
+    runMode1();
+    websocket.close();
+    console.log("WebSocket closed");
 }
 
-function commandLoop() {
-    websocket.send("Connected. Commands: echo <text> | stop");
+// ── MODE 1: blocking read with optional millisecond timeout ──────────────────
+function runMode1() {
+    websocket.send("[Mode 1] Blocking read with timeout. Commands: echo <text> | stop | mode2 | mode3");
 
     while (true) {
-        var msg = websocket.read();
+        // Block up to 30 s waiting for a message; returns null on timeout, false if closed
+        var msg = websocket.read(30000);
 
-        // null means the connection was closed or timed out
-        if (msg == null) {
-            console.log("WebSocket read returned null — closing");
+        if (msg === false) {
+            // Connection was closed remotely
             break;
         }
+        if (msg === null) {
+            // 30-second idle timeout — send a ping to keep things alive
+            websocket.send("[Mode 1] Still here. 30 s idle timeout reached.");
+            continue;
+        }
 
         msg = msg.trim();
 
         if (msg === "stop") {
             websocket.send("Bye!");
-            break;
+            return;
+        } else if (msg === "mode2") {
+            websocket.send("Switching to Mode 2 (available polling)...");
+            runMode2();
+            return;
+        } else if (msg === "mode3") {
+            websocket.send("Switching to Mode 3 (onMessage callback)...");
+            runMode3();
+            return;
         } else if (msg.indexOf("echo ") === 0) {
             websocket.send(msg.slice(5));
-        } else if (msg === "") {
-            // ignore empty messages
+        } else if (msg !== "") {
+            websocket.send("[Mode 1] Unknown command: '" + msg + "'");
+        }
+    }
+}
+
+// ── MODE 2: available() + non-blocking read ───────────────────────────────────
+function runMode2() {
+    websocket.send("[Mode 2] available() polling. Commands: echo <text> | stop | mode1");
+
+    while (true) {
+        if (websocket.isClosed()) {
+            break;
+        }
+
+        var pending = websocket.available();
+        if (pending > 0) {
+            // Read all queued messages without blocking
+            for (var i = 0; i < pending; i++) {
+                var msg = websocket.read(0); // 0 = block; channel already has data
+                if (msg === false) { return; }
+                if (msg === null)  { continue; }
+
+                msg = msg.trim();
+                if (msg === "stop") {
+                    websocket.send("Bye!");
+                    return;
+                } else if (msg === "mode1") {
+                    websocket.send("Switching to Mode 1...");
+                    runMode1();
+                    return;
+                } else if (msg.indexOf("echo ") === 0) {
+                    websocket.send(msg.slice(5));
+                } else if (msg !== "") {
+                    websocket.send("[Mode 2] Unknown command: '" + msg + "'");
+                }
+            }
         } else {
-            websocket.send("Unknown command: '" + msg + "'");
+            // Nothing waiting — report queue depth and sleep briefly
+            websocket.send("[Mode 2] Queue empty (available=" + pending + "). Sleeping 500 ms...");
+            delay(500); // delay() is now message-pumping, but onMessage is null here
         }
     }
 }
 
-if (setup()) {
-    commandLoop();
-    websocket.close();
-} else {
-    console.log("WebSocket setup failed");
+// ── MODE 3: onMessage callback ────────────────────────────────────────────────
+function runMode3() {
+    websocket.send("[Mode 3] onMessage callback. Commands: echo <text> | stop | mode1");
+
+    var lastMessage = "";
+
+    // Assign the async handler — fires inside delay() on the script's goroutine
+    websocket.onMessage = function(msg) {
+        // msg = { data: string, timestamp: number, type: number }
+        lastMessage = msg.data;
+        console.log("onMessage fired at " + msg.timestamp + " ms: " + msg.data);
+    };
+
+    while (true) {
+        if (lastMessage !== "") {
+            var msg = lastMessage.trim();
+            lastMessage = "";
+
+            if (msg === "stop") {
+                websocket.send("Bye!");
+                websocket.onMessage = null;
+                return;
+            } else if (msg === "mode1") {
+                websocket.send("Switching to Mode 1...");
+                websocket.onMessage = null;
+                runMode1();
+                return;
+            } else if (msg.indexOf("echo ") === 0) {
+                websocket.send(msg.slice(5));
+            } else if (msg !== "") {
+                websocket.send("[Mode 3] Unknown command: '" + msg + "'");
+            }
+        }
+
+        if (websocket.isClosed()) {
+            break;
+        }
+
+        // delay() pumps the inbound channel and fires onMessage for each frame
+        delay(100);
+    }
+
+    websocket.onMessage = null;
 }