diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..93c32986f2b22467c0f539d41fbcc267dbab70b6 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +## 项目介绍 +Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用。 + + +### 场景说明 +Kafka作为一个中间件组件,具有与其他中间件组件通用的功能 (异步处理、系统解耦、流量削峰、日志处理), +但在某些特殊的功能方面,每个中间件拥有其独特的特性,其中Kafka作为一个具有高吞吐、高性能的中间件, +它也有其不足的地方,在某些应用场景下面要求中间件实现消息延时的功能,但Kafka本身是不具备这种能力的。 + +此DEMO项目实现了每条消息实现自定义的延时。 + +### 时序图 +![](img/Kafka消息延时时序图.png) + +### 参数指南 +本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, +如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 +如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。 +#### Delay.java参数详情 +1. delay:自定义延时时间,单位ms。 +2. topic_delay变量:用于临时存储消息的topic名称。 +3. topic_out变量:用于消费者拉取消息消费的topic名称。 +4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs + +### 调用结果 +![](./img/延时结果.png) + +### 问题反馈通道 +https://support.developer.huaweicloud.com/feedback/ \ No newline at end of file diff --git "a/img/Kafka\346\266\210\346\201\257\345\273\266\346\227\266\346\227\266\345\272\217\345\233\276.png" "b/img/Kafka\346\266\210\346\201\257\345\273\266\346\227\266\346\227\266\345\272\217\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..b4b1a03643edbcaf9f9cf52309adda8a241fa8fd Binary files /dev/null and "b/img/Kafka\346\266\210\346\201\257\345\273\266\346\227\266\346\227\266\345\272\217\345\233\276.png" differ diff --git "a/img/\345\273\266\346\227\266\347\273\223\346\236\234.png" "b/img/\345\273\266\346\227\266\347\273\223\346\236\234.png" new file mode 100644 index 0000000000000000000000000000000000000000..9920a67c8002e325846fd1d33003f49da615035b Binary files /dev/null and "b/img/\345\273\266\346\227\266\347\273\223\346\236\234.png" differ diff --git a/src/main/java/com/dms/delay/Delay.java b/src/main/java/com/dms/delay/Delay.java index d300494960f71c34e19935c6977e6b09059ec790..e9528cef867f1b2732e69c8fe7505071bcc5bcc2 100644 --- a/src/main/java/com/dms/delay/Delay.java +++ b/src/main/java/com/dms/delay/Delay.java @@ -31,7 +31,7 @@ public class Delay { public static void main( String[] args ) { //延时主题(用于控制延时缓冲) - String delay = "delay"; + String topic_delay = "topic_delay"; //输出主题(直接供消费者消费) String topic_out = "topic_out"; /* @@ -42,7 +42,7 @@ public class Delay { public void run() { //消费者配置。请根据需要自行设置Kafka配置 Properties props = new Properties(); - props.setProperty("bootstrap.servers", "192.168.0.161:9092,192.168.0.131:9092,192.168.0.132:9092"); + props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); @@ -51,7 +51,7 @@ public class Delay { //创建消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); //指定消费主题 - consumer.subscribe(Arrays.asList(delay)); + consumer.subscribe(Arrays.asList(topic_delay)); while (true) { //轮询消费 ConsumerRecords records = consumer.poll(Duration.ofMillis(10)); @@ -72,7 +72,7 @@ public class Delay { public void run() { //生产者配置(请根据需求自行配置) Properties props = new Properties(); - props.put("bootstrap.servers", "192.168.0.161:9092,192.168.0.131:9092,192.168.0.132:9092"); + props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); diff --git a/src/main/resources/dms.sdk.consumer.properties b/src/main/resources/dms.sdk.consumer.properties index 716d9b1defb420a53d0a58810cc225e0b5fe4aab..4c3466512e88e33dc013f19d8364f99f7f362488 100644 --- a/src/main/resources/dms.sdk.consumer.properties +++ b/src/main/resources/dms.sdk.consumer.properties @@ -1,5 +1,5 @@ #\u5EFA\u7ACB\u4E0Ekafka\u96C6\u7FA4\u8FDE\u63A5\u7684host/port\u7EC4\uFF0C\u8BF7\u901A\u8FC7\u63A7\u5236\u53F0\u516C\u7F51\u8BBF\u95EE\u83B7\u53D6 -bootstrap.servers=192.168.0.161:9092,192.168.0.131:9092,192.168.0.132:9092 +bootstrap.servers=192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092 #\u7528\u6765\u552F\u4E00\u6807\u8BC6consumer\u8FDB\u7A0B\u6240\u5728\u7EC4\u7684\u5B57\u7B26\u4E32\uFF0C\u5982\u679C\u8BBE\u7F6E\u540C\u6837\u7684group id\uFF0C\u8868\u793A\u8FD9\u4E9Bprocesses\u90FD\u662F\u5C5E\u4E8E\u540C\u4E00\u4E2Aconsumer group group.id=test #\u952E\u7684\u5E8F\u5217\u5316\u65B9\u5F0F diff --git a/src/main/resources/dms.sdk.producer.properties b/src/main/resources/dms.sdk.producer.properties index 9f79128492d3d0569f5db3f5aeb1db44a6d3992f..9525c46b7c0780fea384df185ab648213ae4779a 100644 --- a/src/main/resources/dms.sdk.producer.properties +++ b/src/main/resources/dms.sdk.producer.properties @@ -1,7 +1,7 @@ #topic\u540D\u79F0\u5728\u5177\u4F53\u7684\u751F\u4EA7\u4E0E\u6D88\u8D39\u4EE3\u7801\u4E2D\u3002 ####################### #kafka\u5B9E\u4F8B\u7684broker\u4FE1\u606F\uFF0Cip:port\u4E3A\u5B9E\u4F8B\u7684\u8FDE\u63A5\u5730\u5740\u548C\u7AEF\u53E3\uFF0C\u53C2\u8003\u201C\u6536\u96C6\u8FDE\u63A5\u4FE1\u606F\u201D\u7AE0\u8282\u83B7\u53D6\u3002\u4E3E\u4F8B\uFF1Abootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x -bootstrap.servers=192.168.0.161:9092,192.168.0.131:9092,192.168.0.132:9092 +bootstrap.servers=192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092 #\u53D1\u9001\u786E\u8BA4\u53C2\u6570 acks=all #\u952E\u7684\u5E8F\u5217\u5316\u65B9\u5F0F diff --git a/src/test/java/com/dms/producer/DmsProducerTest.java b/src/test/java/com/dms/producer/DmsProducerTest.java index ee41bb3b08fe98cae5124de03cf85eb6776ecd23..1f123c7869e313e2c7bb65511b4cea9e68bb151f 100644 --- a/src/test/java/com/dms/producer/DmsProducerTest.java +++ b/src/test/java/com/dms/producer/DmsProducerTest.java @@ -15,7 +15,7 @@ public class DmsProducerTest { Thread.sleep(1000); String data = "生产" + (i++);//设置生产的数据 // - producer.produce("delay", data, new Callback() + producer.produce("topic_delay", data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception)