Skip to content
Snippets Groups Projects
Unverified Commit 67515def authored by Andrea Burattin's avatar Andrea Burattin Committed by GitHub
Browse files

Merge pull request #2 from beamline/flink

Conversion to the Apache Flink underlying library
parents 06159b98 622484ce
No related branches found
No related tags found
No related merge requests found
Showing
with 614 additions and 187 deletions
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
<attribute name="test" value="true"/> <attribute name="test" value="true"/>
</attributes> </attributes>
</classpathentry> </classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11"> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes> <attributes>
<attribute name="maven.pomderived" value="true"/> <attribute name="maven.pomderived" value="true"/>
</attributes> </attributes>
......
eclipse.preferences.version=1 eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=11 org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=11 org.eclipse.jdt.core.compiler.source=1.8
...@@ -11,6 +11,9 @@ ...@@ -11,6 +11,9 @@
<sonar.organization>beamline</sonar.organization> <sonar.organization>beamline</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url> <sonar.host.url>https://sonarcloud.io</sonar.host.url>
<flink.version>1.14.3</flink.version>
<log4j.version>2.17.2</log4j.version>
</properties> </properties>
<repositories> <repositories>
...@@ -34,7 +37,7 @@ ...@@ -34,7 +37,7 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>29.0-jre</version> <version>31.1-jre</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.deckfour</groupId> <groupId>org.deckfour</groupId>
...@@ -46,16 +49,44 @@ ...@@ -46,16 +49,44 @@
<artifactId>spex</artifactId> <artifactId>spex</artifactId>
<version>1.0</version> <version>1.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.3</version>
</dependency>
<dependency> <dependency>
<groupId>com.github.beamline</groupId> <groupId>com.github.beamline</groupId>
<artifactId>graphviz</artifactId> <artifactId>graphviz</artifactId>
...@@ -67,6 +98,7 @@ ...@@ -67,6 +98,7 @@
<version>5.6</version> <version>5.6</version>
</dependency> </dependency>
<!-- For testing only --> <!-- For testing only -->
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
...@@ -74,6 +106,30 @@ ...@@ -74,6 +106,30 @@
<version>5.8.2</version> <version>5.8.2</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.hamcrest</groupId> <groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId> <artifactId>hamcrest</artifactId>
......
package beamline.events;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XAttributeBoolean;
import org.deckfour.xes.model.XAttributeContinuous;
import org.deckfour.xes.model.XAttributeDiscrete;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.XAttributeTimestamp;
import org.deckfour.xes.model.XTrace;
import beamline.exceptions.EventException;
/**
*
* @author Andrea Burattin
*/
public class BEvent implements Serializable, Comparable<BEvent> {
private static final long serialVersionUID = -7300189277034528917L;
private Map<String, Serializable> eventAttributes;
private Map<String, Serializable> traceAttributes;
private Map<String, Serializable> logAttributes;
public BEvent() {
this.eventAttributes = new HashMap<>();
this.traceAttributes = new HashMap<>();
this.logAttributes = new HashMap<>();
}
//
// Factories
//
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @param eventAttributes a collection of string attributes for the event
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(
String processName,
String activityName,
String caseId,
Date time,
Collection<Pair<String, String>> eventAttributes) throws EventException {
if (processName == null || activityName == null || caseId == null) {
throw new EventException("Activity name or case id missing");
}
BEvent event = new BEvent();
event.setProcessName(processName);
event.setTraceName(caseId);
event.setEventName(activityName);
if (time == null) {
event.setTimestamp(new Date());
} else {
event.setTimestamp(time);
}
if (eventAttributes != null) {
for(Pair<String, String> a : eventAttributes) {
event.setEventAttribute(a.getLeft(), a.getRight());
}
}
return event;
}
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(String processName, String activityName, String caseId, Date time) throws EventException {
return create(processName, activityName, caseId, time, null);
}
/**
* Creates a new {@link XTrace} referring to one event. The time of the
* event is set to the current time
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(String processName, String activityName, String caseId) throws EventException {
return create(processName, activityName, caseId, null, null);
}
//
// Specific methods
//
public void setProcessName(String name) {
setLogAttribute(XConceptExtension.KEY_NAME, name);
}
public String getProcessName() {
return (String) logAttributes.get(XConceptExtension.KEY_NAME);
}
public void setTraceName(String name) {
setTraceAttribute(XConceptExtension.KEY_NAME, name);
}
public String getTraceName() {
return (String) traceAttributes.get(XConceptExtension.KEY_NAME);
}
public void setEventName(String name) {
setEventAttribute(XConceptExtension.KEY_NAME, name);
}
public String getEventName() {
return (String) eventAttributes.get(XConceptExtension.KEY_NAME);
}
public void setTimestamp(Date timestamp) {
setEventAttribute(XTimeExtension.KEY_TIMESTAMP, timestamp);
}
public Date getEventTime() {
return (Date) eventAttributes.get(XTimeExtension.KEY_TIMESTAMP);
}
//
// General methods
//
public Map<String, Serializable> getEventAttributes() {
return eventAttributes;
}
public Map<String, Serializable> getTraceAttributes() {
return traceAttributes;
}
public Map<String, Serializable> getLogAttributes() {
return logAttributes;
}
public void setEventAttribute(String name, Serializable value) {
eventAttributes.put(name, value);
}
public void setEventAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(eventAttributes, name, value);
}
public void setTraceAttribute(String name, Serializable value) {
traceAttributes.put(name, value);
}
public void setTraceAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(traceAttributes, name, value);
}
public void setLogAttribute(String name, Serializable value) {
logAttributes.put(name, value);
}
public void setLogAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(logAttributes, name, value);
}
//
// Overrides
//
@Override
public String toString() {
return logAttributes.toString() + " - " + traceAttributes.toString() + " - " + eventAttributes.toString();
}
@Override
public int compareTo(BEvent o) {
if (getEventTime() == null || o.getEventTime() == null) {
return 0;
}
return getEventTime().compareTo(o.getEventTime());
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
BEvent other = (BEvent) obj;
return new EqualsBuilder()
.appendSuper(super.equals(obj))
.append(logAttributes, other.logAttributes)
.append(traceAttributes, other.traceAttributes)
.append(eventAttributes, other.eventAttributes)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(logAttributes)
.append(traceAttributes)
.append(eventAttributes)
.toHashCode();
}
//
// Private methods
//
private void setAttributeFromXAttribute(Map<String, Serializable> map, String name, XAttribute value) {
if (value instanceof XAttributeBoolean) {
map.put(name, ((XAttributeBoolean) value).getValue());
} else if (value instanceof XAttributeContinuous) {
map.put(name, ((XAttributeContinuous) value).getValue());
} else if (value instanceof XAttributeDiscrete) {
map.put(name, ((XAttributeDiscrete) value).getValue());
} else if (value instanceof XAttributeLiteral) {
map.put(name, ((XAttributeLiteral) value).getValue());
} else if (value instanceof XAttributeTimestamp) {
map.put(name, ((XAttributeTimestamp) value).getValue());
}
}
}
package beamline.filters; package beamline.filters;
import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
/** /**
* A specific instance of the {@link ExcludeOnEventAttributeEqualityFilter} that * A specific instance of the {@link ExcludeOnEventAttributeEqualityFilter} that
...@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; ...@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
* @author Andrea Burattin * @author Andrea Burattin
* *
*/ */
public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<XAttributeLiteral> { public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<String> {
private static final long serialVersionUID = -5319332746992005641L;
/** /**
* Constructors * Constructors
...@@ -22,7 +22,7 @@ public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilt ...@@ -22,7 +22,7 @@ public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilt
super(XConceptExtension.KEY_NAME); super(XConceptExtension.KEY_NAME);
for (String activity : activities) { for (String activity : activities) {
addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity)); addValue(activity);
} }
} }
} }
package beamline.filters; package beamline.filters;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.deckfour.xes.model.XAttribute; import org.apache.flink.api.common.functions.FilterFunction;
import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.functions.Predicate;
/** /**
* This filter excludes events based on the equality of a certain trace * This filter excludes events based on the equality of a certain trace
...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; ...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
* *
* @param <T> the type of the attribute * @param <T> the type of the attribute
*/ */
public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { public class ExcludeOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 371257881178171433L;
private String attributeName; private String attributeName;
private Set<T> attributeValues; private Set<T> attributeValues;
...@@ -38,8 +38,8 @@ public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implemen ...@@ -38,8 +38,8 @@ public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implemen
} }
@Override @Override
public boolean test(@NonNull XTrace t) throws Throwable { public boolean filter(BEvent t) {
return !attributeValues.contains(t.getAttributes().get(attributeName)); return !attributeValues.contains(t.getTraceAttributes().get(attributeName));
} }
} }
package beamline.filters; package beamline.filters;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.deckfour.xes.model.XAttribute; import org.apache.flink.api.common.functions.FilterFunction;
import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.functions.Predicate;
/** /**
* This filter excludes events based on the equality of a certain event * This filter excludes events based on the equality of a certain event
...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; ...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
* *
* @param <T> the type of the attribute * @param <T> the type of the attribute
*/ */
public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { public class ExcludeOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 1193680203608634150L;
private String attributeName; private String attributeName;
private Set<T> attributeValues; private Set<T> attributeValues;
...@@ -47,8 +47,8 @@ public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> impleme ...@@ -47,8 +47,8 @@ public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> impleme
} }
@Override @Override
public boolean test(@NonNull XTrace t) throws Throwable { public boolean filter(BEvent t) {
return !attributeValues.contains(t.get(0).getAttributes().get(attributeName)); return !attributeValues.contains(t.getEventAttributes().get(attributeName));
} }
} }
package beamline.filters; package beamline.filters;
import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
/** /**
* A specific instance of the {@link RetainOnEventAttributeEqualityFilter} that * A specific instance of the {@link RetainOnEventAttributeEqualityFilter} that
...@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; ...@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
* @author Andrea Burattin * @author Andrea Burattin
* *
*/ */
public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<XAttributeLiteral> { public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<String> {
private static final long serialVersionUID = 102039300555271213L;
/** /**
* Constructors * Constructors
...@@ -22,7 +22,7 @@ public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter ...@@ -22,7 +22,7 @@ public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter
super(XConceptExtension.KEY_NAME); super(XConceptExtension.KEY_NAME);
for (String activity : activities) { for (String activity : activities) {
addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity)); addValue(activity);
} }
} }
} }
package beamline.filters; package beamline.filters;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.deckfour.xes.model.XAttribute; import org.apache.flink.api.common.functions.FilterFunction;
import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.functions.Predicate;
/** /**
* This filter retains events based on the equality of a certain trace * This filter retains events based on the equality of a certain trace
...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; ...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
* *
* @param <T> the type of the attribute * @param <T> the type of the attribute
*/ */
public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { public class RetainOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 1225284800265650317L;
private String attributeName; private String attributeName;
private Set<T> attributeValues; private Set<T> attributeValues;
...@@ -38,8 +38,8 @@ public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implement ...@@ -38,8 +38,8 @@ public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implement
} }
@Override @Override
public boolean test(@NonNull XTrace t) throws Throwable { public boolean filter(BEvent t) {
return attributeValues.contains(t.getAttributes().get(attributeName)); return attributeValues.contains(t.getTraceAttributes().get(attributeName));
} }
} }
package beamline.filters; package beamline.filters;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.deckfour.xes.model.XAttribute; import org.apache.flink.api.common.functions.FilterFunction;
import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.functions.Predicate;
/** /**
* This filter retains events based on the equality of a certain event * This filter retains events based on the equality of a certain event
...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; ...@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
* *
* @param <T> the type of the attribute * @param <T> the type of the attribute
*/ */
public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { public class RetainOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = -720485056040728235L;
private String attributeName; private String attributeName;
private Set<T> attributeValues; private Set<T> attributeValues;
...@@ -47,8 +47,7 @@ public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implemen ...@@ -47,8 +47,7 @@ public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implemen
} }
@Override @Override
public boolean test(@NonNull XTrace t) throws Throwable { public boolean filter(BEvent event) {
return attributeValues.contains(t.get(0).getAttributes().get(attributeName)); return attributeValues.contains(event.getEventAttributes().get(attributeName));
} }
} }
package beamline.mappers;
import org.deckfour.xes.model.XEvent;
/**
* This class represents a directly follows relation as produced by
* {@link InfiniteSizeDirectlyFollowsMapper}.
*
* @author Andrea Burattin
*/
public class DirectlyFollowsRelation {
private String caseId;
private XEvent first;
private XEvent second;
/**
* Constructor
*
* @param caseId the case id
* @param first the first event
* @param second the second event
*/
public DirectlyFollowsRelation(String caseId, XEvent first, XEvent second) {
this.caseId = caseId;
this.first = first;
this.second = second;
}
/**
* Returns the case id this directly follows relation belongs to
*
* @return the case id
*/
public String getCaseId() {
return caseId;
}
/**
* Returns the first event
*
* @return the first event
*/
public XEvent getFirst() {
return first;
}
/**
* Returns the second event
*
* @return the second event
*/
public XEvent getSecond() {
return second;
}
}
/**
* This package contains some mappers that are available by default in the
* framework.
*/
package beamline.mappers;
\ No newline at end of file
package beamline.models.algorithms;
/**
* This interface defines the structure of the callback function that a
* {@link StreamMiningAlgorithm} can execute (cf.,
* {@link StreamMiningAlgorithm#setOnBeforeEvent(HookEventProcessing)} and
* {@link StreamMiningAlgorithm#setOnAfterEvent(HookEventProcessing)}).
*
* @author Andrea Burattin
*/
public interface HookEventProcessing {
/**
* The actual function to trigger
*/
public void trigger();
}
package beamline.mappers; package beamline.models.algorithms;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XTrace; import org.deckfour.xes.model.XTrace;
import io.reactivex.rxjava3.annotations.NonNull; import beamline.events.BEvent;
import io.reactivex.rxjava3.core.Observable; import beamline.models.responses.DirectlyFollowsRelation;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.functions.Function;
/** /**
* This mapper transforms a stream of {@link XTrace}s into a stream of * This mapper transforms a stream of {@link XTrace}s into a stream of
...@@ -24,26 +20,22 @@ import io.reactivex.rxjava3.functions.Function; ...@@ -24,26 +20,22 @@ import io.reactivex.rxjava3.functions.Function;
* *
* @author Andrea Burattin * @author Andrea Burattin
*/ */
public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> { public class InfiniteSizeDirectlyFollowsMapper extends StreamMiningAlgorithm<DirectlyFollowsRelation> {
private Map<String, XEvent> map = new HashMap<>(); private static final long serialVersionUID = 9114527510820073110L;
private Map<String, BEvent> map = new HashMap<>();
@Override @Override
public @NonNull ObservableSource<DirectlyFollowsRelation> apply(@NonNull XTrace t) throws Throwable { public DirectlyFollowsRelation ingest(BEvent event) throws Exception {
String caseId = XConceptExtension.instance().extractName(t); String caseId = event.getTraceName();
DirectlyFollowsRelation toRet = null; DirectlyFollowsRelation toRet = null;
if (map.containsKey(caseId)) { if (map.containsKey(caseId)) {
toRet = new DirectlyFollowsRelation(caseId, map.get(caseId), t.get(0)); toRet = new DirectlyFollowsRelation(map.get(caseId), event);
} }
map.put(caseId, t.get(0)); map.put(caseId, event);
if (toRet == null) { return toRet;
return Observable.empty();
} else {
return Observable.just(toRet);
} }
} }
}
package beamline.models.algorithms; package beamline.models.algorithms;
import io.reactivex.rxjava3.annotations.NonNull; import java.io.IOException;
import io.reactivex.rxjava3.functions.Consumer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import beamline.events.BEvent;
import beamline.models.responses.Response;
/** /**
* This abstract class defines the root of the mining algorithms hierarchy. It * This abstract class defines the root of the mining algorithms hierarchy. It
* is a {@link Consumer} of elements with type <code>T</code> that is capable of * is a {@link MapFunction} of elements with type {@link BEvent} that is capable
* producing responses of a certain type <code>K</code>. * of producing responses of type {@link Response}.
* *
* @author Andrea Burattin * @author Andrea Burattin
*
* @param <T> the type of the consumed events
* @param <K> the type of the responses produced by the mining algorithm
*/ */
public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> { public abstract class StreamMiningAlgorithm<T extends Response> extends RichFlatMapFunction<BEvent, T> {
private static final long serialVersionUID = 10170817098305999L;
private transient ValueState<Long> processedEvents;
@Override
public void open(Configuration parameters) throws Exception {
processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processed-events", Long.class));
}
private int processedEvents = 0; @Override
private K latestResponse; public void flatMap(BEvent event, Collector<T> out) throws Exception {
private HookEventProcessing onBeforeEvent = null; T latestResponse = process(event);
private HookEventProcessing onAfterEvent = null; if (latestResponse != null) {
out.collect(latestResponse);
}
}
/** /**
* This abstract method is what each derive class is expected to implement. * This abstract method is what each derive class is expected to implement.
...@@ -27,65 +44,46 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> { ...@@ -27,65 +44,46 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
* *
* @param event the new event being observed * @param event the new event being observed
* @return the result of the mining of the event * @return the result of the mining of the event
* @throws Exception
*/ */
public abstract K ingest(T event); public abstract T ingest(BEvent event) throws Exception;
/** /**
* Returns the total number of events processed so far * Returns the total number of events processed so far
* *
* @return the total number of events processed so far * @return the total number of events processed so far
*/ */
public int getProcessedEvents() { public long getProcessedEvents() {
return processedEvents; try {
if (processedEvents == null || processedEvents.value() == null) {
return -1;
} }
return processedEvents.value().longValue();
/** } catch (IOException e) {
* Returns the latest result of the mining e.printStackTrace();
*
* @return the latest result of the mining
*/
public K getLatestResponse() {
return latestResponse;
} }
return -1;
/**
* This method can be used to set a hook to a callback function to be
* executed before an event is processed
*
* @param onBeforeEvent the callback function
*/
public void setOnBeforeEvent(HookEventProcessing onBeforeEvent) {
this.onBeforeEvent = onBeforeEvent;
} }
/**
* This method can be used to set a hook to a callback function to be
* executed after an event is processed
*
* @param onAfterEvent the callback function
*/
public void setOnAfterEvent(HookEventProcessing onAfterEvent) {
this.onAfterEvent = onAfterEvent;
}
protected void process(T event) {
this.processedEvents++;
latestResponse = ingest(event);
}
protected K setLatestResponse(K latestResponse) { /*
this.latestResponse = latestResponse; * The internal processor in charge of updating the internal status of the
return latestResponse; * map.
*/
protected T process(BEvent event) throws Exception {
try {
long value = 1;
if (processedEvents.value() != null) {
value = processedEvents.value() + 1;
} }
processedEvents.update(value);
@Override } catch (IOException e) {
public void accept(@NonNull T t) throws Throwable { e.printStackTrace();
if (onBeforeEvent != null) {
onBeforeEvent.trigger();
} }
process(t); T tmp = ingest(event);
if (onAfterEvent != null) { if (tmp != null) {
onAfterEvent.trigger(); tmp.setProcessedEvents(getProcessedEvents());
} }
return tmp;
} }
} }
package beamline.models.responses;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.tuple.Pair;
import beamline.events.BEvent;
import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper;
/**
* This class represents a directly follows relation as produced by
* {@link InfiniteSizeDirectlyFollowsMapper}.
*
* @author Andrea Burattin
*/
public class DirectlyFollowsRelation extends Response {
private static final long serialVersionUID = 1775695752885219490L;
private Pair<BEvent, BEvent> pair;
/**
* Constructor
*
* @param caseId the case id
* @param first the first event
* @param second the second event
*/
public DirectlyFollowsRelation(BEvent from, BEvent to) {
if (!from.getTraceName().equals(to.getTraceName())) {
throw new IllegalArgumentException();
}
pair = Pair.of(from, to);
}
/**
* Returns the case id this directly follows relation belongs to
*
* @return the case id
*/
public String getCaseId() {
return pair.getLeft().getTraceName();
}
/**
* Returns the source event
*
* @return the source event
*/
public BEvent getFrom() {
return pair.getLeft();
}
/**
* Returns the target event
*
* @return the target event
*/
public BEvent getTo() {
return pair.getRight();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
DirectlyFollowsRelation other = (DirectlyFollowsRelation) obj;
return new EqualsBuilder()
.appendSuper(super.equals(obj))
.append(getFrom(), other.getFrom())
.append(getTo(), other.getTo())
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(getFrom())
.append(getTo())
.toHashCode();
}
}
...@@ -8,7 +8,9 @@ import beamline.graphviz.Dot; ...@@ -8,7 +8,9 @@ import beamline.graphviz.Dot;
* *
* @author Andrea Burattin * @author Andrea Burattin
*/ */
public interface GraphvizResponse extends Response { public abstract class GraphvizResponse extends Response {
private static final long serialVersionUID = 7232727657074096321L;
/** /**
* Returns the Dot representation of the response * Returns the Dot representation of the response
......
package beamline.models.responses; package beamline.models.responses;
import java.io.Serializable;
/** /**
* Marker interface used to define the type of the responses * Marker interface used to define the type of the responses
* *
* @author Andrea Burattin * @author Andrea Burattin
*/ */
public interface Response { public class Response implements Serializable {
private static final long serialVersionUID = 3104314256198741100L;
private Long processedEvents;
public Long getProcessedEvents() {
return processedEvents;
}
public void setProcessedEvents(Long processedEvents) {
this.processedEvents = processedEvents;
}
} }
package beamline.models.responses;
public class StringResponse extends Response {
private static final long serialVersionUID = 7863665787098981704L;
private String str;
public StringResponse(String str) {
set(str);
}
public String get() {
return str;
}
public void set(String str) {
this.str = str;
}
@Override
public String toString() {
return str;
}
}
package beamline.sources;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import beamline.events.BEvent;
/**
* This interface is supposed just to bind the type of {@link SourceFunction} to
* {@link BEvent}.
*
* @author Andrea Burattin
*/
public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent> {
private static final long serialVersionUID = 1072198158533070679L;
private boolean running = true;
/**
*
* @return
*/
public boolean isRunning() {
return running;
}
@Override
public void cancel() {
this.running = false;
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment