diff --git a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java index bd90c53657a2ad6210afe0c2e370dc044fb25607..2c8398c7ba12d344d08012e990907f06e46fa5a8 100644 --- a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java +++ b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java @@ -1,15 +1,16 @@ package beamline.models.algorithms; import java.io.IOException; -import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; import beamline.events.BEvent; +import beamline.models.responses.Response; /** * This abstract class defines the root of the mining algorithms hierarchy. It @@ -18,30 +19,22 @@ import beamline.events.BEvent; * * @author Andrea Burattin */ -public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Serializable> { +public abstract class StreamMiningAlgorithm<T extends Response> extends RichFlatMapFunction<BEvent, T> { private static final long serialVersionUID = 10170817098305999L; private transient ValueState<Long> processedEvents; - private transient ValueState<Serializable> latestResponse; - private transient HookEventProcessing onBeforeEvent = null; - private transient HookEventProcessing onAfterEvent = null; @Override public void open(Configuration parameters) throws Exception { - processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processede vents", Long.class)); - latestResponse = getRuntimeContext().getState(new ValueStateDescriptor<>("latest response", Serializable.class)); + processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processed-events", Long.class)); } @Override - public Serializable map(BEvent t) throws Exception { - if (onBeforeEvent != null) { - onBeforeEvent.trigger(); + public void flatMap(BEvent event, Collector<T> out) throws Exception { + T latestResponse = process(event); + if (latestResponse != null) { + out.collect(latestResponse); } - process(t); - if (onAfterEvent != null) { - onAfterEvent.trigger(); - } - return getLatestResponse(); } /** @@ -51,8 +44,9 @@ public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Seri * * @param event the new event being observed * @return the result of the mining of the event + * @throws Exception */ - public abstract Serializable ingest(BEvent event); + public abstract T ingest(BEvent event) throws Exception; /** * Returns the total number of events processed so far @@ -71,65 +65,25 @@ public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Seri return -1; } - /** - * Returns the latest result of the mining - * - * @return the latest result of the mining - */ - public Serializable getLatestResponse() { - try { - return latestResponse.value(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - - /** - * This method can be used to set a hook to a callback function to be - * executed before an event is processed - * - * @param onBeforeEvent the callback function - */ - public void setOnBeforeEvent(HookEventProcessing onBeforeEvent) { - this.onBeforeEvent = onBeforeEvent; - } - - /** - * This method can be used to set a hook to a callback function to be - * executed after an event is processed - * - * @param onAfterEvent the callback function - */ - public void setOnAfterEvent(HookEventProcessing onAfterEvent) { - this.onAfterEvent = onAfterEvent; - } /* * The internal processor in charge of updating the internal status of the * map. */ - protected void process(BEvent event) { + protected T process(BEvent event) throws Exception { try { long value = 1; if (processedEvents.value() != null) { value = processedEvents.value() + 1; } processedEvents.update(value); - latestResponse.update(ingest(event)); } catch (IOException e) { e.printStackTrace(); } - } - - /* - * Setter of the latest response onto the status. - */ - protected void setLatestResponse(Serializable latestResponse) { - try { - this.latestResponse.update(latestResponse); - } catch (IOException e) { - e.printStackTrace(); + T tmp = ingest(event); + if (tmp != null) { + tmp.setProcessedEvents(getProcessedEvents()); } + return tmp; } } diff --git a/src/main/java/beamline/models/responses/GraphvizResponse.java b/src/main/java/beamline/models/responses/GraphvizResponse.java index d91a232a1bca47dd70eac9c7ad8eaf857dfd8262..2dcef1ef42b3106c20f39d19a2e1f88e992fda58 100644 --- a/src/main/java/beamline/models/responses/GraphvizResponse.java +++ b/src/main/java/beamline/models/responses/GraphvizResponse.java @@ -8,7 +8,9 @@ import beamline.graphviz.Dot; * * @author Andrea Burattin */ -public interface GraphvizResponse extends Response { +public abstract class GraphvizResponse extends Response { + + private static final long serialVersionUID = 7232727657074096321L; /** * Returns the Dot representation of the response diff --git a/src/main/java/beamline/models/responses/Response.java b/src/main/java/beamline/models/responses/Response.java index c728c69cb7ca04379efe7af671c19bac0616301d..256972c6bccf772cca42fa67197872c44df0cce2 100644 --- a/src/main/java/beamline/models/responses/Response.java +++ b/src/main/java/beamline/models/responses/Response.java @@ -1,10 +1,24 @@ package beamline.models.responses; +import java.io.Serializable; + /** * Marker interface used to define the type of the responses * * @author Andrea Burattin */ -public interface Response { +public class Response implements Serializable { + + private static final long serialVersionUID = 3104314256198741100L; + private Long processedEvents; + + public Long getProcessedEvents() { + return processedEvents; + } + public void setProcessedEvents(Long processedEvents) { + this.processedEvents = processedEvents; + } + + } diff --git a/src/main/java/beamline/models/responses/StringResponse.java b/src/main/java/beamline/models/responses/StringResponse.java new file mode 100644 index 0000000000000000000000000000000000000000..1f27d853d665daaa4a90b48a0a1c338f9cb8cac2 --- /dev/null +++ b/src/main/java/beamline/models/responses/StringResponse.java @@ -0,0 +1,24 @@ +package beamline.models.responses; + +public class StringResponse extends Response { + + private static final long serialVersionUID = 7863665787098981704L; + private String str; + + public StringResponse(String str) { + set(str); + } + + public String get() { + return str; + } + + public void set(String str) { + this.str = str; + } + + @Override + public String toString() { + return str; + } +} diff --git a/src/test/java/beamline/tests/AlgorithmTest.java b/src/test/java/beamline/tests/AlgorithmTest.java index 5ab62633b5582a6c46dfef6172a69a7f6675fb13..69997337a9c890c37ab44bf19c201a9d5ae05d3b 100644 --- a/src/test/java/beamline/tests/AlgorithmTest.java +++ b/src/test/java/beamline/tests/AlgorithmTest.java @@ -6,10 +6,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; @@ -23,11 +25,13 @@ import org.junit.jupiter.api.Test; import beamline.events.BEvent; import beamline.models.algorithms.StreamMiningAlgorithm; import beamline.models.responses.Response; +import beamline.models.responses.StringResponse; +import beamline.sources.CSVLogSource; public class AlgorithmTest { // private OneInputStreamOperatorTestHarness<String, Long> testHarness; - private StreamMiningAlgorithm statefulFlatMapFunction; +// private StreamMiningAlgorithm statefulFlatMapFunction; // @BeforeEach // public void setupTestHarness() throws Exception { @@ -47,21 +51,62 @@ public class AlgorithmTest { @Test public void test_result() throws Exception { - StreamMiningAlgorithm m = new StreamMiningAlgorithm() { - private static final long serialVersionUID = 3268754545347297698L; - +// List<String> acts = new LinkedList<>(); +// List<String> caseIds = new LinkedList<>(); +// CSVLogSource source = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1); +// +// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// DataStream<BEvent> stream = env.addSource(source); +// stream.executeAndCollect().forEachRemaining((BEvent e) -> { +// acts.add(e.getEventName()); +// caseIds.add(e.getTraceName()); +// }); +// +// assertEquals(5, acts.size()); +// assertEquals(5, caseIds.size()); +// +// assertThat(acts, hasItems("a11","a21","a22","a12","a23")); +// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); + + StreamMiningAlgorithm<StringResponse> m = new StreamMiningAlgorithm<StringResponse>() { @Override - public Serializable ingest(BEvent event) { - int product = 1; - if (getLatestResponse() != null) { - product = (int) getLatestResponse(); - } - product *= Integer.parseInt(event.getEventName()); - setLatestResponse(-product); - return product; + public StringResponse ingest(BEvent event) throws Exception { + return new StringResponse(event.getProcessName() + event.getEventName() + event.getTraceName()); } }; + List<String> events = new LinkedList<>(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .fromElements( + BEvent.create("p", "3", "c1"), + BEvent.create("p", "7", "c1"), + BEvent.create("p", "11", "c1"), + BEvent.create("p", "13", "c1")) + .keyBy(BEvent::getProcessName) + .flatMap(m) + .executeAndCollect().forEachRemaining((StringResponse e) -> { + events.add(e.get()); + }); + + assertEquals(4, events.size()); + assertThat(events, hasItems("p3c1","p7c1","p11c1","p13c1")); + +// StreamMiningAlgorithm m = new StreamMiningAlgorithm() { +// private static final long serialVersionUID = 3268754545347297698L; +// +// @Override +// public Serializable ingest(BEvent event) { +// int product = 1; +// if (getLatestResponse() != null) { +// product = (int) getLatestResponse(); +// } +// product *= Integer.parseInt(event.getEventName()); +// setLatestResponse(-product); +// return product; +// } +// }; + // private OneInputStreamOperatorTestHarness<BEvent, Serializable> testHarness = new OneInputStreamOperatorTestHarness<BEvent, Serializable>(m); // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.fromElements( @@ -73,10 +118,10 @@ public class AlgorithmTest { // Collector<BEvent> stream = mock - System.out.println(m.getProcessedEvents()); +// System.out.println(m.getProcessedEvents()); - assertEquals(4l, m.getProcessedEvents()); - assertEquals(3003, m.getLatestResponse()); +// assertEquals(4l, m.getProcessedEvents()); +// assertEquals(3003, m.getLatestResponse()); } // @Test