diff --git a/.gitignore b/.gitignore index e7d27bb67fdff0929aeb422002614b2590ebdee6..cd63b81984fc1c999b80f8d8425b66b3fb0ddf7d 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ *.tar.gz *.rar *.idea +/target # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* diff --git a/src/main/java/com/jm/AsyncExecutor.java b/src/main/java/com/jm/AsyncExecutor.java index 1903df39ea4476147235a50397f8f34e1b046238..b4e3b94441aeee35f98222f861bd69110629620a 100644 --- a/src/main/java/com/jm/AsyncExecutor.java +++ b/src/main/java/com/jm/AsyncExecutor.java @@ -11,12 +11,12 @@ import java.util.concurrent.Executors; public class AsyncExecutor { - private ExecutorService executorService = Executors.newCachedThreadPool(); + private ExecutorService executorService; private SourceDataStream sourceDataStream; public static AsyncExecutor getInstance() { - return new AsyncExecutor(); + return getInstance(Executors.newCachedThreadPool()); } public static AsyncExecutor getInstance(ExecutorService executorService) { @@ -42,19 +42,18 @@ public class AsyncExecutor { return dataStream; } - @SuppressWarnings({"unchecked", "rawtypes"}) public void execute() { if (sourceDataStream == null) { return; } - DataStream dataStream = sourceDataStream; + DataStream dataStream = sourceDataStream; CollectorCoordinator coordinator; do { if (!(dataStream instanceof SinkDataStream)) { dataStream.setPartitioner(new Partitioner<>(dataStream.next().getParallelism())); } coordinator = new CollectorCoordinator(executorService, - dataStream.pref(), dataStream); + dataStream.prev(), dataStream); coordinator.start(); } while ((dataStream = dataStream.next()) != null); coordinator.waitFinished(); diff --git a/src/main/java/com/jm/memory/CollectorCoordinator.java b/src/main/java/com/jm/memory/CollectorCoordinator.java index a0621a515deb7aa9fd99361eec4dd5a0d8d3d119..2518709f9ec63a07fcb5bcffeabdc74bef6aded3 100644 --- a/src/main/java/com/jm/memory/CollectorCoordinator.java +++ b/src/main/java/com/jm/memory/CollectorCoordinator.java @@ -16,12 +16,13 @@ public class CollectorCoordinator { private final JsonMapper jsonMapper = new JsonMapper(); - private final DataStream sourceDataStream; + private final DataStream sourceDataStream; + @SuppressWarnings({"rawtypes"}) private final DataStream nextDataStream; - public CollectorCoordinator(ExecutorService executorService, DataStream sourceDataStream, - DataStream nextDataStream) { + public CollectorCoordinator(ExecutorService executorService, DataStream sourceDataStream, + DataStream nextDataStream) { this.executorService = executorService; this.sourceDataStream = sourceDataStream; this.nextDataStream = nextDataStream; @@ -37,7 +38,7 @@ public class CollectorCoordinator { nextDataStream.setFutures(futures); } - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"unchecked"}) private void consumerData(int index) { if (sourceDataStream == null) { nextDataStream.run(null); @@ -50,7 +51,7 @@ public class CollectorCoordinator { long dataSize = 0; try { Class inClass = Class.forName(nextDataStream.getInputClassName()); - Collector collector = sourceDataStream.getPartitioner().getCollector(index); + Collector collector = sourceDataStream.getPartitioner().getCollector(index); while (!sourceDataStream.isDone()) { MemorySegment memorySegment = collector.emitElement(); if (memorySegment == null) { @@ -75,12 +76,13 @@ public class CollectorCoordinator { } @SuppressWarnings({"unchecked"}) - private void emitElement(Class inClass, MemorySegment memorySegment, Collector collector) + private void emitElement(Class inClass, MemorySegment memorySegment, + Collector collector) throws IOException { try { byte[] bytes; while ((bytes = memorySegment.get()) != null) { - Object value = jsonMapper.readValue(bytes, inClass); + IN value = jsonMapper.readValue(bytes, inClass); nextDataStream.run(value); } } finally { diff --git a/src/main/java/com/jm/stream/DataStream.java b/src/main/java/com/jm/stream/DataStream.java index bb7c439c31363106b62d76fa2b570b885be77672..6a6f82e24ed5abe059170c62c041c8b6557ea979 100644 --- a/src/main/java/com/jm/stream/DataStream.java +++ b/src/main/java/com/jm/stream/DataStream.java @@ -4,7 +4,6 @@ import com.jm.func.FilterFunction; import com.jm.func.FlatMapFunction; import com.jm.func.MapFunction; import com.jm.func.SinkFunction; -import com.jm.memory.Collector; import com.jm.partition.Partitioner; import java.util.List; import java.util.concurrent.ExecutionException; @@ -18,7 +17,7 @@ public abstract class DataStream { protected String inputClassName; protected String name; private int parallelism = 1; - private DataStream pref; + private DataStream prev; private DataStream next; private List> futures; @@ -70,21 +69,21 @@ public abstract class DataStream { public DataStream map(MapFunction mapFunction) { MapOutputStream mapOutputStream = new MapOutputStream<>(mapFunction); next = mapOutputStream; - mapOutputStream.setPref(this); + mapOutputStream.setPrev(this); return mapOutputStream; } public DataStream flatMap(FlatMapFunction flatMapFunction) { FlatMapOutputStream flatMapOutputStream = new FlatMapOutputStream<>(flatMapFunction); next = flatMapOutputStream; - flatMapOutputStream.setPref(this); + flatMapOutputStream.setPrev(this); return flatMapOutputStream; } public DataStream filter(FilterFunction filterFunction) { FilterOutputStream filterOutputStream = new FilterOutputStream<>(filterFunction); next = filterOutputStream; - filterOutputStream.setPref(this); + filterOutputStream.setPrev(this); return filterOutputStream; } @@ -92,7 +91,7 @@ public abstract class DataStream { public DataStream sink(SinkFunction sinkFunction) { SinkDataStream sinkDataStream = new SinkDataStream<>(sinkFunction); next = sinkDataStream; - sinkDataStream.setPref(this); + sinkDataStream.setPrev(this); return sinkDataStream; } @@ -100,12 +99,12 @@ public abstract class DataStream { return next; } - public void setPref(DataStream pref) { - this.pref = pref; + public void setPrev(DataStream prev) { + this.prev = prev; } - public DataStream pref() { - return pref; + public DataStream prev() { + return prev; }