Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ datarangers-sdk-java 是 [DataFinder](https://www.volcengine.com/product/datafin
<dependency>
<groupId>com.datarangers</groupId>
<artifactId>datarangers-sdk-core</artifactId>
<version>1.5.12-release</version>
<version>1.5.13-release</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion datarangers-sdk-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>datarangers-sdk</artifactId>
<groupId>com.datarangers</groupId>
<version>1.5.12-release</version>
<version>1.5.13-release</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Consumer implements Runnable {

Expand Down Expand Up @@ -64,8 +66,18 @@ private void sendBatch() throws Exception {
List<Message> messages = collectorContainer.consume(sdkConfigProperties.getBatchSize(),
sdkConfigProperties.getWaitTimeMs());
if (messages != null && messages.size() > 0) {
MessageSenderFactory.getMessageSender(messages.get(0))
.sendBatch(messages, this.sdkConfigProperties);
// 根据appId 进行分组
Map<Integer, List<Message>> messagesMap =
messages.stream().collect(Collectors.groupingBy(n -> n.getAppMessage().getAppId()));
// 上报
for (Map.Entry<Integer, List<Message>> entry : messagesMap.entrySet()) {
List<Message> messageList = entry.getValue();
if(messageList != null && messageList.size() > 0) {
MessageSenderFactory.getMessageSender(messageList.get(0))
.sendBatch(messageList, this.sdkConfigProperties);
}
}

}
} catch (Throwable e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.GregorianCalendar;

public class Constants {
public static final String SDK_VERSION = "datarangers_sdk_1.5.12-release";
public static final String SDK_VERSION = "datarangers_sdk_1.5.13-release";
public static DateTimeFormatter FULL_HOUR = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
public static DateTimeFormatter FULL_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static final String APP_LOG_PATH = "/sdk/log";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class DataRangersSDKConfigProperties {
public String domain;

private String path;
private String batchPath;

private int threadCount = 20;

Expand Down Expand Up @@ -293,6 +294,14 @@ public void setPath(String path) {
this.path = path;
}

public String getBatchPath() {
return batchPath;
}

public void setBatchPath(String batchPath) {
this.batchPath = batchPath;
}

public ZoneOffset getTimeOffset() {
return timeOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public interface MessageSender {

/**
* 使用批量上报
* @param message
* @param messages
* @param sdkConfigProperties
*/
default void sendBatch(List<Message> message, DataRangersSDKConfigProperties sdkConfigProperties) {
default void sendBatch(List<Message> messages, DataRangersSDKConfigProperties sdkConfigProperties) {
throw new UnsupportedOperationException("Not support batch");
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.datarangers.message.MessageEnv;
import com.datarangers.message.MessageType;
import com.datarangers.sender.saas.SaasItemAppMessageSender;
import com.datarangers.sender.saas.SaasMessageSender;
import com.datarangers.sender.saas.SaasProfileAppMessageSender;
import com.datarangers.sender.saas.SaasServerAppMessageSender;
import com.datarangers.sender.saasnative.SaasNativeMessageSender;
Expand All @@ -25,16 +26,7 @@ public static MessageSender getMessageSender(Message message) {
if (MessageEnv.SAAS_NATIVE == messageEnv) {
return new SaasNativeMessageSender();
}
MessageType messageType = message.getMessageType();
switch (messageType) {
case EVENT:
return new SaasServerAppMessageSender();
case ITEM:
return new SaasItemAppMessageSender();
case PROFILE:
return new SaasProfileAppMessageSender();
default:
throw new IllegalArgumentException("Not support message: " + messageType);
}
// saas
return new SaasMessageSender();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper
}

@Override
public void sendBatch(List<Message> message, DataRangersSDKConfigProperties sdkConfigProperties) {
List<AppMessage> sendMessages = message.stream().map(n -> n.getAppMessage()).collect(Collectors.toList());
public void sendBatch(List<Message> messages, DataRangersSDKConfigProperties sdkConfigProperties) {
List<AppMessage> sendMessages = messages.stream().map(n -> n.getAppMessage()).collect(Collectors.toList());
HttpUtils.post(EventConfig.getAppListUrl(), RangersJSONConfig.getInstance().toJson(sendMessages), EventConfig.SEND_HEADER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper

}
}

@Override
public void sendBatch(List<Message> message, DataRangersSDKConfigProperties sdkConfigProperties) {
// 接口不支持批量,这里循环处理
for (Message m : message) {
send(m, sdkConfigProperties);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.datarangers.sender.saas;

import com.datarangers.config.DataRangersSDKConfigProperties;
import com.datarangers.message.Message;
import com.datarangers.message.MessageType;
import com.datarangers.sender.MessageSender;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author zhangpeng.spin
*/
public class SaasMessageSender implements MessageSender {
private Map<MessageType, MessageSender> senders = new HashMap<>();

public SaasMessageSender() {
senders.put(MessageType.EVENT, new SaasServerAppMessageSender());
senders.put(MessageType.PROFILE, new SaasProfileAppMessageSender());
senders.put(MessageType.ITEM, new SaasItemAppMessageSender());
}

@Override
public void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties) {
MessageSender messageSender = doGetSender(message.getMessageType());
messageSender.send(message, sdkConfigProperties);
}

@Override
public void sendBatch(List<Message> messages, DataRangersSDKConfigProperties sdkConfigProperties) {
// 根据类型进行分组
Map<MessageType, List<Message>> messageListMap =
messages.stream().collect(Collectors.groupingBy(Message::getMessageType));
for (Map.Entry<MessageType, List<Message>> entry : messageListMap.entrySet()) {
List<Message> messageList = entry.getValue();
if (messageList == null || messageList.isEmpty()) {
continue;
}
MessageSender messageSender = doGetSender(entry.getKey());
messageSender.sendBatch(messageList, sdkConfigProperties);
}
}

protected MessageSender doGetSender(MessageType messageType) {
MessageSender messageSender = senders.get(messageType);
if (messageSender == null) {
throw new IllegalArgumentException("Not support message: " + messageType);
}
return messageSender;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datarangers.util.AuthUtils;
import com.datarangers.util.HttpUtils;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -42,4 +43,12 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper

}

@Override
public void sendBatch(List<Message> message, DataRangersSDKConfigProperties sdkConfigProperties) {
// 接口不支持批量,这里循环处理
for (Message m : message) {
send(m, sdkConfigProperties);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,64 @@
import com.datarangers.message.saas.SaasServerAppMessage;
import com.datarangers.sender.MessageSender;
import com.datarangers.util.HttpUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @Author zhangpeng.spin@bytedance.com
* @Date 2021-07-22
*/
public class SaasServerAppMessageSender implements MessageSender {

private static final String path = "/v2/event/json";

@Override
public void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties) {
Object sendMessage = new SaasServerAppMessage(message);
String url = sdkConfigProperties.getDomain() + getPath(sdkConfigProperties);
Map<String, String> headers = new HashMap<>();
headers.putAll(EventConfig.SEND_HEADER);
Integer appId = message.getAppMessage().getAppId();
Map<Integer, String> appKeys = sdkConfigProperties.getAppKeys();
String appKey = appKeys.get(appId);
if(appKey == null){
throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId);
private static final String PATH = "/v2/event/json";

private static final String BATCH_PATH = "/v2/event/list";

@Override
public void send(Message message, DataRangersSDKConfigProperties sdkConfigProperties) {
Object sendMessage = new SaasServerAppMessage(message);
String url = sdkConfigProperties.getDomain() + getPath(sdkConfigProperties.getPath(), PATH);
Map<String, String> headers = new HashMap<>();
headers.putAll(EventConfig.SEND_HEADER);
Integer appId = message.getAppMessage().getAppId();
Map<Integer, String> appKeys = sdkConfigProperties.getAppKeys();
String appKey = appKeys.get(appId);
if (appKey == null) {
throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId);
}
headers.put(Constants.APP_KEY, appKey);
HttpUtils.post(url, RangersJSONConfig.getInstance().toJson(sendMessage), headers);
}
headers.put(Constants.APP_KEY, appKey);
HttpUtils.post(url, RangersJSONConfig.getInstance().toJson(sendMessage), headers);
}

protected String getPath(DataRangersSDKConfigProperties sdkConfigProperties) {
String domainPath = sdkConfigProperties.getPath();
if (domainPath == null || domainPath.trim().length() == 0) {
return path;

@Override
public void sendBatch(List<Message> messages, DataRangersSDKConfigProperties sdkConfigProperties) {
List<SaasServerAppMessage> sendMessages = messages.stream().map(n -> new SaasServerAppMessage(n))
.collect(Collectors.toList());
String url = sdkConfigProperties.getDomain() + getPath(sdkConfigProperties.getBatchPath(), BATCH_PATH);

Map<String, String> headers = new HashMap<>();
headers.putAll(EventConfig.SEND_HEADER);
Integer appId = messages.get(0).getAppMessage().getAppId();
Map<Integer, String> appKeys = sdkConfigProperties.getAppKeys();
String appKey = appKeys.get(appId);
if (appKey == null) {
throw new IllegalArgumentException("App key cannot be empty. app_id: " + appId);
}
headers.put(Constants.APP_KEY, appKey);
String body = RangersJSONConfig.getInstance().toJson(sendMessages);
HttpUtils.post(url, body, headers);
}
if (!domainPath.startsWith("/")) {
domainPath = "/" + domainPath;

protected String getPath(String domainPath, String defaultPath) {
if (domainPath == null || domainPath.trim().length() == 0) {
return defaultPath;
}
if (!domainPath.startsWith("/")) {
domainPath = "/" + domainPath;
}
return domainPath;
}
return domainPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public void send(Message message, DataRangersSDKConfigProperties sdkConfigProper
}

@Override
public void sendBatch(List<Message> message, DataRangersSDKConfigProperties sdkConfigProperties) {
Map<String, String> headers = getHeaders(message.get(0), sdkConfigProperties);
List<AppMessage> sendMessages = message.stream().map(n -> getSassNativeMessage(n)).collect(Collectors.toList());
public void sendBatch(List<Message> messages, DataRangersSDKConfigProperties sdkConfigProperties) {
Map<String, String> headers = getHeaders(messages.get(0), sdkConfigProperties);
List<AppMessage> sendMessages =
messages.stream().map(n -> getSassNativeMessage(n)).collect(Collectors.toList());
HttpUtils.post(EventConfig.getAppListUrl(), RangersJSONConfig.getInstance().toJson(sendMessages), headers);
}

Expand Down
2 changes: 1 addition & 1 deletion datarangers-sdk-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>datarangers-sdk</artifactId>
<groupId>com.datarangers</groupId>
<version>1.5.12-release</version>
<version>1.5.13-release</version>
</parent>
<groupId>com.datarangers</groupId>
<artifactId>datarangers-sdk-example</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,12 @@ public void sendItemProfile(int appId) {
.setName("Java")
.setPrice(100)
.setPublishDate("2010-10-11")
.setAuthors(Arrays.asList("zhangsan", "lisi"))
.setCategory("1"));
items.add(
new BookItem("1002", "book")
.setName("PHP")
.setPrice(100)
.setPublishDate("2021-07-20")
.setAuthors(Arrays.asList("zhangsan", "wanger"))
.setCategory("2"));

appEventCollector.itemSet(appId, "book", items);
Expand Down
Loading