Skip to content
Snippets Groups Projects
Commit 8b896312 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Fixed infinite size directly follows mappers

parent 51de3428
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,8 @@ import java.util.Date; ...@@ -6,6 +6,8 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension; import org.deckfour.xes.extension.std.XTimeExtension;
...@@ -199,6 +201,36 @@ public class BEvent implements Serializable, Comparable<BEvent> { ...@@ -199,6 +201,36 @@ public class BEvent implements Serializable, Comparable<BEvent> {
return getEventTime().compareTo(o.getEventTime()); return getEventTime().compareTo(o.getEventTime());
} }
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
BEvent other = (BEvent) obj;
return new EqualsBuilder()
.appendSuper(super.equals(obj))
.append(logAttributes, other.logAttributes)
.append(traceAttributes, other.traceAttributes)
.append(eventAttributes, other.eventAttributes)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(logAttributes)
.append(traceAttributes)
.append(eventAttributes)
.toHashCode();
}
// //
// Private methods // Private methods
// //
......
package beamline.mappers;
import org.deckfour.xes.model.XEvent;
/**
* This class represents a directly follows relation as produced by
* {@link InfiniteSizeDirectlyFollowsMapper}.
*
* @author Andrea Burattin
*/
public class DirectlyFollowsRelation {
private String caseId;
private XEvent first;
private XEvent second;
/**
* Constructor
*
* @param caseId the case id
* @param first the first event
* @param second the second event
*/
public DirectlyFollowsRelation(String caseId, XEvent first, XEvent second) {
this.caseId = caseId;
this.first = first;
this.second = second;
}
/**
* Returns the case id this directly follows relation belongs to
*
* @return the case id
*/
public String getCaseId() {
return caseId;
}
/**
* Returns the first event
*
* @return the first event
*/
public XEvent getFirst() {
return first;
}
/**
* Returns the second event
*
* @return the second event
*/
public XEvent getSecond() {
return second;
}
}
package beamline.mappers; package beamline.models.algorithms;
/*
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XTrace; import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.core.Observable; import beamline.models.responses.DirectlyFollowsRelation;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.functions.Function;
/** /**
* This mapper transforms a stream of {@link XTrace}s into a stream of * This mapper transforms a stream of {@link XTrace}s into a stream of
...@@ -23,29 +19,23 @@ import io.reactivex.rxjava3.functions.Function; ...@@ -23,29 +19,23 @@ import io.reactivex.rxjava3.functions.Function;
* number of case ids grows as well. * number of case ids grows as well.
* *
* @author Andrea Burattin * @author Andrea Burattin
* */
public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> { public class InfiniteSizeDirectlyFollowsMapper extends StreamMiningAlgorithm<DirectlyFollowsRelation> {
private Map<String, XEvent> map = new HashMap<>(); private static final long serialVersionUID = 9114527510820073110L;
private Map<String, BEvent> map = new HashMap<>();
@Override @Override
public @NonNull ObservableSource<DirectlyFollowsRelation> apply(@NonNull XTrace t) throws Throwable { public DirectlyFollowsRelation ingest(BEvent event) throws Exception {
String caseId = XConceptExtension.instance().extractName(t); String caseId = event.getTraceName();
DirectlyFollowsRelation toRet = null; DirectlyFollowsRelation toRet = null;
if (map.containsKey(caseId)) { if (map.containsKey(caseId)) {
toRet = new DirectlyFollowsRelation(caseId, map.get(caseId), t.get(0)); toRet = new DirectlyFollowsRelation(map.get(caseId), event);
} }
map.put(caseId, t.get(0)); map.put(caseId, event);
if (toRet == null) { return toRet;
return Observable.empty();
} else {
return Observable.just(toRet);
} }
} }
}
*/
class InfiniteSizeDirectlyFollowsMapper{}
\ No newline at end of file
package beamline.models.responses;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.tuple.Pair;
import beamline.events.BEvent;
import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper;
/**
* This class represents a directly follows relation as produced by
* {@link InfiniteSizeDirectlyFollowsMapper}.
*
* @author Andrea Burattin
*/
public class DirectlyFollowsRelation extends Response {
private static final long serialVersionUID = 1775695752885219490L;
private Pair<BEvent, BEvent> pair;
/**
* Constructor
*
* @param caseId the case id
* @param first the first event
* @param second the second event
*/
public DirectlyFollowsRelation(BEvent from, BEvent to) {
if (!from.getTraceName().equals(to.getTraceName())) {
throw new IllegalArgumentException();
}
pair = Pair.of(from, to);
}
/**
* Returns the case id this directly follows relation belongs to
*
* @return the case id
*/
public String getCaseId() {
return pair.getLeft().getTraceName();
}
/**
* Returns the source event
*
* @return the source event
*/
public BEvent getFrom() {
return pair.getLeft();
}
/**
* Returns the target event
*
* @return the target event
*/
public BEvent getTo() {
return pair.getRight();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
DirectlyFollowsRelation other = (DirectlyFollowsRelation) obj;
return new EqualsBuilder()
.appendSuper(super.equals(obj))
.append(getFrom(), other.getFrom())
.append(getTo(), other.getTo())
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(getFrom())
.append(getTo())
.toHashCode();
}
}
...@@ -91,7 +91,6 @@ public class MQTTXesSource extends BeamlineAbstractSource { ...@@ -91,7 +91,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
while(isRunning()) { while(isRunning()) {
while (isRunning() && buffer.isEmpty()) { while (isRunning() && buffer.isEmpty()) {
System.out.println("sleeping " + isRunning());
Thread.sleep(100l); Thread.sleep(100l);
} }
if (isRunning()) { if (isRunning()) {
...@@ -100,7 +99,6 @@ public class MQTTXesSource extends BeamlineAbstractSource { ...@@ -100,7 +99,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
} }
} }
} }
System.out.println("aaa");
if (!isRunning() && myClient.isConnected()) { if (!isRunning() && myClient.isConnected()) {
try { try {
...@@ -109,13 +107,5 @@ public class MQTTXesSource extends BeamlineAbstractSource { ...@@ -109,13 +107,5 @@ public class MQTTXesSource extends BeamlineAbstractSource {
// nothing to do here // nothing to do here
} }
} }
System.err.println("done");
}
@Override
public void cancel() {
// TODO Auto-generated method stub
super.cancel();
System.out.println("closing");
} }
} }
...@@ -6,25 +6,43 @@ import static org.junit.jupiter.api.Assertions.assertTrue; ...@@ -6,25 +6,43 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import beamline.mappers.DirectlyFollowsRelation; import beamline.events.BEvent;
import beamline.mappers.InfiniteSizeDirectlyFollowsMapper; import beamline.exceptions.EventException;
import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper;
import beamline.models.responses.DirectlyFollowsRelation;
public class MapperTest { public class MapperTest {
@Test @Test
public void test_infinite_size_df() { public void test_infinite_size_df() throws EventException, Exception {
List<DirectlyFollowsRelation> results = new ArrayList<>(); List<DirectlyFollowsRelation> results = new ArrayList<>();
// <K,A,B,A,C> // <K,A,B,A,C>, <A,B,A>
Utils.generateObservableSameCaseId() StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(
BEvent.create("p", "K", "c"),
BEvent.create("p", "A", "c2"),
BEvent.create("p", "A", "c"),
BEvent.create("p", "B", "c"),
BEvent.create("p", "B", "c2"),
BEvent.create("p", "A", "c"),
BEvent.create("p", "A", "c2"),
BEvent.create("p", "C", "c"))
.keyBy(BEvent::getProcessName)
.flatMap(new InfiniteSizeDirectlyFollowsMapper()) .flatMap(new InfiniteSizeDirectlyFollowsMapper())
.subscribe((df) -> results.add(df)); .executeAndCollect().forEachRemaining((DirectlyFollowsRelation e) -> {
results.add(e);
});
assertEquals(4, results.size()); assertEquals(6, results.size());
assertTrue(Utils.verifyDirectFollows(results.get(0), "K", "A", "c")); assertTrue(Utils.verifyDirectFollows(results.get(0), "K", "A", "c"));
assertTrue(Utils.verifyDirectFollows(results.get(1), "A", "B", "c")); assertTrue(Utils.verifyDirectFollows(results.get(1), "A", "B", "c"));
assertTrue(Utils.verifyDirectFollows(results.get(2), "B", "A", "c")); assertTrue(Utils.verifyDirectFollows(results.get(2), "A", "B", "c2"));
assertTrue(Utils.verifyDirectFollows(results.get(3), "A", "C", "c")); assertTrue(Utils.verifyDirectFollows(results.get(3), "B", "A", "c"));
assertTrue(Utils.verifyDirectFollows(results.get(4), "B", "A", "c2"));
assertTrue(Utils.verifyDirectFollows(results.get(5), "A", "C", "c"));
} }
} }
...@@ -5,7 +5,9 @@ import static org.hamcrest.Matchers.hasItems; ...@@ -5,7 +5,9 @@ import static org.hamcrest.Matchers.hasItems;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.io.File;
import java.net.URI; import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
...@@ -14,12 +16,21 @@ import java.util.List; ...@@ -14,12 +16,21 @@ import java.util.List;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.lang3.CharSet;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.experimental.CollectSink; import org.apache.flink.streaming.experimental.CollectSink;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
...@@ -27,6 +38,7 @@ import org.eclipse.paho.client.mqttv3.MqttPersistenceException; ...@@ -27,6 +38,7 @@ import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.google.common.io.Files;
import com.opencsv.CSVParserBuilder; import com.opencsv.CSVParserBuilder;
import beamline.events.BEvent; import beamline.events.BEvent;
...@@ -183,62 +195,80 @@ public class SourcesTest { ...@@ -183,62 +195,80 @@ public class SourcesTest {
// brokerService.start(); // brokerService.start();
// brokerService.waitUntilStarted(); // brokerService.waitUntilStarted();
// //
// List<String> acts = new LinkedList<>(); // final List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>(); // List<String> caseIds = new LinkedList<>();
// //
// MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); // MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
// //
// new Thread(new Runnable() { // // create the sink
// File tmpFile = File.createTempFile("mqtt", "log");
// StreamingFileSink<BEvent> sink = StreamingFileSink.forRowFormat(Path.fromLocalFile(tmpFile), new SimpleStringEncoder<BEvent>()).build();
//// val sink: StreamingFileSink[String] = StreamingFileSink
//// .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
//// .build()
// //
// @Override //
// public void run() { // // create actual source
// try { // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Thread.sleep(5000); // DataStream<BEvent> stream = env.addSource(s);
// System.out.println("going..."); // stream.addSink(sink);
// JobClient job = env.executeAsync();
//
//// Thread.sleep(1000);
//
// System.out.println(tmpFile);
//
// System.out.println("going");
// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); // MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
// client.connect(); // client.connect();
//
// publish(client, "c1", "a11"); // publish(client, "c1", "a11");
// publish(client, "c2", "a21"); // publish(client, "c2", "a21");
// publish(client, "c2", "a22"); // publish(client, "c2", "a22");
// publish(client, "c1", "a12"); // publish(client, "c1", "a12");
// publish(client, "c2", "a23"); // publish(client, "c2", "a23");
// s.cancel();
// //
// } catch (InterruptedException e) { // Thread.sleep(1000);
// e.printStackTrace(); //// job.cancel();
// } catch (MqttException e) { // System.out.println(job.getJobStatus().isDone());
// e.printStackTrace();
// }
// }
// }).start();
// //
// // create actual source // System.out.println(Files.readLines(tmpFile, Charset.defaultCharset()));
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //// System.out.println("final " + acts);
// DataStream<BEvent> stream = env.addSource(s); ////
////
////
//// System.out.println("post-final " + acts);
////
////// System.out.println("1");
//// stream.executeAndCollect().forEachRemaining((BEvent e) -> {
//// System.out.println(e);
//// acts.add(e.getEventName());
//// caseIds.add(e.getTraceName());
//// });
//// JobClient job = env.executeAsync();
////
//// Thread.sleep(1000);
//// job.cancel();
////
//// Thread.sleep(1000);
// //
// stream.executeAndCollect().forEachRemaining((BEvent e) -> { //// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// System.out.println(e); //// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
// acts.add(e.getEventName()); ////
// caseIds.add(e.getTraceName()); //// System.out.println("3");
// });
// //
// System.out.println("3");
// //
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
// //
// } catch (Exception e) { // } catch (Exception e) {
// e.printStackTrace(); // e.printStackTrace();
// } // }
// } // }
//
// @Test //// @Test
// public void test_mqtt_2() { //// public void test_mqtt_2() {
// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); //// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
// assertThrowsExactly(SourceException.class, () -> source.prepare()); //// assertThrowsExactly(SourceException.class, () -> source.prepare());
// } //// }
//
// protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { // protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
// client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); // client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
// } // }
......
...@@ -14,7 +14,7 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; ...@@ -14,7 +14,7 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
import beamline.events.BEvent; import beamline.events.BEvent;
import beamline.exceptions.EventException; import beamline.exceptions.EventException;
import beamline.mappers.DirectlyFollowsRelation; import beamline.models.responses.DirectlyFollowsRelation;
public class Utils { public class Utils {
...@@ -81,8 +81,8 @@ public class Utils { ...@@ -81,8 +81,8 @@ public class Utils {
} }
public static boolean verifyDirectFollows(DirectlyFollowsRelation df, String a1, String a2, String caseId) { public static boolean verifyDirectFollows(DirectlyFollowsRelation df, String a1, String a2, String caseId) {
String df_a1 = XConceptExtension.instance().extractName(df.getFirst()); String df_a1 = df.getFrom().getEventName();
String df_a2 = XConceptExtension.instance().extractName(df.getSecond()); String df_a2 = df.getTo().getEventName();
return df_a1.equals(a1) && df_a2.equals(a2) && df.getCaseId().equals(caseId); return df_a1.equals(a1) && df_a2.equals(a2) && df.getCaseId().equals(caseId);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment