routes_io_messages.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import { SocketWithUser , Variables} from './router_io.ts';
  2. import { MessageType, MessageStatus } from '../client/share/types.ts'
  3. import { APIClient } from "@d8d-appcontainer/api";
  4. interface MessageSendData {
  5. title: string;
  6. content: string;
  7. type: MessageType;
  8. receiver_ids: number[];
  9. }
  10. interface MessageListData {
  11. page?: number;
  12. pageSize?: number;
  13. type?: MessageType;
  14. status?: MessageStatus;
  15. }
  16. export function setupMessageEvents({ socket , apiClient }:Variables) {
  17. // 订阅频道
  18. socket.on('message:subscribe', (channel: string) => {
  19. try {
  20. socket.join(channel);
  21. socket.emit('message:subscribed', {
  22. message: `成功订阅频道: ${channel}`,
  23. channel
  24. });
  25. } catch (error) {
  26. console.error('订阅频道失败:', error);
  27. socket.emit('error', '订阅频道失败');
  28. }
  29. });
  30. // 取消订阅
  31. socket.on('message:unsubscribe', (channel: string) => {
  32. try {
  33. socket.leave(channel);
  34. socket.emit('message:unsubscribed', {
  35. message: `已取消订阅频道: ${channel}`,
  36. channel
  37. });
  38. } catch (error) {
  39. console.error('取消订阅失败:', error);
  40. socket.emit('error', '取消订阅失败');
  41. }
  42. });
  43. // 广播消息
  44. socket.on('message:broadcast', async (data: {
  45. channel?: string;
  46. title: string;
  47. content: string;
  48. type: MessageType;
  49. }) => {
  50. try {
  51. const { channel, title, content, type } = data;
  52. const user = socket.user;
  53. if (!user) {
  54. socket.emit('error', '未授权访问');
  55. return;
  56. }
  57. // 创建广播消息
  58. const [messageId] = await apiClient.database.table('messages').insert({
  59. title,
  60. content,
  61. type,
  62. sender_id: user.id,
  63. sender_name: user.username,
  64. is_broadcast: 1,
  65. created_at: apiClient.database.fn.now(),
  66. updated_at: apiClient.database.fn.now()
  67. });
  68. // 广播到所有客户端或特定频道
  69. const broadcastTarget = channel ? socket.to(channel) : socket.broadcast;
  70. broadcastTarget.emit('message:broadcasted', {
  71. id: messageId,
  72. title,
  73. content,
  74. type,
  75. sender_id: user.id,
  76. sender_name: user.username,
  77. created_at: new Date().toISOString()
  78. });
  79. socket.emit('message:broadcasted', {
  80. message: '广播消息发送成功',
  81. data: { id: messageId }
  82. });
  83. } catch (error) {
  84. console.error('广播消息失败:', error);
  85. socket.emit('error', '广播消息失败');
  86. }
  87. });
  88. // 发送消息
  89. socket.on('message:send', async (data: MessageSendData) => {
  90. try {
  91. const { title, content, type, receiver_ids } = data;
  92. if (!title || !content || !type || !receiver_ids?.length) {
  93. socket.emit('error', '缺少必要参数');
  94. return;
  95. }
  96. const user = socket.user;
  97. if (!user) {
  98. socket.emit('error', '未授权访问');
  99. return;
  100. }
  101. // 创建消息
  102. const [messageId] = await apiClient.database.table('messages').insert({
  103. title,
  104. content,
  105. type,
  106. sender_id: user.id,
  107. sender_name: user.username,
  108. created_at: apiClient.database.fn.now(),
  109. updated_at: apiClient.database.fn.now()
  110. });
  111. // 关联用户消息
  112. const userMessages = receiver_ids.map((userId: number) => ({
  113. user_id: userId,
  114. message_id: messageId,
  115. status: MessageStatus.UNREAD,
  116. created_at: apiClient.database.fn.now(),
  117. updated_at: apiClient.database.fn.now()
  118. }));
  119. await apiClient.database.table('user_messages').insert(userMessages);
  120. socket.emit('message:sent', {
  121. message: '消息发送成功',
  122. data: { id: messageId }
  123. });
  124. } catch (error) {
  125. console.error('发送消息失败:', error);
  126. socket.emit('error', '发送消息失败');
  127. }
  128. });
  129. // 获取消息列表
  130. socket.on('message:list', async (data: MessageListData) => {
  131. try {
  132. const { page = 1, pageSize = 20, type, status } = data;
  133. const user = socket.user;
  134. if (!user) {
  135. socket.emit('error', '未授权访问');
  136. return;
  137. }
  138. const query = apiClient.database.table('user_messages as um')
  139. .select('m.*', 'um.status as user_status', 'um.read_at', 'um.id as user_message_id')
  140. .leftJoin('messages as m', 'um.message_id', 'm.id')
  141. .where('um.user_id', user.id)
  142. .where('um.is_deleted', 0)
  143. .orderBy('m.created_at', 'desc')
  144. .limit(pageSize)
  145. .offset((page - 1) * pageSize);
  146. if (type) query.where('m.type', type);
  147. if (status) query.where('um.status', status);
  148. const countQuery = query.clone();
  149. const messages = await query;
  150. // 获取总数用于分页
  151. const total = await countQuery.count();
  152. const totalCount = Number(total);
  153. const totalPages = Math.ceil(totalCount / pageSize);
  154. socket.emit('message:list', {
  155. data: messages,
  156. pagination: {
  157. total: totalCount,
  158. current: page,
  159. pageSize,
  160. totalPages
  161. }
  162. });
  163. } catch (error) {
  164. console.error('获取消息列表失败:', error);
  165. socket.emit('error', '获取消息列表失败');
  166. }
  167. });
  168. // 获取消息详情
  169. socket.on('message:detail', async (messageId: number) => {
  170. try {
  171. const user = socket.user;
  172. if (!user) {
  173. socket.emit('error', '未授权访问');
  174. return;
  175. }
  176. const message = await apiClient.database.table('user_messages as um')
  177. .select('m.*', 'um.status as user_status', 'um.read_at')
  178. .leftJoin('messages as m', 'um.message_id', 'm.id')
  179. .where('um.user_id', user.id)
  180. .where('um.message_id', messageId)
  181. .first();
  182. if (!message) {
  183. socket.emit('error', '消息不存在或无权访问');
  184. return;
  185. }
  186. // 标记为已读
  187. if (message.user_status === MessageStatus.UNREAD) {
  188. await apiClient.database.table('user_messages')
  189. .where('user_id', user.id)
  190. .where('message_id', messageId)
  191. .update({
  192. status: MessageStatus.READ,
  193. read_at: apiClient.database.fn.now(),
  194. updated_at: apiClient.database.fn.now()
  195. });
  196. }
  197. socket.emit('message:detail', {
  198. message: '获取消息成功',
  199. data: message
  200. });
  201. } catch (error) {
  202. console.error('获取消息详情失败:', error);
  203. socket.emit('error', '获取消息详情失败');
  204. }
  205. });
  206. // 删除消息
  207. socket.on('message:delete', async (messageId: number) => {
  208. try {
  209. const user = socket.user;
  210. if (!user) {
  211. socket.emit('error', '未授权访问');
  212. return;
  213. }
  214. await apiClient.database.table('user_messages')
  215. .where('user_id', user.id)
  216. .where('message_id', messageId)
  217. .update({
  218. is_deleted: 1,
  219. updated_at: apiClient.database.fn.now()
  220. });
  221. socket.emit('message:deleted', { message: '消息已删除' });
  222. } catch (error) {
  223. console.error('删除消息失败:', error);
  224. socket.emit('error', '删除消息失败');
  225. }
  226. });
  227. // 获取未读消息数
  228. socket.on('message:count', async () => {
  229. try {
  230. const user = socket.user;
  231. if (!user) {
  232. socket.emit('error', '未授权访问');
  233. return;
  234. }
  235. const count = await apiClient.database.table('user_messages')
  236. .where('user_id', user.id)
  237. .where('status', MessageStatus.UNREAD)
  238. .where('is_deleted', 0)
  239. .count();
  240. socket.emit('message:count', { count: Number(count) });
  241. } catch (error) {
  242. console.error('获取未读消息数失败:', error);
  243. socket.emit('error', '获取未读消息数失败');
  244. }
  245. });
  246. // 标记消息为已读
  247. socket.on('message:read', async (messageId: number) => {
  248. try {
  249. const user = socket.user;
  250. if (!user) {
  251. socket.emit('error', '未授权访问');
  252. return;
  253. }
  254. await apiClient.database.table('user_messages')
  255. .where('user_id', user.id)
  256. .where('message_id', messageId)
  257. .update({
  258. status: MessageStatus.READ,
  259. read_at: apiClient.database.fn.now(),
  260. updated_at: apiClient.database.fn.now()
  261. });
  262. socket.emit('message:read', { message: '消息已标记为已读' });
  263. } catch (error) {
  264. console.error('标记消息为已读失败:', error);
  265. socket.emit('error', '标记消息为已读失败');
  266. }
  267. });
  268. }