From 28f4bbd04c6ccebcdffa66ceedbe7ed2d49fb656 Mon Sep 17 00:00:00 2001 From: "zhangpeng.spin" Date: Mon, 18 Jul 2022 15:36:35 +0800 Subject: [PATCH 1/5] fix: remove some unused --- README.md | 4 +- datarangers-sdk-core/pom.xml | 89 +++++++++++++++++++++++++++++++++ datarangers-sdk-starter/pom.xml | 89 +++++++++++++++++++++++++++++++++ pom.xml | 56 +++++++++++++++++++++ 4 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 datarangers-sdk-core/pom.xml create mode 100644 datarangers-sdk-starter/pom.xml create mode 100644 pom.xml diff --git a/README.md b/README.md index ee935b4..db17bf4 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ version是sdk的版本号,当前最新的版本为1.5.3-release。 ### 2. 配置SDK DataRangers SDK需要进行一定的参数配置才能够使用,具体需要配置的参数为: -* domain:datarangers的域名或者ip,支持http和https,例如为 http://www.datarangers.com,在私有化环境中,需要修改为对应的sdk上报域名或者使用DataRangers服务器的ip地址。在saas环境中需要修改成对应的域名: +* domain:datarangers的域名或者ip,支持http和https,例如为 https://www.xxx.com,在私有化环境中,需要修改为对应的sdk上报域名或者使用DataRangers服务器的ip地址。在saas环境中需要修改成对应的域名: * 中国区:https://mcs.ctobsnssdk.com * sg(新加坡): https://mcs.tobsnssdk.com * va(美东): https://mcs.itobsnssdk.com @@ -108,7 +108,7 @@ datarangers.sdk.headers.Host=host # datarangers.sdk.appKeys.${appId}=xxx # 如果是在saas环境中,需要配置openapi, 私有化环境中可以不配置 -# openapi的domain, 国内: https://analytics.volcengineapi.com,国际是: https://datarangers.com +# openapi的domain, 国内: https://analytics.volcengineapi.com,国际是: https://analytics.byteplusapi.com # datarangers.sdk.openapiConfig.domain=xxx # openapi的ak, sk diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml new file mode 100644 index 0000000..dc82031 --- /dev/null +++ b/datarangers-sdk-core/pom.xml @@ -0,0 +1,89 @@ + + + + datarangers-sdk + com.datarangers + 1.5.3-release + + 4.0.0 + jar + datarangers-server-sdk-core + A SDK for datarangers user + https://github.com/volcengine/datarangers-sdk-java + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + datarangers-opensource + datarangers-opensource@bytedance.com + DataFinder + https://www.volcengine.com/product/datafinder + + + com.datarangers + datarangers-sdk-core + 1.5.3-release + + + + + org.apache.httpcomponents.client5 + httpclient5 + 5.0.1 + + + com.fasterxml.jackson.core + jackson-core + 2.11.4 + + + com.fasterxml.jackson.core + jackson-annotations + 2.11.4 + + + com.fasterxml.jackson.core + jackson-databind + 2.11.4 + + + org.apache.kafka + kafka-clients + 0.10.2.1 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + + \ No newline at end of file diff --git a/datarangers-sdk-starter/pom.xml b/datarangers-sdk-starter/pom.xml new file mode 100644 index 0000000..5ac4d7d --- /dev/null +++ b/datarangers-sdk-starter/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + 1.5.3-release + jar + com.datarangers + datarangers-sdk-starter + datarangers-server-sdk-starter + A SDK for datarangers user + https://github.com/volcengine/datarangers-sdk-java + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + + com.datarangers + datarangers-sdk-core + ${project.version} + + + org.springframework.boot + spring-boot-autoconfigure + 2.3.4.RELEASE + + + org.apache.httpcomponents.client5 + httpclient5 + 5.0.1 + + + org.springframework.boot + spring-boot-starter-test + 2.3.4.RELEASE + test + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + org.springframework.boot + spring-boot-test + 2.3.4.RELEASE + test + + + log4j-to-slf4j + org.apache.logging.log4j + 2.15.0 + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..874a472 --- /dev/null +++ b/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + 1.5.3-release + + + com.datarangers + datarangers-sdk + pom + 1.5.3-release + + datarangers-sdk-core + datarangers-sdk-starter + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + UTF-8 + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + From 172305cd4fb9c229e5f852122fbc2c27ecb8ac5c Mon Sep 17 00:00:00 2001 From: "zhangpeng.spin" Date: Wed, 3 Aug 2022 17:55:36 +0800 Subject: [PATCH 2/5] feature: support batch --- README.md | 19 +- datarangers-sdk-core/pom.xml | 4 +- .../asynccollector/CollectorContainer.java | 4 + .../asynccollector/CollectorQueue.java | 10 + .../datarangers/asynccollector/Consumer.java | 220 ++++++++++-------- .../asynccollector/RangersCollectorQueue.java | 21 ++ .../com/datarangers/collector/Collector.java | 10 +- .../com/datarangers/config/Constants.java | 3 +- .../DataRangersSDKConfigProperties.java | 28 +++ .../com/datarangers/config/EventConfig.java | 10 + .../java/com/datarangers/sender/Callback.java | 17 +- .../com/datarangers/sender/MessageSender.java | 11 + .../sender/PrivatizationMessageSender.java | 10 + .../sender/callback/LoggingCallback.java | 27 ++- .../java/com/datarangers/util/HttpUtils.java | 17 +- datarangers-sdk-starter/pom.xml | 2 +- pom.xml | 4 +- 17 files changed, 291 insertions(+), 126 deletions(-) diff --git a/README.md b/README.md index db17bf4..5f8036f 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ datarangers-sdk-java是 [DataFinder](https://www.volcengine.com/product/datafind com.datarangers datarangers-sdk-core - 1.5.3-release + 1.5.4-release ``` -version是sdk的版本号,当前最新的版本为1.5.3-release。 +version是sdk的版本号,当前最新的版本为1.5.4-release。 火山引擎仓库地址: ```xml @@ -86,7 +86,7 @@ DataRangers SDK需要进行一定的参数配置才能够使用,具体需要 com.datarangers datarangers-sdk-starter - 1.5.3-release + 1.5.4-release ``` @@ -115,13 +115,24 @@ datarangers.sdk.headers.Host=host # datarangers.sdk.openapiConfig.ak=xxx # datarangers.sdk.openapiConfig.sk=xxx - # 是否保存到本地,如果需要配合logagent使用需要将其定义为true datarangers.sdk.save=true # 异步方式的发送线程数量,如果为logagent模式请设置为1 datarangers.sdk.threadCount=4 +# 异步方式的发送核心线程数量,建议corePoolSize 跟threadCount 配置成一样 +datarangers.sdk.corePoolSize=4 # 异步方式队列长度 datarangers.sdk.queueSize=102400 + +# 是否使用批量发送,默认为false +#datarangers.sdk.sendBatch=true + +# 批量发送的大小 +#datarangers.sdk.batchSize=16 + +# 批量的等待时间,当批量达到batchSize,或者等待时间超过waitTimeMs,就立刻发送 +#datarangers.sdk.waitTimeMs=100 + # 保存日志文件路径 datarangers.sdk.eventSavePath=logs/ # 保存日志文件名 diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml index dc82031..6896b6a 100644 --- a/datarangers-sdk-core/pom.xml +++ b/datarangers-sdk-core/pom.xml @@ -5,7 +5,7 @@ datarangers-sdk com.datarangers - 1.5.3-release + 1.5.4-release 4.0.0 jar @@ -28,7 +28,7 @@ com.datarangers datarangers-sdk-core - 1.5.3-release + 1.5.4-release diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java index 6fd53ae..c8f96c6 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java @@ -43,6 +43,10 @@ public List consume(int waitTimeMs) throws InterruptedException { return handleMessage(messageQueue.poll(waitTimeMs)); } + public List consume(int size, int waitTimeMs) throws InterruptedException { + return handleMessage(messageQueue.poll(size, waitTimeMs)); + } + public int size() { return messageQueue.size(); } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java index 8edf846..bbae533 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java @@ -23,7 +23,17 @@ public interface CollectorQueue { * @date: 2021/2/7 15:51 */ List take() throws InterruptedException; + List poll(int waitTimeMs) throws InterruptedException; + + /** + * 每次等待waitTimeMs,获取数据最多size的数据 + * @param size + * @param waitTimeMs + * @return + * @throws InterruptedException + */ + List poll(int size, int waitTimeMs) throws InterruptedException; /** * 功能描述: 发送一个Message到队列中 * diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java index 6d1f198..5961d95 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java @@ -13,125 +13,143 @@ import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.sender.MessageSenderFactory; - -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class Consumer implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(Consumer.class); - private static RangersLoggerWriterPool pool; - private CollectorContainer collectorContainer; - private DataRangersSDKConfigProperties sdkConfigProperties; - - public Consumer(CollectorContainer collectorContainer, - DataRangersSDKConfigProperties sdkConfigProperties) { - this.collectorContainer = collectorContainer; - this.sdkConfigProperties = sdkConfigProperties; - } - - public static void setWriterPool(final List targetPrefixes, String targetNames, - int maxSize) { - if (pool == null) { - synchronized (Consumer.class) { + private static final Logger logger = LoggerFactory.getLogger(Consumer.class); + private static RangersLoggerWriterPool pool; + private CollectorContainer collectorContainer; + private DataRangersSDKConfigProperties sdkConfigProperties; + + public Consumer(CollectorContainer collectorContainer, + DataRangersSDKConfigProperties sdkConfigProperties) { + this.collectorContainer = collectorContainer; + this.sdkConfigProperties = sdkConfigProperties; + } + + public static void setWriterPool(final List targetPrefixes, String targetNames, + int maxSize) { if (pool == null) { - pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize); + synchronized (Consumer.class) { + if (pool == null) { + pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize); + } + } } - } } - } - - private void send() throws Exception { - while (true) { - try { - List messages = collectorContainer.consume(); - if (messages != null) { - messages.forEach(message -> { - doSend(message); - }); + + private void send() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(); + if (messages != null) { + messages.forEach(message -> { + doSend(message); + }); + } + } catch (Throwable e) { + e.printStackTrace(); + logger.error("consumer send error", e); + } } - } catch (Throwable e) { - e.printStackTrace(); - logger.error("consumer send error", e); - } } - } - - private void write() throws Exception { - while (true) { - try { - List messages = collectorContainer.consume(); - if (messages != null) { - messages.forEach(message -> { - doWrite(message); - }); + + private void sendBatch() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(sdkConfigProperties.getBatchSize(), + sdkConfigProperties.getWaitTimeMs()); + if (messages != null && messages.size() > 0) { + MessageSenderFactory.getMessageSender(messages.get(0)) + .sendBatch(messages, this.sdkConfigProperties); + } + } catch (Throwable e) { + e.printStackTrace(); + logger.error("consumer send error", e); + } } - } catch (Throwable e) { - logger.error("consumer write error", e); - } + } + private void write() throws Exception { + while (true) { + try { + List messages = collectorContainer.consume(); + if (messages != null) { + messages.forEach(message -> { + doWrite(message); + }); + } + } catch (Throwable e) { + logger.error("consumer write error", e); + } + + } } - } - - @Override - public void run() { - try { - if (EventConfig.saveFlag) { - write(); - } else { - send(); - } - } catch (Exception e) { - e.printStackTrace(); - logger.error("consumer run error", e); + + @Override + public void run() { + try { + if (EventConfig.saveFlag) { + write(); + } else if (sdkConfigProperties.isSendBatch()) { + sendBatch(); + } else { + send(); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("consumer run error", e); + } } - } - - public void flush() { - try { - System.out.println("flush message start"); - logger.info("flush message start"); - int count = 0; - if (collectorContainer.getMessageQueue() != null) { - Message message = collectorContainer.getMessageQueue().poll(); - while (message != null) { - count++; - message = collectorContainer.handleMessage(message); - if (EventConfig.saveFlag) { + + public void flush() { + try { + System.out.println("flush message start"); + logger.info("flush message start"); + int count = 0; + if (collectorContainer.getMessageQueue() != null) { + Message message = collectorContainer.getMessageQueue().poll(); + while (message != null) { + count++; + message = collectorContainer.handleMessage(message); + if (EventConfig.saveFlag) { + doWrite(message); + } else { + doSend(message); + } + message = collectorContainer.getMessageQueue().poll(); + } + } + + logger.info("flush message success. size: {}", count); + System.out.println("flush message success. size: " + count); + } catch (Exception e) { + e.printStackTrace(); + logger.error("flush message error", e); + } + } + + public void flush(Message message) { + message = collectorContainer.handleMessage(message); + if (EventConfig.saveFlag) { doWrite(message); - } else { + } else { doSend(message); - } - message = collectorContainer.getMessageQueue().poll(); } - } + } - logger.info("flush message success. size: {}", count); - System.out.println("flush message success. size: " + count); - } catch (Exception e) { - e.printStackTrace(); - logger.error("flush message error", e); + private void doSend(Message message) { + MessageSenderFactory.getMessageSender(message) + .send(message, this.sdkConfigProperties); } - } - - public void flush(Message message) { - message = collectorContainer.handleMessage(message); - if (EventConfig.saveFlag) { - doWrite(message); - } else { - doSend(message); + + private void doWrite(Message message) { + AppMessage appMessage = message.getAppMessage(); + pool.getWriter(appMessage.getUserUniqueId()) + .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n"); } - } - - private void doSend(Message message) { - MessageSenderFactory.getMessageSender(message) - .send(message, this.sdkConfigProperties); - } - - private void doWrite(Message message) { - AppMessage appMessage = message.getAppMessage(); - pool.getWriter(appMessage.getUserUniqueId()) - .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n"); - } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java index deced6c..39793a8 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java @@ -2,6 +2,7 @@ import com.datarangers.message.Message; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -57,6 +58,26 @@ public List poll(int waitTimeMs) throws InterruptedException { return null; } + @Override + public List poll(int size, int waitTimeMs) throws InterruptedException { + List messages = new ArrayList<>(); + Message msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS); + if(msg != null){ + messages.add(msg); + } + + // 只有 + while(messages.size() < size){ + msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS); + if(msg == null){ + // 退出循环 + break; + } + messages.add(msg); + } + return messages; + } + @Override public void put(Message t) throws InterruptedException { queue.put(t); diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java index fc69e3a..9a9e60d 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java @@ -119,14 +119,14 @@ private void sendByKafka(AppMessage appMessage) { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); } } }); } catch (Exception e) { e.printStackTrace(); logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); } } @@ -136,7 +136,7 @@ private void syncSendMessage(Message message, String sendMessage) { } catch (Exception e) { e.printStackTrace(); logger.error("sync send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); } } @@ -147,11 +147,11 @@ private void asyncSendMessage(Message message, String sendMessage) { } catch (Exception e) { e.printStackTrace(); logger.error("async send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e)); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); } } else { logger.error("getMessageQueue is null"); - getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null")); + getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null", false)); } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java index 8dae554..cc4ac23 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java @@ -11,10 +11,11 @@ import java.util.GregorianCalendar; public class Constants { - public static final String SDK_VERSION = "datarangers_sdk_1.5.3-release"; + public static final String SDK_VERSION = "datarangers_sdk_1.5.4-release"; public static DateTimeFormatter FULL_HOUR = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); public static DateTimeFormatter FULL_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static final String APP_LOG_PATH = "/sdk/log"; + public static final String APP_LIST_PATH = "/sdk/list"; public static final String DEFAULT_USER = "__rangers"; public static final String INIT_ERROR = "sdk config must not be null"; diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java index ada2c0b..751fd7b 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java @@ -58,6 +58,34 @@ public class DataRangersSDKConfigProperties { public int queueSize = 10240; public boolean send = true; + private boolean sendBatch = false; + private int batchSize = 20; + private int waitTimeMs = 100; + + public boolean isSendBatch() { + return sendBatch; + } + + public void setSendBatch(boolean sendBatch) { + this.sendBatch = sendBatch; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getWaitTimeMs() { + return waitTimeMs; + } + + public void setWaitTimeMs(int waitTimeMs) { + this.waitTimeMs = waitTimeMs; + } + public boolean isEnable() { return enable; } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java index 7f6e555..9244bfe 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java @@ -12,6 +12,7 @@ public class EventConfig { public static String appUrl; + public static String appListUrl; public static Header[] headers; public static Map SEND_HEADER; public static boolean saveFlag = false; @@ -19,6 +20,7 @@ public class EventConfig { public static void setUrl(String url) { setAppUrl(url + Constants.APP_LOG_PATH); + setAppListUrl(url + Constants.APP_LIST_PATH); } public static String getAppUrl() { @@ -28,4 +30,12 @@ public static String getAppUrl() { public static void setAppUrl(String appUrl) { EventConfig.appUrl = appUrl; } + + public static String getAppListUrl() { + return EventConfig.appListUrl; + } + + public static void setAppListUrl(String appUrl) { + EventConfig.appListUrl = appUrl; + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java index 824662f..2628a59 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java @@ -13,14 +13,15 @@ public interface Callback { class FailedData { - public FailedData(String message, String cause) { - this(message, cause, null); + public FailedData(String message, String cause, boolean listable) { + this(message, cause, null, listable); } - public FailedData(String message, String cause, Exception exception) { + public FailedData(String message, String cause, Exception exception, boolean listable) { this.message = message; this.cause = cause; this.exception = exception; + this.listable = listable; } /** @@ -38,6 +39,8 @@ public FailedData(String message, String cause, Exception exception) { */ private Exception exception; + private boolean listable; + public String getMessage() { return message; } @@ -61,5 +64,13 @@ public Exception getException() { public void setException(Exception exception) { this.exception = exception; } + + public boolean isListable() { + return listable; + } + + public void setListable(boolean listable) { + this.listable = listable; + } } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java index 2305771..53f3f00 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java @@ -3,6 +3,8 @@ import com.datarangers.config.DataRangersSDKConfigProperties; import com.datarangers.message.Message; +import java.util.List; + /** * @Author zhangpeng.spin@bytedance.com * @Date 2021-07-22 @@ -15,4 +17,13 @@ public interface MessageSender { * @param sdkConfigProperties */ void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties); + + /** + * 使用批量上报 + * @param message + * @param sdkConfigProperties + */ + default void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) { + throw new UnsupportedOperationException("Not support batch"); + }; } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java index 89f1396..c69156e 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java @@ -3,9 +3,13 @@ import com.datarangers.config.DataRangersSDKConfigProperties; import com.datarangers.config.EventConfig; import com.datarangers.config.RangersJSONConfig; +import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.util.HttpUtils; +import java.util.List; +import java.util.stream.Collectors; + /** * @Author zhangpeng.spin@bytedance.com * @Date 2021-07-22 @@ -17,4 +21,10 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper Object sendMessage = message.getAppMessage(); HttpUtils.post(EventConfig.getAppUrl(), RangersJSONConfig.getInstance().toJson(sendMessage), EventConfig.SEND_HEADER); } + + @Override + public void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) { + List sendMessages = message.stream().map(n -> n.getAppMessage()).collect(Collectors.toList()); + HttpUtils.post(EventConfig.getAppListUrl(), RangersJSONConfig.getInstance().toJson(sendMessages), EventConfig.SEND_HEADER); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java index 5e90356..bc66da4 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java @@ -1,7 +1,13 @@ package com.datarangers.sender.callback; +import com.datarangers.config.RangersJSONConfig; import com.datarangers.logger.RangersLoggerWriter; import com.datarangers.sender.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; /** * @Author zhangpeng.spin@bytedance.com @@ -9,6 +15,8 @@ */ public class LoggingCallback implements Callback { + public static final Logger logger = LoggerFactory.getLogger(LoggingCallback.class); + private static RangersLoggerWriter writer; private static final Object lock = new Object(); @@ -19,7 +27,24 @@ public LoggingCallback(final String targetPrefix, final String targetName, int m @Override public void onFailed(FailedData failedData) { - writeFailedMessage(failedData.getMessage()); + if (failedData.getMessage() == null) { + return; + } + if(!failedData.isListable()){ + writeFailedMessage(failedData.getMessage()); + }else{ + try { + List list = RangersJSONConfig.getInstance().fromJson(failedData.getMessage(), List.class); + list.forEach(n-> { + writeFailedMessage(RangersJSONConfig.getInstance().toJson(n)); + }); + } catch (IOException e) { + e.printStackTrace(); + logger.error("json error", e); + writeFailedMessage(failedData.getMessage()); + } + } + } private void writeFailedMessage(String message) { diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java index 279f681..8e8ded5 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java @@ -7,9 +7,7 @@ package com.datarangers.util; import com.datarangers.config.HttpConfig; -import com.datarangers.config.EventConfig; import com.datarangers.config.RangersJSONConfig; -import com.datarangers.logger.RangersLoggerWriter; import com.datarangers.sender.Callback; import com.datarangers.sender.Callback.FailedData; @@ -27,7 +25,6 @@ import javax.net.ssl.X509TrustManager; import org.apache.hc.client5.http.classic.HttpClient; -import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy; @@ -202,12 +199,12 @@ public static void request(String method, String url, String body, Map 2) { logger.error(String.format("request error, io error: %s, resultStr: %s", e.getMessage(), resultStr), e); - callback.onFailed(new FailedData(body, e.toString(), e)); + callback.onFailed(new FailedData(body, e.toString(), e, isListable(url))); } else { count++; post(url, body, headers, count); @@ -217,7 +214,7 @@ public static void request(String method, String url, String body, Map 4.0.0 - 1.5.3-release + 1.5.4-release jar com.datarangers datarangers-sdk-starter diff --git a/pom.xml b/pom.xml index 874a472..4de759c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,13 +4,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - 1.5.3-release + 1.5.4-release com.datarangers datarangers-sdk pom - 1.5.3-release + 1.5.4-release datarangers-sdk-core datarangers-sdk-starter From c83a579e677d7b7b8c79ef52a6369abddebcae59 Mon Sep 17 00:00:00 2001 From: "zhangpeng.spin" Date: Mon, 8 Aug 2022 21:44:43 +0800 Subject: [PATCH 3/5] fix: fix write log bug --- README.md | 6 +++--- datarangers-sdk-core/pom.xml | 4 ++-- .../src/main/java/com/datarangers/config/Constants.java | 2 +- .../java/com/datarangers/logger/RangersLoggerWriter.java | 4 ++-- datarangers-sdk-starter/pom.xml | 2 +- pom.xml | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 5f8036f..fad4f13 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ datarangers-sdk-java是 [DataFinder](https://www.volcengine.com/product/datafind com.datarangers datarangers-sdk-core - 1.5.4-release + 1.5.5-release ``` -version是sdk的版本号,当前最新的版本为1.5.4-release。 +version是sdk的版本号,当前最新的版本为1.5.5-release。 火山引擎仓库地址: ```xml @@ -86,7 +86,7 @@ DataRangers SDK需要进行一定的参数配置才能够使用,具体需要 com.datarangers datarangers-sdk-starter - 1.5.4-release + 1.5.5-release ``` diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml index 6896b6a..00c4021 100644 --- a/datarangers-sdk-core/pom.xml +++ b/datarangers-sdk-core/pom.xml @@ -5,7 +5,7 @@ datarangers-sdk com.datarangers - 1.5.4-release + 1.5.5-release 4.0.0 jar @@ -28,7 +28,7 @@ com.datarangers datarangers-sdk-core - 1.5.4-release + 1.5.5-release diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java index cc4ac23..add87dc 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java @@ -11,7 +11,7 @@ import java.util.GregorianCalendar; public class Constants { - public static final String SDK_VERSION = "datarangers_sdk_1.5.4-release"; + public static final String SDK_VERSION = "datarangers_sdk_1.5.5-release"; public static DateTimeFormatter FULL_HOUR = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); public static DateTimeFormatter FULL_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static final String APP_LOG_PATH = "/sdk/log"; diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java index 07fca3b..d3b1614 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java @@ -101,7 +101,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName, i File parent = new File(targetPrefix); if (!parent.exists()) parent.mkdirs(); this.currentIndex = setCurrentIndex(); - if (this.currentIndex == 0) this.currentIndex++; + this.currentIndex++; fullTarget = this.targetPrefix + "/" + this.targetName; this.output = new File(fullTarget); currentName = targetName + "." + LocalDateTime.now().format(Constants.FULL_HOUR); @@ -120,7 +120,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName) { private int setCurrentIndex() { String current = LocalDateTime.now().format(Constants.FULL_HOUR); - String full = targetName + "-" + current + "-"; + String full = targetName + "." + current + "."; int number = 0; for (File f : new File(targetPrefix).listFiles()) { if (f.getName().contains(full)) { diff --git a/datarangers-sdk-starter/pom.xml b/datarangers-sdk-starter/pom.xml index b9f232b..2af176a 100644 --- a/datarangers-sdk-starter/pom.xml +++ b/datarangers-sdk-starter/pom.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - 1.5.4-release + 1.5.5-release jar com.datarangers datarangers-sdk-starter diff --git a/pom.xml b/pom.xml index 4de759c..6c43407 100644 --- a/pom.xml +++ b/pom.xml @@ -4,13 +4,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - 1.5.4-release + 1.5.5-release com.datarangers datarangers-sdk pom - 1.5.4-release + 1.5.5-release datarangers-sdk-core datarangers-sdk-starter From c50a565a10c39916d0501bd6e93dc91fb9665786 Mon Sep 17 00:00:00 2001 From: qifeng Date: Mon, 22 Aug 2022 16:04:27 +0800 Subject: [PATCH 4/5] perf: config, log --- datarangers-sdk-core/pom.xml | 6 + .../asynccollector/CollectorCounter.java | 2 +- .../collector/AppEventCollector.java | 10 +- .../com/datarangers/collector/Collector.java | 472 ++++++++++++------ .../DataRangersSDKConfigProperties.java | 230 ++------- .../com/datarangers/config/HttpConfig.java | 9 + .../com/datarangers/config/KafkaConfig.java | 21 + .../main/java/test/AppEventCollectorTest.java | 38 ++ 8 files changed, 435 insertions(+), 353 deletions(-) create mode 100644 datarangers-sdk-core/src/main/java/test/AppEventCollectorTest.java diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml index 00c4021..e3e96e7 100644 --- a/datarangers-sdk-core/pom.xml +++ b/datarangers-sdk-core/pom.xml @@ -57,6 +57,12 @@ kafka-clients 0.10.2.1 + + junit + junit + 4.12 + compile + diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java index c3eed78..493c5c2 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java @@ -34,7 +34,7 @@ public void run() { FileOutputStream stream = null; try { stream = new FileOutputStream(output, true); - Map status = new HashMap() {{ + Map status = new HashMap(2) {{ put("history", CollectorContainer.SEND_HISTORY); put("queue_length", Collector.collectorContainer.size()); }}; diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java index 3cf9e12..927d4d0 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java @@ -32,15 +32,6 @@ public AppEventCollector(String appType, DataRangersSDKConfigProperties properti public AppEventCollector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { super(appType, properties, cb); - if (properties != null) { - properties.init(); - - // 设置同步发送的consumer,队列满的时候使用 - setConsumer(new Consumer(Collector.collectorContainer, this.properties)); - - } else { - System.out.println(Constants.INIT_ERROR); - } } @Override @@ -227,6 +218,7 @@ private void sendEvents(Header header, List events, MessageType messageTy appMessage.setAppType(getAppType()); appMessage.setHeader(header); appMessage.addEvents(events); + appMessage.setTraceId(UUID.randomUUID().toString()); message.setAppMessage(appMessage); send(message); diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java index 9a9e60d..6b38a0d 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java @@ -6,185 +6,343 @@ */ package com.datarangers.collector; -import com.datarangers.asynccollector.CollectorContainer; -import com.datarangers.asynccollector.Consumer; +import com.datarangers.asynccollector.*; import com.datarangers.config.*; +import com.datarangers.logger.RangersFileCleaner; import com.datarangers.message.AppMessage; import com.datarangers.message.Message; import com.datarangers.message.MessageEnv; import com.datarangers.sender.Callback; import com.datarangers.sender.Callback.FailedData; +import com.datarangers.sender.callback.LoggingCallback; +import com.datarangers.util.HttpUtils; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * @author taojian */ public abstract class Collector implements EventCollector { - private String appType; - public static final Logger logger = LoggerFactory.getLogger("DatarangersLog"); - public static ExecutorService httpRequestPool = null; - public static ScheduledExecutorService scheduled = null; - public static CollectorContainer collectorContainer; - - private boolean enable; - protected DataRangersSDKConfigProperties properties; - protected Callback callback; - protected Consumer consumer = null; - protected KafkaProducer kafkaProducer; - - public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { - this.appType = appType; - this.enable = properties.isEnable(); - this.properties = properties; - this.callback = cb; - this.properties.setCallback(this.getCallback()); - this.initKafkaProducer(); - } - - private void initKafkaProducer(){ - if(SdkMode.KAFKA != this.properties.getMode()){ - return; - } - // 设置过了就不需要再自己创建 - if(kafkaProducer != null){ - return; - } - kafkaProducer = createProducer(this.properties.getKafka()); - } - - private KafkaProducer createProducer(KafkaConfig kafkaConfig) { - Properties props = new Properties(); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("bootstrap.servers", kafkaConfig.getBootstrapServers()); - Map map = kafkaConfig.getProperties(); - if (map != null && (!map.isEmpty())) { - props.putAll(map); - } - return new KafkaProducer<>(props); - } - public String getAppType() { - return appType; - } - - public Collector setAppType(String appType) { - this.appType = appType; - return this; - } - - - public void send(Message message) { - sendMessage(message); - } - - protected void sendMessage(Message message) { - if (!enable) { - return; - } - message.merge(); - String sendMessage; - - validate(message); - if(kafkaProducer != null){ - // 使用kafka的方式 - sendByKafka(message.getAppMessage()); - return; - } - sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage()); - if (this.properties.isSync()) { - syncSendMessage(message, sendMessage); - } else { - asyncSendMessage(message, sendMessage); - } - } - - private void sendByKafka(AppMessage appMessage) { - // kafka sender,header 添加固定的头 - appMessage.getHeader().setSource(Constants.SDK_SERVER); - String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage); - try { - ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage); - kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { + private String appType; + public static final Logger logger = LoggerFactory.getLogger("DatarangersLog"); + public static ExecutorService httpRequestPool = null; + public static ScheduledExecutorService scheduled = null; + public static CollectorContainer collectorContainer; + private boolean enable; + protected DataRangersSDKConfigProperties properties; + protected Callback callback; + protected Consumer consumer = null; + protected KafkaProducer kafkaProducer; + private static volatile Boolean IS_INIT = false; + + public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) { + logger.info("sdk config properties: {}", properties); + System.out.println("sdk config properties: " + properties.toString()); + this.appType = appType; + this.enable = properties.isEnable(); + this.properties = properties; + this.callback = cb; + this.properties.setCallback(this.getCallback()); + this.init(); + } + + public String getAppType() { + return appType; + } + + public void setAppType(String appType) { + this.appType = appType; + } + + public Callback getCallback() { + return callback; + } + + public void setCallback(Callback callback) { + this.callback = callback; + } + + public Consumer getConsumer() { + return consumer; + } + + public void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + + + private void initKafkaProducer() { + logger.info("init kafka producer"); + if (SdkMode.KAFKA != this.properties.getMode()) { + return; + } + // 设置过了就不需要再自己创建 + if (kafkaProducer != null) { + return; + } + kafkaProducer = createProducer(this.properties.getKafka()); + } + + private KafkaProducer createProducer(KafkaConfig kafkaConfig) { + Properties props = new Properties(); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("bootstrap.servers", kafkaConfig.getBootstrapServers()); + Map map = kafkaConfig.getProperties(); + if (map != null && (!map.isEmpty())) { + props.putAll(map); + } + return new KafkaProducer<>(props); + } + + public void send(Message message) { + sendMessage(message); + } + + private void sendMessage(Message message) { + if (!enable) { + return; + } + message.merge(); + String sendMessage; + + validate(message); + if (kafkaProducer != null) { + // 使用kafka的方式 + sendByKafka(message.getAppMessage()); + return; + } + sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage()); + if (this.properties.isSync()) { + syncSendMessage(message, sendMessage); + } else { + asyncSendMessage(message, sendMessage); + } + } + + private void sendByKafka(AppMessage appMessage) { + // kafka sender,header 添加固定的头 + appMessage.getHeader().setSource(Constants.SDK_SERVER); + String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage); + try { + ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage); + kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); - } - } - }); - } catch (Exception e) { - e.printStackTrace(); - logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); - } - } - - private void syncSendMessage(Message message, String sendMessage) { - try { - this.consumer.flush(message); - } catch (Exception e) { - e.printStackTrace(); - logger.error("sync send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); - } - } - - private void asyncSendMessage(Message message, String sendMessage) { - if (collectorContainer.getMessageQueue() != null) { - try { - collectorContainer.produce(message); - } catch (Exception e) { - e.printStackTrace(); - logger.error("async send message error", e); - getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); - } - } else { - logger.error("getMessageQueue is null"); - getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null", false)); - } - } - - /** - * message 检查 - */ - private void validate(Message message) { - // 当前只有saas需要校验下appkey - if (this.properties.getMessageEnv() != MessageEnv.SAAS) { - return; - } - - Integer appId = message.getAppMessage().getAppId(); - Map appKeys = this.properties.getAppKeys(); - String appKey = appKeys.get(appId); - if (appKey == null) { - throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId); - } - } - - public Callback getCallback() { - return callback; - } - - public void setCallback(Callback callback) { - this.callback = callback; - } - - public Consumer getConsumer() { - return consumer; - } - - public void setConsumer(Consumer consumer) { - this.consumer = consumer; - } + } + } + + private void syncSendMessage(Message message, String sendMessage) { + try { + this.consumer.flush(message); + } catch (Exception e) { + e.printStackTrace(); + logger.error("sync send message error", e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } + + private void asyncSendMessage(Message message, String sendMessage) { + if (collectorContainer.getMessageQueue() != null) { + try { + collectorContainer.produce(message); + } catch (Exception e) { + e.printStackTrace(); + logger.error("async send message error", e); + getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false)); + } + } else { + logger.error("getMessageQueue is null"); + getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null", false)); + } + } + + /** + * message 检查 + */ + private void validate(Message message) { + // 当前只有saas需要校验下appkey + if (this.properties.getMessageEnv() != MessageEnv.SAAS) { + return; + } + + Integer appId = message.getAppMessage().getAppId(); + Map appKeys = this.properties.getAppKeys(); + String appKey = appKeys.get(appId); + if (appKey == null) { + throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId); + } + } + + /** + * 配置初始化 + */ + private void init() { + if (!IS_INIT) { + synchronized (Collector.class) { + if (!IS_INIT) { + initLogger(); + initCommon(); + initKafkaProducer(); + initConsumer(); + initHook(); + IS_INIT = true; + } + } + } + } + + /** + * 日志pool初始化 + */ + private void initLogger() { + logger.info("init log writer pool"); + List eventFilePaths = properties.getEventFilePaths(); + String eventSaveName = properties.getEventSaveName(); + int eventSaveMaxFileSize = properties.getEventSaveMaxFileSize(); + String eventSavePath = properties.getEventSavePath(); + + Consumer.setWriterPool(eventFilePaths, eventSaveName, eventSaveMaxFileSize); + if (properties.getCallback() == null) { + properties.setCallback(new LoggingCallback(eventSavePath, "error-" + eventSaveName, + eventSaveMaxFileSize)); + } + } + + /** + * eventConfig,httpclient,EventConfig 初始化 + */ + private void initCommon() { + HttpConfig httpConfig = properties.getHttpConfig(); + HttpClient httpClient = properties.getCustomHttpClient(); + Callback callback = properties.getCallback(); + int httpTimeOut = properties.getHttpTimeout(); + + //EventConfig配置 + EventConfig.saveFlag = SdkMode.FILE == properties.getMode(); + EventConfig.sendFlag = SdkMode.HTTP == properties.getMode(); + EventConfig.setUrl(properties.getDomain()); + + if (EventConfig.sendFlag) { + if (httpConfig.getMaxPerRoute() < properties.getCorePoolSize()) { + httpConfig.setMaxPerRoute(properties.getCorePoolSize()); + } + if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) { + httpConfig.setMaxTotal(httpConfig.getMaxPerRoute()); + } + // 老版本配置做兼容 + httpConfig.initTimeOut(httpTimeOut); + //httpclient 初始化 + HttpUtils.createHttpClient(httpConfig, httpClient, callback); + + //EventConfig 初始化 + if (EventConfig.SEND_HEADER == null) { + EventConfig.SEND_HEADER = properties.getHeaders(); + EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK"); + EventConfig.SEND_HEADER.put("Content-Type", "application/json"); + List
headerList = new ArrayList<>(); + EventConfig.SEND_HEADER + .forEach((key, value) -> headerList.add(new BasicHeader(key, value))); + EventConfig.headers = headerList.toArray(new Header[0]); + } + } + } + + /** + * 初始化消费者, httpRequestPool, 日志记录和清理任务 + */ + private void initConsumer() { + logger.info("init consumer"); + int threadCount = this.properties.getCorePoolSize(); + CollectorQueue userQueue = this.properties.getUserQueue(); + + if (EventConfig.saveFlag) { + threadCount = 1; + logger.info("Start LogAgent Mode"); + } else { + logger.info("Start Http Mode"); + } + + // 如果客户自定义了queue,则需要替换为客户自定义queue,否则使用默认的队列 + if (userQueue == null) { + collectorContainer = new CollectorContainer( + RangersCollectorQueue.getInstance(this.properties.getQueueSize())); + } else { + collectorContainer = new CollectorContainer(userQueue); + } + + boolean isSync = this.properties.isSync(); + boolean hasConsumer = this.properties.isHasConsumer(); + boolean hasProducer = this.properties.isHasProducer(); + String eventSavePath = this.properties.getEventSavePath(); + List eventFilePaths = properties.getEventFilePaths(); + String eventSaveName = this.properties.getEventSaveName(); + int eventSaveMaxDays = this.properties.getEventSaveMaxDays(); + // 设置同步发送的consumer,队列满的时候使用 + if (isSync) { + setConsumer(new Consumer(Collector.collectorContainer, this.properties)); + } + // 异步起多个消费者 + if (!isSync && hasConsumer && httpRequestPool == null) { + httpRequestPool = Executors.newFixedThreadPool(this.properties.getCorePoolSize()); + for (int i = 0; i < threadCount; i++) { + //必须全部消费同一个队列 + httpRequestPool.execute(new Consumer(collectorContainer, properties)); + } + } + if ((!isSync) && hasProducer) { + //定时记录日志的条数 + scheduled = Executors.newSingleThreadScheduledExecutor(); + scheduled + .scheduleAtFixedRate(new CollectorCounter(eventSavePath), 0, 2, TimeUnit.MINUTES); + if (EventConfig.saveFlag) { + // 清理日志文件定时任务, 每隔12小时清理一次 + scheduled.scheduleAtFixedRate( + new RangersFileCleaner(eventFilePaths, eventSaveName, eventSaveMaxDays), + 0, 12, TimeUnit.HOURS); + logger.info("Start DataRangers Cleaner/Record Thread"); + } + } + } + + /** + * jvm关闭时,一些需要做的清理任务 + */ + private void initHook() { + logger.info("init hook"); + Runtime.getRuntime().addShutdownHook(new Thread( + () -> { + if (Collector.httpRequestPool != null) { + Collector.httpRequestPool.shutdown(); + } + + new Consumer(Collector.collectorContainer, properties).flush(); + })); + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java index 751fd7b..b02d6a4 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java @@ -7,34 +7,20 @@ package com.datarangers.config; import com.datarangers.asynccollector.*; -import com.datarangers.collector.Collector; -import com.datarangers.logger.RangersFileCleaner; import com.datarangers.message.MessageEnv; import com.datarangers.sender.Callback; -import com.datarangers.sender.callback.LoggingCallback; -import com.datarangers.util.HttpUtils; import org.apache.hc.client5.http.classic.HttpClient; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** * @author hTangle */ public class DataRangersSDKConfigProperties { - public static final Logger logger = LoggerFactory.getLogger(DataRangersSDKConfigProperties.class); public Map headers; public String domain; - public int threadPoolCount = 1; - public int maxPoolSize = 8; public int corePoolSize = 4; /** @@ -54,7 +40,6 @@ public class DataRangersSDKConfigProperties { */ @Deprecated public boolean save = false; - public int threadCount = 1; public int queueSize = 10240; public boolean send = true; @@ -62,45 +47,13 @@ public class DataRangersSDKConfigProperties { private int batchSize = 20; private int waitTimeMs = 100; - public boolean isSendBatch() { - return sendBatch; - } - - public void setSendBatch(boolean sendBatch) { - this.sendBatch = sendBatch; - } - - public int getBatchSize() { - return batchSize; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public int getWaitTimeMs() { - return waitTimeMs; - } - - public void setWaitTimeMs(int waitTimeMs) { - this.waitTimeMs = waitTimeMs; - } - - public boolean isEnable() { - return enable; - } - - public void setEnable(boolean enable) { - this.enable = enable; - } - public boolean enable = true; public String eventSavePath = "logs/"; public List eventFilePaths; public String eventSaveName = "datarangers.log"; - public int eventSaveMaxHistory = 20; public int eventSaveMaxFileSize = 100; + // 日志清理时间 public int eventSaveMaxDays = 5; public CollectorQueue userQueue; @@ -120,8 +73,8 @@ public void setEnable(boolean enable) { private String env = "privatization"; private List SAAS_DOMAIN_URLS = Arrays.asList( - "https://mcs.ctobsnssdk.com", - "https://mcs.tobsnssdk.com"); + "https://mcs.ctobsnssdk.com", + "https://mcs.tobsnssdk.com"); /** * saas openapi 配置地址 @@ -138,6 +91,38 @@ public void setEnable(boolean enable) { private SdkMode mode; private KafkaConfig kafka; + public boolean isSendBatch() { + return sendBatch; + } + + public void setSendBatch(boolean sendBatch) { + this.sendBatch = sendBatch; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getWaitTimeMs() { + return waitTimeMs; + } + + public void setWaitTimeMs(int waitTimeMs) { + this.waitTimeMs = waitTimeMs; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + public KafkaConfig getKafka() { return kafka; } @@ -243,15 +228,6 @@ public DataRangersSDKConfigProperties setEventSaveName(String eventSaveName) { return this; } - public int getEventSaveMaxHistory() { - return eventSaveMaxHistory; - } - - public DataRangersSDKConfigProperties setEventSaveMaxHistory(int eventSaveMaxHistory) { - this.eventSaveMaxHistory = eventSaveMaxHistory; - return this; - } - public int getEventSaveMaxFileSize() { return eventSaveMaxFileSize; } @@ -270,14 +246,6 @@ public DataRangersSDKConfigProperties setSend(boolean send) { return this; } - public int getThreadCount() { - return threadCount; - } - - public void setThreadCount(int threadCount) { - this.threadCount = threadCount; - } - public int getQueueSize() { return queueSize; } @@ -286,14 +254,6 @@ public void setQueueSize(int queueSize) { this.queueSize = queueSize; } - public int getMaxPoolSize() { - return maxPoolSize; - } - - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - public int getCorePoolSize() { return corePoolSize; } @@ -321,14 +281,6 @@ public void setDomain(String domain) { this.domain = domain; } - public int getThreadPoolCount() { - return threadPoolCount; - } - - public void setThreadPoolCount(int threadPoolCount) { - this.threadPoolCount = threadPoolCount; - } - public ZoneOffset getTimeOffset() { return timeOffset; } @@ -356,98 +308,6 @@ public void setTimeZone(String timeZone) { this.timeZone = timeZone; } - public void setLogger() { - Consumer.setWriterPool(getEventFilePaths(), getEventSaveName(), getEventSaveMaxFileSize()); - if (callback == null) { - setCallback(new LoggingCallback(getEventSavePath(), "error-" + getEventSaveName(), - getEventSaveMaxFileSize())); - } - } - - public void setCommon() { - EventConfig.saveFlag = SdkMode.FILE == getMode(); - EventConfig.sendFlag = SdkMode.HTTP == getMode(); - if (EventConfig.sendFlag) { - httpConfig = this.getHttpConfig(); - if (httpConfig.getMaxPerRoute() < this.getThreadCount()) { - httpConfig.setMaxPerRoute(this.getThreadCount()); - } - if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) { - httpConfig.setMaxTotal(httpConfig.getMaxPerRoute()); - } - httpConfig.initTimeOut(getHttpTimeout()); - HttpUtils - .createHttpClient(this.getHttpConfig(), this.getCustomHttpClient(), this.getCallback()); - if (EventConfig.SEND_HEADER == null) { - EventConfig.SEND_HEADER = getHeaders(); - EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK"); - EventConfig.SEND_HEADER.put("Content-Type", "application/json"); - List
headerList = new ArrayList<>(); - EventConfig.SEND_HEADER - .forEach((key, value) -> headerList.add(new BasicHeader(key, value))); - EventConfig.headers = headerList.toArray(new Header[0]); - } - } - setConsumer(getThreadCount()); - EventConfig.setUrl(getDomain()); - } - - - public ExecutorService setThreadPool() { - return Executors.newFixedThreadPool(getCorePoolSize()); - } - - public void setConsumer(int threadCount) { - if (EventConfig.saveFlag) { - threadCount = 1; - logger.info("Start LogAgent Mode"); - } else { - logger.info("Start Http Mode"); - } - if (userQueue == null) { - Collector.collectorContainer = new CollectorContainer( - RangersCollectorQueue.getInstance(getQueueSize())); - } else { - //如果客户自定义了queue,则需要替换为客户自定义queue - Collector.collectorContainer = new CollectorContainer(userQueue); - } - if ((!sync) && hasConsumer && Collector.httpRequestPool == null) { - //有消费者才初始化消费者 - Collector.httpRequestPool = setThreadPool(); - for (int i = 0; i < threadCount; i++) {//必须全部消费同一个队列 - Collector.httpRequestPool.execute(new Consumer(Collector.collectorContainer, this)); - } - } - if ((!sync) && hasProducer) { - //有生产者才需要记录 - Collector.scheduled = Executors.newSingleThreadScheduledExecutor(); - Collector.scheduled - .scheduleAtFixedRate(new CollectorCounter(getEventSavePath()), 0, 2, TimeUnit.MINUTES); - if (EventConfig.saveFlag) { - Collector.scheduled.scheduleAtFixedRate( - new RangersFileCleaner(getEventFilePaths(), getEventSaveName(), getEventSaveMaxDays()), - 0, 12, TimeUnit.HOURS); - } - } - logger.info("Start DataRangers Cleaner/Record Thread"); - } - - - public static volatile Boolean IS_INIT = false; - - public void init() { - if (!IS_INIT) { - synchronized (DataRangersSDKConfigProperties.class) { - if (!IS_INIT) { - setLogger(); - setCommon(); - setHook(); - IS_INIT = true; - } - } - } - } - public CollectorQueue getUserQueue() { return userQueue; } @@ -504,17 +364,15 @@ public void setAppKeys(Map appKeys) { this.appKeys = appKeys; } - private void setHook() { - DataRangersSDKConfigProperties properties = this; - Runtime.getRuntime().addShutdownHook(new Thread( - () -> { - if(Collector.httpRequestPool != null){ - Collector.httpRequestPool.shutdown(); - } - - new Consumer(Collector.collectorContainer, properties).flush(); - })); + @Override + public String toString() { + return " domain:" + domain + " corePoolSize:" + corePoolSize + " httpTimout:" + httpTimeout + + " timeZone:" + timeZone + " timeOffset:" + timeOffset + " save:" + save + " queueSize:" + queueSize + + " send:" + send + " sendBatch:" + sendBatch + " batchSize:" + batchSize + " waitTimeMs:" + waitTimeMs + + " enable:" + enable + " eventSavePath:" + eventSavePath + " eventSaveName:" + eventSaveName + + " eventSaveMaxFileSize:" + eventSaveMaxFileSize + " eventSaveMaxDays:" + eventSaveMaxDays + + " userQueue:" + userQueue + " hasConsumer:" + hasConsumer + " hasProducer:" + hasProducer + + " env" + env + " sync:" + sync + " mode:" + mode + " kafka:" + kafka + " httpConfig" + httpConfig; } - } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java index d69348d..d8bf3d2 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java @@ -161,4 +161,13 @@ public Integer getKeepAliveTimeout() { public void setKeepAliveTimeout(Integer keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } + + @Override + public String toString() { + return " requestTimeout:" + requestTimeout + "connectTimeout:" + connectTimeout + + " socketTimeout:" + socketTimeout + " keepAliveTimeout:" + keepAliveTimeout + + " maxTotal:" + maxTotal + " maxPerRoute" + maxPerRoute + + " keyMaterialPath" + keyMaterialPath + " keyPassword" + keyPassword + + " storePassword" + storePassword + " trustMaterialPath" + trustMaterialPath; + } } diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java index 846e8a1..a1516cf 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java @@ -34,4 +34,25 @@ public Map getProperties() { public void setProperties(Map properties) { this.properties = properties; } + + @Override + public String toString() { + StringBuilder kafkaSb = new StringBuilder(); + + kafkaSb.append(" topic:") + .append(topic) + .append(" bootstrapServers:") + .append(bootstrapServers); + + if (properties != null) { + kafkaSb.append("properties:"); + for (Map.Entry entry:properties.entrySet()) { + kafkaSb.append(entry.getKey()) + .append(":") + .append(entry.getValue()); + } + } + + return kafkaSb.toString(); + } } diff --git a/datarangers-sdk-core/src/main/java/test/AppEventCollectorTest.java b/datarangers-sdk-core/src/main/java/test/AppEventCollectorTest.java new file mode 100644 index 0000000..bc20200 --- /dev/null +++ b/datarangers-sdk-core/src/main/java/test/AppEventCollectorTest.java @@ -0,0 +1,38 @@ +package test; + +import com.datarangers.collector.AppEventCollector; +import com.datarangers.config.DataRangersSDKConfigProperties; +import com.datarangers.config.KafkaConfig; +import com.datarangers.config.SdkMode; +import org.junit.Test; + +/** + * @author qifeng.64343747@bytedance.com + * @date 2022-08-18 + */ +public class AppEventCollectorTest { + + @Test + public void initAppEventCollectorTest() { + //kafka + DataRangersSDKConfigProperties dataRangersSDKConfigProperties = new DataRangersSDKConfigProperties(); + dataRangersSDKConfigProperties.setMode(SdkMode.KAFKA); + KafkaConfig kafkaConfig = new KafkaConfig(); + kafkaConfig.setBootstrapServers("servers"); + dataRangersSDKConfigProperties.setKafka(kafkaConfig); + AppEventCollector appEventCollector = new AppEventCollector("app", dataRangersSDKConfigProperties, null); + + // http + dataRangersSDKConfigProperties.setMode(SdkMode.HTTP); + appEventCollector = new AppEventCollector("app", dataRangersSDKConfigProperties, null); + //file + dataRangersSDKConfigProperties.setMode(SdkMode.FILE); + appEventCollector = new AppEventCollector("app", dataRangersSDKConfigProperties, null); + + } + + @Test + public void sendEvent() { + + } +} From 544dc0ff421ce30003544d36bfec2e4138a9bc47 Mon Sep 17 00:00:00 2001 From: qifeng Date: Wed, 28 Sep 2022 16:57:50 +0800 Subject: [PATCH 5/5] fix httpConfig toString --- .../src/main/java/com/datarangers/config/HttpConfig.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java index d8bf3d2..2f4bb13 100644 --- a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java +++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java @@ -166,8 +166,8 @@ public void setKeepAliveTimeout(Integer keepAliveTimeout) { public String toString() { return " requestTimeout:" + requestTimeout + "connectTimeout:" + connectTimeout + " socketTimeout:" + socketTimeout + " keepAliveTimeout:" + keepAliveTimeout + - " maxTotal:" + maxTotal + " maxPerRoute" + maxPerRoute + - " keyMaterialPath" + keyMaterialPath + " keyPassword" + keyPassword + - " storePassword" + storePassword + " trustMaterialPath" + trustMaterialPath; + " maxTotal:" + maxTotal + " maxPerRoute:" + maxPerRoute + + " keyMaterialPath:" + keyMaterialPath + " keyPassword:" + keyPassword + + " storePassword:" + storePassword + " trustMaterialPath:" + trustMaterialPath; } }