對(duì)接fastbee
yml文件配置
配置kafka的服務(wù)器地址
bootstrap-servers: localhost:9092
更改為您的服務(wù)器IP地址
kafka:
bootstrap-servers: localhost:9092 # kafka 服務(wù)器集群地址,默認(rèn)為 localhost:9092
template:
default-topic: property_post #將消息發(fā)送到的默認(rèn)主題,KafkaTemplate.sendDefault
listener:
type: batch
kafka消費(fèi)者
@KafkaListener(topics = KafkaConstant.FMQ_PROPERTY_POST, groupId = KafkaConstant.FMQ_PROPERTY_POST_GROUP,
containerFactory = "propertyPostFactory", batch = "true")
public void propertyPostListen(String data) {
try {
//此處省略
} catch (Exception e) {
}
}
kafka生產(chǎn)者
public Promise<Void> transmit(ProducerVO vo) {
final Promise<Void> promise = defaultEventExecutorGroup.next().newPromise();
try {
String key = vo.getKey();
String jKey = key != null ? key : "default";
ProducerRecord<String,Object> record;
if (Objects.isNull(key)){
record = new ProducerRecord<>(vo.getTopic(), vo.getData());
}else {
record = new ProducerRecord<>(vo.getTopic(),jKey, vo.getData());
}
producer.send(record,(data ,e) -> {
if (e!= null){
promise.setFailure(e);
}else {
promise.setSuccess(null);
}
});
}catch (Exception e){
}
return promise;
}
運(yùn)行測(cè)試