| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- import AgoraRTM, { RTMClient, RTMConfig, ChannelType, MetadataItem } from 'agora-rtm'
- import { AGEventEmitter } from '../core/event-emitter'
- import { SttError } from '../core/stt-error'
- import type {
- IRtmManagerAdapter,
- RtmManagerConfig,
- RtmEventMap,
- RtmUserInfo,
- RtmChannelMetadata,
- } from '../types'
- import { generateAgoraToken } from '../utils/token-utils'
- const { RTM } = AgoraRTM
- const CHANNEL_TYPE: ChannelType = 'MESSAGE'
- const LOCK_STT = 'lock_stt'
- export class RtmManagerAdapter extends AGEventEmitter<RtmEventMap> implements IRtmManagerAdapter {
- private _joined = false
- private _config?: RtmManagerConfig
- private _userId: string = ''
- private _channel: string = ''
- private _userMap: Map<string, RtmUserInfo> = new Map()
- private _client?: RTMClient
- private _appId: string = ''
- private _certificate: string = ''
- private _rtmConfig: RTMConfig = {}
- constructor(appId?: string, certificate?: string) {
- super()
- if (appId) {
- this._appId = appId
- }
- if (certificate) {
- this._certificate = certificate
- }
- }
- async join(config: RtmManagerConfig): Promise<void> {
- try {
- const { channel, userId, userName } = config
- if (!channel || !userId || !userName) {
- throw new SttError('INVALID_CONFIG', 'Missing required configuration parameters')
- }
- if (this._joined) {
- throw new SttError('ALREADY_JOINED', 'RTM manager is already joined to a channel')
- }
- if (!this._appId) {
- throw new SttError('APP_ID_REQUIRED', 'App ID is required for RTM connection')
- }
- if (!this._certificate) {
- throw new SttError(
- 'CERTIFICATE_REQUIRED',
- 'Certificate is required for RTM token generation'
- )
- }
- this._userId = userId
- this._channel = channel
- this._config = config
- this.emit('connecting', { channel, userId })
- // 获取RTM token
- const token = await generateAgoraToken({
- appId: this._appId,
- appCertificate: this._certificate,
- uid: userId,
- channel,
- })
- if (token) {
- this._rtmConfig.token = token
- }
- // 创建RTM客户端
- this._client = new RTM(this._appId, userId, this._rtmConfig)
- // 设置事件监听
- this._listenRtmEvents()
- // 登录
- await this._client.login()
- this._joined = true
- // 订阅频道
- await this._client.subscribe(channel, {
- withPresence: true,
- withMetadata: true,
- })
- // 检查主机状态
- await this._checkHost()
- // 更新用户信息
- await this._updateUserInfo(userName)
- // 设置锁
- await this._setLock()
- // 添加当前用户到用户列表
- this._userMap.set(userId, { userId, userName })
- this.emit('connected', { channel, userId })
- this.emit('userListChanged', Array.from(this._userMap.values()))
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- async updateSttData(data: RtmChannelMetadata): Promise<void> {
- if (!this._joined) {
- throw new SttError(
- 'NOT_JOINED',
- 'RTM manager must be joined to a channel before updating STT data'
- )
- }
- try {
- this.emit('metadataUpdating', { data })
- // 检查数据是否为空
- const hasData = Object.keys(data).some(
- (key) => data[key as keyof RtmChannelMetadata] !== undefined
- )
- if (hasData && this._client) {
- const metadataItems: MetadataItem[] = []
- for (const key in data) {
- if (data[key as keyof RtmChannelMetadata] !== undefined) {
- metadataItems.push({
- key,
- value: JSON.stringify(data[key as keyof RtmChannelMetadata]),
- })
- }
- }
- if (metadataItems.length > 0) {
- await this._client.storage.setChannelMetadata(this._channel, CHANNEL_TYPE, metadataItems)
- }
- }
- this.emit('metadataUpdated', { data })
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- async updateLanguages(languages: any[]): Promise<void> {
- if (!this._joined) {
- throw new SttError(
- 'NOT_JOINED',
- 'RTM manager must be joined to a channel before updating languages'
- )
- }
- try {
- this.emit('languagesUpdating', { languages })
- // 格式化语言数据
- const message: Record<string, any> = {
- transcribe1: '',
- translate1List: [],
- transcribe2: '',
- translate2List: [],
- }
- const language1 = languages[0]
- if (language1?.source) {
- message.transcribe1 = language1.source
- }
- if (language1?.target) {
- message.translate1List.push(...language1.target)
- }
- const language2 = languages[1]
- if (language2) {
- if (language2.source) {
- message.transcribe2 = language2.source
- }
- if (language2.target) {
- message.translate2List.push(...language2.target)
- }
- }
- // 更新元数据
- await this.updateSttData(message)
- this.emit('languagesUpdated', { languages })
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- async acquireLock(): Promise<void> {
- if (!this._joined) {
- throw new SttError(
- 'NOT_JOINED',
- 'RTM manager must be joined to a channel before acquiring lock'
- )
- }
- try {
- this.emit('lockAcquiring')
- // 获取锁
- if (this._client) {
- await this._client.lock.acquireLock(this._channel, CHANNEL_TYPE, LOCK_STT)
- }
- this.emit('lockAcquired')
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- async releaseLock(): Promise<void> {
- if (!this._joined) {
- throw new SttError(
- 'NOT_JOINED',
- 'RTM manager must be joined to a channel before releasing lock'
- )
- }
- try {
- this.emit('lockReleasing')
- // 释放锁
- if (this._client) {
- await this._client.lock.releaseLock(this._channel, CHANNEL_TYPE, LOCK_STT)
- }
- this.emit('lockReleased')
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- async destroy(): Promise<void> {
- try {
- this.emit('destroying')
- // 登出
- if (this._client) {
- await this._client.logout()
- }
- // 清理资源
- this._joined = false
- this._config = undefined
- this._userId = ''
- this._channel = ''
- this._userMap.clear()
- this._client = undefined
- this.emit('destroyed')
- this.removeAllEventListeners()
- } catch (error) {
- this.emit('error', error as Error)
- throw error
- }
- }
- get isJoined(): boolean {
- return this._joined
- }
- get config(): RtmManagerConfig | undefined {
- return this._config
- }
- get userId(): string {
- return this._userId
- }
- get channel(): string {
- return this._channel
- }
- get userList(): RtmUserInfo[] {
- return Array.from(this._userMap.values())
- }
- // 私有方法
- private _listenRtmEvents(): void {
- if (!this._client) return
- this._client.addEventListener('status', (res) => {
- this.emit('status', res)
- })
- this._client.addEventListener('presence', (res) => {
- const { channelName, eventType, snapshot = [], stateChanged, publisher } = res
- if (channelName === this._channel) {
- switch (eventType) {
- case 'SNAPSHOT':
- this._dealPresenceSnapshot(snapshot as any[])
- break
- case 'REMOTE_STATE_CHANGED':
- this._dealPresenceRemoteState(stateChanged)
- break
- case 'REMOTE_LEAVE':
- case 'REMOTE_TIMEOUT':
- if (this._userMap.has(publisher)) {
- this._userMap.delete(publisher)
- this._emitUserListChanged()
- }
- break
- }
- }
- })
- this._client.addEventListener('storage', (res) => {
- const { eventType, data, channelName } = res
- const { metadata } = data
- if (channelName === this._channel) {
- switch (eventType) {
- case 'SNAPSHOT':
- case 'UPDATE':
- this._dealStorageDataChanged(metadata)
- break
- }
- }
- })
- }
- private _dealPresenceRemoteState(stateChanged: any): void {
- if (stateChanged.type === 'UserInfo') {
- const userInfo = {
- userName: stateChanged.userName,
- userId: stateChanged.userId,
- }
- if (userInfo.userId) {
- this._userMap.set(userInfo.userId, userInfo)
- this._emitUserListChanged()
- }
- }
- }
- private _dealPresenceSnapshot(snapshot?: any[]): void {
- if (!snapshot?.length) return
- let changed = false
- for (const v of snapshot) {
- const { states } = v
- if (states.type === 'UserInfo') {
- const userInfo = {
- userName: states.userName,
- userId: states.userId,
- }
- if (userInfo.userId) {
- this._userMap.set(userInfo.userId, userInfo)
- changed = true
- }
- }
- }
- if (changed) {
- this._emitUserListChanged()
- }
- }
- private _dealStorageDataChanged(metadata: any): void {
- // 处理语言变化
- const { transcribe1, translate1List, transcribe2, translate2List } = metadata
- if (transcribe1?.value || transcribe2?.value) {
- const parseTranscribe1 = transcribe1?.value ? JSON.parse(transcribe1.value) : ''
- const parseTranslate1 = translate1List?.value ? JSON.parse(translate1List.value) : []
- const parseTranscribe2 = transcribe2?.value ? JSON.parse(transcribe2.value) : ''
- const parseTranslate2 = translate2List?.value ? JSON.parse(translate2List.value) : []
- this.emit('languagesChanged', {
- transcribe1: parseTranscribe1,
- translate1List: parseTranslate1,
- transcribe2: parseTranscribe2,
- translate2List: parseTranslate2,
- })
- }
- // 处理STT数据变化
- const { status, taskId, token, startTime, duration } = metadata
- if (status?.value) {
- this.emit('sttDataChanged', {
- status: JSON.parse(status.value),
- taskId: taskId ? JSON.parse(taskId.value) : undefined,
- token: token ? JSON.parse(token.value) : undefined,
- startTime: startTime ? JSON.parse(startTime.value) : undefined,
- duration: duration ? JSON.parse(duration.value) : undefined,
- })
- }
- }
- private _emitUserListChanged(): void {
- this.emit('userListChanged', Array.from(this._userMap.values()))
- }
- private async _updateUserInfo(userName: string): Promise<void> {
- if (!this._joined) return
- await this._setPresenceState({
- type: 'UserInfo',
- userId: this._userId,
- userName,
- })
- }
- private async _setPresenceState(attr: Record<string, any>): Promise<void> {
- if (!this._joined) return
- const state: Record<string, string> = {}
- for (const key in attr) {
- const value = attr[key]
- state[key] = typeof value === 'string' ? value : JSON.stringify(value)
- }
- await this._client?.presence.setState(this._channel, CHANNEL_TYPE, state)
- }
- private async _checkHost(): Promise<void> {
- const result = await this._client?.presence.whoNow(this._channel, CHANNEL_TYPE)
- if (result?.totalOccupancy === 1) {
- await this._removeChannelMetadata()
- }
- }
- private async _removeChannelMetadata(metadata?: Record<string, any>): Promise<void> {
- const data: MetadataItem[] = []
- const options: any = {}
- for (const key in metadata) {
- data.push({
- key,
- value: JSON.stringify(metadata[key]),
- })
- }
- if (data.length) {
- options.data = data
- }
- await this._client?.storage.removeChannelMetadata(this._channel, CHANNEL_TYPE, options)
- }
- private async _setLock(): Promise<void> {
- const { lockDetails = [] } =
- (await this._client?.lock.getLock(this._channel, CHANNEL_TYPE)) || {}
- if (!lockDetails.find((v: any) => v.lockName === LOCK_STT)) {
- await this._client?.lock.setLock(this._channel, CHANNEL_TYPE, LOCK_STT)
- }
- }
- }
|