rtm-manager-adapter.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. import AgoraRTM, { RTMClient, RTMConfig, ChannelType, MetadataItem } from 'agora-rtm'
  2. import { AGEventEmitter } from '../core/event-emitter'
  3. import { SttError } from '../core/stt-error'
  4. import type {
  5. IRtmManagerAdapter,
  6. RtmManagerConfig,
  7. RtmEventMap,
  8. RtmUserInfo,
  9. RtmChannelMetadata,
  10. } from '../types'
  11. import { generateAgoraToken } from '../utils/token-utils'
  12. const { RTM } = AgoraRTM
  13. const CHANNEL_TYPE: ChannelType = 'MESSAGE'
  14. const LOCK_STT = 'lock_stt'
  15. export class RtmManagerAdapter extends AGEventEmitter<RtmEventMap> implements IRtmManagerAdapter {
  16. private _joined = false
  17. private _config?: RtmManagerConfig
  18. private _userId: string = ''
  19. private _channel: string = ''
  20. private _userMap: Map<string, RtmUserInfo> = new Map()
  21. private _client?: RTMClient
  22. private _appId: string = ''
  23. private _certificate: string = ''
  24. private _rtmConfig: RTMConfig = {}
  25. constructor(appId?: string, certificate?: string) {
  26. super()
  27. if (appId) {
  28. this._appId = appId
  29. }
  30. if (certificate) {
  31. this._certificate = certificate
  32. }
  33. }
  34. async join(config: RtmManagerConfig): Promise<void> {
  35. try {
  36. const { channel, userId, userName } = config
  37. if (!channel || !userId || !userName) {
  38. throw new SttError('INVALID_CONFIG', 'Missing required configuration parameters')
  39. }
  40. if (this._joined) {
  41. throw new SttError('ALREADY_JOINED', 'RTM manager is already joined to a channel')
  42. }
  43. if (!this._appId) {
  44. throw new SttError('APP_ID_REQUIRED', 'App ID is required for RTM connection')
  45. }
  46. if (!this._certificate) {
  47. throw new SttError(
  48. 'CERTIFICATE_REQUIRED',
  49. 'Certificate is required for RTM token generation'
  50. )
  51. }
  52. this._userId = userId
  53. this._channel = channel
  54. this._config = config
  55. this.emit('connecting', { channel, userId })
  56. // 获取RTM token
  57. const token = await generateAgoraToken({
  58. appId: this._appId,
  59. appCertificate: this._certificate,
  60. uid: userId,
  61. channel,
  62. })
  63. if (token) {
  64. this._rtmConfig.token = token
  65. }
  66. // 创建RTM客户端
  67. this._client = new RTM(this._appId, userId, this._rtmConfig)
  68. // 设置事件监听
  69. this._listenRtmEvents()
  70. // 登录
  71. await this._client.login()
  72. this._joined = true
  73. // 订阅频道
  74. await this._client.subscribe(channel, {
  75. withPresence: true,
  76. withMetadata: true,
  77. })
  78. // 检查主机状态
  79. await this._checkHost()
  80. // 更新用户信息
  81. await this._updateUserInfo(userName)
  82. // 设置锁
  83. await this._setLock()
  84. // 添加当前用户到用户列表
  85. this._userMap.set(userId, { userId, userName })
  86. this.emit('connected', { channel, userId })
  87. this.emit('userListChanged', Array.from(this._userMap.values()))
  88. } catch (error) {
  89. this.emit('error', error as Error)
  90. throw error
  91. }
  92. }
  93. async updateSttData(data: RtmChannelMetadata): Promise<void> {
  94. if (!this._joined) {
  95. throw new SttError(
  96. 'NOT_JOINED',
  97. 'RTM manager must be joined to a channel before updating STT data'
  98. )
  99. }
  100. try {
  101. this.emit('metadataUpdating', { data })
  102. // 检查数据是否为空
  103. const hasData = Object.keys(data).some(
  104. (key) => data[key as keyof RtmChannelMetadata] !== undefined
  105. )
  106. if (hasData && this._client) {
  107. const metadataItems: MetadataItem[] = []
  108. for (const key in data) {
  109. if (data[key as keyof RtmChannelMetadata] !== undefined) {
  110. metadataItems.push({
  111. key,
  112. value: JSON.stringify(data[key as keyof RtmChannelMetadata]),
  113. })
  114. }
  115. }
  116. if (metadataItems.length > 0) {
  117. await this._client.storage.setChannelMetadata(this._channel, CHANNEL_TYPE, metadataItems)
  118. }
  119. }
  120. this.emit('metadataUpdated', { data })
  121. } catch (error) {
  122. this.emit('error', error as Error)
  123. throw error
  124. }
  125. }
  126. async updateLanguages(languages: any[]): Promise<void> {
  127. if (!this._joined) {
  128. throw new SttError(
  129. 'NOT_JOINED',
  130. 'RTM manager must be joined to a channel before updating languages'
  131. )
  132. }
  133. try {
  134. this.emit('languagesUpdating', { languages })
  135. // 格式化语言数据
  136. const message: Record<string, any> = {
  137. transcribe1: '',
  138. translate1List: [],
  139. transcribe2: '',
  140. translate2List: [],
  141. }
  142. const language1 = languages[0]
  143. if (language1?.source) {
  144. message.transcribe1 = language1.source
  145. }
  146. if (language1?.target) {
  147. message.translate1List.push(...language1.target)
  148. }
  149. const language2 = languages[1]
  150. if (language2) {
  151. if (language2.source) {
  152. message.transcribe2 = language2.source
  153. }
  154. if (language2.target) {
  155. message.translate2List.push(...language2.target)
  156. }
  157. }
  158. // 更新元数据
  159. await this.updateSttData(message)
  160. this.emit('languagesUpdated', { languages })
  161. } catch (error) {
  162. this.emit('error', error as Error)
  163. throw error
  164. }
  165. }
  166. async acquireLock(): Promise<void> {
  167. if (!this._joined) {
  168. throw new SttError(
  169. 'NOT_JOINED',
  170. 'RTM manager must be joined to a channel before acquiring lock'
  171. )
  172. }
  173. try {
  174. this.emit('lockAcquiring')
  175. // 获取锁
  176. if (this._client) {
  177. await this._client.lock.acquireLock(this._channel, CHANNEL_TYPE, LOCK_STT)
  178. }
  179. this.emit('lockAcquired')
  180. } catch (error) {
  181. this.emit('error', error as Error)
  182. throw error
  183. }
  184. }
  185. async releaseLock(): Promise<void> {
  186. if (!this._joined) {
  187. throw new SttError(
  188. 'NOT_JOINED',
  189. 'RTM manager must be joined to a channel before releasing lock'
  190. )
  191. }
  192. try {
  193. this.emit('lockReleasing')
  194. // 释放锁
  195. if (this._client) {
  196. await this._client.lock.releaseLock(this._channel, CHANNEL_TYPE, LOCK_STT)
  197. }
  198. this.emit('lockReleased')
  199. } catch (error) {
  200. this.emit('error', error as Error)
  201. throw error
  202. }
  203. }
  204. async destroy(): Promise<void> {
  205. try {
  206. this.emit('destroying')
  207. // 登出
  208. if (this._client) {
  209. await this._client.logout()
  210. }
  211. // 清理资源
  212. this._joined = false
  213. this._config = undefined
  214. this._userId = ''
  215. this._channel = ''
  216. this._userMap.clear()
  217. this._client = undefined
  218. this.emit('destroyed')
  219. this.removeAllEventListeners()
  220. } catch (error) {
  221. this.emit('error', error as Error)
  222. throw error
  223. }
  224. }
  225. get isJoined(): boolean {
  226. return this._joined
  227. }
  228. get config(): RtmManagerConfig | undefined {
  229. return this._config
  230. }
  231. get userId(): string {
  232. return this._userId
  233. }
  234. get channel(): string {
  235. return this._channel
  236. }
  237. get userList(): RtmUserInfo[] {
  238. return Array.from(this._userMap.values())
  239. }
  240. // 私有方法
  241. private _listenRtmEvents(): void {
  242. if (!this._client) return
  243. this._client.addEventListener('status', (res) => {
  244. this.emit('status', res)
  245. })
  246. this._client.addEventListener('presence', (res) => {
  247. const { channelName, eventType, snapshot = [], stateChanged, publisher } = res
  248. if (channelName === this._channel) {
  249. switch (eventType) {
  250. case 'SNAPSHOT':
  251. this._dealPresenceSnapshot(snapshot as any[])
  252. break
  253. case 'REMOTE_STATE_CHANGED':
  254. this._dealPresenceRemoteState(stateChanged)
  255. break
  256. case 'REMOTE_LEAVE':
  257. case 'REMOTE_TIMEOUT':
  258. if (this._userMap.has(publisher)) {
  259. this._userMap.delete(publisher)
  260. this._emitUserListChanged()
  261. }
  262. break
  263. }
  264. }
  265. })
  266. this._client.addEventListener('storage', (res) => {
  267. const { eventType, data, channelName } = res
  268. const { metadata } = data
  269. if (channelName === this._channel) {
  270. switch (eventType) {
  271. case 'SNAPSHOT':
  272. case 'UPDATE':
  273. this._dealStorageDataChanged(metadata)
  274. break
  275. }
  276. }
  277. })
  278. }
  279. private _dealPresenceRemoteState(stateChanged: any): void {
  280. if (stateChanged.type === 'UserInfo') {
  281. const userInfo = {
  282. userName: stateChanged.userName,
  283. userId: stateChanged.userId,
  284. }
  285. if (userInfo.userId) {
  286. this._userMap.set(userInfo.userId, userInfo)
  287. this._emitUserListChanged()
  288. }
  289. }
  290. }
  291. private _dealPresenceSnapshot(snapshot?: any[]): void {
  292. if (!snapshot?.length) return
  293. let changed = false
  294. for (const v of snapshot) {
  295. const { states } = v
  296. if (states.type === 'UserInfo') {
  297. const userInfo = {
  298. userName: states.userName,
  299. userId: states.userId,
  300. }
  301. if (userInfo.userId) {
  302. this._userMap.set(userInfo.userId, userInfo)
  303. changed = true
  304. }
  305. }
  306. }
  307. if (changed) {
  308. this._emitUserListChanged()
  309. }
  310. }
  311. private _dealStorageDataChanged(metadata: any): void {
  312. // 处理语言变化
  313. const { transcribe1, translate1List, transcribe2, translate2List } = metadata
  314. if (transcribe1?.value || transcribe2?.value) {
  315. const parseTranscribe1 = transcribe1?.value ? JSON.parse(transcribe1.value) : ''
  316. const parseTranslate1 = translate1List?.value ? JSON.parse(translate1List.value) : []
  317. const parseTranscribe2 = transcribe2?.value ? JSON.parse(transcribe2.value) : ''
  318. const parseTranslate2 = translate2List?.value ? JSON.parse(translate2List.value) : []
  319. this.emit('languagesChanged', {
  320. transcribe1: parseTranscribe1,
  321. translate1List: parseTranslate1,
  322. transcribe2: parseTranscribe2,
  323. translate2List: parseTranslate2,
  324. })
  325. }
  326. // 处理STT数据变化
  327. const { status, taskId, token, startTime, duration } = metadata
  328. if (status?.value) {
  329. this.emit('sttDataChanged', {
  330. status: JSON.parse(status.value),
  331. taskId: taskId ? JSON.parse(taskId.value) : undefined,
  332. token: token ? JSON.parse(token.value) : undefined,
  333. startTime: startTime ? JSON.parse(startTime.value) : undefined,
  334. duration: duration ? JSON.parse(duration.value) : undefined,
  335. })
  336. }
  337. }
  338. private _emitUserListChanged(): void {
  339. this.emit('userListChanged', Array.from(this._userMap.values()))
  340. }
  341. private async _updateUserInfo(userName: string): Promise<void> {
  342. if (!this._joined) return
  343. await this._setPresenceState({
  344. type: 'UserInfo',
  345. userId: this._userId,
  346. userName,
  347. })
  348. }
  349. private async _setPresenceState(attr: Record<string, any>): Promise<void> {
  350. if (!this._joined) return
  351. const state: Record<string, string> = {}
  352. for (const key in attr) {
  353. const value = attr[key]
  354. state[key] = typeof value === 'string' ? value : JSON.stringify(value)
  355. }
  356. await this._client?.presence.setState(this._channel, CHANNEL_TYPE, state)
  357. }
  358. private async _checkHost(): Promise<void> {
  359. const result = await this._client?.presence.whoNow(this._channel, CHANNEL_TYPE)
  360. if (result?.totalOccupancy === 1) {
  361. await this._removeChannelMetadata()
  362. }
  363. }
  364. private async _removeChannelMetadata(metadata?: Record<string, any>): Promise<void> {
  365. const data: MetadataItem[] = []
  366. const options: any = {}
  367. for (const key in metadata) {
  368. data.push({
  369. key,
  370. value: JSON.stringify(metadata[key]),
  371. })
  372. }
  373. if (data.length) {
  374. options.data = data
  375. }
  376. await this._client?.storage.removeChannelMetadata(this._channel, CHANNEL_TYPE, options)
  377. }
  378. private async _setLock(): Promise<void> {
  379. const { lockDetails = [] } =
  380. (await this._client?.lock.getLock(this._channel, CHANNEL_TYPE)) || {}
  381. if (!lockDetails.find((v: any) => v.lockName === LOCK_STT)) {
  382. await this._client?.lock.setLock(this._channel, CHANNEL_TYPE, LOCK_STT)
  383. }
  384. }
  385. }