import AgoraRTM, { RTMClient, RTMConfig, ChannelType, MetadataItem } from 'agora-rtm' import { AGEventEmitter } from '../core/event-emitter' import { SttError } from '../core/stt-error' import type { IRtmManagerAdapter, RtmManagerConfig, RtmEventMap, RtmUserInfo, RtmChannelMetadata, } from '../types' import { generateAgoraToken } from '../utils/token-utils' const { RTM } = AgoraRTM const CHANNEL_TYPE: ChannelType = 'MESSAGE' const LOCK_STT = 'lock_stt' export class RtmManagerAdapter extends AGEventEmitter implements IRtmManagerAdapter { private _joined = false private _config?: RtmManagerConfig private _userId: string = '' private _channel: string = '' private _userMap: Map = new Map() private _client?: RTMClient private _appId: string = '' private _certificate: string = '' private _rtmConfig: RTMConfig = {} constructor(appId?: string, certificate?: string) { super() if (appId) { this._appId = appId } if (certificate) { this._certificate = certificate } } async join(config: RtmManagerConfig): Promise { try { const { channel, userId, userName } = config if (!channel || !userId || !userName) { throw new SttError('INVALID_CONFIG', 'Missing required configuration parameters') } if (this._joined) { throw new SttError('ALREADY_JOINED', 'RTM manager is already joined to a channel') } if (!this._appId) { throw new SttError('APP_ID_REQUIRED', 'App ID is required for RTM connection') } if (!this._certificate) { throw new SttError( 'CERTIFICATE_REQUIRED', 'Certificate is required for RTM token generation' ) } this._userId = userId this._channel = channel this._config = config this.emit('connecting', { channel, userId }) // 获取RTM token const token = await generateAgoraToken({ appId: this._appId, appCertificate: this._certificate, uid: userId, channel, }) if (token) { this._rtmConfig.token = token } // 创建RTM客户端 this._client = new RTM(this._appId, userId, this._rtmConfig) // 设置事件监听 this._listenRtmEvents() // 登录 await this._client.login() this._joined = true // 订阅频道 await this._client.subscribe(channel, { withPresence: true, withMetadata: true, }) // 检查主机状态 await this._checkHost() // 更新用户信息 await this._updateUserInfo(userName) // 设置锁 await this._setLock() // 添加当前用户到用户列表 this._userMap.set(userId, { userId, userName }) this.emit('connected', { channel, userId }) this.emit('userListChanged', Array.from(this._userMap.values())) } catch (error) { this.emit('error', error as Error) throw error } } async updateSttData(data: RtmChannelMetadata): Promise { if (!this._joined) { throw new SttError( 'NOT_JOINED', 'RTM manager must be joined to a channel before updating STT data' ) } try { this.emit('metadataUpdating', { data }) // 检查数据是否为空 const hasData = Object.keys(data).some( (key) => data[key as keyof RtmChannelMetadata] !== undefined ) if (hasData && this._client) { const metadataItems: MetadataItem[] = [] for (const key in data) { if (data[key as keyof RtmChannelMetadata] !== undefined) { metadataItems.push({ key, value: JSON.stringify(data[key as keyof RtmChannelMetadata]), }) } } if (metadataItems.length > 0) { await this._client.storage.setChannelMetadata(this._channel, CHANNEL_TYPE, metadataItems) } } this.emit('metadataUpdated', { data }) } catch (error) { this.emit('error', error as Error) throw error } } async updateLanguages(languages: any[]): Promise { if (!this._joined) { throw new SttError( 'NOT_JOINED', 'RTM manager must be joined to a channel before updating languages' ) } try { this.emit('languagesUpdating', { languages }) // 格式化语言数据 const message: Record = { transcribe1: '', translate1List: [], transcribe2: '', translate2List: [], } const language1 = languages[0] if (language1?.source) { message.transcribe1 = language1.source } if (language1?.target) { message.translate1List.push(...language1.target) } const language2 = languages[1] if (language2) { if (language2.source) { message.transcribe2 = language2.source } if (language2.target) { message.translate2List.push(...language2.target) } } // 更新元数据 await this.updateSttData(message) this.emit('languagesUpdated', { languages }) } catch (error) { this.emit('error', error as Error) throw error } } async acquireLock(): Promise { if (!this._joined) { throw new SttError( 'NOT_JOINED', 'RTM manager must be joined to a channel before acquiring lock' ) } try { this.emit('lockAcquiring') // 获取锁 if (this._client) { await this._client.lock.acquireLock(this._channel, CHANNEL_TYPE, LOCK_STT) } this.emit('lockAcquired') } catch (error) { this.emit('error', error as Error) throw error } } async releaseLock(): Promise { if (!this._joined) { throw new SttError( 'NOT_JOINED', 'RTM manager must be joined to a channel before releasing lock' ) } try { this.emit('lockReleasing') // 释放锁 if (this._client) { await this._client.lock.releaseLock(this._channel, CHANNEL_TYPE, LOCK_STT) } this.emit('lockReleased') } catch (error) { this.emit('error', error as Error) throw error } } async destroy(): Promise { try { this.emit('destroying') // 登出 if (this._client) { await this._client.logout() } // 清理资源 this._joined = false this._config = undefined this._userId = '' this._channel = '' this._userMap.clear() this._client = undefined this.emit('destroyed') this.removeAllEventListeners() } catch (error) { this.emit('error', error as Error) throw error } } get isJoined(): boolean { return this._joined } get config(): RtmManagerConfig | undefined { return this._config } get userId(): string { return this._userId } get channel(): string { return this._channel } get userList(): RtmUserInfo[] { return Array.from(this._userMap.values()) } // 私有方法 private _listenRtmEvents(): void { if (!this._client) return this._client.addEventListener('status', (res) => { this.emit('status', res) }) this._client.addEventListener('presence', (res) => { const { channelName, eventType, snapshot = [], stateChanged, publisher } = res if (channelName === this._channel) { switch (eventType) { case 'SNAPSHOT': this._dealPresenceSnapshot(snapshot as any[]) break case 'REMOTE_STATE_CHANGED': this._dealPresenceRemoteState(stateChanged) break case 'REMOTE_LEAVE': case 'REMOTE_TIMEOUT': if (this._userMap.has(publisher)) { this._userMap.delete(publisher) this._emitUserListChanged() } break } } }) this._client.addEventListener('storage', (res) => { const { eventType, data, channelName } = res const { metadata } = data if (channelName === this._channel) { switch (eventType) { case 'SNAPSHOT': case 'UPDATE': this._dealStorageDataChanged(metadata) break } } }) } private _dealPresenceRemoteState(stateChanged: any): void { if (stateChanged.type === 'UserInfo') { const userInfo = { userName: stateChanged.userName, userId: stateChanged.userId, } if (userInfo.userId) { this._userMap.set(userInfo.userId, userInfo) this._emitUserListChanged() } } } private _dealPresenceSnapshot(snapshot?: any[]): void { if (!snapshot?.length) return let changed = false for (const v of snapshot) { const { states } = v if (states.type === 'UserInfo') { const userInfo = { userName: states.userName, userId: states.userId, } if (userInfo.userId) { this._userMap.set(userInfo.userId, userInfo) changed = true } } } if (changed) { this._emitUserListChanged() } } private _dealStorageDataChanged(metadata: any): void { // 处理语言变化 const { transcribe1, translate1List, transcribe2, translate2List } = metadata if (transcribe1?.value || transcribe2?.value) { const parseTranscribe1 = transcribe1?.value ? JSON.parse(transcribe1.value) : '' const parseTranslate1 = translate1List?.value ? JSON.parse(translate1List.value) : [] const parseTranscribe2 = transcribe2?.value ? JSON.parse(transcribe2.value) : '' const parseTranslate2 = translate2List?.value ? JSON.parse(translate2List.value) : [] this.emit('languagesChanged', { transcribe1: parseTranscribe1, translate1List: parseTranslate1, transcribe2: parseTranscribe2, translate2List: parseTranslate2, }) } // 处理STT数据变化 const { status, taskId, token, startTime, duration } = metadata if (status?.value) { this.emit('sttDataChanged', { status: JSON.parse(status.value), taskId: taskId ? JSON.parse(taskId.value) : undefined, token: token ? JSON.parse(token.value) : undefined, startTime: startTime ? JSON.parse(startTime.value) : undefined, duration: duration ? JSON.parse(duration.value) : undefined, }) } } private _emitUserListChanged(): void { this.emit('userListChanged', Array.from(this._userMap.values())) } private async _updateUserInfo(userName: string): Promise { if (!this._joined) return await this._setPresenceState({ type: 'UserInfo', userId: this._userId, userName, }) } private async _setPresenceState(attr: Record): Promise { if (!this._joined) return const state: Record = {} for (const key in attr) { const value = attr[key] state[key] = typeof value === 'string' ? value : JSON.stringify(value) } await this._client?.presence.setState(this._channel, CHANNEL_TYPE, state) } private async _checkHost(): Promise { const result = await this._client?.presence.whoNow(this._channel, CHANNEL_TYPE) if (result?.totalOccupancy === 1) { await this._removeChannelMetadata() } } private async _removeChannelMetadata(metadata?: Record): Promise { const data: MetadataItem[] = [] const options: any = {} for (const key in metadata) { data.push({ key, value: JSON.stringify(metadata[key]), }) } if (data.length) { options.data = data } await this._client?.storage.removeChannelMetadata(this._channel, CHANNEL_TYPE, options) } private async _setLock(): Promise { const { lockDetails = [] } = (await this._client?.lock.getLock(this._channel, CHANNEL_TYPE)) || {} if (!lockDetails.find((v: any) => v.lockName === LOCK_STT)) { await this._client?.lock.setLock(this._channel, CHANNEL_TYPE, LOCK_STT) } } }