diff --git a/.gitignore b/.gitignore index 4bfa550..dd18300 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea/ *.iml target/ +logs/ diff --git a/README.md b/README.md index ee935b4..f03629e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DataRangers ## 项目背景 -datarangers-sdk-java是 [DataFinder](https://www.volcengine.com/product/datafinder) 的用户行为采集服务端SDK。 +datarangers-sdk-java 是 [DataFinder](https://www.volcengine.com/product/datafinder) 的用户行为采集服务端SDK。 服务端埋点支持在客户的服务端进行埋点采集和上报,作为客户端埋点的补充或替代,其支持的典型场景包括: 1. 客户端埋点+服务端埋点组合:该场景下,服务端埋点一般用来弥补客户端埋点覆盖不到的部分数据,是目前最常见的使用场景。 @@ -13,12 +13,10 @@ datarangers-sdk-java是 [DataFinder](https://www.volcengine.com/product/datafind com.datarangers datarangers-sdk-core - 1.5.3-release + 1.5.6-release ``` -version是sdk的版本号,当前最新的版本为1.5.3-release。 - 火山引擎仓库地址: ```xml @@ -30,324 +28,16 @@ version是sdk的版本号,当前最新的版本为1.5.3-release。 ``` -### 2. 配置SDK -DataRangers SDK需要进行一定的参数配置才能够使用,具体需要配置的参数为: -* domain:datarangers的域名或者ip,支持http和https,例如为 http://www.datarangers.com,在私有化环境中,需要修改为对应的sdk上报域名或者使用DataRangers服务器的ip地址。在saas环境中需要修改成对应的域名: - * 中国区:https://mcs.ctobsnssdk.com - * sg(新加坡): https://mcs.tobsnssdk.com - * va(美东): https://mcs.itobsnssdk.com -* save:bool型变量,表示是否保存到文件: - * true:保存到文件,但是需要配置LogAgent完成事件上报功能,需要额外定义: - * eventSaveName:保存日志的文件名,需要保证文件的写权限。 - * eventSavePath:保存日志的文件路径,需要保证写权限和创建文件的权限。 - * eventSaveMaxFileSize:表示需要保存的日志文件的最大文件大小,单位为MB。 - * eventFilePaths:表示需要保存的日志文件的位置,为一个字符串数组,数组中的每一个值都表示一个路径,用户将日志文件写到不同的文件夹下,可以配合多个LogAgent实例使用。注意:如果定义了该数组,则eventSavePath不会生效。 - * false:使用http模式进行异步上报: - * httpTimeout:Http的超时时间,单位为ms。 - * headers:Map类型,Http请求的Header中的字段,必填为Host ,Host在DataRangers安装中进行定义 -* mode: 枚举值,支持kafka,http,file。建议使用新的该配置。当mode和save同时存在的时候,以mode为准 - * http 等同于save=false - * file 等同于save=true - * kafka,支持直接通过kafka进行上报,当使用该模式的时候,需要配置kafka的上报地址: - * bootstrapServers: kafka的地址 - * properties: 是一个map,需要配置的其他的kafka properties。kafkaProducer的参数参考:https://kafka.apache.org/0102/documentation.html#producerconfigs - -如果您使用了Spring框架,则可以参考的配置如下: -```xml - - - - - - - - - - - - - - - - - - - - - - - - - -``` - -如果您使用SpringBoot框架,我们提供了一个封装完成的的starter包,您可以在pom中通过如下方式引入: -```xml - - com.datarangers - datarangers-sdk-starter - 1.5.3-release - -``` - -并在properties文件中对sdk进行配置 -```properties -# 使能sdk功能,为false就disable sdk功能 -datarangers.sdk.enable=true - -# privatization 表示是私有化环境, saas表示是saas环境,默认是私有化环境。sdk会根据配置的datarangers.sdk.domain自动识别是否是saas环境,该配置可选 -# datarangers.sdk.env=privatization -# rangers的ip或域名 -datarangers.sdk.domain=http://domain - -# datarangers.sdk.headers为http请求中headers字段内容,在私有化环境中必须要添加Host,而在saas环境中 不能配置Host,其他如果需要设置的可以选填 -# Host的配置在安装部署的那台机器上,查看/home/datarangers/DataRangersDeploy/conf_rangers.yml中配置项sdk.report.host -datarangers.sdk.headers.Host=host - -# 如果在saas环境中,需要配置appkey -# datarangers.sdk.appKeys.${appId}=xxx - -# 如果是在saas环境中,需要配置openapi, 私有化环境中可以不配置 -# openapi的domain, 国内: https://analytics.volcengineapi.com,国际是: https://datarangers.com -# datarangers.sdk.openapiConfig.domain=xxx +### 2. 使用方式 +参考官方文档 [DataFinder](https://www.volcengine.com/docs/6285/75430) -# openapi的ak, sk -# datarangers.sdk.openapiConfig.ak=xxx -# datarangers.sdk.openapiConfig.sk=xxx - - -# 是否保存到本地,如果需要配合logagent使用需要将其定义为true -datarangers.sdk.save=true -# 异步方式的发送线程数量,如果为logagent模式请设置为1 -datarangers.sdk.threadCount=4 -# 异步方式队列长度 -datarangers.sdk.queueSize=102400 -# 保存日志文件路径 -datarangers.sdk.eventSavePath=logs/ -# 保存日志文件名 -datarangers.sdk.eventSaveName=datarangers -# 最多保存的单个日志的大小,单位MB -datarangers.sdk.eventSaveMaxFileSize=256 - -# client是否需要进行ssl证书认证,默认为false,false表示需要进行证书认证,这也是jdk自身的默认标准行为。如果访问https, 需要把证书导入到证书库里面,默认使用的是jdk的证书库,建议客户使用这种方式;如果不想导入的话,可以设置trustDisable为true,sdk会通过设置一个自定义的trustManager跳过认证 -# datarangers.sdk.httpConfig.trustDisable=false - -# 自定义证书路径和密码,false表示使用jdk自身的默认路径 -# datarangers.sdk.httpConfig.customKeyTrustEnable=false - -# 配置证书 -# datarangers.sdk.httpConfig.keyMaterialPath=xxx -# datarangers.sdk.httpConfig.keyPassword=xxx -# datarangers.sdk.httpConfig.storePassword=xxx -# datarangers.sdk.httpConfig.trustMaterialPath=xxx - -# self for selfTrustStrategy, default is all -# datarangers.sdk.httpConfig.trustStrategy=xxx - -# http 超时配置 -# http request timeout, 单位是毫秒 -# datarangers.sdk.httpConfig.requestTimeout=10000 -# http connect timeout, 单位是毫秒 -# datarangers.sdk.httpConfig.connectTimeout=10000 -# http socket timeout, 单位是毫秒 -# datarangers.sdk.httpConfig.socketTimeout=20000 -# http keep alive time, 单位是秒 -# datarangers.sdk.httpConfig.keepAliveTimeout=180 - -# http 连接配置 -# 连接池最大连接数 -# datarangers.sdk.httpConfig.maxTotal=1000 -# 每一个 host 的最大连接数 -# datarangers.sdk.httpConfig.maxPerRoute=100 - -# kafka 配置 -# 设置模式为kafka -# datarangers.sdk.mode=kafka - -# 配置发送的kafka topic,没有配置时,使用默认sdk_origin_event, -# datarangers.sdk.kafka.topic=sdk_origin_event - -# 配置发送的地址,ip需要替换成真实的ip -# datarangers.sdk.kafka.bootstrapServers={ip1}:9192,{ip2}:9192 - -# 如果有需要,配置其他的属性, 形式为:datarangers.sdk.kafka.properties.${key}=${value}, 比如配置重试次数。 -# kafkaProducer的参数参考:https://kafka.apache.org/0102/documentation.html#producerconfigs -# 重试次数 -# datarangers.sdk.kafka.properties.retries=3 -``` +### 3. Demo +参考 [datarangers-sdk-example](https://github.com/volcengine/datarangers-sdk-java/tree/main/datarangers-sdk-example) 代码样例 -### 3. 使用SDK -使用时需要先注入Bean,Bean有三种类型,如下: -```java -// App -@Resource(name = "appEventCollector") -private EventCollector appEventCollector; -// Web -@Resource(name = "webEventCollector") -private EventCollector webEventCollector; -// 小程序 -@Resource(name = "mpEventCollector") -private EventCollector mpEventCollector; -``` - -如果您已经注入完成了,则可以调用bean进行事件发送。发送的接口为: -```java -/** - * 功能描述: 异步发送事件 - * - * @param appId 应用id - * @param custom 用户自定义公共参数 - * @param eventName 事件名称 - * @param eventParams 事件参数 - * @param userUniqueId 用户uuid - * @return: void - * @date: 2020/8/26 12:24 - */ -void sendEvent(String userUniqueId, int appId, Map custom, String eventName, Map eventParams); - -/** - * 功能描述: 批量发送事件 - * - * @param header 事件的公共属性,可以通过调用HeaderV3.Builder().build()构建一个header - * @param events 事件数组 一般不推荐自己构建事件数组,我们推荐使用EventsBuilder这个类对多事件进行构造,并调用build方法生成事件数组 - * @return: void - * @date: 2020/12/25 15:57 - */ -void sendEvents(Header header, List events); - -/** - * 功能描述: 发送单条事件 - * - * @param header 事件的公共属性,可以通过调用HeaderV3.Builder().build()构建一个header - * @param eventName 事件名 - * @param eventParams 事件参数 - * @return: void - * @date: 2020/9/28 22:00 - */ -void sendEvent(Header header, String eventName, Map eventParams); - -/** - * 功能描述: 批量发送事件 - * - * @param header 事件的公共属性,可以通过调用HeaderV3.Builder().build()构建一个header - * @param eventName 事件名数组,需要与eventParams数组长度相同 - * @return: void - * @date: 2020/12/25 15:59 - */ -void sendEvent(Header header, List eventName, List> eventParams); - -/** - * 功能描述: 对userUniqueId的用户进行profile属性设置 - * - * @param appId app id - * @param userUniqueId 用户id - * @param eventParams 需要设置的用户属性 - * @return: void - * @date: 2020/12/23 10:43 - */ -void profileSet(String userUniqueId, int appId, Map eventParams); - -void profileSetOnce(String userUniqueId, int appId, Map eventParams); - -void profileIncrement(String userUniqueId, int appId, Map eventParams); - -void profileAppend(String userUniqueId, int appId, Map eventParams); - -/** - * 功能描述: 删除用户的属性 - * @param appId app id - * @param userUniqueId uuid - * @param params 需要删除的用户属性名 - * @return: void - * @date: 2020/12/25 16:11 - */ -void profileUnset(String userUniqueId, int appId, List params); - -/** - * 功能描述: 对业务对象进行设置 - * - * @param appId app id - * @param name 业务对象的名称 - * @param items 业务对象的类,需要继承Items类,注意 - * @return: void - * @date: 2020/12/23 10:47 - */ -void itemSet(int appId, String name, List items); - -/** - * 功能描述: 删除item的属性 - * @param appId - * @param id - * @param name - * @param params 需要删除的item属性 - * @return: void - * @date: 2020/12/25 16:13 - */ -void itemUnset(int appId, String id, String name, List params); -``` - -## 使用示例 -1. 发送普通事件 -```java -@Resource(name = "appEventCollector") -private EventCollector appEventCollector; - -appEventCollector.sendEvent("uuid2", 10000000, null, "test_event_java_sdk", new HashMap() {{ - put("date_time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))); - put("current_time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss"))); -}}); - -``` - -2. 设置用户属性 -```java -eventCollector.profileSet("uuid-1", 10000028, new HashMap() {{ - put("profile_1", "param_1"); - put("profile_2", "param_2"); - put("profile_3", "param_3"); - put("profile_4", "param_4"); -}}); -``` - -3. 设置Item属性 -```java -List items = new ArrayList<>(); -items.add(new BookItems("1000", "book").setName("Java").setPrice(100).setPublishDate(LocalDate.now()).setAuthors(author).setCategory("computer")); -items.add(new BookItems("1002", "book").setName("PHP").setPrice(100).setPublishDate(LocalDate.now()).setAuthors(author).setCategory("computer")); -eventCollector.itemSet(10000028, "book", items); -``` - -4. 发送携带item的事件 -```java -List items = new ArrayList<>(); -items.add(new BookItems("1000", "book")); -items.add(new BookItems("1002", "book")); -items.add(new PhoneItems("1002", "phone")); -eventCollector.sendEvent("user-001", 10000028, null, "set_items", new HashMap() {{ - put("param1", "params"); - put("param2", items.get(0)); - put("param3", items.get(1)); - put("param4", items.get(2)); -}}); -``` - -5. 使用header上报事件 -```java -// 可以设置userUniqueId和deviceId等,具体字段可以查看Header类 -Map custom = new HashMap(); -Map eventParams = new HashMap(); -Header header = new HeaderV3.Builder() - .setCustom(custom) - .setAppId(10000000) - .setUserUniqueId("uuid-1") - .setDeviceId(1231232131313123L) - .build(); - -appEventCollector.sendEvent(header, "test_event_java_sdk_header", eventParams); -``` ## 注意事项 * 当前sdk版本没有主动清理日志的功能,需要手动清理日志 +* 上报事件,需要注意下事件发生时间 ## License diff --git a/SDKTest.iml b/SDKTest.iml deleted file mode 100644 index 78b2cc5..0000000 --- a/SDKTest.iml +++ /dev/null @@ -1,2 +0,0 @@ - - \ No newline at end of file diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml new file mode 100644 index 0000000..3d035e1 --- /dev/null +++ b/datarangers-sdk-core/pom.xml @@ -0,0 +1,94 @@ + + + + datarangers-sdk + com.datarangers + 1.5.6-release + + 4.0.0 + jar + datarangers-server-sdk-core + A SDK for datarangers user + https://github.com/volcengine/datarangers-sdk-java + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + datarangers-opensource + datarangers-opensource@bytedance.com + DataFinder + https://www.volcengine.com/product/datafinder + + + com.datarangers + datarangers-sdk-core + + + + + org.apache.httpcomponents.client5 + httpclient5 + 5.0.1 + + + com.fasterxml.jackson.core + jackson-core + 2.11.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.11.4 + + + com.fasterxml.jackson.core + jackson-databind + 2.11.4 + + + org.apache.kafka + kafka-clients + 0.10.2.1 + + + junit + junit + 4.12 + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + + \ No newline at end of file diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java index 6fd53ae..c8f96c6 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java @@ -43,6 +43,10 @@ public List consume(int waitTimeMs) throws InterruptedException { return handleMessage(messageQueue.poll(waitTimeMs)); } + public List consume(int size, int waitTimeMs) throws InterruptedException { + return handleMessage(messageQueue.poll(size, waitTimeMs)); + } + public int size() { return messageQueue.size(); } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java index c3eed78..493c5c2 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java @@ -34,7 +34,7 @@ public void run() { FileOutputStream stream = null; try { stream = new FileOutputStream(output, true); - Map status = new HashMap() {{ + Map status = new HashMap(2) {{ put("history", CollectorContainer.SEND_HISTORY); put("queue_length", Collector.collectorContainer.size()); }}; diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java index 8edf846..bbae533 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java @@ -23,7 +23,17 @@ public interface CollectorQueue { * @date: 2021/2/7 15:51 */ List take() throws InterruptedException; + List poll(int waitTimeMs) throws InterruptedException; + + /** + * 每次等待waitTimeMs,获取数据最多size的数据 + * @param size + * @param waitTimeMs + * @return + * @throws InterruptedException + */ + List poll(int size, int waitTimeMs) throws InterruptedException; /** * 功能描述: 发送一个Message到队列中 * diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java index 6d1f198..8210d27 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java @@ -7,131 +7,149 @@ package com.datarangers.asynccollector; import com.datarangers.config.DataRangersSDKConfigProperties; -import com.datarangers.config.EventConfig; import com.datarangers.config.RangersJSONConfig; +import com.datarangers.config.SdkMode; import com.datarangers.logger.RangersLoggerWriterPool; import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.sender.MessageSenderFactory; - -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class Consumer implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(Consumer.class); - private static RangersLoggerWriterPool pool; - private CollectorContainer collectorContainer; - private DataRangersSDKConfigProperties sdkConfigProperties; - - public Consumer(CollectorContainer collectorContainer, - DataRangersSDKConfigProperties sdkConfigProperties) { - this.collectorContainer = collectorContainer; - this.sdkConfigProperties = sdkConfigProperties; - } - - public static void setWriterPool(final List targetPrefixes, String targetNames, - int maxSize) { - if (pool == null) { - synchronized (Consumer.class) { + private static final Logger logger = LoggerFactory.getLogger(Consumer.class); + private static RangersLoggerWriterPool pool; + private CollectorContainer collectorContainer; + private DataRangersSDKConfigProperties sdkConfigProperties; + + public Consumer(CollectorContainer collectorContainer, + DataRangersSDKConfigProperties sdkConfigProperties) { + this.collectorContainer = collectorContainer; + this.sdkConfigProperties = sdkConfigProperties; + } + + public static void setWriterPool(final List targetPrefixes, String targetNames, + int maxSize) { if (pool == null) { - pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize); + synchronized (Consumer.class) { + if (pool == null) { + pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize); + } + } } - } } - } - - private void send() throws Exception { - while (true) { - try { - List messages = collectorContainer.consume(); - if (messages != null) { - messages.forEach(message -> { - doSend(message); - }); + + private void send() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(); + if (messages != null) { + messages.forEach(message -> { + doSend(message); + }); + } + } catch (Throwable e) { + e.printStackTrace(); + logger.error("consumer send error", e); + } } - } catch (Throwable e) { - e.printStackTrace(); - logger.error("consumer send error", e); - } } - } - - private void write() throws Exception { - while (true) { - try { - List messages = collectorContainer.consume(); - if (messages != null) { - messages.forEach(message -> { - doWrite(message); - }); + + private void sendBatch() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(sdkConfigProperties.getBatchSize(), + sdkConfigProperties.getWaitTimeMs()); + if (messages != null && messages.size() > 0) { + MessageSenderFactory.getMessageSender(messages.get(0)) + .sendBatch(messages, this.sdkConfigProperties); + } + } catch (Throwable e) { + e.printStackTrace(); + logger.error("consumer send error", e); + } } - } catch (Throwable e) { - logger.error("consumer write error", e); - } + } + private void write() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(); + if (messages != null) { + messages.forEach(message -> { + doWrite(message); + }); + } + } catch (Throwable e) { + logger.error("consumer write error", e); + } + + } } - } - - @Override - public void run() { - try { - if (EventConfig.saveFlag) { - write(); - } else { - send(); - } - } catch (Exception e) { - e.printStackTrace(); - logger.error("consumer run error", e); + + @Override + public void run() { + try { + if (SdkMode.FILE == sdkConfigProperties.getMode()) { + write(); + } else if (sdkConfigProperties.isSendBatch()) { + sendBatch(); + } else { + send(); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("consumer run error", e); + } } - } - - public void flush() { - try { - System.out.println("flush message start"); - logger.info("flush message start"); - int count = 0; - if (collectorContainer.getMessageQueue() != null) { - Message message = collectorContainer.getMessageQueue().poll(); - while (message != null) { - count++; - message = collectorContainer.handleMessage(message); - if (EventConfig.saveFlag) { + + public void flush() { + try { + System.out.println("flush message start"); + logger.info("flush message start"); + int count = 0; + if (collectorContainer.getMessageQueue() != null) { + Message message = collectorContainer.getMessageQueue().poll(); + while (message != null) { + count++; + message = collectorContainer.handleMessage(message); + if (SdkMode.FILE == sdkConfigProperties.getMode()) { + doWrite(message); + } else { + doSend(message); + } + message = collectorContainer.getMessageQueue().poll(); + } + } + + logger.info("flush message success. size: {}", count); + System.out.println("flush message success. size: " + count); + } catch (Exception e) { + e.printStackTrace(); + logger.error("flush message error", e); + } + } + + public void flush(Message message) { + message = collectorContainer.handleMessage(message); + if (SdkMode.FILE == sdkConfigProperties.getMode()) { doWrite(message); - } else { + } else { doSend(message); - } - message = collectorContainer.getMessageQueue().poll(); } - } + } - logger.info("flush message success. size: {}", count); - System.out.println("flush message success. size: " + count); - } catch (Exception e) { - e.printStackTrace(); - logger.error("flush message error", e); + private void doSend(Message message) { + MessageSenderFactory.getMessageSender(message) + .send(message, this.sdkConfigProperties); } - } - - public void flush(Message message) { - message = collectorContainer.handleMessage(message); - if (EventConfig.saveFlag) { - doWrite(message); - } else { - doSend(message); + + private void doWrite(Message message) { + AppMessage appMessage = message.getAppMessage(); + pool.getWriter(appMessage.getUserUniqueId()) + .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n"); } - } - - private void doSend(Message message) { - MessageSenderFactory.getMessageSender(message) - .send(message, this.sdkConfigProperties); - } - - private void doWrite(Message message) { - AppMessage appMessage = message.getAppMessage(); - pool.getWriter(appMessage.getUserUniqueId()) - .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n"); - } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java index deced6c..39793a8 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java @@ -2,6 +2,7 @@ import com.datarangers.message.Message; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -57,6 +58,26 @@ public List poll(int waitTimeMs) throws InterruptedException { return null; } + @Override + public List poll(int size, int waitTimeMs) throws InterruptedException { + List messages = new ArrayList<>(); + Message msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS); + if(msg != null){ + messages.add(msg); + } + + // 只有 + while(messages.size() < size){ + msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS); + if(msg == null){ + // 退出循环 + break; + } + messages.add(msg); + } + return messages; + } + @Override public void put(Message t) throws InterruptedException { queue.put(t); diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java index 3cf9e12..719d44a 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java @@ -6,7 +6,6 @@ */ package com.datarangers.collector; -import com.datarangers.asynccollector.Consumer; import com.datarangers.config.Constants; import com.datarangers.config.DataRangersSDKConfigProperties; import com.datarangers.config.RangersJSONConfig; @@ -16,8 +15,8 @@ import com.datarangers.message.MessageType; import com.datarangers.profile.ItemMethod; import com.datarangers.profile.ProfileMethod; - import com.datarangers.sender.Callback; + import java.io.IOException; import java.util.*; @@ -32,15 +31,6 @@ public AppEventCollector(String appType, DataRangersSDKConfigProperties properti public AppEventCollector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { super(appType, properties, cb); - if (properties != null) { - properties.init(); - - // 设置同步发送的consumer,队列满的时候使用 - setConsumer(new Consumer(Collector.collectorContainer, this.properties)); - - } else { - System.out.println(Constants.INIT_ERROR); - } } @Override @@ -113,6 +103,22 @@ private void profile(String userUniqueId, int appId, ProfileMethod method, sendEvent(header, event, MessageType.PROFILE); } + private void profile(Header header, ProfileMethod method, + Map profiles) { + if (header == null || header.getAppId() == null) { + logger.error("header is null or appId is empty"); + return; + } + if (profiles == null) { + logger.error("userUniqueId={}, app_id={}, device_id={} params are null.", + header.getUserUniqueId(), header.getAppId(), header.getDeviceId()); + return; + } + Event event = new EventV3().setEvent(method.toString()).setParams(profiles) + .setUserId(header.getUserUniqueId()); + sendEvent(header, event, MessageType.PROFILE); + } + @Override public void profileSet(String userUniqueId, int appId, Map eventParams) { profile(userUniqueId, appId, ProfileMethod.SET, eventParams); @@ -135,6 +141,33 @@ public void profileUnset(String userUniqueId, int appId, List params) { profile(userUniqueId, appId, ProfileMethod.UNSET, eventParams); } + @Override + public void profileSet(Header header, Map profiles) { + profile(header, ProfileMethod.SET, profiles); + } + + @Override + public void profileSetOnce(Header header, Map profiles) { + profile(header, ProfileMethod.SET_ONCE, profiles); + } + + @Override + public void profileIncrement(Header header, Map profiles) { + profile(header, ProfileMethod.INCREMENT, profiles); + } + + @Override + public void profileAppend(Header header, Map profiles) { + profile(header, ProfileMethod.APPEND, profiles); + } + + @Override + public void profileUnset(Header header, List profiles) { + Map eventParams = new HashMap<>(); + profiles.forEach(p -> eventParams.put(p, "java")); + profile(header, ProfileMethod.UNSET, eventParams); + } + @Override public void profileAppend(String userUniqueId, int appId, Map eventParams) { profile(userUniqueId, appId, ProfileMethod.APPEND, eventParams); @@ -227,6 +260,7 @@ private void sendEvents(Header header, List events, MessageType messageTy appMessage.setAppType(getAppType()); appMessage.setHeader(header); appMessage.addEvents(events); + appMessage.setTraceId(UUID.randomUUID().toString()); message.setAppMessage(appMessage); send(message); diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java index fc69e3a..b6649e2 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java @@ -6,185 +6,354 @@ */ package com.datarangers.collector; -import com.datarangers.asynccollector.CollectorContainer; -import com.datarangers.asynccollector.Consumer; +import com.datarangers.asynccollector.*; import com.datarangers.config.*; +import com.datarangers.logger.RangersFileCleaner; import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.message.MessageEnv; import com.datarangers.sender.Callback; import com.datarangers.sender.Callback.FailedData; +import com.datarangers.sender.callback.LoggingCallback; +import com.datarangers.util.HttpUtils; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * @author taojian */ public abstract class Collector implements EventCollector { - private String appType; - public static final Logger logger = LoggerFactory.getLogger("DatarangersLog"); - public static ExecutorService httpRequestPool = null; - public static ScheduledExecutorService scheduled = null; - public static CollectorContainer collectorContainer; - - private boolean enable; - protected DataRangersSDKConfigProperties properties; - protected Callback callback; - protected Consumer consumer = null; - protected KafkaProducer kafkaProducer; - - public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { - this.appType = appType; - this.enable = properties.isEnable(); - this.properties = properties; - this.callback = cb; - this.properties.setCallback(this.getCallback()); - this.initKafkaProducer(); - } - - private void initKafkaProducer(){ - if(SdkMode.KAFKA != this.properties.getMode()){ - return; - } - // 设置过了就不需要再自己创建 - if(kafkaProducer != null){ - return; - } - kafkaProducer = createProducer(this.properties.getKafka()); - } - - private KafkaProducer createProducer(KafkaConfig kafkaConfig) { - Properties props = new Properties(); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("bootstrap.servers", kafkaConfig.getBootstrapServers()); - Map map = kafkaConfig.getProperties(); - if (map != null && (!map.isEmpty())) { - props.putAll(map); - } - return new KafkaProducer<>(props); - } - public String getAppType() { - return appType; - } - - public Collector setAppType(String appType) { - this.appType = appType; - return this; - } - - - public void send(Message message) { - sendMessage(message); - } - - protected void sendMessage(Message message) { - if (!enable) { - return; - } - message.merge(); - String sendMessage; - - validate(message); - if(kafkaProducer != null){ - // 使用kafka的方式 - sendByKafka(message.getAppMessage()); - return; - } - sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage()); - if (this.properties.isSync()) { - syncSendMessage(message, sendMessage); - } else { - asyncSendMessage(message, sendMessage); - } - } - - private void sendByKafka(AppMessage appMessage) { - // kafka sender,header 添加固定的头 - appMessage.getHeader().setSource(Constants.SDK_SERVER); - String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage); - try { - ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage); - kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { + private String appType; + public static final Logger logger = LoggerFactory.getLogger("DatarangersLog"); + public static ExecutorService executorService = null; + public static ScheduledExecutorService scheduled = null; + public static CollectorContainer collectorContainer; + private boolean enable; + protected DataRangersSDKConfigProperties properties; + protected Callback callback; + protected Consumer consumer = null; + protected static KafkaProducer kafkaProducer; + private static volatile Boolean IS_INIT = false; + + public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { + this.appType = appType; + this.enable = properties.isEnable(); + this.properties = properties; + this.callback = cb; + this.properties.setCallback(this.getCallback()); + this.init(); + } + + public String getAppType() { + return appType; + } + + public void setAppType(String appType) { + this.appType = appType; + } + + public Callback getCallback() { + return callback; + } + + public void setCallback(Callback callback) { + this.callback = callback; + } + + public Consumer getConsumer() { + return consumer; + } + + public void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + + + private void initModeKafka() { + if (SdkMode.KAFKA != this.properties.getMode()) { + return; + } + logger.info("init kafka producer"); + // 设置过了就不需要再自己创建 + if (kafkaProducer != null) { + return; + } + kafkaProducer = createProducer(this.properties.getKafka()); + } + + private KafkaProducer createProducer(KafkaConfig kafkaConfig) { + Properties props = new Properties(); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("bootstrap.servers", kafkaConfig.getBootstrapServers()); + Map map = kafkaConfig.getProperties(); + if (map != null && (!map.isEmpty())) { + props.putAll(map); + } + return new KafkaProducer<>(props); + } + + public void send(Message message) { + sendMessage(message); + } + + private void sendMessage(Message message) { + if (!enable) { + return; + } + message.merge(); + String sendMessage; + + validate(message); + if (kafkaProducer != null) { + // 使用kafka的方式 + sendByKafka(message.getAppMessage()); + return; + } + sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage()); + if (this.properties.isSync()) { + syncSendMessage(message, sendMessage); + } else { + asyncSendMessage(message, sendMessage); + } + } + + private void sendByKafka(AppMessage appMessage) { + // kafka sender,header 添加固定的头 + appMessage.getHeader().setSource(Constants.SDK_SERVER); + String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage); + try { + ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage); + kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); - } - } - }); - } catch (Exception e) { - e.printStackTrace(); - logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); - } - } - - private void syncSendMessage(Message message, String sendMessage) { - try { - this.consumer.flush(message); - } catch (Exception e) { - e.printStackTrace(); - logger.error("sync send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); - } - } - - private void asyncSendMessage(Message message, String sendMessage) { - if (collectorContainer.getMessageQueue() != null) { - try { - collectorContainer.produce(message); - } catch (Exception e) { - e.printStackTrace(); - logger.error("async send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); - } - } else { - logger.error("getMessageQueue is null"); - getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null")); - } - } - - /** - * message 检查 - */ - private void validate(Message message) { - // 当前只有saas需要校验下appkey - if (this.properties.getMessageEnv() != MessageEnv.SAAS) { - return; - } - - Integer appId = message.getAppMessage().getAppId(); - Map appKeys = this.properties.getAppKeys(); - String appKey = appKeys.get(appId); - if (appKey == null) { - throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId); - } - } - - public Callback getCallback() { - return callback; - } - - public void setCallback(Callback callback) { - this.callback = callback; - } - - public Consumer getConsumer() { - return consumer; - } - - public void setConsumer(Consumer consumer) { - this.consumer = consumer; - } + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } + + private void syncSendMessage(Message message, String sendMessage) { + try { + this.consumer.flush(message); + } catch (Exception e) { + e.printStackTrace(); + logger.error("sync send message error", e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } + + private void asyncSendMessage(Message message, String sendMessage) { + if (collectorContainer.getMessageQueue() != null) { + try { + collectorContainer.produce(message); + } catch (Exception e) { + e.printStackTrace(); + logger.error("async send message error", e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } else { + logger.error("getMessageQueue is null"); + getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null", false)); + } + } + + /** + * message 检查 + */ + private void validate(Message message) { + // 当前只有saas需要校验下appkey + if (this.properties.getMessageEnv() != MessageEnv.SAAS) { + return; + } + + Integer appId = message.getAppMessage().getAppId(); + Map appKeys = this.properties.getAppKeys(); + String appKey = appKeys.get(appId); + if (appKey == null) { + throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId); + } + } + + /** + * 配置初始化 + */ + private void init() { + if (!IS_INIT) { + synchronized (Collector.class) { + if (!IS_INIT) { + initLogger(); + initCommon(); + initSdkMode(); + initConsumer(); + initHook(); + IS_INIT = true; + logger.info("sdk config properties: \r\n{}", properties); + System.out.println("sdk config properties: \r\n" + properties); + } + } + } + } + + private void initSdkMode() { + initModeFile(); + initModeHttp(); + initModeKafka(); + } + + /** + * 日志pool初始化 + */ + private void initLogger() { + logger.info("init log writer pool"); + List eventFilePaths = properties.getEventFilePaths(); + String eventSaveName = properties.getEventSaveName(); + int eventSaveMaxFileSize = properties.getEventSaveMaxFileSize(); + String eventSavePath = properties.getEventSavePath(); + + Consumer.setWriterPool(eventFilePaths, eventSaveName, eventSaveMaxFileSize); + if (properties.getCallback() == null) { + properties.setCallback(new LoggingCallback(eventSavePath, "error-" + eventSaveName, + eventSaveMaxFileSize)); + } + } + + /** + * eventConfig,httpclient,EventConfig 初始化 + */ + private void initCommon() { + // 如果客户自定义了queue,则需要替换为客户自定义queue,否则使用默认的队列 + CollectorQueue userQueue = this.properties.getUserQueue(); + if (userQueue == null) { + collectorContainer = new CollectorContainer( + RangersCollectorQueue.getInstance(this.properties.getQueueSize())); + } else { + collectorContainer = new CollectorContainer(userQueue); + } + } + + /** + * 初始化消费者, httpRequestPool, 日志记录和清理任务 + */ + private void initConsumer() { + logger.info("init consumer"); + boolean isSync = this.properties.isSync(); + + // 同步设置 + if (isSync) { + setConsumer(new Consumer(Collector.collectorContainer, this.properties)); + return; + } + + // 异步起多个消费者 + if (executorService == null) { + int threadCount = this.properties.getThreadCount(); + executorService = Executors.newFixedThreadPool(threadCount); + + // 创建consumer + for (int i = 0; i < threadCount; i++) { + //必须全部消费同一个队列 + executorService.execute(new Consumer(collectorContainer, properties)); + } + } + } + + private void initModeHttp() { + if(SdkMode.HTTP != this.properties.getMode()){ + return; + } + HttpConfig httpConfig = properties.getHttpConfig(); + HttpClient httpClient = properties.getCustomHttpClient(); + Callback callback = properties.getCallback(); + int httpTimeOut = properties.getHttpTimeout(); + + //EventConfig配置 + EventConfig.setUrl(properties.getDomain()); + if (httpConfig.getMaxPerRoute() < properties.getThreadCount()) { + httpConfig.setMaxPerRoute(properties.getThreadCount()); + } + if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) { + httpConfig.setMaxTotal(httpConfig.getMaxPerRoute()); + } + // 老版本配置做兼容 + httpConfig.initTimeOut(httpTimeOut); + //httpclient 初始化 + HttpUtils.createHttpClient(httpConfig, httpClient, callback); + + //EventConfig 初始化 + if (EventConfig.SEND_HEADER == null) { + EventConfig.SEND_HEADER = properties.getHeaders(); + EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK"); + EventConfig.SEND_HEADER.put("Content-Type", "application/json"); + List
headerList = new ArrayList<>(); + EventConfig.SEND_HEADER + .forEach((key, value) -> headerList.add(new BasicHeader(key, value))); + EventConfig.headers = headerList.toArray(new Header[0]); + } + } + + private void initModeFile() { + if(SdkMode.FILE != this.properties.getMode()){ + return; + } + // thread 设置为1 + this.properties.setThreadCount(1); + + String eventSavePath = this.properties.getEventSavePath(); + List eventFilePaths = properties.getEventFilePaths(); + String eventSaveName = this.properties.getEventSaveName(); + int eventSaveMaxDays = this.properties.getEventSaveMaxDays(); + + //定时记录日志的条数 + scheduled = Executors.newSingleThreadScheduledExecutor(); + + scheduled + .scheduleAtFixedRate(new CollectorCounter(eventSavePath), 0, 2, TimeUnit.MINUTES); + + if (eventSaveMaxDays > 0) { + // 清理日志文件定时任务, 每隔12小时清理一次 + scheduled.scheduleAtFixedRate( + new RangersFileCleaner(eventFilePaths, eventSaveName, eventSaveMaxDays), + 0, 12, TimeUnit.HOURS); + logger.info("Start DataRangers Cleaner/Record Thread"); + } + } + + /** + * jvm关闭时,一些需要做的清理任务 + */ + private void initHook() { + logger.info("init hook"); + Runtime.getRuntime().addShutdownHook(new Thread( + () -> { + if (Collector.executorService != null) { + Collector.executorService.shutdown(); + } + + new Consumer(Collector.collectorContainer, properties).flush(); + })); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/EventCollector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/EventCollector.java index 3426ddb..135b1c2 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/EventCollector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/EventCollector.java @@ -115,6 +115,31 @@ public interface EventCollector { */ void profileUnset(String userUniqueId, int appId, List params); + /** + * 功能描述: 对用户进行profile属性设置 + * + * @param header 用户相关信息 + * @param profiles 需要设置的用户属性 + * @return: void + * @date 2020/12/23 10:43 + */ + void profileSet(Header header, Map profiles); + + void profileSetOnce(Header header, Map profiles); + + void profileIncrement(Header header, Map profiles); + + void profileAppend(Header header, Map profiles); + + /** + * 功能描述: 删除用户的属性 + * @param header 用户相关信息 + * @param profiles 需要删除的用户属性名 + * @return: void + * @date 2020/12/25 16:11 + */ + void profileUnset(Header header, List profiles); + /** * 功能描述: 对业务对象进行设置 * diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java index 8dae554..73e222c 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java @@ -11,10 +11,11 @@ import java.util.GregorianCalendar; public class Constants { - public static final String SDK_VERSION = "datarangers_sdk_1.5.3-release"; + public static final String SDK_VERSION = "datarangers_sdk_1.5.6-release"; public static DateTimeFormatter FULL_HOUR = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); public static DateTimeFormatter FULL_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static final String APP_LOG_PATH = "/sdk/log"; + public static final String APP_LIST_PATH = "/sdk/list"; public static final String DEFAULT_USER = "__rangers"; public static final String INIT_ERROR = "sdk config must not be null"; diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java index ada2c0b..fd32b7f 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java @@ -7,41 +7,27 @@ package com.datarangers.config; import com.datarangers.asynccollector.*; -import com.datarangers.collector.Collector; -import com.datarangers.logger.RangersFileCleaner; import com.datarangers.message.MessageEnv; import com.datarangers.sender.Callback; -import com.datarangers.sender.callback.LoggingCallback; -import com.datarangers.util.HttpUtils; import org.apache.hc.client5.http.classic.HttpClient; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** * @author hTangle */ public class DataRangersSDKConfigProperties { - - public static final Logger logger = LoggerFactory.getLogger(DataRangersSDKConfigProperties.class); + public boolean enable = true; public Map headers; public String domain; - public int threadPoolCount = 1; - public int maxPoolSize = 8; - public int corePoolSize = 4; + private int threadCount = 20; /** * 该配置过期,请使用httpConfig的配置 */ @Deprecated - public int httpTimeout = 500; + public int httpTimeout = 10000; public String timeZone = "+8"; public ZoneOffset timeOffset = null; @@ -54,32 +40,30 @@ public class DataRangersSDKConfigProperties { */ @Deprecated public boolean save = false; - public int threadCount = 1; - public int queueSize = 10240; - public boolean send = true; - - public boolean isEnable() { - return enable; - } - public void setEnable(boolean enable) { - this.enable = enable; - } + public int queueSize = 10240; - public boolean enable = true; + /** + * batch批量配置 + */ + private boolean sendBatch = false; + private int batchSize = 20; + private int waitTimeMs = 100; + /** + * file模式 相关配置 + */ public String eventSavePath = "logs/"; public List eventFilePaths; public String eventSaveName = "datarangers.log"; - public int eventSaveMaxHistory = 20; public int eventSaveMaxFileSize = 100; - public int eventSaveMaxDays = 5; - public CollectorQueue userQueue; + // 日志清理时间 + public int eventSaveMaxDays = -1; - public boolean hasConsumer = true; - public boolean hasProducer = true; + public CollectorQueue userQueue; + // http相关配置 private HttpConfig httpConfig = new HttpConfig(); private HttpClient customHttpClient; @@ -92,8 +76,8 @@ public void setEnable(boolean enable) { private String env = "privatization"; private List SAAS_DOMAIN_URLS = Arrays.asList( - "https://mcs.ctobsnssdk.com", - "https://mcs.tobsnssdk.com"); + "https://mcs.ctobsnssdk.com", + "https://mcs.tobsnssdk.com"); /** * saas openapi 配置地址 @@ -110,6 +94,38 @@ public void setEnable(boolean enable) { private SdkMode mode; private KafkaConfig kafka; + public boolean isSendBatch() { + return sendBatch; + } + + public void setSendBatch(boolean sendBatch) { + this.sendBatch = sendBatch; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getWaitTimeMs() { + return waitTimeMs; + } + + public void setWaitTimeMs(int waitTimeMs) { + this.waitTimeMs = waitTimeMs; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + public KafkaConfig getKafka() { return kafka; } @@ -148,24 +164,6 @@ public void setCallback(Callback callback) { this.callback = callback; } - public boolean isHasConsumer() { - return hasConsumer; - } - - public DataRangersSDKConfigProperties setHasConsumer(boolean hasConsumer) { - this.hasConsumer = hasConsumer; - return this; - } - - public boolean isHasProducer() { - return hasProducer; - } - - public DataRangersSDKConfigProperties setHasProducer(boolean hasProducer) { - this.hasProducer = hasProducer; - return this; - } - public int getHttpTimeout() { return httpTimeout; } @@ -215,15 +213,6 @@ public DataRangersSDKConfigProperties setEventSaveName(String eventSaveName) { return this; } - public int getEventSaveMaxHistory() { - return eventSaveMaxHistory; - } - - public DataRangersSDKConfigProperties setEventSaveMaxHistory(int eventSaveMaxHistory) { - this.eventSaveMaxHistory = eventSaveMaxHistory; - return this; - } - public int getEventSaveMaxFileSize() { return eventSaveMaxFileSize; } @@ -233,23 +222,6 @@ public DataRangersSDKConfigProperties setEventSaveMaxFileSize(int eventSaveMaxFi return this; } - public boolean isSend() { - return send; - } - - public DataRangersSDKConfigProperties setSend(boolean send) { - this.send = send; - return this; - } - - public int getThreadCount() { - return threadCount; - } - - public void setThreadCount(int threadCount) { - this.threadCount = threadCount; - } - public int getQueueSize() { return queueSize; } @@ -258,20 +230,12 @@ public void setQueueSize(int queueSize) { this.queueSize = queueSize; } - public int getMaxPoolSize() { - return maxPoolSize; - } - - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - - public int getCorePoolSize() { - return corePoolSize; + public int getThreadCount() { + return threadCount; } - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; + public void setThreadCount(int threadCount) { + this.threadCount = threadCount; } public Map getHeaders() { @@ -293,14 +257,6 @@ public void setDomain(String domain) { this.domain = domain; } - public int getThreadPoolCount() { - return threadPoolCount; - } - - public void setThreadPoolCount(int threadPoolCount) { - this.threadPoolCount = threadPoolCount; - } - public ZoneOffset getTimeOffset() { return timeOffset; } @@ -328,98 +284,6 @@ public void setTimeZone(String timeZone) { this.timeZone = timeZone; } - public void setLogger() { - Consumer.setWriterPool(getEventFilePaths(), getEventSaveName(), getEventSaveMaxFileSize()); - if (callback == null) { - setCallback(new LoggingCallback(getEventSavePath(), "error-" + getEventSaveName(), - getEventSaveMaxFileSize())); - } - } - - public void setCommon() { - EventConfig.saveFlag = SdkMode.FILE == getMode(); - EventConfig.sendFlag = SdkMode.HTTP == getMode(); - if (EventConfig.sendFlag) { - httpConfig = this.getHttpConfig(); - if (httpConfig.getMaxPerRoute() < this.getThreadCount()) { - httpConfig.setMaxPerRoute(this.getThreadCount()); - } - if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) { - httpConfig.setMaxTotal(httpConfig.getMaxPerRoute()); - } - httpConfig.initTimeOut(getHttpTimeout()); - HttpUtils - .createHttpClient(this.getHttpConfig(), this.getCustomHttpClient(), this.getCallback()); - if (EventConfig.SEND_HEADER == null) { - EventConfig.SEND_HEADER = getHeaders(); - EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK"); - EventConfig.SEND_HEADER.put("Content-Type", "application/json"); - List
headerList = new ArrayList<>(); - EventConfig.SEND_HEADER - .forEach((key, value) -> headerList.add(new BasicHeader(key, value))); - EventConfig.headers = headerList.toArray(new Header[0]); - } - } - setConsumer(getThreadCount()); - EventConfig.setUrl(getDomain()); - } - - - public ExecutorService setThreadPool() { - return Executors.newFixedThreadPool(getCorePoolSize()); - } - - public void setConsumer(int threadCount) { - if (EventConfig.saveFlag) { - threadCount = 1; - logger.info("Start LogAgent Mode"); - } else { - logger.info("Start Http Mode"); - } - if (userQueue == null) { - Collector.collectorContainer = new CollectorContainer( - RangersCollectorQueue.getInstance(getQueueSize())); - } else { - //如果客户自定义了queue,则需要替换为客户自定义queue - Collector.collectorContainer = new CollectorContainer(userQueue); - } - if ((!sync) && hasConsumer && Collector.httpRequestPool == null) { - //有消费者才初始化消费者 - Collector.httpRequestPool = setThreadPool(); - for (int i = 0; i < threadCount; i++) {//必须全部消费同一个队列 - Collector.httpRequestPool.execute(new Consumer(Collector.collectorContainer, this)); - } - } - if ((!sync) && hasProducer) { - //有生产者才需要记录 - Collector.scheduled = Executors.newSingleThreadScheduledExecutor(); - Collector.scheduled - .scheduleAtFixedRate(new CollectorCounter(getEventSavePath()), 0, 2, TimeUnit.MINUTES); - if (EventConfig.saveFlag) { - Collector.scheduled.scheduleAtFixedRate( - new RangersFileCleaner(getEventFilePaths(), getEventSaveName(), getEventSaveMaxDays()), - 0, 12, TimeUnit.HOURS); - } - } - logger.info("Start DataRangers Cleaner/Record Thread"); - } - - - public static volatile Boolean IS_INIT = false; - - public void init() { - if (!IS_INIT) { - synchronized (DataRangersSDKConfigProperties.class) { - if (!IS_INIT) { - setLogger(); - setCommon(); - setHook(); - IS_INIT = true; - } - } - } - } - public CollectorQueue getUserQueue() { return userQueue; } @@ -476,17 +340,29 @@ public void setAppKeys(Map appKeys) { this.appKeys = appKeys; } - private void setHook() { - DataRangersSDKConfigProperties properties = this; - Runtime.getRuntime().addShutdownHook(new Thread( - () -> { - if(Collector.httpRequestPool != null){ - Collector.httpRequestPool.shutdown(); - } - - new Consumer(Collector.collectorContainer, properties).flush(); - })); + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("enable: %s \r\n", enable)); + sb.append(String.format("env: %s \r\n", getMessageEnv())); + sb.append(String.format("sync: %s \r\n", sync)); + sb.append(String.format("sdkMode: %s \r\n", mode)); + sb.append(String.format("domain: %s \r\n", domain)); + sb.append(String.format("headers: %s \r\n", headers)); + sb.append(String.format("threadCount: %s \r\n", threadCount)); + sb.append(String.format("queueSize: %s \r\n", queueSize)); + sb.append(String.format("sendBatch: %s \r\n", sendBatch)); + sb.append(String.format("batchSize: %s \r\n", batchSize)); + sb.append(String.format("waitTimeMs: %s \r\n", waitTimeMs)); + sb.append(String.format("httpConfig: %s \r\n", httpConfig)); + sb.append(String.format("eventSavePath: %s \r\n", eventSavePath)); + sb.append(String.format("eventSaveName: %s \r\n", eventSaveName)); + sb.append(String.format("eventFilePaths: %s \r\n", eventFilePaths)); + sb.append(String.format("eventSaveMaxFileSize: %s \r\n", eventSaveMaxFileSize)); + sb.append(String.format("eventSaveMaxDays: %s \r\n", eventSaveMaxDays)); + sb.append(String.format("openapiConfig: %s \r\n", openapiConfig)); + sb.append(String.format("kafka: %s", kafka)); + return sb.toString(); } - } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java index 7f6e555..6c5dcbe 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java @@ -12,13 +12,13 @@ public class EventConfig { public static String appUrl; + public static String appListUrl; public static Header[] headers; public static Map SEND_HEADER; - public static boolean saveFlag = false; - public static boolean sendFlag = true; public static void setUrl(String url) { setAppUrl(url + Constants.APP_LOG_PATH); + setAppListUrl(url + Constants.APP_LIST_PATH); } public static String getAppUrl() { @@ -28,4 +28,12 @@ public static String getAppUrl() { public static void setAppUrl(String appUrl) { EventConfig.appUrl = appUrl; } + + public static String getAppListUrl() { + return EventConfig.appListUrl; + } + + public static void setAppListUrl(String appUrl) { + EventConfig.appListUrl = appUrl; + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java index d69348d..67cf647 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java @@ -7,6 +7,8 @@ package com.datarangers.config; +import com.datarangers.util.Tools; + /** * @author bytedance * @date 2020/12/2 20:02 @@ -18,10 +20,14 @@ public class HttpConfig { private Integer maxTotal = 1000; private Integer maxPerRoute = 100; - private Integer requestTimeout; - private Integer connectTimeout; - private Integer socketTimeout; - private Integer keepAliveTimeout; + private Integer requestTimeout = 10 * 1000; + private Integer connectTimeout = 10 * 1000; + private Integer socketTimeout = 20 * 1000; + + /** + * 单位是s,默认1分钟 + */ + private Integer keepAliveTimeout = 30; /** * 是否需要自定义配置key, store 路径和密码 @@ -134,7 +140,7 @@ public void initTimeOut(int timeBase) { if(keepAliveTimeout == null) { // 3分钟 - keepAliveTimeout = 3 * 60; + keepAliveTimeout = 30; } } @@ -161,4 +167,24 @@ public Integer getKeepAliveTimeout() { public void setKeepAliveTimeout(Integer keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + sb.append(String.format("\"requestTimeout\": %s,", requestTimeout)); + sb.append(String.format("\"connectTimeout\": %s,", connectTimeout)); + sb.append(String.format("\"socketTimeout\": %s,", socketTimeout)); + sb.append(String.format("\"keepAliveTimeout\": %s,", keepAliveTimeout)); + sb.append(String.format("\"maxTotal\": %s,", maxTotal)); + sb.append(String.format("\"maxPerRoute\": %s,", maxPerRoute)); + sb.append(String.format("\"keyMaterialPath\": \"%s\",", keyMaterialPath)); + sb.append(String.format("\"keyPassword\": \"%s\",", Tools.passwordMask(keyPassword))); + sb.append(String.format("\"trustMaterialPath\": \"%s\",", trustMaterialPath)); + sb.append(String.format("\"storePassword\": \"%s\"", Tools.passwordMask(storePassword))); + + sb.append("}"); + return sb.toString(); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java index 846e8a1..a1516cf 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java @@ -34,4 +34,25 @@ public Map getProperties() { public void setProperties(Map properties) { this.properties = properties; } + + @Override + public String toString() { + StringBuilder kafkaSb = new StringBuilder(); + + kafkaSb.append(" topic:") + .append(topic) + .append(" bootstrapServers:") + .append(bootstrapServers); + + if (properties != null) { + kafkaSb.append("properties:"); + for (Map.Entry entry:properties.entrySet()) { + kafkaSb.append(entry.getKey()) + .append(":") + .append(entry.getValue()); + } + } + + return kafkaSb.toString(); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/OpenapiConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/OpenapiConfig.java index c257657..74cdb87 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/OpenapiConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/OpenapiConfig.java @@ -1,5 +1,7 @@ package com.datarangers.config; +import com.datarangers.util.Tools; + /** * @Author zhangpeng.spin@bytedance.com * @Date 2021-07-23 @@ -33,4 +35,9 @@ public String getSk() { public void setSk(String sk) { this.sk = sk; } + + @Override + public String toString(){ + return String.format("{\"domain\": %s, \"ak\"=\"%s\", \"sk\"=\"%s\"}", domain, Tools.passwordMask(ak), Tools.passwordMask(sk)); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java index 07fca3b..d3b1614 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java @@ -101,7 +101,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName, i File parent = new File(targetPrefix); if (!parent.exists()) parent.mkdirs(); this.currentIndex = setCurrentIndex(); - if (this.currentIndex == 0) this.currentIndex++; + this.currentIndex++; fullTarget = this.targetPrefix + "/" + this.targetName; this.output = new File(fullTarget); currentName = targetName + "." + LocalDateTime.now().format(Constants.FULL_HOUR); @@ -120,7 +120,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName) { private int setCurrentIndex() { String current = LocalDateTime.now().format(Constants.FULL_HOUR); - String full = targetName + "-" + current + "-"; + String full = targetName + "." + current + "."; int number = 0; for (File f : new File(targetPrefix).listFiles()) { if (f.getName().contains(full)) { diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java index 824662f..2628a59 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java @@ -13,14 +13,15 @@ public interface Callback { class FailedData { - public FailedData(String message, String cause) { - this(message, cause, null); + public FailedData(String message, String cause, boolean listable) { + this(message, cause, null, listable); } - public FailedData(String message, String cause, Exception exception) { + public FailedData(String message, String cause, Exception exception, boolean listable) { this.message = message; this.cause = cause; this.exception = exception; + this.listable = listable; } /** @@ -38,6 +39,8 @@ public FailedData(String message, String cause, Exception exception) { */ private Exception exception; + private boolean listable; + public String getMessage() { return message; } @@ -61,5 +64,13 @@ public Exception getException() { public void setException(Exception exception) { this.exception = exception; } + + public boolean isListable() { + return listable; + } + + public void setListable(boolean listable) { + this.listable = listable; + } } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java index 2305771..53f3f00 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java @@ -3,6 +3,8 @@ import com.datarangers.config.DataRangersSDKConfigProperties; import com.datarangers.message.Message; +import java.util.List; + /** * @Author zhangpeng.spin@bytedance.com * @Date 2021-07-22 @@ -15,4 +17,13 @@ public interface MessageSender { * @param sdkConfigProperties */ void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties); + + /** + * 使用批量上报 + * @param message + * @param sdkConfigProperties + */ + default void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) { + throw new UnsupportedOperationException("Not support batch"); + }; } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java index 89f1396..c69156e 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java @@ -3,9 +3,13 @@ import com.datarangers.config.DataRangersSDKConfigProperties; import com.datarangers.config.EventConfig; import com.datarangers.config.RangersJSONConfig; +import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.util.HttpUtils; +import java.util.List; +import java.util.stream.Collectors; + /** * @Author zhangpeng.spin@bytedance.com * @Date 2021-07-22 @@ -17,4 +21,10 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper Object sendMessage = message.getAppMessage(); HttpUtils.post(EventConfig.getAppUrl(), RangersJSONConfig.getInstance().toJson(sendMessage), EventConfig.SEND_HEADER); } + + @Override + public void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) { + List sendMessages = message.stream().map(n -> n.getAppMessage()).collect(Collectors.toList()); + HttpUtils.post(EventConfig.getAppListUrl(), RangersJSONConfig.getInstance().toJson(sendMessages), EventConfig.SEND_HEADER); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java index 5e90356..bc66da4 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java @@ -1,7 +1,13 @@ package com.datarangers.sender.callback; +import com.datarangers.config.RangersJSONConfig; import com.datarangers.logger.RangersLoggerWriter; import com.datarangers.sender.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; /** * @Author zhangpeng.spin@bytedance.com @@ -9,6 +15,8 @@ */ public class LoggingCallback implements Callback { + public static final Logger logger = LoggerFactory.getLogger(LoggingCallback.class); + private static RangersLoggerWriter writer; private static final Object lock = new Object(); @@ -19,7 +27,24 @@ public LoggingCallback(final String targetPrefix, final String targetName, int m @Override public void onFailed(FailedData failedData) { - writeFailedMessage(failedData.getMessage()); + if (failedData.getMessage() == null) { + return; + } + if(!failedData.isListable()){ + writeFailedMessage(failedData.getMessage()); + }else{ + try { + List list = RangersJSONConfig.getInstance().fromJson(failedData.getMessage(), List.class); + list.forEach(n-> { + writeFailedMessage(RangersJSONConfig.getInstance().toJson(n)); + }); + } catch (IOException e) { + e.printStackTrace(); + logger.error("json error", e); + writeFailedMessage(failedData.getMessage()); + } + } + } private void writeFailedMessage(String message) { diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java index 279f681..8e8ded5 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java @@ -7,9 +7,7 @@ package com.datarangers.util; import com.datarangers.config.HttpConfig; -import com.datarangers.config.EventConfig; import com.datarangers.config.RangersJSONConfig; -import com.datarangers.logger.RangersLoggerWriter; import com.datarangers.sender.Callback; import com.datarangers.sender.Callback.FailedData; @@ -27,7 +25,6 @@ import javax.net.ssl.X509TrustManager; import org.apache.hc.client5.http.classic.HttpClient; -import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy; @@ -202,12 +199,12 @@ public static void request(String method, String url, String body, Map 2) { logger.error(String.format("request error, io error: %s, resultStr: %s", e.getMessage(), resultStr), e); - callback.onFailed(new FailedData(body, e.toString(), e)); + callback.onFailed(new FailedData(body, e.toString(), e, isListable(url))); } else { count++; post(url, body, headers, count); @@ -217,7 +214,7 @@ public static void request(String method, String url, String body, Map + + 4.0.0 + + datarangers-sdk + com.datarangers + 1.5.6-release + + com.datarangers + datarangers-sdk-example + datarangers-sdk-example + datarangers-sdk-example + + 1.8 + + + + com.datarangers + datarangers-sdk-starter + ${project.version} + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-test + test + + + com.datarangers + datarangers-sdk-core + ${project.version} + + + junit + junit + 4.13.1 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + + diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/AbstractSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/AbstractSdkExample.java new file mode 100644 index 0000000..00e2ef3 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/AbstractSdkExample.java @@ -0,0 +1,253 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.EventCollector; +import com.datarangers.event.*; + +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public abstract class AbstractSdkExample { + /** + * 获取 app应用上报的EventCollector + * + * @return EventCollector + */ + public abstract EventCollector getAppEventCollector(); + + /** + * 获取 web应用上报的EventCollector + * + * @return EventCollector + */ + public abstract EventCollector getWebEventCollector(); + + /** + * 获取 mp应用上报的EventCollector + * + * @return EventCollector + */ + public abstract EventCollector getMpEventCollector(); + + /** + * 上报事件 + * + * @param userUniqueId 用户ID + * @param appId 应用ID + */ + public void sendEvent(String userUniqueId, int appId) { + EventCollector appEventCollector = getAppEventCollector(); + appEventCollector + .sendEvent(userUniqueId, appId, null, "test_event_java_sdk", + new HashMap() {{ + put("date_time", new SimpleDateFormat("yyyyMMdd").format(new Date())); + put("current_time", + new SimpleDateFormat("yyyyMMdd HH:mm:ss").format(new Date())); + }}); + + System.out.println("end"); + } + + /** + * 传入自定义时间戳 + * + * @param userUniqueId + * @param appId + */ + public void sendEventWithLocalTimeMs(String userUniqueId, int appId, long localTimeMs) { + EventCollector appEventCollector = getAppEventCollector(); + appEventCollector + .sendEvent(userUniqueId, appId, null, "test_event_java_sdk", + new HashMap() {{ + put("date_time", new SimpleDateFormat("yyyyMMdd").format(new Date())); + put("current_time", + new SimpleDateFormat("yyyyMMdd HH:mm:ss").format(new Date())); + }}, + localTimeMs + ); + + System.out.println("end"); + } + + /** + * 上报事件,携带ab_sdk_version + * + * @param userUniqueId + * @param appId + */ + public void senEventWithAbSdk(String userUniqueId, int appId) { + EventCollector appEventCollector = getAppEventCollector(); + Map custom = new HashMap<>(); + Map eventParams = new HashMap<>(); + Header header = new HeaderV3.Builder().setCustom(custom).setAppId(appId) + .setUserUniqueId(userUniqueId).build(); + + for (int i = 0; i < 5; i++) { + Event event1 = new EventV3().setEvent("test_ab_sdk") + .setParams(eventParams).setUserId(userUniqueId) + .setLocalTimeMs(new Date().getTime()) + .setAbSdkVersion("12345"); + Event event2 = new EventV3().setEvent("test_ab_sdk") + .setParams(eventParams).setUserId(userUniqueId) + .setLocalTimeMs(new Date().getTime()) + .setAbSdkVersion("12345"); + appEventCollector.sendEvents(header, Arrays.asList(event1, event2)); + } + + } + + /** + * 上报用户属性 + * + * @param userUniqueId 用户id + * @param appId 应用id + */ + public void sendUserProfile(String userUniqueId, int appId) { + EventCollector appEventCollector = getAppEventCollector(); + appEventCollector.profileSet(userUniqueId, appId, new HashMap() {{ + put("profile_a", "param_11"); + put("profile_b", "param_22"); + put("profile_c", "param_33"); + put("profile_d", "param_44"); + }}); + } + + /** + * 上报item属性,需要先在系统创建item + * + * @param appId 应用ID + */ + public void sendItemProfile(int appId) { + EventCollector appEventCollector = getAppEventCollector(); + + List items = new ArrayList<>(); + items.add( + new BookItem("1000", "book") + .setName("Java") + .setPrice(100) + .setPublishDate("2010-10-11") + .setAuthors(Arrays.asList("zhangsan", "lisi")) + .setCategory("1")); + items.add( + new BookItem("1002", "book") + .setName("PHP") + .setPrice(100) + .setPublishDate("2021-07-20") + .setAuthors(Arrays.asList("zhangsan", "wanger")) + .setCategory("2")); + + appEventCollector.itemSet(appId, "book", items); + } + + /** + * 在事件中,携带item + * + * @param userUniqueId 用户id + * @param appId 应用id + */ + public void sendEventWithItem(String userUniqueId, int appId) { + EventCollector appEventCollector = getAppEventCollector(); + final List items = new ArrayList<>(); + items.add(new BookItem("1000", "book")); + items.add(new BookItem("1002", "book")); + appEventCollector + .sendEvent(userUniqueId, appId, null, "set_items", new HashMap() {{ + put("param1", "params"); + put("param2", items.get(0)); + put("param3", items.get(1)); + }}); + } + + /** + * item 对象 + */ + static class BookItem implements Item { + + private String itemId; + private String itemName; + + private String name; + private int price; + private String publishDate; + private List authors; + private String category; + + public BookItem() { + + } + + public BookItem(String itemId, String itemName) { + this.itemId = itemId; + this.itemName = itemName; + } + + @Override + public String getItemId() { + return itemId; + } + + @Override + public String getItemName() { + return itemName; + } + + public String getName() { + return name; + } + + public BookItem setName(String name) { + this.name = name; + return this; + } + + public int getPrice() { + return price; + } + + public BookItem setPrice(int price) { + this.price = price; + return this; + } + + + public List getAuthors() { + return authors; + } + + public BookItem setAuthors(List authors) { + this.authors = authors; + return this; + } + + public String getCategory() { + return category; + } + + public BookItem setCategory(String category) { + this.category = category; + return this; + } + + public String getPublishDate() { + return publishDate; + } + + public BookItem setPublishDate(String publishDate) { + this.publishDate = publishDate; + return this; + } + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/BatchPriSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/BatchPriSdkExample.java new file mode 100644 index 0000000..ca22d28 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/BatchPriSdkExample.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.SdkMode; + +/** + * HTTP模式的特殊场景,当跨网络传输,http 时延比较大的场景,可以使用批量上报的方式来提高性能 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class BatchPriSdkExample extends AbstractSdkExample { + private EventCollector appEventCollector; + private EventCollector webEventCollector; + private EventCollector mpEventCollector; + + BatchPriSdkExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + // 设置模式 + properties.setMode(SdkMode.HTTP); + + // 设置domain和host,这里注意替换成真实的参数 + properties.setDomain(System.getenv("SDK_DOMAIN")); + properties.getHeaders().put("HOST", System.getenv("SDK_HOST")); + + // 设置batch + properties.setSendBatch(true); + + // 可以根据需要进行调试batchSize,一般使用默认的即可 + // properties.setBatchSize(20); + // properties.setWaitTimeMs(100); + + // 初始化collector + appEventCollector = new AppEventCollector("app", properties); + webEventCollector = new AppEventCollector("web", properties); + mpEventCollector = new AppEventCollector("mp", properties); + } + + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + + public static void main(String[] args) { + BatchPriSdkExample sdkExample = new BatchPriSdkExample(); + String userUniqueId = "test_sdk_user1"; + int appId = 10000000; + + // 发送事件,时间发生时间为send方法调用的时间 + sdkExample.sendEvent(userUniqueId, appId); + sdkExample.senEventWithAbSdk(userUniqueId, appId); + sdkExample.sendUserProfile(userUniqueId, appId); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/DataRangersSdkExampleApplication.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/DataRangersSdkExampleApplication.java new file mode 100644 index 0000000..a820848 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/DataRangersSdkExampleApplication.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class DataRangersSdkExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(DataRangersSdkExampleApplication.class, args); + } + +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/FailCallbackExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/FailCallbackExample.java new file mode 100644 index 0000000..074711a --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/FailCallbackExample.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.SdkMode; +import com.datarangers.sender.Callback; + +/** + * 适用于需要自定义发送失败后处理的场景 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/29 + */ +public abstract class FailCallbackExample extends AbstractSdkExample { + private EventCollector appEventCollector; + + FailCallbackExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + // 设置模式 + properties.setMode(SdkMode.HTTP); + + // 设置domain和host,这里注意替换成真实的参数 + properties.setDomain(System.getenv("SDK_DOMAIN")); + properties.getHeaders().put("HOST", System.getenv("SDK_HOST")); + + // 自定义失败处理方式 + Callback callback = failedData -> { + System.out.println("message: " + failedData.getMessage()); + System.out.println("cause: " + failedData.getCause()); + System.out.println("exception: " + failedData.getException()); + }; + + // 初始化collector + appEventCollector = new AppEventCollector("app", properties, callback); + } + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/FilePriSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/FilePriSdkExample.java new file mode 100644 index 0000000..6a98a34 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/FilePriSdkExample.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.SdkMode; + +/** + * 使用 File 文件模式,需要配合 logagent 一起使用 + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class FilePriSdkExample extends AbstractSdkExample { + private EventCollector appEventCollector; + private EventCollector webEventCollector; + private EventCollector mpEventCollector; + + FilePriSdkExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + // 设置模式 + properties.setMode(SdkMode.FILE); + + // 设置路径 + properties.setEventSavePath("logs/"); + + // 初始化collector + appEventCollector = new AppEventCollector("app", properties); + webEventCollector = new AppEventCollector("web", properties); + mpEventCollector = new AppEventCollector("mp", properties); + } + + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + + public static void main(String[] args) { + FilePriSdkExample sdkExample = new FilePriSdkExample(); + String userUniqueId = "test_sdk_user1"; + int appId = 10000000; + + // 发送事件,时间发生时间为send方法调用的时间 + sdkExample.sendEvent(userUniqueId, appId); + + // 上报用户属性 + sdkExample.sendUserProfile(userUniqueId, appId); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkExample.java new file mode 100644 index 0000000..07dc4c6 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkExample.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.SdkMode; + +/** + * 在私有化场景,直接使用 Http 模式使用进行发送事件 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class HttpPriSdkExample extends AbstractSdkExample { + private EventCollector appEventCollector; + private EventCollector webEventCollector; + private EventCollector mpEventCollector; + + HttpPriSdkExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + // 设置模式 + properties.setMode(SdkMode.HTTP); + + // 设置domain和host,这里注意替换成真实的参数 + properties.setDomain(System.getenv("SDK_DOMAIN")); + properties.getHeaders().put("HOST", System.getenv("SDK_HOST")); + + // 初始化collector + appEventCollector = new AppEventCollector("app", properties); + webEventCollector = new AppEventCollector("web", properties); + mpEventCollector = new AppEventCollector("mp", properties); + } + + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + + public static void main(String[] args) { + HttpPriSdkExample sdkExample = new HttpPriSdkExample(); + String userUniqueId = "test_sdk_user1"; + int appId = 10000000; + + // 发送事件,时间发生时间为send方法调用的时间 + sdkExample.sendEvent(userUniqueId, appId); + + // 上报用户属性 + sdkExample.sendUserProfile(userUniqueId, appId); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkProfileExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkProfileExample.java new file mode 100644 index 0000000..507c76c --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/HttpPriSdkProfileExample.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.EventCollector; +import com.datarangers.event.Header; +import com.datarangers.event.HeaderV3; + +import java.util.HashMap; + +/** + * 在私有化场景,直接使用 Http 模式使用进行发送事件 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class HttpPriSdkProfileExample extends HttpPriSdkExample { + + HttpPriSdkProfileExample() { + super(); + } + + public void sendUserProfileWithHeader(Header header) { + EventCollector appEventCollector = getAppEventCollector(); + appEventCollector.profileSet(header, new HashMap() {{ + put("profile_a", "param_11"); + put("profile_b", "param_22"); + put("profile_c", "param_33"); + put("profile_d", "param_55"); + }}); + } + + public static void main(String[] args) { + HttpPriSdkProfileExample sdkExample = new HttpPriSdkProfileExample(); + String userUniqueId = "test_sdk_user1"; + Long deviceId = 123456L; + int appId = 10000001; + + // 上报用户属性 + Header header = new HeaderV3.Builder() + .setUserUniqueId(userUniqueId) + .setDeviceId(deviceId) + .setAppId(appId) + .build(); + + sdkExample.sendUserProfileWithHeader(header); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } + +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/KafkaPriSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/KafkaPriSdkExample.java new file mode 100644 index 0000000..a171959 --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/KafkaPriSdkExample.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.KafkaConfig; +import com.datarangers.config.SdkMode; + +/** + * 在私有化场景,直接使用 Kafka 模式使用进行发送事件,私有化在同一个内网中,对qps有极高的要求 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class KafkaPriSdkExample extends AbstractSdkExample { + private EventCollector appEventCollector; + private EventCollector webEventCollector; + private EventCollector mpEventCollector; + + KafkaPriSdkExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + + // 设置模式 + properties.setMode(SdkMode.KAFKA); + + // 设置kafka + KafkaConfig kafkaConfig = new KafkaConfig(); + properties.setKafka(kafkaConfig); + + // 设置 BootstrapServers, etc: 127.0.0.1:9192,localhost:9192,这里注意替换成真实的参数 + String bootstrapServers = System.getenv("SDK_KAFKA_BOOTSTRAP_SERVERS"); + if(bootstrapServers == null){ + throw new IllegalArgumentException("bootstrapServers can not be empty"); + } + kafkaConfig.setBootstrapServers(bootstrapServers); + + // 如果需要设置其他属性,可以进行设置 + // Map kafkaProperties = new HashMap<>(); + // kafkaConfig.setProperties(kafkaProperties); + + // 初始化collector + webEventCollector = new AppEventCollector("web", properties); + mpEventCollector = new AppEventCollector("mp", properties); + appEventCollector = new AppEventCollector("app", properties); + } + + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + + public static void main(String[] args) { + KafkaPriSdkExample sdkExample = new KafkaPriSdkExample(); + String userUniqueId = "test_sdk_user1"; + int appId = 10000000; + + // 发送事件,时间发生时间为send方法调用的时间 + sdkExample.sendEvent(userUniqueId, appId); + + // 上报用户属性 + sdkExample.sendUserProfile(userUniqueId, appId); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } +} diff --git a/datarangers-sdk-example/src/main/java/com/datarangers/example/SaasSdkExample.java b/datarangers-sdk-example/src/main/java/com/datarangers/example/SaasSdkExample.java new file mode 100644 index 0000000..94684cb --- /dev/null +++ b/datarangers-sdk-example/src/main/java/com/datarangers/example/SaasSdkExample.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.SdkMode; + +/** + * saas 使用场景 + * + * @Author zhangpeng.spin@bytedance.com + * @Date 2022/9/28 + */ +public class SaasSdkExample extends AbstractSdkExample { + private EventCollector appEventCollector; + private EventCollector webEventCollector; + private EventCollector mpEventCollector; + + SaasSdkExample() { + DataRangersSDKConfigProperties properties = new DataRangersSDKConfigProperties(); + // 设置模式 + properties.setMode(SdkMode.HTTP); + + // 设置domain和appKey, 不需要设置HOST + properties.setDomain("https://mcs.ctobsnssdk.com"); + + // 可以设置多个app,这里注意替换成真实的参数 + properties.getAppKeys().put(Integer.valueOf(System.getenv("SDK_APP_1")), System.getenv("SDK_APP_KEY_1")); + + // 设置openapi domain, AK,SK,这里注意替换成真实的参数 + properties.getOpenapiConfig().setDomain("https://analytics.volcengineapi.com"); + properties.getOpenapiConfig().setAk(System.getenv("OPENAPI_AK")); + properties.getOpenapiConfig().setSk(System.getenv("OPENAPI_SK")); + + + // 初始化collector + appEventCollector = new AppEventCollector("app", properties); + webEventCollector = new AppEventCollector("web", properties); + mpEventCollector = new AppEventCollector("mp", properties); + } + + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + + public static void main(String[] args) { + SaasSdkExample sdkExample = new SaasSdkExample(); + String userUniqueId = "test_sdk_user1"; + int appId = Integer.valueOf(System.getenv("SDK_APP_1")); + + // 发送事件,时间发生时间为send方法调用的时间 + sdkExample.sendEvent(userUniqueId, appId); + + // 上报用户属性,需要保证先在系统新增用户属性 + sdkExample.sendUserProfile(userUniqueId, appId); + + // 指定localTimeMs时间,即事件发生时间 + long localTimeMs = System.currentTimeMillis(); + sdkExample.sendEventWithLocalTimeMs(userUniqueId, appId, localTimeMs); + } +} diff --git a/datarangers-sdk-example/src/main/resources/application-prifile.properties b/datarangers-sdk-example/src/main/resources/application-prifile.properties new file mode 100644 index 0000000..c3cf1d0 --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application-prifile.properties @@ -0,0 +1,15 @@ +# \u79C1\u6709\u5316\u914D\u7F6Eexample +# \u4F7F\u7528file\u4E0A\u62A5\u6A21\u5F0F\uFF0C\u9700\u8981\u914D\u5408 loagent \u4E00\u8D77\u4F7F\u7528 +datarangers.sdk.mode=file + +# \u6587\u4EF6\u8DEF\u5F84 +#datarangers.sdk.eventSavePath=logs/ +#datarangers.sdk.eventSaveName=datarangers.log +# \u5355\u4F4D\u662FM +#datarangers.sdk.eventSaveMaxFileSize=100 +# \u5982\u679C\u6CA1\u6709\u914D\u7F6EeventFilePaths\uFF0C\u90A3\u4E48\u4F1A\u628A\u65E5\u5FD7\u6587\u4EF6\u653E\u5230eventSavePath\u76EE\u5F55\u4E0B +#datarangers.sdk.eventFilePaths=event/logs/1/,event/logs/2/,event/logs/3/,event/logs/4/,event/logs/5/,event/logs/6/ + +# \u6587\u4EF6\u6700\u5927\u4FDD\u7559\u65F6\u95F4\uFF0C\u9ED8\u8BA4\u662F-1\uFF0C\u4E00\u76F4\u4FDD\u7559 +#datarangers.sdk.eventSaveMaxDays=-1 + diff --git a/datarangers-sdk-example/src/main/resources/application-prihttp.properties b/datarangers-sdk-example/src/main/resources/application-prihttp.properties new file mode 100644 index 0000000..853ad37 --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application-prihttp.properties @@ -0,0 +1,21 @@ +# \u79C1\u6709\u5316\u914D\u7F6Eexample +# \u4F7F\u7528 http \u4E0A\u62A5\u6A21\u5F0F +datarangers.sdk.mode=http +# \u670D\u52A1\u5668ip\u6216\u57DF\u540D +datarangers.sdk.domain=${SDK_DOMAIN} +# host\uFF0C\u79C1\u6709\u5316\u73AF\u5883Host\u7684\u914D\u7F6E\u5728\u5B89\u88C5\u90E8\u7F72\u7684\u90A3\u53F0\u673A\u5668\u4E0A\uFF0C\u67E5\u770B/home/datarangers/DataRangersDeploy/conf_rangers.yml\u4E2D\u914D\u7F6E\u9879sdk.report.host +datarangers.sdk.headers.Host=${SDK_HOST} + +#\u5F02\u6B65\u7EBF\u7A0B\u6570\u91CF\uFF0C\u5F53\u5E76\u53D1\u4E0D\u591F\u7684\u65F6\u5019\u53EF\u4EE5\u8C03\u6574\u8BE5\u6570\u636E +#datarangers.sdk.threadCount=20 + +#[http config] +# \u5355\u4F4D\u662F\u6BEB\u79D2 +#datarangers.sdk.httpConfig.requestTimeout=10000 +#datarangers.sdk.httpConfig.connectTimeout=10000 +#datarangers.sdk.httpConfig.socketTimeout=20000 + +# \u5355\u4F4D\u662Fs +#datarangers.sdk.httpConfig.keepAliveTimeout=30 + + diff --git a/datarangers-sdk-example/src/main/resources/application-prihttpbatch.properties b/datarangers-sdk-example/src/main/resources/application-prihttpbatch.properties new file mode 100644 index 0000000..e1436cd --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application-prihttpbatch.properties @@ -0,0 +1,28 @@ +# \u79C1\u6709\u5316\u914D\u7F6Eexample +# \u4F7F\u7528 http \u4E0A\u62A5\u6A21\u5F0F\uFF0C\u4F7F\u7528\u6279\u91CF\u4E0A\u62A5\u7684\u65B9\u5F0F +datarangers.sdk.mode=http +# \u670D\u52A1\u5668ip\u6216\u57DF\u540D +datarangers.sdk.domain=${SDK_DOMAIN} +# host +datarangers.sdk.headers.Host=${SDK_HOST} + +# \u5F02\u6B65\u7EBF\u7A0B\u6570\u91CF\uFF0C\u5F53\u5E76\u53D1\u4E0D\u591F\u7684\u65F6\u5019\u53EF\u4EE5\u8C03\u6574\u8BE5\u6570\u636E +# datarangers.sdk.threadCount=20 + +#[http config] +# \u5355\u4F4D\u662F\u6BEB\u79D2 +#datarangers.sdk.httpConfig.requestTimeout=10000 +#datarangers.sdk.httpConfig.connectTimeout=10000 +#datarangers.sdk.httpConfig.socketTimeout=20000 +# \u5355\u4F4D\u662Fs +#datarangers.sdk.httpConfig.keepAliveTimeout=30 + +# [batch] +# \u4F7F\u7528 batch \u7684\u65B9\u5F0F +datarangers.sdk.sendBatch=true +#datarangers.sdk.batchSize=20 +#datarangers.sdk.waitTimeMs=100 + + + + diff --git a/datarangers-sdk-example/src/main/resources/application-prikafka.properties b/datarangers-sdk-example/src/main/resources/application-prikafka.properties new file mode 100644 index 0000000..8aa015f --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application-prikafka.properties @@ -0,0 +1,8 @@ +# \u79C1\u6709\u5316\u914D\u7F6Eexample +# \u4F7F\u7528kafka\u4E0A\u62A5\u7684\u6A21\u5F0F +datarangers.sdk.mode=kafka +datarangers.sdk.kafka.bootstrapServers={ip1}:9192,{ip2}:9192 + +# kafka producer\u7684 properties\u53EF\u4EE5\u5728\u8FD9\u91CC\u8FDB\u884C\u914D\u7F6E +#datarangers.sdk.kafka.properties.retries=3 + diff --git a/datarangers-sdk-example/src/main/resources/application-saas.properties b/datarangers-sdk-example/src/main/resources/application-saas.properties new file mode 100644 index 0000000..1ea80d2 --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application-saas.properties @@ -0,0 +1,29 @@ +# saas \u7F6Eexample +datarangers.sdk.env=saas +datarangers.sdk.mode=http + +# [domain] +# \u670D\u52A1\u5668ip\u6216\u57DF\u540D +# \u56FD\u5185 +datarangers.sdk.domain=https://mcs.ctobsnssdk.com +# \u56FD\u9645 +#datarangers.sdk.domain=https://mcs.tobsnssdk.com + +# [app key] +datarangers.sdk.appKeys.${SDK_APP_1}=${SDK_APP_KEY_1} + +# [openapi] +# openapi\u7684domain +# \u56FD\u5185 +datarangers.sdk.openapiConfig.domain=https://analytics.volcengineapi.com +# \u56FD\u9645 +#datarangers.sdk.openapiConfig.domain=https://analytics.byteplusapi.com + +# openapi\u7684ak, sk +datarangers.sdk.openapiConfig.ak=${OPENAPI_AK} +datarangers.sdk.openapiConfig.sk=${OPENAPI_SK} + + + + + diff --git a/datarangers-sdk-example/src/main/resources/application.properties b/datarangers-sdk-example/src/main/resources/application.properties new file mode 100644 index 0000000..e45f01e --- /dev/null +++ b/datarangers-sdk-example/src/main/resources/application.properties @@ -0,0 +1,11 @@ +#\u65E5\u5FD7\u914D\u7F6E +logging.level.root=debug +logging.pattern.console=%d{HH:mm:ss.SSS} [%t] %-5level %logger{36}:%L - %msg%n + +datarangers.sdk.enable=true + +# \u4F7F\u7528\u54EA\u4E2A\u914D\u7F6E\u6587\u4EF6 +spring.profiles.active=${PROFILE} + + + diff --git a/datarangers-sdk-example/src/test/java/com/datarangers/example/DataRangersSdkExampleApplicationTests.java b/datarangers-sdk-example/src/test/java/com/datarangers/example/DataRangersSdkExampleApplicationTests.java new file mode 100644 index 0000000..200e3b6 --- /dev/null +++ b/datarangers-sdk-example/src/test/java/com/datarangers/example/DataRangersSdkExampleApplicationTests.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class DataRangersSdkExampleApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/datarangers-sdk-example/src/test/java/com/datarangers/example/SdkExampleTest.java b/datarangers-sdk-example/src/test/java/com/datarangers/example/SdkExampleTest.java new file mode 100644 index 0000000..b381885 --- /dev/null +++ b/datarangers-sdk-example/src/test/java/com/datarangers/example/SdkExampleTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020 Beijing Volcano Engine Technology Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package com.datarangers.example; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.collector.EventCollector; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +@ActiveProfiles("${PROFILE}") +class SdkExampleTest extends AbstractSdkExample { + + @Autowired + @Qualifier("appEventCollector") + private AppEventCollector appEventCollector; + + @Autowired + @Qualifier("webEventCollector") + private AppEventCollector webEventCollector; + + @Autowired + @Qualifier("mpEventCollector") + private AppEventCollector mpEventCollector; + + private int appId; + private String userUniqueId; + + @BeforeEach + public void beforeEach() { + appId = 10000000; + userUniqueId = "test_sdk_user2"; + } + + @AfterEach + public void afterEach() { + // 优雅退出 + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @AfterAll + public static void clearUp() { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void sendEvent() { + sendEvent(userUniqueId, appId); + } + + @Test + public void senEventWithAbSdk() { + senEventWithAbSdk(userUniqueId, appId); + } + + @Test + public void sendUserProfile() { + sendUserProfile(userUniqueId, appId); + } + + @Test + public void sendItemProfile() { + sendEventWithItem(userUniqueId, appId); + } + + @Test + public void sendEventWithItem() { + sendEventWithItem(userUniqueId, appId); + } + + @Override + public EventCollector getAppEventCollector() { + return appEventCollector; + } + + @Override + public EventCollector getWebEventCollector() { + return webEventCollector; + } + + @Override + public EventCollector getMpEventCollector() { + return mpEventCollector; + } + +} diff --git a/datarangers-sdk-starter/pom.xml b/datarangers-sdk-starter/pom.xml new file mode 100644 index 0000000..98febdf --- /dev/null +++ b/datarangers-sdk-starter/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + com.datarangers + datarangers-sdk + 1.5.6-release + + jar + datarangers-sdk-starter + datarangers-server-sdk-starter + A SDK for datarangers user + https://github.com/volcengine/datarangers-sdk-java + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + + com.datarangers + datarangers-sdk-core + ${project.version} + + + org.springframework.boot + spring-boot-autoconfigure + + + org.apache.httpcomponents.client5 + httpclient5 + + + org.springframework.boot + spring-boot-starter-test + test + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + org.springframework.boot + spring-boot-test + test + + + log4j-to-slf4j + org.apache.logging.log4j + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c4d291c --- /dev/null +++ b/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + com.datarangers + datarangers-sdk + pom + 1.5.6-release + + datarangers-sdk-core + datarangers-sdk-starter + datarangers-sdk-example + + + 2.3.4.RELEASE + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + com.datarangers + datarangers-sdk-core + ${project.version} + + + org.springframework.boot + spring-boot-autoconfigure + 2.3.4.RELEASE + + + org.apache.httpcomponents.client5 + httpclient5 + 5.0.1 + + + org.springframework.boot + spring-boot-starter + ${spring.boot.version} + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + org.springframework.boot + spring-boot-starter-test + ${spring.boot.version} + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + org.springframework.boot + spring-boot-test + ${spring.boot.version} + + + log4j-to-slf4j + org.apache.logging.log4j + 2.15.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + +