diff --git a/README.md b/README.md
index ee935b4..fad4f13 100644
--- a/README.md
+++ b/README.md
@@ -13,11 +13,11 @@ datarangers-sdk-java是 [DataFinder](https://www.volcengine.com/product/datafind
com.datarangers
datarangers-sdk-core
- 1.5.3-release
+ 1.5.5-release
```
-version是sdk的版本号,当前最新的版本为1.5.3-release。
+version是sdk的版本号,当前最新的版本为1.5.5-release。
火山引擎仓库地址:
```xml
@@ -32,7 +32,7 @@ version是sdk的版本号,当前最新的版本为1.5.3-release。
### 2. 配置SDK
DataRangers SDK需要进行一定的参数配置才能够使用,具体需要配置的参数为:
-* domain:datarangers的域名或者ip,支持http和https,例如为 http://www.datarangers.com,在私有化环境中,需要修改为对应的sdk上报域名或者使用DataRangers服务器的ip地址。在saas环境中需要修改成对应的域名:
+* domain:datarangers的域名或者ip,支持http和https,例如为 https://www.xxx.com,在私有化环境中,需要修改为对应的sdk上报域名或者使用DataRangers服务器的ip地址。在saas环境中需要修改成对应的域名:
* 中国区:https://mcs.ctobsnssdk.com
* sg(新加坡): https://mcs.tobsnssdk.com
* va(美东): https://mcs.itobsnssdk.com
@@ -86,7 +86,7 @@ DataRangers SDK需要进行一定的参数配置才能够使用,具体需要
com.datarangers
datarangers-sdk-starter
- 1.5.3-release
+ 1.5.5-release
```
@@ -108,20 +108,31 @@ datarangers.sdk.headers.Host=host
# datarangers.sdk.appKeys.${appId}=xxx
# 如果是在saas环境中,需要配置openapi, 私有化环境中可以不配置
-# openapi的domain, 国内: https://analytics.volcengineapi.com,国际是: https://datarangers.com
+# openapi的domain, 国内: https://analytics.volcengineapi.com,国际是: https://analytics.byteplusapi.com
# datarangers.sdk.openapiConfig.domain=xxx
# openapi的ak, sk
# datarangers.sdk.openapiConfig.ak=xxx
# datarangers.sdk.openapiConfig.sk=xxx
-
# 是否保存到本地,如果需要配合logagent使用需要将其定义为true
datarangers.sdk.save=true
# 异步方式的发送线程数量,如果为logagent模式请设置为1
datarangers.sdk.threadCount=4
+# 异步方式的发送核心线程数量,建议corePoolSize 跟threadCount 配置成一样
+datarangers.sdk.corePoolSize=4
# 异步方式队列长度
datarangers.sdk.queueSize=102400
+
+# 是否使用批量发送,默认为false
+#datarangers.sdk.sendBatch=true
+
+# 批量发送的大小
+#datarangers.sdk.batchSize=16
+
+# 批量的等待时间,当批量达到batchSize,或者等待时间超过waitTimeMs,就立刻发送
+#datarangers.sdk.waitTimeMs=100
+
# 保存日志文件路径
datarangers.sdk.eventSavePath=logs/
# 保存日志文件名
diff --git a/datarangers-sdk-core/pom.xml b/datarangers-sdk-core/pom.xml
new file mode 100644
index 0000000..e3e96e7
--- /dev/null
+++ b/datarangers-sdk-core/pom.xml
@@ -0,0 +1,95 @@
+
+
+
+ datarangers-sdk
+ com.datarangers
+ 1.5.5-release
+
+ 4.0.0
+ jar
+ datarangers-server-sdk-core
+ A SDK for datarangers user
+ https://github.com/volcengine/datarangers-sdk-java
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+ datarangers-opensource
+ datarangers-opensource@bytedance.com
+ DataFinder
+ https://www.volcengine.com/product/datafinder
+
+
+ com.datarangers
+ datarangers-sdk-core
+ 1.5.5-release
+
+
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.0.1
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.11.4
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.11.4
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.11.4
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.2.1
+
+
+ junit
+ junit
+ 4.12
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.0
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java
index 6fd53ae..c8f96c6 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorContainer.java
@@ -43,6 +43,10 @@ public List consume(int waitTimeMs) throws InterruptedException {
return handleMessage(messageQueue.poll(waitTimeMs));
}
+ public List consume(int size, int waitTimeMs) throws InterruptedException {
+ return handleMessage(messageQueue.poll(size, waitTimeMs));
+ }
+
public int size() {
return messageQueue.size();
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java
index c3eed78..493c5c2 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorCounter.java
@@ -34,7 +34,7 @@ public void run() {
FileOutputStream stream = null;
try {
stream = new FileOutputStream(output, true);
- Map status = new HashMap() {{
+ Map status = new HashMap(2) {{
put("history", CollectorContainer.SEND_HISTORY);
put("queue_length", Collector.collectorContainer.size());
}};
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java
index 8edf846..bbae533 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/CollectorQueue.java
@@ -23,7 +23,17 @@ public interface CollectorQueue {
* @date: 2021/2/7 15:51
*/
List take() throws InterruptedException;
+
List poll(int waitTimeMs) throws InterruptedException;
+
+ /**
+ * 每次等待waitTimeMs,获取数据最多size的数据
+ * @param size
+ * @param waitTimeMs
+ * @return
+ * @throws InterruptedException
+ */
+ List poll(int size, int waitTimeMs) throws InterruptedException;
/**
* 功能描述: 发送一个Message到队列中
*
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java
index 6d1f198..5961d95 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/Consumer.java
@@ -13,125 +13,143 @@
import com.datarangers.message.AppMessage;
import com.datarangers.message.Message;
import com.datarangers.sender.MessageSenderFactory;
-
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class Consumer implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
- private static RangersLoggerWriterPool pool;
- private CollectorContainer collectorContainer;
- private DataRangersSDKConfigProperties sdkConfigProperties;
-
- public Consumer(CollectorContainer collectorContainer,
- DataRangersSDKConfigProperties sdkConfigProperties) {
- this.collectorContainer = collectorContainer;
- this.sdkConfigProperties = sdkConfigProperties;
- }
-
- public static void setWriterPool(final List targetPrefixes, String targetNames,
- int maxSize) {
- if (pool == null) {
- synchronized (Consumer.class) {
+ private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
+ private static RangersLoggerWriterPool pool;
+ private CollectorContainer collectorContainer;
+ private DataRangersSDKConfigProperties sdkConfigProperties;
+
+ public Consumer(CollectorContainer collectorContainer,
+ DataRangersSDKConfigProperties sdkConfigProperties) {
+ this.collectorContainer = collectorContainer;
+ this.sdkConfigProperties = sdkConfigProperties;
+ }
+
+ public static void setWriterPool(final List targetPrefixes, String targetNames,
+ int maxSize) {
if (pool == null) {
- pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize);
+ synchronized (Consumer.class) {
+ if (pool == null) {
+ pool = RangersLoggerWriterPool.getInstance(targetPrefixes, targetNames, maxSize);
+ }
+ }
}
- }
}
- }
-
- private void send() throws Exception {
- while (true) {
- try {
- List messages = collectorContainer.consume();
- if (messages != null) {
- messages.forEach(message -> {
- doSend(message);
- });
+
+ private void send() throws Exception {
+ while (true) {
+ try {
+ List messages = collectorContainer.consume();
+ if (messages != null) {
+ messages.forEach(message -> {
+ doSend(message);
+ });
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ logger.error("consumer send error", e);
+ }
}
- } catch (Throwable e) {
- e.printStackTrace();
- logger.error("consumer send error", e);
- }
}
- }
-
- private void write() throws Exception {
- while (true) {
- try {
- List messages = collectorContainer.consume();
- if (messages != null) {
- messages.forEach(message -> {
- doWrite(message);
- });
+
+ private void sendBatch() throws Exception {
+ while (true) {
+ try {
+ List messages = collectorContainer.consume(sdkConfigProperties.getBatchSize(),
+ sdkConfigProperties.getWaitTimeMs());
+ if (messages != null && messages.size() > 0) {
+ MessageSenderFactory.getMessageSender(messages.get(0))
+ .sendBatch(messages, this.sdkConfigProperties);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ logger.error("consumer send error", e);
+ }
}
- } catch (Throwable e) {
- logger.error("consumer write error", e);
- }
+ }
+ private void write() throws Exception {
+ while (true) {
+ try {
+ List messages = collectorContainer.consume();
+ if (messages != null) {
+ messages.forEach(message -> {
+ doWrite(message);
+ });
+ }
+ } catch (Throwable e) {
+ logger.error("consumer write error", e);
+ }
+
+ }
}
- }
-
- @Override
- public void run() {
- try {
- if (EventConfig.saveFlag) {
- write();
- } else {
- send();
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("consumer run error", e);
+
+ @Override
+ public void run() {
+ try {
+ if (EventConfig.saveFlag) {
+ write();
+ } else if (sdkConfigProperties.isSendBatch()) {
+ sendBatch();
+ } else {
+ send();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("consumer run error", e);
+ }
}
- }
-
- public void flush() {
- try {
- System.out.println("flush message start");
- logger.info("flush message start");
- int count = 0;
- if (collectorContainer.getMessageQueue() != null) {
- Message message = collectorContainer.getMessageQueue().poll();
- while (message != null) {
- count++;
- message = collectorContainer.handleMessage(message);
- if (EventConfig.saveFlag) {
+
+ public void flush() {
+ try {
+ System.out.println("flush message start");
+ logger.info("flush message start");
+ int count = 0;
+ if (collectorContainer.getMessageQueue() != null) {
+ Message message = collectorContainer.getMessageQueue().poll();
+ while (message != null) {
+ count++;
+ message = collectorContainer.handleMessage(message);
+ if (EventConfig.saveFlag) {
+ doWrite(message);
+ } else {
+ doSend(message);
+ }
+ message = collectorContainer.getMessageQueue().poll();
+ }
+ }
+
+ logger.info("flush message success. size: {}", count);
+ System.out.println("flush message success. size: " + count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("flush message error", e);
+ }
+ }
+
+ public void flush(Message message) {
+ message = collectorContainer.handleMessage(message);
+ if (EventConfig.saveFlag) {
doWrite(message);
- } else {
+ } else {
doSend(message);
- }
- message = collectorContainer.getMessageQueue().poll();
}
- }
+ }
- logger.info("flush message success. size: {}", count);
- System.out.println("flush message success. size: " + count);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("flush message error", e);
+ private void doSend(Message message) {
+ MessageSenderFactory.getMessageSender(message)
+ .send(message, this.sdkConfigProperties);
}
- }
-
- public void flush(Message message) {
- message = collectorContainer.handleMessage(message);
- if (EventConfig.saveFlag) {
- doWrite(message);
- } else {
- doSend(message);
+
+ private void doWrite(Message message) {
+ AppMessage appMessage = message.getAppMessage();
+ pool.getWriter(appMessage.getUserUniqueId())
+ .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n");
}
- }
-
- private void doSend(Message message) {
- MessageSenderFactory.getMessageSender(message)
- .send(message, this.sdkConfigProperties);
- }
-
- private void doWrite(Message message) {
- AppMessage appMessage = message.getAppMessage();
- pool.getWriter(appMessage.getUserUniqueId())
- .write(RangersJSONConfig.getInstance().toJson(appMessage) + "\n");
- }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java
index deced6c..39793a8 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/asynccollector/RangersCollectorQueue.java
@@ -2,6 +2,7 @@
import com.datarangers.message.Message;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -57,6 +58,26 @@ public List poll(int waitTimeMs) throws InterruptedException {
return null;
}
+ @Override
+ public List poll(int size, int waitTimeMs) throws InterruptedException {
+ List messages = new ArrayList<>();
+ Message msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS);
+ if(msg != null){
+ messages.add(msg);
+ }
+
+ // 只有
+ while(messages.size() < size){
+ msg = queue.poll(waitTimeMs, TimeUnit.MILLISECONDS);
+ if(msg == null){
+ // 退出循环
+ break;
+ }
+ messages.add(msg);
+ }
+ return messages;
+ }
+
@Override
public void put(Message t) throws InterruptedException {
queue.put(t);
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java
index 3cf9e12..927d4d0 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/AppEventCollector.java
@@ -32,15 +32,6 @@ public AppEventCollector(String appType, DataRangersSDKConfigProperties properti
public AppEventCollector(String appType, DataRangersSDKConfigProperties properties, Callback cb) {
super(appType, properties, cb);
- if (properties != null) {
- properties.init();
-
- // 设置同步发送的consumer,队列满的时候使用
- setConsumer(new Consumer(Collector.collectorContainer, this.properties));
-
- } else {
- System.out.println(Constants.INIT_ERROR);
- }
}
@Override
@@ -227,6 +218,7 @@ private void sendEvents(Header header, List events, MessageType messageTy
appMessage.setAppType(getAppType());
appMessage.setHeader(header);
appMessage.addEvents(events);
+ appMessage.setTraceId(UUID.randomUUID().toString());
message.setAppMessage(appMessage);
send(message);
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java
index fc69e3a..6b38a0d 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/collector/Collector.java
@@ -6,185 +6,343 @@
*/
package com.datarangers.collector;
-import com.datarangers.asynccollector.CollectorContainer;
-import com.datarangers.asynccollector.Consumer;
+import com.datarangers.asynccollector.*;
import com.datarangers.config.*;
+import com.datarangers.logger.RangersFileCleaner;
import com.datarangers.message.AppMessage;
import com.datarangers.message.Message;
import com.datarangers.message.MessageEnv;
import com.datarangers.sender.Callback;
import com.datarangers.sender.Callback.FailedData;
+import com.datarangers.sender.callback.LoggingCallback;
+import com.datarangers.util.HttpUtils;
+import org.apache.hc.client5.http.classic.HttpClient;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* @author taojian
*/
public abstract class Collector implements EventCollector {
- private String appType;
- public static final Logger logger = LoggerFactory.getLogger("DatarangersLog");
- public static ExecutorService httpRequestPool = null;
- public static ScheduledExecutorService scheduled = null;
- public static CollectorContainer collectorContainer;
-
- private boolean enable;
- protected DataRangersSDKConfigProperties properties;
- protected Callback callback;
- protected Consumer consumer = null;
- protected KafkaProducer kafkaProducer;
-
- public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) {
- this.appType = appType;
- this.enable = properties.isEnable();
- this.properties = properties;
- this.callback = cb;
- this.properties.setCallback(this.getCallback());
- this.initKafkaProducer();
- }
-
- private void initKafkaProducer(){
- if(SdkMode.KAFKA != this.properties.getMode()){
- return;
- }
- // 设置过了就不需要再自己创建
- if(kafkaProducer != null){
- return;
- }
- kafkaProducer = createProducer(this.properties.getKafka());
- }
-
- private KafkaProducer createProducer(KafkaConfig kafkaConfig) {
- Properties props = new Properties();
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
- Map map = kafkaConfig.getProperties();
- if (map != null && (!map.isEmpty())) {
- props.putAll(map);
- }
- return new KafkaProducer<>(props);
- }
- public String getAppType() {
- return appType;
- }
-
- public Collector setAppType(String appType) {
- this.appType = appType;
- return this;
- }
-
-
- public void send(Message message) {
- sendMessage(message);
- }
-
- protected void sendMessage(Message message) {
- if (!enable) {
- return;
- }
- message.merge();
- String sendMessage;
-
- validate(message);
- if(kafkaProducer != null){
- // 使用kafka的方式
- sendByKafka(message.getAppMessage());
- return;
- }
- sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage());
- if (this.properties.isSync()) {
- syncSendMessage(message, sendMessage);
- } else {
- asyncSendMessage(message, sendMessage);
- }
- }
-
- private void sendByKafka(AppMessage appMessage) {
- // kafka sender,header 添加固定的头
- appMessage.getHeader().setSource(Constants.SDK_SERVER);
- String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage);
- try {
- ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage);
- kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
+ private String appType;
+ public static final Logger logger = LoggerFactory.getLogger("DatarangersLog");
+ public static ExecutorService httpRequestPool = null;
+ public static ScheduledExecutorService scheduled = null;
+ public static CollectorContainer collectorContainer;
+ private boolean enable;
+ protected DataRangersSDKConfigProperties properties;
+ protected Callback callback;
+ protected Consumer consumer = null;
+ protected KafkaProducer kafkaProducer;
+ private static volatile Boolean IS_INIT = false;
+
+ public Collector(String appType, DataRangersSDKConfigProperties properties, Callback cb) {
+ logger.info("sdk config properties: {}", properties);
+ System.out.println("sdk config properties: " + properties.toString());
+ this.appType = appType;
+ this.enable = properties.isEnable();
+ this.properties = properties;
+ this.callback = cb;
+ this.properties.setCallback(this.getCallback());
+ this.init();
+ }
+
+ public String getAppType() {
+ return appType;
+ }
+
+ public void setAppType(String appType) {
+ this.appType = appType;
+ }
+
+ public Callback getCallback() {
+ return callback;
+ }
+
+ public void setCallback(Callback callback) {
+ this.callback = callback;
+ }
+
+ public Consumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+
+ private void initKafkaProducer() {
+ logger.info("init kafka producer");
+ if (SdkMode.KAFKA != this.properties.getMode()) {
+ return;
+ }
+ // 设置过了就不需要再自己创建
+ if (kafkaProducer != null) {
+ return;
+ }
+ kafkaProducer = createProducer(this.properties.getKafka());
+ }
+
+ private KafkaProducer createProducer(KafkaConfig kafkaConfig) {
+ Properties props = new Properties();
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
+ Map map = kafkaConfig.getProperties();
+ if (map != null && (!map.isEmpty())) {
+ props.putAll(map);
+ }
+ return new KafkaProducer<>(props);
+ }
+
+ public void send(Message message) {
+ sendMessage(message);
+ }
+
+ private void sendMessage(Message message) {
+ if (!enable) {
+ return;
+ }
+ message.merge();
+ String sendMessage;
+
+ validate(message);
+ if (kafkaProducer != null) {
+ // 使用kafka的方式
+ sendByKafka(message.getAppMessage());
+ return;
+ }
+ sendMessage = RangersJSONConfig.getInstance().toJson(message.getAppMessage());
+ if (this.properties.isSync()) {
+ syncSendMessage(message, sendMessage);
+ } else {
+ asyncSendMessage(message, sendMessage);
+ }
+ }
+
+ private void sendByKafka(AppMessage appMessage) {
+ // kafka sender,header 添加固定的头
+ appMessage.getHeader().setSource(Constants.SDK_SERVER);
+ String sendMessage = RangersJSONConfig.getInstance().toJson(appMessage);
+ try {
+ ProducerRecord producerRecord = new ProducerRecord<>(properties.getKafka().getTopic(), sendMessage);
+ kafkaProducer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() {
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ if (e != null) {
+ logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e);
+ getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false));
+ }
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e);
- getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e));
- }
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- logger.error(String.format("kafka send message error. value: \r\n %s", sendMessage), e);
- getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e));
- }
- }
-
- private void syncSendMessage(Message message, String sendMessage) {
- try {
- this.consumer.flush(message);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("sync send message error", e);
- getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e));
- }
- }
-
- private void asyncSendMessage(Message message, String sendMessage) {
- if (collectorContainer.getMessageQueue() != null) {
- try {
- collectorContainer.produce(message);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("async send message error", e);
- getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e));
- }
- } else {
- logger.error("getMessageQueue is null");
- getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null"));
- }
- }
-
- /**
- * message 检查
- */
- private void validate(Message message) {
- // 当前只有saas需要校验下appkey
- if (this.properties.getMessageEnv() != MessageEnv.SAAS) {
- return;
- }
-
- Integer appId = message.getAppMessage().getAppId();
- Map appKeys = this.properties.getAppKeys();
- String appKey = appKeys.get(appId);
- if (appKey == null) {
- throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId);
- }
- }
-
- public Callback getCallback() {
- return callback;
- }
-
- public void setCallback(Callback callback) {
- this.callback = callback;
- }
-
- public Consumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(Consumer consumer) {
- this.consumer = consumer;
- }
+ getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false));
+ }
+ }
+
+ private void syncSendMessage(Message message, String sendMessage) {
+ try {
+ this.consumer.flush(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("sync send message error", e);
+ getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false));
+ }
+ }
+
+ private void asyncSendMessage(Message message, String sendMessage) {
+ if (collectorContainer.getMessageQueue() != null) {
+ try {
+ collectorContainer.produce(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("async send message error", e);
+ getCallback().onFailed(new FailedData(sendMessage, e.getMessage(), e, false));
+ }
+ } else {
+ logger.error("getMessageQueue is null");
+ getCallback().onFailed(new FailedData(sendMessage, "getMessageQueue is null", false));
+ }
+ }
+
+ /**
+ * message 检查
+ */
+ private void validate(Message message) {
+ // 当前只有saas需要校验下appkey
+ if (this.properties.getMessageEnv() != MessageEnv.SAAS) {
+ return;
+ }
+
+ Integer appId = message.getAppMessage().getAppId();
+ Map appKeys = this.properties.getAppKeys();
+ String appKey = appKeys.get(appId);
+ if (appKey == null) {
+ throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId);
+ }
+ }
+
+ /**
+ * 配置初始化
+ */
+ private void init() {
+ if (!IS_INIT) {
+ synchronized (Collector.class) {
+ if (!IS_INIT) {
+ initLogger();
+ initCommon();
+ initKafkaProducer();
+ initConsumer();
+ initHook();
+ IS_INIT = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * 日志pool初始化
+ */
+ private void initLogger() {
+ logger.info("init log writer pool");
+ List eventFilePaths = properties.getEventFilePaths();
+ String eventSaveName = properties.getEventSaveName();
+ int eventSaveMaxFileSize = properties.getEventSaveMaxFileSize();
+ String eventSavePath = properties.getEventSavePath();
+
+ Consumer.setWriterPool(eventFilePaths, eventSaveName, eventSaveMaxFileSize);
+ if (properties.getCallback() == null) {
+ properties.setCallback(new LoggingCallback(eventSavePath, "error-" + eventSaveName,
+ eventSaveMaxFileSize));
+ }
+ }
+
+ /**
+ * eventConfig,httpclient,EventConfig 初始化
+ */
+ private void initCommon() {
+ HttpConfig httpConfig = properties.getHttpConfig();
+ HttpClient httpClient = properties.getCustomHttpClient();
+ Callback callback = properties.getCallback();
+ int httpTimeOut = properties.getHttpTimeout();
+
+ //EventConfig配置
+ EventConfig.saveFlag = SdkMode.FILE == properties.getMode();
+ EventConfig.sendFlag = SdkMode.HTTP == properties.getMode();
+ EventConfig.setUrl(properties.getDomain());
+
+ if (EventConfig.sendFlag) {
+ if (httpConfig.getMaxPerRoute() < properties.getCorePoolSize()) {
+ httpConfig.setMaxPerRoute(properties.getCorePoolSize());
+ }
+ if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) {
+ httpConfig.setMaxTotal(httpConfig.getMaxPerRoute());
+ }
+ // 老版本配置做兼容
+ httpConfig.initTimeOut(httpTimeOut);
+ //httpclient 初始化
+ HttpUtils.createHttpClient(httpConfig, httpClient, callback);
+
+ //EventConfig 初始化
+ if (EventConfig.SEND_HEADER == null) {
+ EventConfig.SEND_HEADER = properties.getHeaders();
+ EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK");
+ EventConfig.SEND_HEADER.put("Content-Type", "application/json");
+ List headerList = new ArrayList<>();
+ EventConfig.SEND_HEADER
+ .forEach((key, value) -> headerList.add(new BasicHeader(key, value)));
+ EventConfig.headers = headerList.toArray(new Header[0]);
+ }
+ }
+ }
+
+ /**
+ * 初始化消费者, httpRequestPool, 日志记录和清理任务
+ */
+ private void initConsumer() {
+ logger.info("init consumer");
+ int threadCount = this.properties.getCorePoolSize();
+ CollectorQueue userQueue = this.properties.getUserQueue();
+
+ if (EventConfig.saveFlag) {
+ threadCount = 1;
+ logger.info("Start LogAgent Mode");
+ } else {
+ logger.info("Start Http Mode");
+ }
+
+ // 如果客户自定义了queue,则需要替换为客户自定义queue,否则使用默认的队列
+ if (userQueue == null) {
+ collectorContainer = new CollectorContainer(
+ RangersCollectorQueue.getInstance(this.properties.getQueueSize()));
+ } else {
+ collectorContainer = new CollectorContainer(userQueue);
+ }
+
+ boolean isSync = this.properties.isSync();
+ boolean hasConsumer = this.properties.isHasConsumer();
+ boolean hasProducer = this.properties.isHasProducer();
+ String eventSavePath = this.properties.getEventSavePath();
+ List eventFilePaths = properties.getEventFilePaths();
+ String eventSaveName = this.properties.getEventSaveName();
+ int eventSaveMaxDays = this.properties.getEventSaveMaxDays();
+ // 设置同步发送的consumer,队列满的时候使用
+ if (isSync) {
+ setConsumer(new Consumer(Collector.collectorContainer, this.properties));
+ }
+ // 异步起多个消费者
+ if (!isSync && hasConsumer && httpRequestPool == null) {
+ httpRequestPool = Executors.newFixedThreadPool(this.properties.getCorePoolSize());
+ for (int i = 0; i < threadCount; i++) {
+ //必须全部消费同一个队列
+ httpRequestPool.execute(new Consumer(collectorContainer, properties));
+ }
+ }
+ if ((!isSync) && hasProducer) {
+ //定时记录日志的条数
+ scheduled = Executors.newSingleThreadScheduledExecutor();
+ scheduled
+ .scheduleAtFixedRate(new CollectorCounter(eventSavePath), 0, 2, TimeUnit.MINUTES);
+ if (EventConfig.saveFlag) {
+ // 清理日志文件定时任务, 每隔12小时清理一次
+ scheduled.scheduleAtFixedRate(
+ new RangersFileCleaner(eventFilePaths, eventSaveName, eventSaveMaxDays),
+ 0, 12, TimeUnit.HOURS);
+ logger.info("Start DataRangers Cleaner/Record Thread");
+ }
+ }
+ }
+
+ /**
+ * jvm关闭时,一些需要做的清理任务
+ */
+ private void initHook() {
+ logger.info("init hook");
+ Runtime.getRuntime().addShutdownHook(new Thread(
+ () -> {
+ if (Collector.httpRequestPool != null) {
+ Collector.httpRequestPool.shutdown();
+ }
+
+ new Consumer(Collector.collectorContainer, properties).flush();
+ }));
+ }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java
index 8dae554..add87dc 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/Constants.java
@@ -11,10 +11,11 @@
import java.util.GregorianCalendar;
public class Constants {
- public static final String SDK_VERSION = "datarangers_sdk_1.5.3-release";
+ public static final String SDK_VERSION = "datarangers_sdk_1.5.5-release";
public static DateTimeFormatter FULL_HOUR = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
public static DateTimeFormatter FULL_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static final String APP_LOG_PATH = "/sdk/log";
+ public static final String APP_LIST_PATH = "/sdk/list";
public static final String DEFAULT_USER = "__rangers";
public static final String INIT_ERROR = "sdk config must not be null";
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java
index ada2c0b..b02d6a4 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/DataRangersSDKConfigProperties.java
@@ -7,34 +7,20 @@
package com.datarangers.config;
import com.datarangers.asynccollector.*;
-import com.datarangers.collector.Collector;
-import com.datarangers.logger.RangersFileCleaner;
import com.datarangers.message.MessageEnv;
import com.datarangers.sender.Callback;
-import com.datarangers.sender.callback.LoggingCallback;
-import com.datarangers.util.HttpUtils;
import org.apache.hc.client5.http.classic.HttpClient;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.ZoneOffset;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
* @author hTangle
*/
public class DataRangersSDKConfigProperties {
- public static final Logger logger = LoggerFactory.getLogger(DataRangersSDKConfigProperties.class);
public Map headers;
public String domain;
- public int threadPoolCount = 1;
- public int maxPoolSize = 8;
public int corePoolSize = 4;
/**
@@ -54,25 +40,20 @@ public class DataRangersSDKConfigProperties {
*/
@Deprecated
public boolean save = false;
- public int threadCount = 1;
public int queueSize = 10240;
public boolean send = true;
- public boolean isEnable() {
- return enable;
- }
-
- public void setEnable(boolean enable) {
- this.enable = enable;
- }
+ private boolean sendBatch = false;
+ private int batchSize = 20;
+ private int waitTimeMs = 100;
public boolean enable = true;
public String eventSavePath = "logs/";
public List eventFilePaths;
public String eventSaveName = "datarangers.log";
- public int eventSaveMaxHistory = 20;
public int eventSaveMaxFileSize = 100;
+ // 日志清理时间
public int eventSaveMaxDays = 5;
public CollectorQueue userQueue;
@@ -92,8 +73,8 @@ public void setEnable(boolean enable) {
private String env = "privatization";
private List SAAS_DOMAIN_URLS = Arrays.asList(
- "https://mcs.ctobsnssdk.com",
- "https://mcs.tobsnssdk.com");
+ "https://mcs.ctobsnssdk.com",
+ "https://mcs.tobsnssdk.com");
/**
* saas openapi 配置地址
@@ -110,6 +91,38 @@ public void setEnable(boolean enable) {
private SdkMode mode;
private KafkaConfig kafka;
+ public boolean isSendBatch() {
+ return sendBatch;
+ }
+
+ public void setSendBatch(boolean sendBatch) {
+ this.sendBatch = sendBatch;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public int getWaitTimeMs() {
+ return waitTimeMs;
+ }
+
+ public void setWaitTimeMs(int waitTimeMs) {
+ this.waitTimeMs = waitTimeMs;
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
public KafkaConfig getKafka() {
return kafka;
}
@@ -215,15 +228,6 @@ public DataRangersSDKConfigProperties setEventSaveName(String eventSaveName) {
return this;
}
- public int getEventSaveMaxHistory() {
- return eventSaveMaxHistory;
- }
-
- public DataRangersSDKConfigProperties setEventSaveMaxHistory(int eventSaveMaxHistory) {
- this.eventSaveMaxHistory = eventSaveMaxHistory;
- return this;
- }
-
public int getEventSaveMaxFileSize() {
return eventSaveMaxFileSize;
}
@@ -242,14 +246,6 @@ public DataRangersSDKConfigProperties setSend(boolean send) {
return this;
}
- public int getThreadCount() {
- return threadCount;
- }
-
- public void setThreadCount(int threadCount) {
- this.threadCount = threadCount;
- }
-
public int getQueueSize() {
return queueSize;
}
@@ -258,14 +254,6 @@ public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
- public int getMaxPoolSize() {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(int maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
- }
-
public int getCorePoolSize() {
return corePoolSize;
}
@@ -293,14 +281,6 @@ public void setDomain(String domain) {
this.domain = domain;
}
- public int getThreadPoolCount() {
- return threadPoolCount;
- }
-
- public void setThreadPoolCount(int threadPoolCount) {
- this.threadPoolCount = threadPoolCount;
- }
-
public ZoneOffset getTimeOffset() {
return timeOffset;
}
@@ -328,98 +308,6 @@ public void setTimeZone(String timeZone) {
this.timeZone = timeZone;
}
- public void setLogger() {
- Consumer.setWriterPool(getEventFilePaths(), getEventSaveName(), getEventSaveMaxFileSize());
- if (callback == null) {
- setCallback(new LoggingCallback(getEventSavePath(), "error-" + getEventSaveName(),
- getEventSaveMaxFileSize()));
- }
- }
-
- public void setCommon() {
- EventConfig.saveFlag = SdkMode.FILE == getMode();
- EventConfig.sendFlag = SdkMode.HTTP == getMode();
- if (EventConfig.sendFlag) {
- httpConfig = this.getHttpConfig();
- if (httpConfig.getMaxPerRoute() < this.getThreadCount()) {
- httpConfig.setMaxPerRoute(this.getThreadCount());
- }
- if (httpConfig.getMaxTotal() < httpConfig.getMaxPerRoute()) {
- httpConfig.setMaxTotal(httpConfig.getMaxPerRoute());
- }
- httpConfig.initTimeOut(getHttpTimeout());
- HttpUtils
- .createHttpClient(this.getHttpConfig(), this.getCustomHttpClient(), this.getCallback());
- if (EventConfig.SEND_HEADER == null) {
- EventConfig.SEND_HEADER = getHeaders();
- EventConfig.SEND_HEADER.put("User-Agent", "DataRangers Java SDK");
- EventConfig.SEND_HEADER.put("Content-Type", "application/json");
- List headerList = new ArrayList<>();
- EventConfig.SEND_HEADER
- .forEach((key, value) -> headerList.add(new BasicHeader(key, value)));
- EventConfig.headers = headerList.toArray(new Header[0]);
- }
- }
- setConsumer(getThreadCount());
- EventConfig.setUrl(getDomain());
- }
-
-
- public ExecutorService setThreadPool() {
- return Executors.newFixedThreadPool(getCorePoolSize());
- }
-
- public void setConsumer(int threadCount) {
- if (EventConfig.saveFlag) {
- threadCount = 1;
- logger.info("Start LogAgent Mode");
- } else {
- logger.info("Start Http Mode");
- }
- if (userQueue == null) {
- Collector.collectorContainer = new CollectorContainer(
- RangersCollectorQueue.getInstance(getQueueSize()));
- } else {
- //如果客户自定义了queue,则需要替换为客户自定义queue
- Collector.collectorContainer = new CollectorContainer(userQueue);
- }
- if ((!sync) && hasConsumer && Collector.httpRequestPool == null) {
- //有消费者才初始化消费者
- Collector.httpRequestPool = setThreadPool();
- for (int i = 0; i < threadCount; i++) {//必须全部消费同一个队列
- Collector.httpRequestPool.execute(new Consumer(Collector.collectorContainer, this));
- }
- }
- if ((!sync) && hasProducer) {
- //有生产者才需要记录
- Collector.scheduled = Executors.newSingleThreadScheduledExecutor();
- Collector.scheduled
- .scheduleAtFixedRate(new CollectorCounter(getEventSavePath()), 0, 2, TimeUnit.MINUTES);
- if (EventConfig.saveFlag) {
- Collector.scheduled.scheduleAtFixedRate(
- new RangersFileCleaner(getEventFilePaths(), getEventSaveName(), getEventSaveMaxDays()),
- 0, 12, TimeUnit.HOURS);
- }
- }
- logger.info("Start DataRangers Cleaner/Record Thread");
- }
-
-
- public static volatile Boolean IS_INIT = false;
-
- public void init() {
- if (!IS_INIT) {
- synchronized (DataRangersSDKConfigProperties.class) {
- if (!IS_INIT) {
- setLogger();
- setCommon();
- setHook();
- IS_INIT = true;
- }
- }
- }
- }
-
public CollectorQueue getUserQueue() {
return userQueue;
}
@@ -476,17 +364,15 @@ public void setAppKeys(Map appKeys) {
this.appKeys = appKeys;
}
- private void setHook() {
- DataRangersSDKConfigProperties properties = this;
- Runtime.getRuntime().addShutdownHook(new Thread(
- () -> {
- if(Collector.httpRequestPool != null){
- Collector.httpRequestPool.shutdown();
- }
-
- new Consumer(Collector.collectorContainer, properties).flush();
- }));
+ @Override
+ public String toString() {
+ return " domain:" + domain + " corePoolSize:" + corePoolSize + " httpTimout:" + httpTimeout +
+ " timeZone:" + timeZone + " timeOffset:" + timeOffset + " save:" + save + " queueSize:" + queueSize +
+ " send:" + send + " sendBatch:" + sendBatch + " batchSize:" + batchSize + " waitTimeMs:" + waitTimeMs +
+ " enable:" + enable + " eventSavePath:" + eventSavePath + " eventSaveName:" + eventSaveName +
+ " eventSaveMaxFileSize:" + eventSaveMaxFileSize + " eventSaveMaxDays:" + eventSaveMaxDays +
+ " userQueue:" + userQueue + " hasConsumer:" + hasConsumer + " hasProducer:" + hasProducer +
+ " env" + env + " sync:" + sync + " mode:" + mode + " kafka:" + kafka + " httpConfig" + httpConfig;
}
-
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java
index 7f6e555..9244bfe 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/EventConfig.java
@@ -12,6 +12,7 @@
public class EventConfig {
public static String appUrl;
+ public static String appListUrl;
public static Header[] headers;
public static Map SEND_HEADER;
public static boolean saveFlag = false;
@@ -19,6 +20,7 @@ public class EventConfig {
public static void setUrl(String url) {
setAppUrl(url + Constants.APP_LOG_PATH);
+ setAppListUrl(url + Constants.APP_LIST_PATH);
}
public static String getAppUrl() {
@@ -28,4 +30,12 @@ public static String getAppUrl() {
public static void setAppUrl(String appUrl) {
EventConfig.appUrl = appUrl;
}
+
+ public static String getAppListUrl() {
+ return EventConfig.appListUrl;
+ }
+
+ public static void setAppListUrl(String appUrl) {
+ EventConfig.appListUrl = appUrl;
+ }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java
index d69348d..2f4bb13 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/HttpConfig.java
@@ -161,4 +161,13 @@ public Integer getKeepAliveTimeout() {
public void setKeepAliveTimeout(Integer keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
}
+
+ @Override
+ public String toString() {
+ return " requestTimeout:" + requestTimeout + "connectTimeout:" + connectTimeout +
+ " socketTimeout:" + socketTimeout + " keepAliveTimeout:" + keepAliveTimeout +
+ " maxTotal:" + maxTotal + " maxPerRoute:" + maxPerRoute +
+ " keyMaterialPath:" + keyMaterialPath + " keyPassword:" + keyPassword +
+ " storePassword:" + storePassword + " trustMaterialPath:" + trustMaterialPath;
+ }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java
index 846e8a1..a1516cf 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/config/KafkaConfig.java
@@ -34,4 +34,25 @@ public Map getProperties() {
public void setProperties(Map properties) {
this.properties = properties;
}
+
+ @Override
+ public String toString() {
+ StringBuilder kafkaSb = new StringBuilder();
+
+ kafkaSb.append(" topic:")
+ .append(topic)
+ .append(" bootstrapServers:")
+ .append(bootstrapServers);
+
+ if (properties != null) {
+ kafkaSb.append("properties:");
+ for (Map.Entry entry:properties.entrySet()) {
+ kafkaSb.append(entry.getKey())
+ .append(":")
+ .append(entry.getValue());
+ }
+ }
+
+ return kafkaSb.toString();
+ }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java
index 07fca3b..d3b1614 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/logger/RangersLoggerWriter.java
@@ -101,7 +101,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName, i
File parent = new File(targetPrefix);
if (!parent.exists()) parent.mkdirs();
this.currentIndex = setCurrentIndex();
- if (this.currentIndex == 0) this.currentIndex++;
+ this.currentIndex++;
fullTarget = this.targetPrefix + "/" + this.targetName;
this.output = new File(fullTarget);
currentName = targetName + "." + LocalDateTime.now().format(Constants.FULL_HOUR);
@@ -120,7 +120,7 @@ public RangersLoggerWriter(final String targetPrefix, final String targetName) {
private int setCurrentIndex() {
String current = LocalDateTime.now().format(Constants.FULL_HOUR);
- String full = targetName + "-" + current + "-";
+ String full = targetName + "." + current + ".";
int number = 0;
for (File f : new File(targetPrefix).listFiles()) {
if (f.getName().contains(full)) {
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java
index 824662f..2628a59 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/Callback.java
@@ -13,14 +13,15 @@ public interface Callback {
class FailedData {
- public FailedData(String message, String cause) {
- this(message, cause, null);
+ public FailedData(String message, String cause, boolean listable) {
+ this(message, cause, null, listable);
}
- public FailedData(String message, String cause, Exception exception) {
+ public FailedData(String message, String cause, Exception exception, boolean listable) {
this.message = message;
this.cause = cause;
this.exception = exception;
+ this.listable = listable;
}
/**
@@ -38,6 +39,8 @@ public FailedData(String message, String cause, Exception exception) {
*/
private Exception exception;
+ private boolean listable;
+
public String getMessage() {
return message;
}
@@ -61,5 +64,13 @@ public Exception getException() {
public void setException(Exception exception) {
this.exception = exception;
}
+
+ public boolean isListable() {
+ return listable;
+ }
+
+ public void setListable(boolean listable) {
+ this.listable = listable;
+ }
}
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java
index 2305771..53f3f00 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/MessageSender.java
@@ -3,6 +3,8 @@
import com.datarangers.config.DataRangersSDKConfigProperties;
import com.datarangers.message.Message;
+import java.util.List;
+
/**
* @Author zhangpeng.spin@bytedance.com
* @Date 2021-07-22
@@ -15,4 +17,13 @@ public interface MessageSender {
* @param sdkConfigProperties
*/
void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties);
+
+ /**
+ * 使用批量上报
+ * @param message
+ * @param sdkConfigProperties
+ */
+ default void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) {
+ throw new UnsupportedOperationException("Not support batch");
+ };
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java
index 89f1396..c69156e 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/PrivatizationMessageSender.java
@@ -3,9 +3,13 @@
import com.datarangers.config.DataRangersSDKConfigProperties;
import com.datarangers.config.EventConfig;
import com.datarangers.config.RangersJSONConfig;
+import com.datarangers.message.AppMessage;
import com.datarangers.message.Message;
import com.datarangers.util.HttpUtils;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
* @Author zhangpeng.spin@bytedance.com
* @Date 2021-07-22
@@ -17,4 +21,10 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper
Object sendMessage = message.getAppMessage();
HttpUtils.post(EventConfig.getAppUrl(), RangersJSONConfig.getInstance().toJson(sendMessage), EventConfig.SEND_HEADER);
}
+
+ @Override
+ public void sendBatch(List message, DataRangersSDKConfigProperties sdkConfigProperties) {
+ List sendMessages = message.stream().map(n -> n.getAppMessage()).collect(Collectors.toList());
+ HttpUtils.post(EventConfig.getAppListUrl(), RangersJSONConfig.getInstance().toJson(sendMessages), EventConfig.SEND_HEADER);
+ }
}
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java
index 5e90356..bc66da4 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/sender/callback/LoggingCallback.java
@@ -1,7 +1,13 @@
package com.datarangers.sender.callback;
+import com.datarangers.config.RangersJSONConfig;
import com.datarangers.logger.RangersLoggerWriter;
import com.datarangers.sender.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
/**
* @Author zhangpeng.spin@bytedance.com
@@ -9,6 +15,8 @@
*/
public class LoggingCallback implements Callback {
+ public static final Logger logger = LoggerFactory.getLogger(LoggingCallback.class);
+
private static RangersLoggerWriter writer;
private static final Object lock = new Object();
@@ -19,7 +27,24 @@ public LoggingCallback(final String targetPrefix, final String targetName, int m
@Override
public void onFailed(FailedData failedData) {
- writeFailedMessage(failedData.getMessage());
+ if (failedData.getMessage() == null) {
+ return;
+ }
+ if(!failedData.isListable()){
+ writeFailedMessage(failedData.getMessage());
+ }else{
+ try {
+ List list = RangersJSONConfig.getInstance().fromJson(failedData.getMessage(), List.class);
+ list.forEach(n-> {
+ writeFailedMessage(RangersJSONConfig.getInstance().toJson(n));
+ });
+ } catch (IOException e) {
+ e.printStackTrace();
+ logger.error("json error", e);
+ writeFailedMessage(failedData.getMessage());
+ }
+ }
+
}
private void writeFailedMessage(String message) {
diff --git a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java
index 279f681..8e8ded5 100644
--- a/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java
+++ b/datarangers-sdk-core/src/main/java/com/datarangers/util/HttpUtils.java
@@ -7,9 +7,7 @@
package com.datarangers.util;
import com.datarangers.config.HttpConfig;
-import com.datarangers.config.EventConfig;
import com.datarangers.config.RangersJSONConfig;
-import com.datarangers.logger.RangersLoggerWriter;
import com.datarangers.sender.Callback;
import com.datarangers.sender.Callback.FailedData;
@@ -27,7 +25,6 @@
import javax.net.ssl.X509TrustManager;
import org.apache.hc.client5.http.classic.HttpClient;
-import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
@@ -202,12 +199,12 @@ public static void request(String method, String url, String body, Map 2) {
logger.error(String.format("request error, io error: %s, resultStr: %s", e.getMessage(), resultStr), e);
- callback.onFailed(new FailedData(body, e.toString(), e));
+ callback.onFailed(new FailedData(body, e.toString(), e, isListable(url)));
} else {
count++;
post(url, body, headers, count);
@@ -217,7 +214,7 @@ public static void request(String method, String url, String body, Map
+
+ 4.0.0
+ 1.5.5-release
+ jar
+ com.datarangers
+ datarangers-sdk-starter
+ datarangers-server-sdk-starter
+ A SDK for datarangers user
+ https://github.com/volcengine/datarangers-sdk-java
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+
+ com.datarangers
+ datarangers-sdk-core
+ ${project.version}
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ 2.3.4.RELEASE
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.0.1
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ 2.3.4.RELEASE
+ test
+
+
+ log4j-to-slf4j
+ org.apache.logging.log4j
+
+
+
+
+ org.springframework.boot
+ spring-boot-test
+ 2.3.4.RELEASE
+ test
+
+
+ log4j-to-slf4j
+ org.apache.logging.log4j
+ 2.15.0
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.0
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6c43407
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,56 @@
+
+
+ 4.0.0
+
+ 1.5.5-release
+
+
+ com.datarangers
+ datarangers-sdk
+ pom
+ 1.5.5-release
+
+ datarangers-sdk-core
+ datarangers-sdk-starter
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.0
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+