diff --git a/docs/CP_MSG_AUDIT_SDK_SAFE_USAGE.md b/docs/CP_MSG_AUDIT_SDK_SAFE_USAGE.md index b3a3ea1d3..b64e4612b 100644 --- a/docs/CP_MSG_AUDIT_SDK_SAFE_USAGE.md +++ b/docs/CP_MSG_AUDIT_SDK_SAFE_USAGE.md @@ -1,5 +1,7 @@ # 企业微信会话存档SDK安全使用指南 +## 说明 +该方案已废弃,请使用新版本:[CP_MSG_AUDIT_THREADLOCAL_LIFECYCLE_REFACTOR.md](CP_MSG_AUDIT_THREADLOCAL_LIFECYCLE_REFACTOR.md) ## 问题背景 在使用企业微信会话存档功能时,部分开发者遇到了JVM崩溃的问题。典型错误信息如下: diff --git a/docs/CP_MSG_AUDIT_THREADLOCAL_LIFECYCLE_REFACTOR.md b/docs/CP_MSG_AUDIT_THREADLOCAL_LIFECYCLE_REFACTOR.md new file mode 100644 index 000000000..072ceefd0 --- /dev/null +++ b/docs/CP_MSG_AUDIT_THREADLOCAL_LIFECYCLE_REFACTOR.md @@ -0,0 +1,204 @@ +# 会话存档SDK生命周期重构方案 + +## Context + +当前实现(4.8.x)通过"共享SDK + 引用计数 + 7200秒过期"来管理会话存档SDK生命周期。 +该方案存在以下核心问题: + +1. **频繁初始化/销毁**:每次调用 `releaseSdk()` 后引用计数归零即销毁SDK。对于"拉取→解密→下载媒体"这类典型串行调用链,每步操作都会触发重新初始化。 +2. **7200秒过期规则无依据**:官方文档FAQ明确说"不需要每次new/init sdk,可以在多次拉取中复用同一个sdk",无任何7200秒过期说明。 +3. **线程安全问题**:企微技术人员建议"一个线程一个SDK实例",当前设计多线程共享同一SDK实例,存在并发安全隐患。 + +--- + +## 推荐方案:ThreadLocal SDK 模式 + +> **核心原则**:每个线程拥有独立SDK实例,懒初始化,生命周期与线程绑定。 + +### 设计要点 + +- 使用 `ThreadLocal` 为每个线程持有独立SDK +- SDK在线程首次调用时初始化,后续所有操作复用(无需重复初始化) +- 移除7200秒过期机制 +- 移除引用计数机制(每线程独占,无需计数) +- 提供显式清理接口:`closeThreadLocalSdk()`(线程结束时调)、`closeAllSdks()`(应用关闭时调) + +### 生命周期示意 + +``` +Thread A: init SDK_A → getChatRecords → getDecryptChatData → downloadMediaFile → [任务结束后调closeThreadLocalSdk] +Thread B: init SDK_B → getChatRecords → getDecryptChatData → downloadMediaFile → ... +Thread C: init SDK_C → ... +``` + +--- + +## 涉及文件 + +| 文件 | 变更类型 | +|------|--------| +| `weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpMsgAuditServiceImpl.java` | 主要重构 | +| `weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMsgAuditService.java` | 新增接口方法 | +| `weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/WxCpConfigStorage.java` | 废弃旧SDK管理方法 | +| `weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/impl/WxCpDefaultConfigImpl.java` | 废弃旧字段/方法 | +| `weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMsgAuditTest.java` | 补充测试 | +| `docs/CP_MSG_AUDIT_SDK_SAFE_USAGE.md` | 更新文档 | + +--- + +## 详细变更 + +### 1. WxCpMsgAuditServiceImpl(主要变更) + +**新增字段:** +```java +/** 每个线程持有独立SDK实例 */ +private final ThreadLocal threadLocalSdk = new ThreadLocal<>(); + +/** 跟踪所有已创建SDK,用于统一清理 */ +private final Set managedSdks = ConcurrentHashMap.newKeySet(); +``` + +**废弃字段/方法:** +- 废弃常量 `SDK_EXPIRES_TIME = 7200`(无官方依据) +- 废弃 `initSdk()`(由 `getOrInitThreadLocalSdk()` 替代) +- 废弃 `acquireSdk()` / `releaseSdk()`(由ThreadLocal模式替代) + +**新增核心方法:** + +```java +/** + * 获取当前线程的SDK,不存在则创建。SDK在线程内跨调用复用,无需每次重新初始化。 + */ +private long getOrInitThreadLocalSdk() throws WxErrorException { + Long sdk = threadLocalSdk.get(); + if (sdk != null && sdk > 0) { + return sdk; + } + long newSdk = createSdk(); + threadLocalSdk.set(newSdk); + managedSdks.add(newSdk); + log.info("线程 [{}] 初始化会话存档SDK成功,sdk={}", Thread.currentThread().getName(), newSdk); + return newSdk; +} + +/** + * 创建并初始化一个新SDK(私有,只在当前线程无SDK时调用) + */ +private long createSdk() throws WxErrorException { + WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage(); + // ... 与现有 initSdk() 内的库加载+Finance.NewSdk()+Finance.Init() 逻辑一致 ... + // 注意:Finance.loadingLibraries() 是幂等的(System.load内部防重复),多线程调用安全 +} + +/** + * 关闭当前线程持有的SDK,释放本地资源。 + * 在线程任务结束时调用(如定时任务finally块,或线程池线程销毁时)。 + */ +public void closeThreadLocalSdk() { + Long sdk = threadLocalSdk.get(); + if (sdk != null && sdk > 0) { + Finance.DestroySdk(sdk); + managedSdks.remove(sdk); + threadLocalSdk.remove(); + log.info("线程 [{}] 关闭会话存档SDK,sdk={}", Thread.currentThread().getName(), sdk); + } +} + +/** + * 关闭所有线程持有的SDK。应用关闭时调用(如Spring @PreDestroy / Shutdown Hook)。 + */ +public void closeAllSdks() { + managedSdks.forEach(sdk -> { + Finance.DestroySdk(sdk); + log.info("关闭会话存档SDK,sdk={}", sdk); + }); + managedSdks.clear(); + threadLocalSdk.remove(); +} +``` + +**更新新API方法(getChatRecords / getDecryptChatData / getChatRecordPlainText / downloadMediaFile):** +- 调用 `getOrInitThreadLocalSdk()` 替代 `acquireSdk()` +- 移除 try-finally 中的 `releaseSdk(sdk)` 调用(SDK不再每次释放) +- 方法变得更简洁:直接使用sdk,无需包装计数 + +**保留旧API方法不变(getChatDatas / getDecryptData / getChatPlainText / getMediaFile):** +- 保持 @Deprecated 标注 +- 内部调用改为 `getOrInitThreadLocalSdk()` 以保持一致性(旧方法也受益于ThreadLocal) +- 移除对 `initSdk()` 的依赖 + +### 2. WxCpMsgAuditService(接口新增) + +```java +/** + * 关闭当前线程持有的SDK,释放native资源。 + * Finance.DestroySdk() 不会随线程结束自动执行,无论线程池还是独立线程, + * 均应在任务结束的finally块中调用本方法,防止native内存、连接等资源泄漏。 + */ +void closeThreadLocalSdk(); + +/** + * 关闭所有会话存档SDK实例。 + * 适用于应用关闭时(如Spring Bean销毁阶段)统一释放资源。 + */ +void closeAllSdks(); +``` + +### 3. WxCpConfigStorage(废弃旧SDK管理API) + +对以下方法标记 `@Deprecated`(保留实现,不做破坏性删除): +- `getMsgAuditSdk()` / `updateMsgAuditSdk()` / `expireMsgAuditSdk()` / `isMsgAuditSdkExpired()` +- `acquireMsgAuditSdk()` / `releaseMsgAuditSdk()` +- `incrementMsgAuditSdkRefCount()` / `decrementMsgAuditSdkRefCount()` / `getMsgAuditSdkRefCount()` + +### 4. WxCpDefaultConfigImpl(废弃旧字段) + +- 将 `msgAuditSdk`、`msgAuditSdkExpiresTime`、`msgAuditSdkRefCount` 字段标记 `@Deprecated` +- 对应的 getter/setter/acquire/release 方法标记 `@Deprecated` +- 保留实现,确保向后兼容 + +--- + +## 使用示例(更新文档) + +```java +// ✅ 典型用法(一次任务中串行调用,SDK在同线程内复用,无重复初始化) +WxCpMsgAuditService msgAuditService = wxCpService.getMsgAuditService(); + +try { + List records = msgAuditService.getChatRecords(seq, 100L, null, null, 30L); + for (WxCpChatDatas.WxCpChatData record : records) { + WxCpChatModel model = msgAuditService.getDecryptChatData(record, 2); + if ("image".equals(model.getMsgType())) { + msgAuditService.downloadMediaFile(model.getImage().getSdkFileId(), null, null, 30L, "/tmp/img.jpg"); + } + } +} finally { + // 无论线程池还是独立线程,均建议在 finally 中显式调用。 + // Finance.DestroySdk() 不会随线程结束自动执行,依赖 closeAllSdks() 兜底会造成 + // native 内存/连接资源的延迟泄漏,对定时任务等长期运行场景尤其有害。 + msgAuditService.closeThreadLocalSdk(); +} + +// 应用关闭时(Spring @PreDestroy 或 Shutdown Hook) +// msgAuditService.closeAllSdks(); +``` + +--- + +## 注意事项 + +1. **线程池场景下必须调用 `closeThreadLocalSdk()`**:线程池中线程会被复用,如不主动清理,下次任务仍会使用旧线程的SDK。对于计划任务/批处理,建议在 finally 块中调用。 +2. **独立线程同样建议显式关闭**:`Finance.DestroySdk()` 是 native 调用,不会随线程结束自动执行,JVM GC 也不会触发它。依赖 `closeAllSdks()` 兜底意味着 native 内存、网络连接等资源在整个应用运行期间一直持有,对定时任务等高频场景会持续积累,建议统一在 finally 块中调用 `closeThreadLocalSdk()`。 +3. **多企业(多CorpId)场景**:`threadLocalSdk` 是实例字段(非static),不同 `WxCpMsgAuditServiceImpl` 实例(不同企业)的ThreadLocal独立,互不影响。 +4. **库加载幂等性**:`Finance.loadingLibraries()` 底层调用 `System.load()`,JVM保证同一库不重复加载,多线程并发调用安全。 + +--- + +## 验证方式 + +1. **单元测试**:在 `WxCpMsgAuditTest` 中添加测试,验证同线程多次调用不触发重新初始化(可通过日志或mock Finance验证) +2. **多线程压测**:多线程并发调用 `getChatRecords` + `getDecryptChatData`,观察无JVM崩溃 +3. **线程池复用测试**:使用固定线程池多次提交任务,验证 `closeThreadLocalSdk()` 后下次任务能正确重新初始化SDK +4. **应用关闭测试**:调用 `closeAllSdks()`,验证所有线程的SDK被正确销毁 diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMsgAuditService.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMsgAuditService.java index b754e32b7..5e8811953 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMsgAuditService.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpMsgAuditService.java @@ -215,4 +215,20 @@ void downloadMediaFile(@NonNull String sdkfileid, String proxy, String passwd, @ */ WxCpAgreeInfo checkSingleAgree(@NonNull WxCpCheckAgreeRequest checkAgreeRequest) throws WxErrorException; + /** + * 关闭当前线程持有的SDK,释放本地资源。 + *

+ * 在线程池场景下,任务结束后必须在 finally 块中调用此方法,防止SDK实例随线程复用而泄漏。 + * 独立线程或一次性任务也建议调用,以主动释放原生资源。 + */ + void closeThreadLocalSdk(); + + /** + * 关闭所有会话存档SDK实例,释放全部原生资源。 + *

+ * 适用于应用关闭阶段(如 Spring Bean 销毁阶段 {@code @PreDestroy} 或 Shutdown Hook)。 + * 调用后,所有线程的SDK均不可再使用。 + */ + void closeAllSdks(); + } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpMsgAuditServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpMsgAuditServiceImpl.java index 8ddd9f878..be6588bc7 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpMsgAuditServiceImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpMsgAuditServiceImpl.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import static me.chanjar.weixin.cp.constant.WxCpApiPathConsts.MsgAudit.*; @@ -37,16 +39,17 @@ public class WxCpMsgAuditServiceImpl implements WxCpMsgAuditService { private final WxCpService cpService; - /** - * SDK初始化有效期,根据企微文档为7200秒 - */ - private static final int SDK_EXPIRES_TIME = 7200; + /** 每个线程持有独立 SDK 实例,懒初始化,线程内跨调用复用 */ + private final ThreadLocal threadLocalSdk = new ThreadLocal<>(); + + /** 跟踪所有已创建的 SDK,用于 closeAllSdks() 统一清理 */ + private final Set managedSdks = ConcurrentHashMap.newKeySet(); @Override public WxCpChatDatas getChatDatas(long seq, @NonNull long limit, String proxy, String passwd, @NonNull long timeout) throws Exception { - // 获取或初始化SDK - long sdk = this.initSdk(); + // 旧版 API:每次调用创建新 SDK,由调用方负责通过 Finance.DestroySdk(chatDatas.getSdk()) 释放 + long sdk = this.createSdk(); long slice = Finance.NewSlice(); long ret = Finance.GetChatData(sdk, seq, limit, proxy, passwd, timeout, slice); @@ -68,23 +71,39 @@ public WxCpChatDatas getChatDatas(long seq, @NonNull long limit, String proxy, S } /** - * 获取或初始化SDK,如果SDK已过期则重新初始化 + * 获取当前线程的 SDK,不存在则初始化。 + * SDK 在线程内跨调用复用,无需每次重新初始化。 * * @return sdk id * @throws WxErrorException 初始化失败时抛出异常 */ - private synchronized long initSdk() throws WxErrorException { - WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage(); - - // 检查SDK是否已缓存且未过期 - if (!configStorage.isMsgAuditSdkExpired()) { - long cachedSdk = configStorage.getMsgAuditSdk(); - if (cachedSdk > 0) { - return cachedSdk; + private long getOrInitThreadLocalSdk() throws WxErrorException { + Long sdk = threadLocalSdk.get(); + if (sdk != null && sdk > 0) { + // 校验句柄是否仍受管理:closeAllSdks() 后其他线程 ThreadLocal 可能保留已销毁的 id + if (managedSdks.contains(sdk)) { + return sdk; } + log.warn("线程 [{}] 发现已失效的会话存档SDK句柄 sdk={},请检查调用逻辑", Thread.currentThread().getName(), sdk); + threadLocalSdk.remove(); + throw new WxErrorException("线程 [" + Thread.currentThread().getName() + "] 获取会话存档SDK失败,请检查是否已调用 closeAllSdks()"); } + long newSdk = createSdk(); + threadLocalSdk.set(newSdk); + managedSdks.add(newSdk); + log.info("线程 [{}] 初始化会话存档SDK成功,sdk={}", Thread.currentThread().getName(), newSdk); + return newSdk; + } + + /** + * 创建并初始化一个新的会话存档 SDK 实例。 + *

通常通过 {@link #getOrInitThreadLocalSdk()} 间接调用以复用 ThreadLocal 中的实例; + * 旧版直接暴露 sdk 的 API(如 {@link #getChatDatas})也会直接调用本方法,此时 SDK 由调用方自行管理。

+ *

Finance.loadingLibraries() 底层依赖 System.load(),JVM 保证同一库不重复加载,多线程并发调用安全。

+ */ + private long createSdk() throws WxErrorException { + WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage(); - // SDK未初始化或已过期,需要重新初始化 String configPath = configStorage.getMsgAuditLibPath(); if (StringUtils.isEmpty(configPath)) { throw new WxErrorException("请配置会话存档sdk文件的路径,不要配错了!!"); @@ -130,55 +149,31 @@ private synchronized long initSdk() throws WxErrorException { Finance.DestroySdk(sdk); throw new WxErrorException("init sdk err ret " + ret); } - - // 缓存SDK - configStorage.updateMsgAuditSdk(sdk, SDK_EXPIRES_TIME); - log.debug("初始化会话存档SDK成功,sdk={}", sdk); - return sdk; } - /** - * 获取SDK并增加引用计数(原子操作) - * 如果SDK未初始化或已过期,会自动初始化 - * - * @return sdk id - * @throws WxErrorException 初始化失败时抛出异常 - */ - private long acquireSdk() throws WxErrorException { - WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage(); - - // 尝试获取现有的有效SDK并增加引用计数(原子操作) - long sdk = configStorage.acquireMsgAuditSdk(); - - if (sdk > 0) { - // 成功获取到有效的SDK - return sdk; - } - - // SDK未初始化或已过期,需要初始化 - // initSdk()方法已经是synchronized的,确保只有一个线程初始化 - sdk = this.initSdk(); - - // 初始化后增加引用计数 - int refCount = configStorage.incrementMsgAuditSdkRefCount(sdk); - if (refCount < 0) { - // SDK已经被替换,需要重新获取 - return acquireSdk(); + @Override + public void closeThreadLocalSdk() { + Long sdk = threadLocalSdk.get(); + // 先从 managedSdks 摘除,摘除成功才调 DestroySdk,防止与 closeAllSdks() 并发时 double-free + if (sdk != null && managedSdks.remove(sdk)) { + Finance.DestroySdk(sdk); + log.info("线程 [{}] 关闭会话存档SDK,sdk={}", Thread.currentThread().getName(), sdk); } - - return sdk; + threadLocalSdk.remove(); } - /** - * 释放SDK引用计数 - * - * @param sdk sdk id - */ - private void releaseSdk(long sdk) { - if (sdk > 0) { - cpService.getWxCpConfigStorage().releaseMsgAuditSdk(sdk); + @Override + public void closeAllSdks() { + // 逐一 remove 后再 Destroy,防止与 closeThreadLocalSdk() 并发时 double-free + Long[] sdks = managedSdks.toArray(new Long[0]); + for (Long sdk : sdks) { + if (managedSdks.remove(sdk)) { + Finance.DestroySdk(sdk); + log.info("关闭会话存档SDK,sdk={}", sdk); + } } + threadLocalSdk.remove(); } @Override @@ -240,17 +235,18 @@ public void getMediaFile(@NonNull long sdk, @NonNull String sdkfileid, String pr * 为空字符串,拉取后续分片时直接填入上次返回的indexbuf即可。 */ File targetFile = new File(targetFilePath); - if (!targetFile.getParentFile().exists()) { - targetFile.getParentFile().mkdirs(); + File parentDir = targetFile.getParentFile(); + if (parentDir != null && !parentDir.exists()) { + parentDir.mkdirs(); } this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, i -> { try { // 大于512k的文件会分片拉取,此处需要使用追加写,避免后面的分片覆盖之前的数据。 - FileOutputStream outputStream = new FileOutputStream(targetFile, true); - outputStream.write(i); - outputStream.close(); + try (FileOutputStream outputStream = new FileOutputStream(targetFile, true)) { + outputStream.write(i); + } } catch (Exception e) { - e.printStackTrace(); + log.error("写入媒体文件分片失败,targetFilePath={}", targetFilePath, e); } }); } @@ -280,7 +276,7 @@ public void getMediaFile(@NonNull long sdk, @NonNull String sdkfileid, String pr // 大于512k的文件会分片拉取,此处需要使用追加写,避免后面的分片覆盖之前的数据。 action.accept(Finance.GetData(mediaData)); } catch (Exception e) { - e.printStackTrace(); + log.error("处理媒体文件分片失败,sdkfileid={}", sdkfileid, e); } if (Finance.IsMediaDataFinish(mediaData) == 1) { @@ -327,69 +323,48 @@ public WxCpAgreeInfo checkSingleAgree(@NonNull WxCpCheckAgreeRequest checkAgreeR @Override public List getChatRecords(long seq, @NonNull long limit, String proxy, String passwd, @NonNull long timeout) throws Exception { - // 获取SDK并自动增加引用计数(原子操作) - long sdk = this.acquireSdk(); - - try { - long slice = Finance.NewSlice(); - long ret = Finance.GetChatData(sdk, seq, limit, proxy, passwd, timeout, slice); - if (ret != 0) { - Finance.FreeSlice(slice); - throw new WxErrorException("getchatdata err ret " + ret); - } + long sdk = this.getOrInitThreadLocalSdk(); - // 拉取会话存档 - String content = Finance.GetContentFromSlice(slice); + long slice = Finance.NewSlice(); + long ret = Finance.GetChatData(sdk, seq, limit, proxy, passwd, timeout, slice); + if (ret != 0) { Finance.FreeSlice(slice); - WxCpChatDatas chatDatas = WxCpChatDatas.fromJson(content); - if (chatDatas.getErrCode().intValue() != 0) { - throw new WxErrorException(chatDatas.toJson()); - } + throw new WxErrorException("getchatdata err ret " + ret); + } - List chatDataList = chatDatas.getChatData(); - return chatDataList != null ? chatDataList : Collections.emptyList(); - } finally { - // 释放SDK引用计数(原子操作) - this.releaseSdk(sdk); + // 拉取会话存档 + String content = Finance.GetContentFromSlice(slice); + Finance.FreeSlice(slice); + WxCpChatDatas chatDatas = WxCpChatDatas.fromJson(content); + if (chatDatas.getErrCode().intValue() != 0) { + throw new WxErrorException(chatDatas.toJson()); } + + List chatDataList = chatDatas.getChatData(); + return chatDataList != null ? chatDataList : Collections.emptyList(); } @Override public WxCpChatModel getDecryptChatData(@NonNull WxCpChatDatas.WxCpChatData chatData, @NonNull Integer pkcs1) throws Exception { - // 获取SDK并自动增加引用计数(原子操作) - long sdk = this.acquireSdk(); - - try { - String plainText = this.decryptChatData(sdk, chatData, pkcs1); - return WxCpChatModel.fromJson(plainText); - } finally { - // 释放SDK引用计数(原子操作) - this.releaseSdk(sdk); - } + long sdk = this.getOrInitThreadLocalSdk(); + String plainText = this.decryptChatData(sdk, chatData, pkcs1); + return WxCpChatModel.fromJson(plainText); } @Override public String getChatRecordPlainText(@NonNull WxCpChatDatas.WxCpChatData chatData, @NonNull Integer pkcs1) throws Exception { - // 获取SDK并自动增加引用计数(原子操作) - long sdk = this.acquireSdk(); - - try { - return this.decryptChatData(sdk, chatData, pkcs1); - } finally { - // 释放SDK引用计数(原子操作) - this.releaseSdk(sdk); - } + long sdk = this.getOrInitThreadLocalSdk(); + return this.decryptChatData(sdk, chatData, pkcs1); } @Override public void downloadMediaFile(@NonNull String sdkfileid, String proxy, String passwd, @NonNull long timeout, @NonNull String targetFilePath) throws WxErrorException { - // 获取SDK并自动增加引用计数(原子操作) long sdk; try { - sdk = this.acquireSdk(); + sdk = this.getOrInitThreadLocalSdk(); } catch (Exception e) { throw new WxErrorException(e); } @@ -397,54 +372,43 @@ public void downloadMediaFile(@NonNull String sdkfileid, String proxy, String pa // 使用AtomicReference捕获Lambda中的异常,以便在执行完后抛出 final java.util.concurrent.atomic.AtomicReference exceptionHolder = new java.util.concurrent.atomic.AtomicReference<>(); - try { - File targetFile = new File(targetFilePath); - if (!targetFile.getParentFile().exists()) { - targetFile.getParentFile().mkdirs(); + File targetFile = new File(targetFilePath); + File parentDir = targetFile.getParentFile(); + if (parentDir != null && !parentDir.exists()) { + parentDir.mkdirs(); + } + this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, i -> { + // 如果之前已经发生异常,不再继续处理 + if (exceptionHolder.get() != null) { + return; } - this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, i -> { - // 如果之前已经发生异常,不再继续处理 - if (exceptionHolder.get() != null) { - return; - } - try { - // 大于512k的文件会分片拉取,此处需要使用追加写,避免后面的分片覆盖之前的数据。 - FileOutputStream outputStream = new FileOutputStream(targetFile, true); + try { + // 大于512k的文件会分片拉取,此处需要使用追加写,避免后面的分片覆盖之前的数据。 + try (FileOutputStream outputStream = new FileOutputStream(targetFile, true)) { outputStream.write(i); - outputStream.close(); - } catch (Exception e) { - exceptionHolder.set(e); } - }); - - // 检查是否发生异常,如果有则抛出 - Exception caughtException = exceptionHolder.get(); - if (caughtException != null) { - throw new WxErrorException(caughtException); + } catch (Exception e) { + exceptionHolder.set(e); } - } finally { - // 释放SDK引用计数(原子操作) - this.releaseSdk(sdk); + }); + + // 检查是否发生异常,如果有则抛出 + Exception caughtException = exceptionHolder.get(); + if (caughtException != null) { + throw new WxErrorException(caughtException); } } @Override public void downloadMediaFile(@NonNull String sdkfileid, String proxy, String passwd, @NonNull long timeout, @NonNull Consumer action) throws WxErrorException { - // 获取SDK并自动增加引用计数(原子操作) long sdk; try { - sdk = this.acquireSdk(); + sdk = this.getOrInitThreadLocalSdk(); } catch (Exception e) { throw new WxErrorException(e); } - - try { - this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, action); - } finally { - // 释放SDK引用计数(原子操作) - this.releaseSdk(sdk); - } + this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, action); } } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/WxCpConfigStorage.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/WxCpConfigStorage.java index f716f9cd8..fe6acf12d 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/WxCpConfigStorage.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/WxCpConfigStorage.java @@ -300,18 +300,24 @@ public interface WxCpConfigStorage { void updateMsgAuditAccessToken(String accessToken, int expiresInSeconds); /** - * 获取会话存档SDK - * 会话存档SDK初始化后有效期为7200秒,无需每次重新初始化 + * 获取会话存档SDK(历史接口)。 + *

历史实现中,会话存档 SDK 初始化后有效期为 7200 秒,由 ConfigStorage 负责维护; + * 该语义现已废弃,不再保证。

* - * @return sdk id,如果未初始化或已过期返回0 + * @return sdk id;历史实现中如果未初始化或已过期返�� 0,当前实现仅为兼容旧代码保留此方法 + * @deprecated SDK 生命周期已改由 {@link me.chanjar.weixin.cp.api.WxCpMsgAuditService} 内部的 ThreadLocal + * 模式管理,不再依赖 ConfigStorage 缓存。请迁移至新接口。 */ + @Deprecated long getMsgAuditSdk(); /** * 检查会话存档SDK是否已过期 * * @return true: 已过期, false: 未过期 + * @deprecated SDK 生命周期已改由 ThreadLocal 模式管理,过期检查不再必要。 */ + @Deprecated boolean isMsgAuditSdkExpired(); /** @@ -319,12 +325,17 @@ public interface WxCpConfigStorage { * * @param sdk sdk id * @param expiresInSeconds 过期时间(秒) + * @deprecated SDK 生命周期已改由 ThreadLocal 模式管理,无需通过 ConfigStorage 更新。 */ + @Deprecated void updateMsgAuditSdk(long sdk, int expiresInSeconds); /** * 使会话存档SDK过期 + * + * @deprecated SDK 生命周期已改由 ThreadLocal 模式管理,此方法已无实际作用。 */ + @Deprecated void expireMsgAuditSdk(); /** @@ -333,7 +344,9 @@ public interface WxCpConfigStorage { * * @param sdk sdk id * @return 增加后的引用计数,如果SDK不匹配返回-1 + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated int incrementMsgAuditSdkRefCount(long sdk); /** @@ -342,7 +355,9 @@ public interface WxCpConfigStorage { * * @param sdk sdk id * @return 减少后的引用计数,如果返回0表示SDK已被销毁,如果SDK不匹配返回-1 + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated int decrementMsgAuditSdkRefCount(long sdk); /** @@ -350,7 +365,9 @@ public interface WxCpConfigStorage { * * @param sdk sdk id * @return 当前引用计数,如果SDK不匹配返回-1 + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated int getMsgAuditSdkRefCount(long sdk); /** @@ -359,7 +376,9 @@ public interface WxCpConfigStorage { * 此方法用于在获取SDK后立即增加引用计数,避免并发问题 * * @return 当前有效的SDK id并已增加引用计数,如果SDK无效返回0 + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated long acquireMsgAuditSdk(); /** @@ -367,6 +386,8 @@ public interface WxCpConfigStorage { * 此方法确保引用计数递减和SDK检查在同一个同步块内完成 * * @param sdk sdk id + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated void releaseMsgAuditSdk(long sdk); } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/impl/WxCpDefaultConfigImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/impl/WxCpDefaultConfigImpl.java index 86ede8241..c7b300ba4 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/impl/WxCpDefaultConfigImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/config/impl/WxCpDefaultConfigImpl.java @@ -61,12 +61,21 @@ public class WxCpDefaultConfigImpl implements WxCpConfigStorage, Serializable { protected transient Lock msgAuditAccessTokenLock = new ReentrantLock(); /** * 会话存档SDK及其过期时间 + * + * @deprecated SDK 生命周期已改由 {@link me.chanjar.weixin.cp.api.impl.WxCpMsgAuditServiceImpl} 内部的 + * ThreadLocal 模式管理,此字段保留仅为向后兼容。 */ + @Deprecated private volatile long msgAuditSdk; + /** @deprecated 同 msgAuditSdk */ + @Deprecated private volatile long msgAuditSdkExpiresTime; /** - * 会话存档SDK引用计数,用于多线程安全的生命周期管理 + * 会话存档SDK引用计数 + * + * @deprecated 引用计数机制已废弃,由 ThreadLocal 模式替代。 */ + @Deprecated private volatile int msgAuditSdkRefCount; private volatile String oauth2redirectUri; private volatile String httpProxyHost; @@ -500,16 +509,19 @@ public synchronized void updateMsgAuditAccessToken(String accessToken, int expir } @Override + @Deprecated public long getMsgAuditSdk() { return this.msgAuditSdk; } @Override + @Deprecated public boolean isMsgAuditSdkExpired() { return System.currentTimeMillis() > this.msgAuditSdkExpiresTime; } @Override + @Deprecated public synchronized void updateMsgAuditSdk(long sdk, int expiresInSeconds) { // 如果有旧的SDK且不同于新的SDK,需要销毁旧的SDK if (this.msgAuditSdk > 0 && this.msgAuditSdk != sdk) { @@ -525,11 +537,13 @@ public synchronized void updateMsgAuditSdk(long sdk, int expiresInSeconds) { } @Override + @Deprecated public void expireMsgAuditSdk() { this.msgAuditSdkExpiresTime = 0; } @Override + @Deprecated public synchronized int incrementMsgAuditSdkRefCount(long sdk) { if (this.msgAuditSdk == sdk && sdk > 0) { return ++this.msgAuditSdkRefCount; @@ -538,6 +552,7 @@ public synchronized int incrementMsgAuditSdkRefCount(long sdk) { } @Override + @Deprecated public synchronized int decrementMsgAuditSdkRefCount(long sdk) { if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) { int newCount = --this.msgAuditSdkRefCount; @@ -554,6 +569,7 @@ public synchronized int decrementMsgAuditSdkRefCount(long sdk) { } @Override + @Deprecated public synchronized int getMsgAuditSdkRefCount(long sdk) { if (this.msgAuditSdk == sdk && sdk > 0) { return this.msgAuditSdkRefCount; @@ -562,6 +578,7 @@ public synchronized int getMsgAuditSdkRefCount(long sdk) { } @Override + @Deprecated public synchronized long acquireMsgAuditSdk() { // 检查SDK是否有效(已初始化且未过期) if (this.msgAuditSdk > 0 && !isMsgAuditSdkExpired()) { @@ -572,6 +589,7 @@ public synchronized long acquireMsgAuditSdk() { } @Override + @Deprecated public synchronized void releaseMsgAuditSdk(long sdk) { if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) { int newCount = --this.msgAuditSdkRefCount; diff --git a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMsgAuditTest.java b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMsgAuditTest.java index a1ea40f3f..7a7fbd137 100644 --- a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMsgAuditTest.java +++ b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/WxCpMsgAuditTest.java @@ -756,80 +756,79 @@ public void testGetMediaFile() throws Exception { /** * 测试新的安全API方法(推荐使用) - * 这些方法不需要手动管理SDK生命周期,更加安全 + * 这些方法不需要手动管理SDK生命周期,SDK由框架 ThreadLocal 模式统一管理。 + * Finance.DestroySdk() 不会随线程结束自动执行,无论线程池还是独立线程, + * 均应在 finally 块中调用 closeThreadLocalSdk() 以释放 native 资源。 */ @Test public void testNewSafeApi() throws Exception { WxCpMsgAuditService msgAuditService = cpService.getMsgAuditService(); - - // 测试新的getChatRecords方法 - 不暴露SDK - List chatRecords = msgAuditService.getChatRecords(0L, 10L, null, null, 1000L); - log.info("获取到 {} 条聊天记录", chatRecords.size()); - - for (WxCpChatDatas.WxCpChatData chatData : chatRecords) { - // 测试新的getDecryptChatData方法 - 不需要传入SDK - WxCpChatModel decryptData = msgAuditService.getDecryptChatData(chatData, 2); - log.info("解密数据:{}", decryptData.toJson()); - - // 测试新的getChatRecordPlainText方法 - 不需要传入SDK - String plainText = msgAuditService.getChatRecordPlainText(chatData, 2); - log.info("明文数据:{}", plainText); - - // 如果是媒体消息,测试新的downloadMediaFile方法 - String msgType = decryptData.getMsgType(); - if ("image".equals(msgType) || "voice".equals(msgType) || "video".equals(msgType) || "file".equals(msgType)) { - String suffix = ""; - String md5Sum = ""; - String sdkFileId = ""; - - switch (msgType) { - case "image": - suffix = ".jpg"; - md5Sum = decryptData.getImage().getMd5Sum(); - sdkFileId = decryptData.getImage().getSdkFileId(); - break; - case "voice": - suffix = ".amr"; - md5Sum = decryptData.getVoice().getMd5Sum(); - sdkFileId = decryptData.getVoice().getSdkFileId(); - break; - case "video": - suffix = ".mp4"; - md5Sum = decryptData.getVideo().getMd5Sum(); - sdkFileId = decryptData.getVideo().getSdkFileId(); - break; - case "file": - md5Sum = decryptData.getFile().getMd5Sum(); - suffix = "." + decryptData.getFile().getFileExt(); - sdkFileId = decryptData.getFile().getSdkFileId(); - break; - default: - // 未知消息类型,跳过处理 - continue; - } - - // 测试新的downloadMediaFile方法 - 不需要传入SDK - String path = Thread.currentThread().getContextClassLoader().getResource("").getPath(); - String targetPath = path + "testfile-new/" + md5Sum + suffix; - File file = new File(targetPath); - - // 确保父目录存在 - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - - // 删除已存在的文件 - if (file.exists()) { - file.delete(); + + try { + // 测试新的getChatRecords方法 - 不暴露SDK + List chatRecords = msgAuditService.getChatRecords(0L, 10L, null, null, 1000L); + log.info("获取到 {} 条聊天记录", chatRecords.size()); + + for (WxCpChatDatas.WxCpChatData chatData : chatRecords) { + // 测试新的getDecryptChatData方法 - 不需要传入SDK + WxCpChatModel decryptData = msgAuditService.getDecryptChatData(chatData, 2); + log.info("解密数据:{}", decryptData.toJson()); + + // 测试新的getChatRecordPlainText方法 - 不需要传入SDK + String plainText = msgAuditService.getChatRecordPlainText(chatData, 2); + log.info("明文数据:{}", plainText); + + // 如果是媒体消息,测试新的downloadMediaFile方法 + String msgType = decryptData.getMsgType(); + if ("image".equals(msgType) || "voice".equals(msgType) || "video".equals(msgType) || "file".equals(msgType)) { + String suffix = ""; + String md5Sum = ""; + String sdkFileId = ""; + + switch (msgType) { + case "image": + suffix = ".jpg"; + md5Sum = decryptData.getImage().getMd5Sum(); + sdkFileId = decryptData.getImage().getSdkFileId(); + break; + case "voice": + suffix = ".amr"; + md5Sum = decryptData.getVoice().getMd5Sum(); + sdkFileId = decryptData.getVoice().getSdkFileId(); + break; + case "video": + suffix = ".mp4"; + md5Sum = decryptData.getVideo().getMd5Sum(); + sdkFileId = decryptData.getVideo().getSdkFileId(); + break; + case "file": + md5Sum = decryptData.getFile().getMd5Sum(); + suffix = "." + decryptData.getFile().getFileExt(); + sdkFileId = decryptData.getFile().getSdkFileId(); + break; + default: + continue; + } + + // 测试新的downloadMediaFile方法 - 不需要传入SDK + String path = Thread.currentThread().getContextClassLoader().getResource("").getPath(); + String targetPath = path + "testfile-new/" + md5Sum + suffix; + File file = new File(targetPath); + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + if (file.exists()) { + file.delete(); + } + + msgAuditService.downloadMediaFile(sdkFileId, null, null, 1000L, targetPath); + log.info("媒体文件下载成功:{}", targetPath); } - - // 使用新的API下载媒体文件 - msgAuditService.downloadMediaFile(sdkFileId, null, null, 1000L, targetPath); - log.info("媒体文件下载成功:{}", targetPath); } + } finally { + // 必须显式调用:Finance.DestroySdk() 不会自动执行,不调用将导致 native 资源泄漏 + msgAuditService.closeThreadLocalSdk(); } - - // 注意:使用新API无需手动调用 Finance.DestroySdk(),SDK由框架自动管理 } // 测试Uint64类型