rtm.ts 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import AgoraRTM, { ChannelType, RTMClient, RTMConfig, MetadataItem } from "agora-rtm-sdk"
  2. import { mapToArray, isString, apiGetAgoraToken, getDefaultLanguageSelect, fetchAgoraConfig } from "../../common"
  3. import { AGEventEmitter } from "../events"
  4. import {
  5. RtmEvents,
  6. ISimpleUserInfo,
  7. RtmMessageType,
  8. RtmPresenceMessageData,
  9. ValueOf,
  10. ILanguageItem,
  11. } from "./types"
  12. import { ISttData } from "../../types"
  13. import { DEFAULT_RTM_CONFIG } from "./constant"
  14. const { RTM } = AgoraRTM
  15. const CHANNEL_TYPE: ChannelType = "MESSAGE"
  16. const LOCK_STT = "lock_stt"
  17. export class RtmManager extends AGEventEmitter<RtmEvents> {
  18. client?: RTMClient
  19. private rtmConfig: RTMConfig = DEFAULT_RTM_CONFIG
  20. channel: string = ""
  21. userId: string = ""
  22. userName: string = ""
  23. private userMap: Map<string, ISimpleUserInfo> = new Map()
  24. private joined: boolean = false
  25. constructor() {
  26. super()
  27. }
  28. async join({ channel, userId, userName }: { channel: string; userId: string; userName: string }) {
  29. if (this.joined) {
  30. return
  31. }
  32. this.userId = userId
  33. this.userName = userName
  34. this.channel = channel
  35. // 获取配置
  36. await fetchAgoraConfig()
  37. // 从配置中获取appId
  38. const { agoraClient } = await import("@/client/api")
  39. const response = await agoraClient.token.$get({
  40. query: { type: 'rtm', channel, userId }
  41. })
  42. if (!response.ok) {
  43. throw new Error('配置获取失败')
  44. }
  45. const data = await response.json()
  46. if (!this.client) {
  47. this.client = new RTM(data.appId, userId, this.rtmConfig)
  48. }
  49. this._listenRtmEvents()
  50. const token = await apiGetAgoraToken({ channel: this.channel, uid: this.userId })
  51. await this.client.login({ token })
  52. this.joined = true
  53. // subscribe message channel
  54. await this.client.subscribe(channel, {
  55. withPresence: true,
  56. withMetadata: true,
  57. })
  58. // check host
  59. await this._checkHost()
  60. // update user info
  61. await this._updateUserInfo()
  62. // set lock
  63. this._setLock()
  64. }
  65. async updateSttData(data: ISttData) {
  66. return await this._setChannelMetadata(data)
  67. }
  68. async updateLanguages(languages: ILanguageItem[]) {
  69. const message: {
  70. transcribe1: string
  71. translate1List: string[]
  72. transcribe2: string
  73. translate2List: string[]
  74. } = {
  75. transcribe1: "",
  76. translate1List: [],
  77. transcribe2: "",
  78. translate2List: [],
  79. }
  80. const language1 = languages[0]
  81. if (language1.source) {
  82. message.transcribe1 = language1.source
  83. }
  84. if (language1.target) {
  85. message.translate1List.push(...language1.target)
  86. }
  87. const language2 = languages[1]
  88. if (language2) {
  89. if (language2.source) {
  90. message.transcribe2 = language2.source
  91. }
  92. if (language2.target) {
  93. message.translate2List.push(...language2.target)
  94. }
  95. }
  96. return await this._setChannelMetadata(message)
  97. }
  98. async destroy() {
  99. await this.client?.logout()
  100. this._resetData()
  101. }
  102. async acquireLock() {
  103. // if not accquire lock, will throw error
  104. return await this.client?.lock.acquireLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  105. }
  106. async releaseLock() {
  107. return await this.client?.lock.releaseLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  108. }
  109. // --------------------- private methods ---------------------
  110. private async _updateUserInfo() {
  111. await this._setPresenceState({
  112. type: RtmMessageType.UserInfo,
  113. userId: this.userId,
  114. userName: this.userName,
  115. })
  116. }
  117. private async _removeChannelMetadata(metadata?: Record<string, any>) {
  118. const data: MetadataItem[] = []
  119. const options: any = {}
  120. for (const key in metadata) {
  121. data.push({
  122. key,
  123. value: JSON.stringify(metadata[key]),
  124. })
  125. }
  126. if (data.length) {
  127. options.data = data
  128. }
  129. await this?.client?.storage.removeChannelMetadata(this.channel, CHANNEL_TYPE, options)
  130. }
  131. private async _setChannelMetadata(metadata?: Record<string, any>) {
  132. const data: MetadataItem[] = []
  133. for (const key in metadata) {
  134. data.push({
  135. key,
  136. value: JSON.stringify(metadata[key]),
  137. })
  138. }
  139. await this?.client?.storage.setChannelMetadata(this.channel, CHANNEL_TYPE, data)
  140. }
  141. private async _setPresenceState(attr: ValueOf<RtmPresenceMessageData>) {
  142. if (!this.joined) {
  143. throw new Error("You must join the channel first")
  144. }
  145. const state: Record<string, string> = {}
  146. for (const key in attr) {
  147. const value = attr[key as keyof typeof attr]
  148. state[key] = isString(value) ? value : JSON.stringify(value)
  149. }
  150. return await this?.client?.presence.setState(this.channel, CHANNEL_TYPE, state)
  151. }
  152. private _listenRtmEvents() {
  153. this.client?.addEventListener("status", (res) => {
  154. this.emit("status", res)
  155. })
  156. this.client?.addEventListener("presence", (res) => {
  157. console.log("[test] presence", res)
  158. const { channelName, eventType, snapshot = [], stateChanged, publisher } = res
  159. if (channelName == this.channel) {
  160. switch (eventType) {
  161. case "SNAPSHOT":
  162. this._dealPresenceSnapshot(snapshot as any[])
  163. break
  164. case "REMOTE_STATE_CHANGED":
  165. this._dealPresenceRemoteState(stateChanged)
  166. break
  167. case "REMOTE_JOIN":
  168. break
  169. case "REMOTE_LEAVE":
  170. if (this.userMap.has(publisher)) {
  171. this.userMap.delete(publisher)
  172. this._emitUserListChanged()
  173. }
  174. break
  175. case "REMOTE_TIMEOUT":
  176. if (this.userMap.has(publisher)) {
  177. this.userMap.delete(publisher)
  178. this._emitUserListChanged()
  179. }
  180. break
  181. }
  182. }
  183. })
  184. this.client?.addEventListener("storage", (res) => {
  185. console.log("[test] storage", res)
  186. const { eventType, data, channelName } = res
  187. const { metadata } = data
  188. if (channelName == this.channel) {
  189. switch (eventType) {
  190. case "SNAPSHOT":
  191. this._dealStorageDataChanged(metadata)
  192. break
  193. case "UPDATE":
  194. this._dealStorageDataChanged(metadata)
  195. break
  196. case "REMOVE":
  197. break
  198. }
  199. }
  200. })
  201. }
  202. private _dealPresenceRemoteState(stateChanged: any) {
  203. switch (stateChanged.type) {
  204. case RtmMessageType.UserInfo:
  205. const userInfo = {
  206. userName: stateChanged.userName,
  207. userId: stateChanged.userId,
  208. }
  209. if (userInfo.userId) {
  210. this.userMap.set(userInfo.userId, userInfo)
  211. this._emitUserListChanged()
  212. }
  213. break
  214. }
  215. }
  216. private _dealPresenceSnapshot(snapshot?: any[]) {
  217. if (!snapshot?.length) {
  218. return
  219. }
  220. let changed = false
  221. for (const v of snapshot) {
  222. const { states } = v
  223. switch (states.type) {
  224. case RtmMessageType.UserInfo:
  225. const userInfo = {
  226. userName: states.userName,
  227. userId: states.userId,
  228. }
  229. if (userInfo.userId && userInfo.userId != this.userId) {
  230. this.userMap.set(userInfo.userId, userInfo)
  231. changed = true
  232. }
  233. break
  234. }
  235. }
  236. if (changed) {
  237. this._emitUserListChanged()
  238. }
  239. }
  240. private _dealStorageDataChanged(metadata: any) {
  241. const {
  242. transcribe1,
  243. translate1List,
  244. transcribe2,
  245. translate2List,
  246. status,
  247. taskId,
  248. token,
  249. startTime,
  250. duration,
  251. } = metadata
  252. if (transcribe1?.value) {
  253. const parseTranscribe1 = JSON.parse(transcribe1.value)
  254. const parseTranslate1 = JSON.parse(translate1List.value)
  255. const parseTranscribe2 = JSON.parse(transcribe2.value)
  256. const parseTranslate2 = JSON.parse(translate2List.value)
  257. this.emit("languagesChanged", {
  258. transcribe1: parseTranscribe1,
  259. translate1List: parseTranslate1,
  260. transcribe2: parseTranscribe2,
  261. translate2List: parseTranslate2,
  262. })
  263. } else {
  264. this.emit("languagesChanged", getDefaultLanguageSelect())
  265. }
  266. if (status?.value) {
  267. this.emit("sttDataChanged", {
  268. status: JSON.parse(status?.value),
  269. taskId: JSON.parse(taskId?.value),
  270. token: JSON.parse(token?.value),
  271. startTime: JSON.parse(startTime?.value),
  272. duration: JSON.parse(duration?.value),
  273. })
  274. } else {
  275. this.emit("sttDataChanged", {
  276. status: "end",
  277. })
  278. }
  279. }
  280. private _emitUserListChanged() {
  281. this.emit("userListChanged", mapToArray(this.userMap))
  282. }
  283. private _resetData() {
  284. this.client = undefined
  285. this.channel = ""
  286. this.rtmConfig = {}
  287. this.userId = ""
  288. this.userName = ""
  289. this.userMap.clear()
  290. this.joined = false
  291. }
  292. private async _checkHost() {
  293. const result = await this.client?.presence.whoNow(this.channel, CHANNEL_TYPE)
  294. console.log("[test] whoNow", result)
  295. if (result?.totalOccupancy == 1) {
  296. this._removeChannelMetadata()
  297. }
  298. }
  299. private async _setLock() {
  300. const { lockDetails = [] } = (await this.client?.lock.getLock(this.channel, CHANNEL_TYPE)) || {}
  301. if (!lockDetails.find((v) => v.lockName === LOCK_STT)) {
  302. await this.client?.lock.setLock(this.channel, CHANNEL_TYPE, LOCK_STT)
  303. }
  304. }
  305. }