rtm.ts 9.0 KB


  1. import AgoraRTM, { ChannelType, RTMClient, RTMConfig, MetadataItem } from "agora-rtm-sdk"
  2. import { mapToArray, isString, getDefaultLanguageSelect } from "../../common"
  3. import { AGEventEmitter } from "../events"
  4. import {
  5. RtmEvents,
  6. ISimpleUserInfo,
  7. RtmMessageType,
  8. RtmPresenceMessageData,
  9. ValueOf,
  10. ILanguageItem,
  11. } from "./types"
  12. import { ISttData } from "../../types"
  13. import { DEFAULT_RTM_CONFIG } from "./constant"
  14. const { RTM } = AgoraRTM
  15. const CHANNEL_TYPE: ChannelType = "MESSAGE"
  16. const LOCK_STT = "lock_stt"
  17. export class RtmManager extends AGEventEmitter<RtmEvents> {
  18. client?: RTMClient
  19. private rtmConfig: RTMConfig = DEFAULT_RTM_CONFIG
  20. channel: string = ""
  21. userId: string = ""
  22. userName: string = ""
  23. private userMap: Map<string, ISimpleUserInfo> = new Map()
  24. private joined: boolean = false
  25. constructor() {
  26. super()
  27. }
  28. async join({ channel, userId, userName }: { channel: string; userId: string; userName: string }) {
  29. if (this.joined) {
  30. return
  31. }
  32. this.userId = userId
  33. this.userName = userName
  34. this.channel = channel
  35. // 获取RTM配置和Token
  36. const { agoraClient } = await import("@/client/api")
  37. const response = await agoraClient.token.$get({
  38. query: { type: 'rtm', channel, userId }
  39. })
  40. if (!response.ok) {
  41. throw new Error('RTM配置获取失败')
  42. }
  43. const data = await response.json()
  44. if (!this.client) {
  45. this.client = new RTM(data.appId, userId, this.rtmConfig)
  46. }
  47. this._listenRtmEvents()
  48. // 直接使用API返回的token,避免重复调用
  49. await this.client.login({ token: data.token })
  50. this.joined = true
  51. // subscribe message channel
  52. await this.client.subscribe(channel, {
  53. withPresence: true,
  54. withMetadata: true,
  55. })
  56. // check host
  57. await this._checkHost()
  58. // update user info
  59. await this._updateUserInfo()
  60. // set lock
  61. this._setLock()
  62. }
  63. async updateSttData(data: ISttData) {
  64. return await this._setChannelMetadata(data)
  65. }
  66. async updateLanguages(languages: ILanguageItem[]) {
  67. const message: {
  68. transcribe1: string
  69. translate1List: string[]
  70. transcribe2: string
  71. translate2List: string[]
  72. } = {
  73. transcribe1: "",
  74. translate1List: [],
  75. transcribe2: "",
  76. translate2List: [],
  77. }
  78. const language1 = languages[0]
  79. if (language1.source) {
  80. message.transcribe1 = language1.source
  81. }
  82. if (language1.target) {
  83. message.translate1List.push(...language1.target)
  84. }
  85. const language2 = languages[1]
  86. if (language2) {
  87. if (language2.source) {
  88. message.transcribe2 = language2.source
  89. }
  90. if (language2.target) {
  91. message.translate2List.push(...language2.target)
  92. }
  93. }
  94. return await this._setChannelMetadata(message)
  95. }
  96. async destroy() {
  97. await this.client?.logout()
  98. this._resetData()
  99. }
  100. async acquireLock() {
  101. // if not accquire lock, will throw error
  102. return await this.client?.lock.acquireLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  103. }
  104. async releaseLock() {
  105. return await this.client?.lock.releaseLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  106. }
  107. // --------------------- private methods ---------------------
  108. private async _updateUserInfo() {
  109. await this._setPresenceState({
  110. type: RtmMessageType.UserInfo,
  111. userId: this.userId,
  112. userName: this.userName,
  113. })
  114. }
  115. private async _removeChannelMetadata(metadata?: Record<string, any>) {
  116. const data: MetadataItem[] = []
  117. const options: any = {}
  118. for (const key in metadata) {
  119. data.push({
  120. key,
  121. value: JSON.stringify(metadata[key]),
  122. })
  123. }
  124. if (data.length) {
  125. options.data = data
  126. }
  127. await this?.client?.storage.removeChannelMetadata(this.channel, CHANNEL_TYPE, options)
  128. }
  129. private async _setChannelMetadata(metadata?: Record<string, any>) {
  130. const data: MetadataItem[] = []
  131. for (const key in metadata) {
  132. data.push({
  133. key,
  134. value: JSON.stringify(metadata[key]),
  135. })
  136. }
  137. await this?.client?.storage.setChannelMetadata(this.channel, CHANNEL_TYPE, data)
  138. }
  139. private async _setPresenceState(attr: ValueOf<RtmPresenceMessageData>) {
  140. if (!this.joined) {
  141. throw new Error("You must join the channel first")
  142. }
  143. const state: Record<string, string> = {}
  144. for (const key in attr) {
  145. const value = attr[key as keyof typeof attr]
  146. state[key] = isString(value) ? value : JSON.stringify(value)
  147. }
  148. return await this?.client?.presence.setState(this.channel, CHANNEL_TYPE, state)
  149. }
  150. private _listenRtmEvents() {
  151. this.client?.addEventListener("status", (res) => {
  152. this.emit("status", res)
  153. })
  154. this.client?.addEventListener("presence", (res) => {
  155. console.log("[test] presence", res)
  156. const { channelName, eventType, snapshot = [], stateChanged, publisher } = res
  157. if (channelName == this.channel) {
  158. switch (eventType) {
  159. case "SNAPSHOT":
  160. this._dealPresenceSnapshot(snapshot as any[])
  161. break
  162. case "REMOTE_STATE_CHANGED":
  163. this._dealPresenceRemoteState(stateChanged)
  164. break
  165. case "REMOTE_JOIN":
  166. break
  167. case "REMOTE_LEAVE":
  168. if (this.userMap.has(publisher)) {
  169. this.userMap.delete(publisher)
  170. this._emitUserListChanged()
  171. }
  172. break
  173. case "REMOTE_TIMEOUT":
  174. if (this.userMap.has(publisher)) {
  175. this.userMap.delete(publisher)
  176. this._emitUserListChanged()
  177. }
  178. break
  179. }
  180. }
  181. })
  182. this.client?.addEventListener("storage", (res) => {
  183. console.log("[test] storage", res)
  184. const { eventType, data, channelName } = res
  185. const { metadata } = data
  186. if (channelName == this.channel) {
  187. switch (eventType) {
  188. case "SNAPSHOT":
  189. this._dealStorageDataChanged(metadata)
  190. break
  191. case "UPDATE":
  192. this._dealStorageDataChanged(metadata)
  193. break
  194. case "REMOVE":
  195. break
  196. }
  197. }
  198. })
  199. }
  200. private _dealPresenceRemoteState(stateChanged: any) {
  201. switch (stateChanged.type) {
  202. case RtmMessageType.UserInfo:
  203. const userInfo = {
  204. userName: stateChanged.userName,
  205. userId: stateChanged.userId,
  206. }
  207. if (userInfo.userId) {
  208. this.userMap.set(userInfo.userId, userInfo)
  209. this._emitUserListChanged()
  210. }
  211. break
  212. }
  213. }
  214. private _dealPresenceSnapshot(snapshot?: any[]) {
  215. if (!snapshot?.length) {
  216. return
  217. }
  218. let changed = false
  219. for (const v of snapshot) {
  220. const { states } = v
  221. switch (states.type) {
  222. case RtmMessageType.UserInfo:
  223. const userInfo = {
  224. userName: states.userName,
  225. userId: states.userId,
  226. }
  227. if (userInfo.userId && userInfo.userId != this.userId) {
  228. this.userMap.set(userInfo.userId, userInfo)
  229. changed = true
  230. }
  231. break
  232. }
  233. }
  234. if (changed) {
  235. this._emitUserListChanged()
  236. }
  237. }
  238. private _dealStorageDataChanged(metadata: any) {
  239. const {
  240. transcribe1,
  241. translate1List,
  242. transcribe2,
  243. translate2List,
  244. status,
  245. taskId,
  246. token,
  247. startTime,
  248. duration,
  249. } = metadata
  250. if (transcribe1?.value) {
  251. const parseTranscribe1 = JSON.parse(transcribe1.value)
  252. const parseTranslate1 = JSON.parse(translate1List.value)
  253. const parseTranscribe2 = JSON.parse(transcribe2.value)
  254. const parseTranslate2 = JSON.parse(translate2List.value)
  255. this.emit("languagesChanged", {
  256. transcribe1: parseTranscribe1,
  257. translate1List: parseTranslate1,
  258. transcribe2: parseTranscribe2,
  259. translate2List: parseTranslate2,
  260. })
  261. } else {
  262. this.emit("languagesChanged", getDefaultLanguageSelect())
  263. }
  264. if (status?.value) {
  265. this.emit("sttDataChanged", {
  266. status: JSON.parse(status?.value),
  267. taskId: JSON.parse(taskId?.value),
  268. token: JSON.parse(token?.value),
  269. startTime: JSON.parse(startTime?.value),
  270. duration: JSON.parse(duration?.value),
  271. })
  272. } else {
  273. this.emit("sttDataChanged", {
  274. status: "end",
  275. })
  276. }
  277. }
  278. private _emitUserListChanged() {
  279. this.emit("userListChanged", mapToArray(this.userMap))
  280. }
  281. private _resetData() {
  282. this.client = undefined
  283. this.channel = ""
  284. this.rtmConfig = {}
  285. this.userId = ""
  286. this.userName = ""
  287. this.userMap.clear()
  288. this.joined = false
  289. }
  290. private async _checkHost() {
  291. const result = await this.client?.presence.whoNow(this.channel, CHANNEL_TYPE)
  292. console.log("[test] whoNow", result)
  293. if (result?.totalOccupancy == 1) {
  294. this._removeChannelMetadata()
  295. }
  296. }
  297. private async _setLock() {
  298. const { lockDetails = [] } = (await this.client?.lock.getLock(this.channel, CHANNEL_TYPE)) || {}
  299. if (!lockDetails.find((v) => v.lockName === LOCK_STT)) {
  300. await this.client?.lock.setLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  301. }
  302. }
  303. }