From 8b896312b6f3fffa43cbe1af4e38ba567fbf8942 Mon Sep 17 00:00:00 2001 From: Andrea Burattin <andrea.burattin@gmail.com> Date: Thu, 24 Mar 2022 10:01:30 +0100 Subject: [PATCH] Fixed infinite size directly follows mappers --- src/main/java/beamline/events/BEvent.java | 32 +++++ .../mappers/DirectlyFollowsRelation.java | 56 --------- .../InfiniteSizeDirectlyFollowsMapper.java | 51 -------- .../InfiniteSizeDirectlyFollowsMapper.java | 41 +++++++ .../responses/DirectlyFollowsRelation.java | 89 ++++++++++++++ .../java/beamline/sources/MQTTXesSource.java | 12 +- src/test/java/beamline/tests/MapperTest.java | 36 ++++-- src/test/java/beamline/tests/SourcesTest.java | 110 +++++++++++------- src/test/java/beamline/tests/Utils.java | 6 +- 9 files changed, 263 insertions(+), 170 deletions(-) delete mode 100644 src/main/java/beamline/mappers/DirectlyFollowsRelation.java delete mode 100644 src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java create mode 100644 src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java create mode 100644 src/main/java/beamline/models/responses/DirectlyFollowsRelation.java diff --git a/src/main/java/beamline/events/BEvent.java b/src/main/java/beamline/events/BEvent.java index bc27273..c4de45f 100644 --- a/src/main/java/beamline/events/BEvent.java +++ b/src/main/java/beamline/events/BEvent.java @@ -6,6 +6,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.tuple.Pair; import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XTimeExtension; @@ -199,6 +201,36 @@ public class BEvent implements Serializable, Comparable<BEvent> { return getEventTime().compareTo(o.getEventTime()); } + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + BEvent other = (BEvent) obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(logAttributes, other.logAttributes) + .append(traceAttributes, other.traceAttributes) + .append(eventAttributes, other.eventAttributes) + .isEquals(); + + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(logAttributes) + .append(traceAttributes) + .append(eventAttributes) + .toHashCode(); + } + // // Private methods // diff --git a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java b/src/main/java/beamline/mappers/DirectlyFollowsRelation.java deleted file mode 100644 index a15690c..0000000 --- a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java +++ /dev/null @@ -1,56 +0,0 @@ -package beamline.mappers; - -import org.deckfour.xes.model.XEvent; - -/** - * This class represents a directly follows relation as produced by - * {@link InfiniteSizeDirectlyFollowsMapper}. - * - * @author Andrea Burattin - */ -public class DirectlyFollowsRelation { - - private String caseId; - private XEvent first; - private XEvent second; - - /** - * Constructor - * - * @param caseId the case id - * @param first the first event - * @param second the second event - */ - public DirectlyFollowsRelation(String caseId, XEvent first, XEvent second) { - this.caseId = caseId; - this.first = first; - this.second = second; - } - - /** - * Returns the case id this directly follows relation belongs to - * - * @return the case id - */ - public String getCaseId() { - return caseId; - } - - /** - * Returns the first event - * - * @return the first event - */ - public XEvent getFirst() { - return first; - } - - /** - * Returns the second event - * - * @return the second event - */ - public XEvent getSecond() { - return second; - } -} diff --git a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java deleted file mode 100644 index d0bd477..0000000 --- a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java +++ /dev/null @@ -1,51 +0,0 @@ -package beamline.mappers; -/* -import java.util.HashMap; -import java.util.Map; - -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.model.XEvent; -import org.deckfour.xes.model.XTrace; - -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.ObservableSource; -import io.reactivex.rxjava3.functions.Function; - -/** - * This mapper transforms a stream of {@link XTrace}s into a stream of - * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent - * events appearing in the same case as a directly follows operator (generating - * an object with type {@link DirectlyFollowsRelation}). - * - * <p> - * This mapper is called infinite because it's memory footprint will grow as the - * number of case ids grows as well. - * - * @author Andrea Burattin - * -public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> { - - private Map<String, XEvent> map = new HashMap<>(); - - @Override - public @NonNull ObservableSource<DirectlyFollowsRelation> apply(@NonNull XTrace t) throws Throwable { - String caseId = XConceptExtension.instance().extractName(t); - DirectlyFollowsRelation toRet = null; - - if (map.containsKey(caseId)) { - toRet = new DirectlyFollowsRelation(caseId, map.get(caseId), t.get(0)); - } - - map.put(caseId, t.get(0)); - - if (toRet == null) { - return Observable.empty(); - } else { - return Observable.just(toRet); - } - } - -} -*/ -class InfiniteSizeDirectlyFollowsMapper{} \ No newline at end of file diff --git a/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java new file mode 100644 index 0000000..6eadf94 --- /dev/null +++ b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java @@ -0,0 +1,41 @@ +package beamline.models.algorithms; + +import java.util.HashMap; +import java.util.Map; + +import org.deckfour.xes.model.XTrace; + +import beamline.events.BEvent; +import beamline.models.responses.DirectlyFollowsRelation; + +/** + * This mapper transforms a stream of {@link XTrace}s into a stream of + * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent + * events appearing in the same case as a directly follows operator (generating + * an object with type {@link DirectlyFollowsRelation}). + * + * <p> + * This mapper is called infinite because it's memory footprint will grow as the + * number of case ids grows as well. + * + * @author Andrea Burattin + */ +public class InfiniteSizeDirectlyFollowsMapper extends StreamMiningAlgorithm<DirectlyFollowsRelation> { + + private static final long serialVersionUID = 9114527510820073110L; + private Map<String, BEvent> map = new HashMap<>(); + + @Override + public DirectlyFollowsRelation ingest(BEvent event) throws Exception { + String caseId = event.getTraceName(); + DirectlyFollowsRelation toRet = null; + + if (map.containsKey(caseId)) { + toRet = new DirectlyFollowsRelation(map.get(caseId), event); + } + + map.put(caseId, event); + + return toRet; + } +} diff --git a/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java new file mode 100644 index 0000000..0b2742e --- /dev/null +++ b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java @@ -0,0 +1,89 @@ +package beamline.models.responses; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.tuple.Pair; + +import beamline.events.BEvent; +import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper; + +/** + * This class represents a directly follows relation as produced by + * {@link InfiniteSizeDirectlyFollowsMapper}. + * + * @author Andrea Burattin + */ +public class DirectlyFollowsRelation extends Response { + + private static final long serialVersionUID = 1775695752885219490L; + private Pair<BEvent, BEvent> pair; + + /** + * Constructor + * + * @param caseId the case id + * @param first the first event + * @param second the second event + */ + public DirectlyFollowsRelation(BEvent from, BEvent to) { + if (!from.getTraceName().equals(to.getTraceName())) { + throw new IllegalArgumentException(); + } + pair = Pair.of(from, to); + } + + /** + * Returns the case id this directly follows relation belongs to + * + * @return the case id + */ + public String getCaseId() { + return pair.getLeft().getTraceName(); + } + + /** + * Returns the source event + * + * @return the source event + */ + public BEvent getFrom() { + return pair.getLeft(); + } + + /** + * Returns the target event + * + * @return the target event + */ + public BEvent getTo() { + return pair.getRight(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + DirectlyFollowsRelation other = (DirectlyFollowsRelation) obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(getFrom(), other.getFrom()) + .append(getTo(), other.getTo()) + .isEquals(); + + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(getFrom()) + .append(getTo()) + .toHashCode(); + } +} diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java index d58c67c..a375f64 100644 --- a/src/main/java/beamline/sources/MQTTXesSource.java +++ b/src/main/java/beamline/sources/MQTTXesSource.java @@ -50,7 +50,7 @@ public class MQTTXesSource extends BeamlineAbstractSource { this.topicBase = topicBase; this.processName = processName; } - + @Override public void run(SourceContext<BEvent> ctx) throws Exception { Queue<BEvent> buffer = new LinkedList<>(); @@ -91,7 +91,6 @@ public class MQTTXesSource extends BeamlineAbstractSource { while(isRunning()) { while (isRunning() && buffer.isEmpty()) { - System.out.println("sleeping " + isRunning()); Thread.sleep(100l); } if (isRunning()) { @@ -100,7 +99,6 @@ public class MQTTXesSource extends BeamlineAbstractSource { } } } - System.out.println("aaa"); if (!isRunning() && myClient.isConnected()) { try { @@ -109,13 +107,5 @@ public class MQTTXesSource extends BeamlineAbstractSource { // nothing to do here } } - System.err.println("done"); - } - - @Override - public void cancel() { - // TODO Auto-generated method stub - super.cancel(); - System.out.println("closing"); } } diff --git a/src/test/java/beamline/tests/MapperTest.java b/src/test/java/beamline/tests/MapperTest.java index 59d758e..f4dc591 100644 --- a/src/test/java/beamline/tests/MapperTest.java +++ b/src/test/java/beamline/tests/MapperTest.java @@ -6,25 +6,43 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; -import beamline.mappers.DirectlyFollowsRelation; -import beamline.mappers.InfiniteSizeDirectlyFollowsMapper; +import beamline.events.BEvent; +import beamline.exceptions.EventException; +import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper; +import beamline.models.responses.DirectlyFollowsRelation; public class MapperTest { @Test - public void test_infinite_size_df() { + public void test_infinite_size_df() throws EventException, Exception { List<DirectlyFollowsRelation> results = new ArrayList<>(); - // <K,A,B,A,C> - Utils.generateObservableSameCaseId() + // <K,A,B,A,C>, <A,B,A> + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .fromElements( + BEvent.create("p", "K", "c"), + BEvent.create("p", "A", "c2"), + BEvent.create("p", "A", "c"), + BEvent.create("p", "B", "c"), + BEvent.create("p", "B", "c2"), + BEvent.create("p", "A", "c"), + BEvent.create("p", "A", "c2"), + BEvent.create("p", "C", "c")) + .keyBy(BEvent::getProcessName) .flatMap(new InfiniteSizeDirectlyFollowsMapper()) - .subscribe((df) -> results.add(df)); + .executeAndCollect().forEachRemaining((DirectlyFollowsRelation e) -> { + results.add(e); + }); - assertEquals(4, results.size()); + assertEquals(6, results.size()); assertTrue(Utils.verifyDirectFollows(results.get(0), "K", "A", "c")); assertTrue(Utils.verifyDirectFollows(results.get(1), "A", "B", "c")); - assertTrue(Utils.verifyDirectFollows(results.get(2), "B", "A", "c")); - assertTrue(Utils.verifyDirectFollows(results.get(3), "A", "C", "c")); + assertTrue(Utils.verifyDirectFollows(results.get(2), "A", "B", "c2")); + assertTrue(Utils.verifyDirectFollows(results.get(3), "B", "A", "c")); + assertTrue(Utils.verifyDirectFollows(results.get(4), "B", "A", "c2")); + assertTrue(Utils.verifyDirectFollows(results.get(5), "A", "C", "c")); } } diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java index 211761e..315bbfe 100644 --- a/src/test/java/beamline/tests/SourcesTest.java +++ b/src/test/java/beamline/tests/SourcesTest.java @@ -5,7 +5,9 @@ import static org.hamcrest.Matchers.hasItems; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import java.io.File; import java.net.URI; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; @@ -14,12 +16,21 @@ import java.util.List; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.commons.lang3.CharSet; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; @@ -27,6 +38,7 @@ import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; +import com.google.common.io.Files; import com.opencsv.CSVParserBuilder; import beamline.events.BEvent; @@ -183,62 +195,80 @@ public class SourcesTest { // brokerService.start(); // brokerService.waitUntilStarted(); // -// List<String> acts = new LinkedList<>(); +// final List<String> acts = new LinkedList<>(); // List<String> caseIds = new LinkedList<>(); // // MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); // -// new Thread(new Runnable() { -// -// @Override -// public void run() { -// try { -// Thread.sleep(5000); -// System.out.println("going..."); -// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); -// client.connect(); -// -// publish(client, "c1", "a11"); -// publish(client, "c2", "a21"); -// publish(client, "c2", "a22"); -// publish(client, "c1", "a12"); -// publish(client, "c2", "a23"); -// s.cancel(); -// -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } catch (MqttException e) { -// e.printStackTrace(); -// } -// } -// }).start(); +// // create the sink +// File tmpFile = File.createTempFile("mqtt", "log"); +// StreamingFileSink<BEvent> sink = StreamingFileSink.forRowFormat(Path.fromLocalFile(tmpFile), new SimpleStringEncoder<BEvent>()).build(); +//// val sink: StreamingFileSink[String] = StreamingFileSink +//// .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")) +//// .build() +// // // // create actual source // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStream<BEvent> stream = env.addSource(s); +// stream.addSink(sink); +// JobClient job = env.executeAsync(); +// +//// Thread.sleep(1000); +// +// System.out.println(tmpFile); +// +// System.out.println("going"); +// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); +// client.connect(); +// publish(client, "c1", "a11"); +// publish(client, "c2", "a21"); +// publish(client, "c2", "a22"); +// publish(client, "c1", "a12"); +// publish(client, "c2", "a23"); +// +// Thread.sleep(1000); +//// job.cancel(); +// System.out.println(job.getJobStatus().isDone()); +// +// System.out.println(Files.readLines(tmpFile, Charset.defaultCharset())); +//// System.out.println("final " + acts); +//// +//// +//// +//// System.out.println("post-final " + acts); +//// +////// System.out.println("1"); +//// stream.executeAndCollect().forEachRemaining((BEvent e) -> { +//// System.out.println(e); +//// acts.add(e.getEventName()); +//// caseIds.add(e.getTraceName()); +//// }); +//// JobClient job = env.executeAsync(); +//// +//// Thread.sleep(1000); +//// job.cancel(); +//// +//// Thread.sleep(1000); // -// stream.executeAndCollect().forEachRemaining((BEvent e) -> { -// System.out.println(e); -// acts.add(e.getEventName()); -// caseIds.add(e.getTraceName()); -// }); +//// assertThat(acts, hasItems("a11","a21","a22","a12","a23")); +//// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); +//// +//// System.out.println("3"); // -// System.out.println("3"); // -// assertThat(acts, hasItems("a11","a21","a22","a12","a23")); -// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); // // } catch (Exception e) { // e.printStackTrace(); // } // } - -// @Test -// public void test_mqtt_2() { -// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); -// assertThrowsExactly(SourceException.class, () -> source.prepare()); -// } - +// +//// @Test +//// public void test_mqtt_2() { +//// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); +//// assertThrowsExactly(SourceException.class, () -> source.prepare()); +//// } +// // protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { // client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); // } diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java index 035ea22..746b548 100644 --- a/src/test/java/beamline/tests/Utils.java +++ b/src/test/java/beamline/tests/Utils.java @@ -14,7 +14,7 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; import beamline.events.BEvent; import beamline.exceptions.EventException; -import beamline.mappers.DirectlyFollowsRelation; +import beamline.models.responses.DirectlyFollowsRelation; public class Utils { @@ -81,8 +81,8 @@ public class Utils { } public static boolean verifyDirectFollows(DirectlyFollowsRelation df, String a1, String a2, String caseId) { - String df_a1 = XConceptExtension.instance().extractName(df.getFirst()); - String df_a2 = XConceptExtension.instance().extractName(df.getSecond()); + String df_a1 = df.getFrom().getEventName(); + String df_a2 = df.getTo().getEventName(); return df_a1.equals(a1) && df_a2.equals(a2) && df.getCaseId().equals(caseId); } } -- GitLab