From fe4be40d4cd56e91d98808a18332576b5a6e6d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93?= <15335194440@163.com> Date: Wed, 12 Jul 2023 15:14:16 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=96=E7=A0=81=E8=A7=84=E8=8C=83=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + src/main/java/com/jm/AsyncExecutor.java | 9 ++++----- .../com/jm/memory/CollectorCoordinator.java | 16 +++++++++------- src/main/java/com/jm/stream/DataStream.java | 19 +++++++++---------- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index e7d27bb..cd63b81 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ *.tar.gz *.rar *.idea +/target # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* diff --git a/src/main/java/com/jm/AsyncExecutor.java b/src/main/java/com/jm/AsyncExecutor.java index 1903df3..b4e3b94 100644 --- a/src/main/java/com/jm/AsyncExecutor.java +++ b/src/main/java/com/jm/AsyncExecutor.java @@ -11,12 +11,12 @@ import java.util.concurrent.Executors; public class AsyncExecutor { - private ExecutorService executorService = Executors.newCachedThreadPool(); + private ExecutorService executorService; private SourceDataStream sourceDataStream; public static AsyncExecutor getInstance() { - return new AsyncExecutor(); + return getInstance(Executors.newCachedThreadPool()); } public static AsyncExecutor getInstance(ExecutorService executorService) { @@ -42,19 +42,18 @@ public class AsyncExecutor { return dataStream; } - @SuppressWarnings({"unchecked", "rawtypes"}) public void execute() { if (sourceDataStream == null) { return; } - DataStream dataStream = sourceDataStream; + DataStream dataStream = sourceDataStream; CollectorCoordinator coordinator; do { if (!(dataStream instanceof SinkDataStream)) { dataStream.setPartitioner(new Partitioner<>(dataStream.next().getParallelism())); } coordinator = new CollectorCoordinator(executorService, - dataStream.pref(), dataStream); + dataStream.prev(), dataStream); coordinator.start(); } while ((dataStream = dataStream.next()) != null); coordinator.waitFinished(); diff --git a/src/main/java/com/jm/memory/CollectorCoordinator.java b/src/main/java/com/jm/memory/CollectorCoordinator.java index a0621a5..2518709 100644 --- a/src/main/java/com/jm/memory/CollectorCoordinator.java +++ b/src/main/java/com/jm/memory/CollectorCoordinator.java @@ -16,12 +16,13 @@ public class CollectorCoordinator { private final JsonMapper jsonMapper = new JsonMapper(); - private final DataStream sourceDataStream; + private final DataStream sourceDataStream; + @SuppressWarnings({"rawtypes"}) private final DataStream nextDataStream; - public CollectorCoordinator(ExecutorService executorService, DataStream sourceDataStream, - DataStream nextDataStream) { + public CollectorCoordinator(ExecutorService executorService, DataStream sourceDataStream, + DataStream nextDataStream) { this.executorService = executorService; this.sourceDataStream = sourceDataStream; this.nextDataStream = nextDataStream; @@ -37,7 +38,7 @@ public class CollectorCoordinator { nextDataStream.setFutures(futures); } - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"unchecked"}) private void consumerData(int index) { if (sourceDataStream == null) { nextDataStream.run(null); @@ -50,7 +51,7 @@ public class CollectorCoordinator { long dataSize = 0; try { Class inClass = Class.forName(nextDataStream.getInputClassName()); - Collector collector = sourceDataStream.getPartitioner().getCollector(index); + Collector collector = sourceDataStream.getPartitioner().getCollector(index); while (!sourceDataStream.isDone()) { MemorySegment memorySegment = collector.emitElement(); if (memorySegment == null) { @@ -75,12 +76,13 @@ public class CollectorCoordinator { } @SuppressWarnings({"unchecked"}) - private void emitElement(Class inClass, MemorySegment memorySegment, Collector collector) + private void emitElement(Class inClass, MemorySegment memorySegment, + Collector collector) throws IOException { try { byte[] bytes; while ((bytes = memorySegment.get()) != null) { - Object value = jsonMapper.readValue(bytes, inClass); + IN value = jsonMapper.readValue(bytes, inClass); nextDataStream.run(value); } } finally { diff --git a/src/main/java/com/jm/stream/DataStream.java b/src/main/java/com/jm/stream/DataStream.java index bb7c439..6a6f82e 100644 --- a/src/main/java/com/jm/stream/DataStream.java +++ b/src/main/java/com/jm/stream/DataStream.java @@ -4,7 +4,6 @@ import com.jm.func.FilterFunction; import com.jm.func.FlatMapFunction; import com.jm.func.MapFunction; import com.jm.func.SinkFunction; -import com.jm.memory.Collector; import com.jm.partition.Partitioner; import java.util.List; import java.util.concurrent.ExecutionException; @@ -18,7 +17,7 @@ public abstract class DataStream { protected String inputClassName; protected String name; private int parallelism = 1; - private DataStream pref; + private DataStream prev; private DataStream next; private List> futures; @@ -70,21 +69,21 @@ public abstract class DataStream { public DataStream map(MapFunction mapFunction) { MapOutputStream mapOutputStream = new MapOutputStream<>(mapFunction); next = mapOutputStream; - mapOutputStream.setPref(this); + mapOutputStream.setPrev(this); return mapOutputStream; } public DataStream flatMap(FlatMapFunction flatMapFunction) { FlatMapOutputStream flatMapOutputStream = new FlatMapOutputStream<>(flatMapFunction); next = flatMapOutputStream; - flatMapOutputStream.setPref(this); + flatMapOutputStream.setPrev(this); return flatMapOutputStream; } public DataStream filter(FilterFunction filterFunction) { FilterOutputStream filterOutputStream = new FilterOutputStream<>(filterFunction); next = filterOutputStream; - filterOutputStream.setPref(this); + filterOutputStream.setPrev(this); return filterOutputStream; } @@ -92,7 +91,7 @@ public abstract class DataStream { public DataStream sink(SinkFunction sinkFunction) { SinkDataStream sinkDataStream = new SinkDataStream<>(sinkFunction); next = sinkDataStream; - sinkDataStream.setPref(this); + sinkDataStream.setPrev(this); return sinkDataStream; } @@ -100,12 +99,12 @@ public abstract class DataStream { return next; } - public void setPref(DataStream pref) { - this.pref = pref; + public void setPrev(DataStream prev) { + this.prev = prev; } - public DataStream pref() { - return pref; + public DataStream prev() { + return prev; } -- Gitee