From b90beda0b2dd0ccc24131c1d285053e113364efb Mon Sep 17 00:00:00 2001
From: Andrea Burattin <andrea.burattin@gmail.com>
Date: Sun, 20 Mar 2022 11:47:34 +0100
Subject: [PATCH] Started the conversion to Apache Flink

---
 .classpath                                    |   2 +-
 .settings/org.eclipse.jdt.core.prefs          |   6 +-
 pom.xml                                       |  70 +++++-
 src/main/java/beamline/events/BEvent.java     | 219 ++++++++++++++++++
 .../filters/ExcludeActivitiesFilter.java      |   8 +-
 .../ExcludeOnCaseAttributeEqualityFilter.java |  14 +-
 ...ExcludeOnEventAttributeEqualityFilter.java |  14 +-
 .../filters/RetainActivitiesFilter.java       |   8 +-
 .../RetainOnCaseAttributeEqualityFilter.java  |  14 +-
 .../RetainOnEventAttributeEqualityFilter.java |  15 +-
 .../InfiniteSizeDirectlyFollowsMapper.java    |   6 +-
 .../algorithms/StreamMiningAlgorithm.java     | 110 ++++++---
 .../sources/BeamlineAbstractSource.java       |  31 +++
 .../java/beamline/sources/CSVLogSource.java   |  64 +++--
 .../java/beamline/sources/MQTTXesSource.java  |  48 ++--
 src/main/java/beamline/sources/Source.java    |  36 ---
 .../java/beamline/sources/XesLogSource.java   | 119 +++++-----
 src/main/java/beamline/sources/XesSource.java |  13 --
 src/main/java/beamline/utils/EventUtils.java  | 109 ---------
 .../java/beamline/utils/package-info.java     |   4 -
 .../java/beamline/tests/AlgorithmTest.java    | 121 +++++++---
 src/test/java/beamline/tests/FiltersTest.java |  24 +-
 src/test/java/beamline/tests/Utils.java       |  47 ++--
 23 files changed, 685 insertions(+), 417 deletions(-)
 create mode 100644 src/main/java/beamline/events/BEvent.java
 create mode 100644 src/main/java/beamline/sources/BeamlineAbstractSource.java
 delete mode 100644 src/main/java/beamline/sources/Source.java
 delete mode 100644 src/main/java/beamline/sources/XesSource.java
 delete mode 100644 src/main/java/beamline/utils/EventUtils.java
 delete mode 100644 src/main/java/beamline/utils/package-info.java

diff --git a/.classpath b/.classpath
index 0fb79cf..002ad57 100644
--- a/.classpath
+++ b/.classpath
@@ -24,7 +24,7 @@
 			<attribute name="test" value="true"/>
 		</attributes>
 	</classpathentry>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
 		<attributes>
 			<attribute name="maven.pomderived" value="true"/>
 		</attributes>
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
index 2af1e7b..2f5cc74 100644
--- a/.settings/org.eclipse.jdt.core.prefs
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -1,8 +1,8 @@
 eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
-org.eclipse.jdt.core.compiler.compliance=11
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
 org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
 org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
 org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
 org.eclipse.jdt.core.compiler.release=disabled
-org.eclipse.jdt.core.compiler.source=11
+org.eclipse.jdt.core.compiler.source=1.8
diff --git a/pom.xml b/pom.xml
index 3035e9f..2356186 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,6 +11,9 @@
 
 		<sonar.organization>beamline</sonar.organization>
 		<sonar.host.url>https://sonarcloud.io</sonar.host.url>
+
+		<flink.version>1.14.3</flink.version>
+		<log4j.version>2.17.2</log4j.version>
 	</properties>
 
 	<repositories>
@@ -34,7 +37,7 @@
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
-			<version>29.0-jre</version>
+			<version>31.1-jre</version>
 		</dependency>
 		<dependency>
 			<groupId>org.deckfour</groupId>
@@ -46,16 +49,44 @@
 			<artifactId>spex</artifactId>
 			<version>1.0</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>${log4j.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-slf4j-impl</artifactId>
+			<version>${log4j.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.11</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_2.11</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
 		<dependency>
 			<groupId>org.eclipse.paho</groupId>
 			<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 			<version>1.2.0</version>
 		</dependency>
-		<dependency>
-			<groupId>io.reactivex.rxjava3</groupId>
-			<artifactId>rxjava</artifactId>
-			<version>3.1.3</version>
-		</dependency>
 		<dependency>
 			<groupId>com.github.beamline</groupId>
 			<artifactId>graphviz</artifactId>
@@ -66,7 +97,8 @@
 			<artifactId>opencsv</artifactId>
 			<version>5.6</version>
 		</dependency>
-		
+
+
 		<!-- For testing only -->
 		<dependency>
 			<groupId>org.junit.jupiter</groupId>
@@ -74,6 +106,30 @@
 			<version>5.8.2</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.11</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.11</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+			<classifier>tests</classifier>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.12</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+		</dependency>
 		<dependency>
 			<groupId>org.hamcrest</groupId>
 			<artifactId>hamcrest</artifactId>
diff --git a/src/main/java/beamline/events/BEvent.java b/src/main/java/beamline/events/BEvent.java
new file mode 100644
index 0000000..bc27273
--- /dev/null
+++ b/src/main/java/beamline/events/BEvent.java
@@ -0,0 +1,219 @@
+package beamline.events;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.deckfour.xes.extension.std.XConceptExtension;
+import org.deckfour.xes.extension.std.XTimeExtension;
+import org.deckfour.xes.model.XAttribute;
+import org.deckfour.xes.model.XAttributeBoolean;
+import org.deckfour.xes.model.XAttributeContinuous;
+import org.deckfour.xes.model.XAttributeDiscrete;
+import org.deckfour.xes.model.XAttributeLiteral;
+import org.deckfour.xes.model.XAttributeTimestamp;
+import org.deckfour.xes.model.XTrace;
+
+import beamline.exceptions.EventException;
+
+/**
+ * 
+ * @author Andrea Burattin
+ */
+public class BEvent implements Serializable, Comparable<BEvent> {
+
+	private static final long serialVersionUID = -7300189277034528917L;
+	
+	private Map<String, Serializable> eventAttributes;
+	private Map<String, Serializable> traceAttributes;
+	private Map<String, Serializable> logAttributes;
+	
+	public BEvent() {
+		this.eventAttributes = new HashMap<>();
+		this.traceAttributes = new HashMap<>();
+		this.logAttributes = new HashMap<>();
+	}
+	
+	//
+	// Factories
+	//
+	/**
+	 * Creates a new {@link XTrace} referring to one event
+	 * 
+	 * @param activityName the name of the activity
+	 * @param caseId the identifier of the process instance
+	 * @param time the time when the event has happened
+	 * @param eventAttributes a collection of string attributes for the event
+	 * @return the new event
+	 * @throws EventException this exception is thrown is incomplete information
+	 * is provided
+	 */
+	public static BEvent create(
+			String processName,
+			String activityName,
+			String caseId,
+			Date time,
+			Collection<Pair<String, String>> eventAttributes) throws EventException {
+		if (processName == null || activityName == null || caseId == null) {
+			throw new EventException("Activity name or case id missing");
+		}
+		
+		BEvent event = new BEvent();
+		event.setProcessName(processName);
+		event.setTraceName(caseId);
+		event.setEventName(activityName);
+		if (time == null) {
+			event.setTimestamp(new Date());
+		} else {
+			event.setTimestamp(time);
+		}
+		
+		if (eventAttributes != null) {
+			for(Pair<String, String> a : eventAttributes) {
+				event.setEventAttribute(a.getLeft(), a.getRight());
+			}
+		}
+		return event;
+	}
+	
+	/**
+	 * Creates a new {@link XTrace} referring to one event
+	 * 
+	 * @param activityName the name of the activity
+	 * @param caseId the identifier of the process instance
+	 * @param time the time when the event has happened
+	 * @return the new event
+	 * @throws EventException this exception is thrown is incomplete information
+	 * is provided
+	 */
+	public static BEvent create(String processName, String activityName, String caseId, Date time) throws EventException {
+		return create(processName, activityName, caseId, time, null);
+	}
+	
+	/**
+	 * Creates a new {@link XTrace} referring to one event. The time of the
+	 * event is set to the current time
+	 * 
+	 * @param activityName the name of the activity
+	 * @param caseId the identifier of the process instance
+	 * @return the new event
+	 * @throws EventException this exception is thrown is incomplete information
+	 * is provided
+	 */
+	public static BEvent create(String processName, String activityName, String caseId) throws EventException {
+		return create(processName, activityName, caseId, null, null);
+	}
+	
+	//
+	// Specific methods
+	//
+	public void setProcessName(String name) {
+		setLogAttribute(XConceptExtension.KEY_NAME, name);
+	}
+	
+	public String getProcessName() {
+		return (String) logAttributes.get(XConceptExtension.KEY_NAME);
+	}
+	
+	public void setTraceName(String name) {
+		setTraceAttribute(XConceptExtension.KEY_NAME, name);
+	}
+	
+	public String getTraceName() {
+		return (String) traceAttributes.get(XConceptExtension.KEY_NAME);
+	}
+	
+	public void setEventName(String name) {
+		setEventAttribute(XConceptExtension.KEY_NAME, name);
+	}
+	
+	public String getEventName() {
+		return (String) eventAttributes.get(XConceptExtension.KEY_NAME);
+	}
+	
+	public void setTimestamp(Date timestamp) {
+		setEventAttribute(XTimeExtension.KEY_TIMESTAMP, timestamp);
+	}
+	
+	public Date getEventTime() {
+		return (Date) eventAttributes.get(XTimeExtension.KEY_TIMESTAMP);
+	}
+	
+	//
+	// General methods
+	//
+	
+	public Map<String, Serializable> getEventAttributes() {
+		return eventAttributes;
+	}
+	
+	public Map<String, Serializable> getTraceAttributes() {
+		return traceAttributes;
+	}
+	
+	public Map<String, Serializable> getLogAttributes() {
+		return logAttributes;
+	}
+	
+	public void setEventAttribute(String name, Serializable value) {
+		eventAttributes.put(name, value);
+	}
+	
+	public void setEventAttribute(String name, XAttribute value) {
+		setAttributeFromXAttribute(eventAttributes, name, value);
+	}
+	
+	public void setTraceAttribute(String name, Serializable value) {
+		traceAttributes.put(name, value);
+	}
+	
+	public void setTraceAttribute(String name, XAttribute value) {
+		setAttributeFromXAttribute(traceAttributes, name, value);
+	}
+	
+	public void setLogAttribute(String name, Serializable value) {
+		logAttributes.put(name, value);
+	}
+	
+	public void setLogAttribute(String name, XAttribute value) {
+		setAttributeFromXAttribute(logAttributes, name, value);
+	}
+	
+	//
+	// Overrides
+	//
+	
+	@Override
+	public String toString() {
+		return logAttributes.toString() + " - " + traceAttributes.toString() + " - " + eventAttributes.toString();
+	}
+
+	@Override
+	public int compareTo(BEvent o) {
+		if (getEventTime() == null || o.getEventTime() == null) {
+			return 0;
+		}
+		return getEventTime().compareTo(o.getEventTime());
+	}
+	
+	//
+	// Private methods
+	//
+	
+	private void setAttributeFromXAttribute(Map<String, Serializable> map, String name, XAttribute value) {
+		if (value instanceof XAttributeBoolean) {
+			map.put(name, ((XAttributeBoolean) value).getValue());
+		} else if (value instanceof XAttributeContinuous) {
+			map.put(name, ((XAttributeContinuous) value).getValue());
+		} else if (value instanceof XAttributeDiscrete) {
+			map.put(name, ((XAttributeDiscrete) value).getValue());
+		} else if (value instanceof XAttributeLiteral) {
+			map.put(name, ((XAttributeLiteral) value).getValue());
+		} else if (value instanceof XAttributeTimestamp) {
+			map.put(name, ((XAttributeTimestamp) value).getValue());
+		}
+	}
+}
diff --git a/src/main/java/beamline/filters/ExcludeActivitiesFilter.java b/src/main/java/beamline/filters/ExcludeActivitiesFilter.java
index 4886ad0..9f08a59 100644
--- a/src/main/java/beamline/filters/ExcludeActivitiesFilter.java
+++ b/src/main/java/beamline/filters/ExcludeActivitiesFilter.java
@@ -1,8 +1,6 @@
 package beamline.filters;
 
 import org.deckfour.xes.extension.std.XConceptExtension;
-import org.deckfour.xes.model.XAttributeLiteral;
-import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
 
 /**
  * A specific instance of the {@link ExcludeOnEventAttributeEqualityFilter} that
@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
  * @author Andrea Burattin
  *
  */
-public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<XAttributeLiteral> {
+public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<String> {
+
+	private static final long serialVersionUID = -5319332746992005641L;
 
 	/**
 	 * Constructors
@@ -22,7 +22,7 @@ public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilt
 		super(XConceptExtension.KEY_NAME);
 		
 		for (String activity : activities) {
-			addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity));
+			addValue(activity);
 		}
 	}
 }
diff --git a/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java b/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java
index 32efc97..87c2cec 100644
--- a/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java
+++ b/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java
@@ -1,14 +1,13 @@
 package beamline.filters;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.deckfour.xes.model.XAttribute;
-import org.deckfour.xes.model.XTrace;
+import org.apache.flink.api.common.functions.FilterFunction;
 
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Predicate;
+import beamline.events.BEvent;
 
 /**
  * This filter excludes events based on the equality of a certain trace
@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
  *
  * @param <T> the type of the attribute
  */
-public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
+public class ExcludeOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
 
+	private static final long serialVersionUID = 371257881178171433L;
 	private String attributeName;
 	private Set<T> attributeValues;
 	
@@ -38,8 +38,8 @@ public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implemen
 	}
 	
 	@Override
-	public boolean test(@NonNull XTrace t) throws Throwable {
-		return !attributeValues.contains(t.getAttributes().get(attributeName));
+	public boolean filter(BEvent t) {
+		return !attributeValues.contains(t.getTraceAttributes().get(attributeName));
 	}
 
 }
diff --git a/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java b/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java
index 1e090c4..94e8fb8 100644
--- a/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java
+++ b/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java
@@ -1,14 +1,13 @@
 package beamline.filters;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.deckfour.xes.model.XAttribute;
-import org.deckfour.xes.model.XTrace;
+import org.apache.flink.api.common.functions.FilterFunction;
 
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Predicate;
+import beamline.events.BEvent;
 
 /**
  * This filter excludes events based on the equality of a certain event
@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
  *
  * @param <T> the type of the attribute
  */
-public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
+public class ExcludeOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
 
+	private static final long serialVersionUID = 1193680203608634150L;
 	private String attributeName;
 	private Set<T> attributeValues;
 	
@@ -47,8 +47,8 @@ public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> impleme
 	}
 	
 	@Override
-	public boolean test(@NonNull XTrace t) throws Throwable {
-		return !attributeValues.contains(t.get(0).getAttributes().get(attributeName));
+	public boolean filter(BEvent t) {
+		return !attributeValues.contains(t.getEventAttributes().get(attributeName));
 	}
 
 }
diff --git a/src/main/java/beamline/filters/RetainActivitiesFilter.java b/src/main/java/beamline/filters/RetainActivitiesFilter.java
index 8e0a525..9324916 100644
--- a/src/main/java/beamline/filters/RetainActivitiesFilter.java
+++ b/src/main/java/beamline/filters/RetainActivitiesFilter.java
@@ -1,8 +1,6 @@
 package beamline.filters;
 
 import org.deckfour.xes.extension.std.XConceptExtension;
-import org.deckfour.xes.model.XAttributeLiteral;
-import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
 
 /**
  * A specific instance of the {@link RetainOnEventAttributeEqualityFilter} that
@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
  * @author Andrea Burattin
  *
  */
-public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<XAttributeLiteral> {
+public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<String> {
+
+	private static final long serialVersionUID = 102039300555271213L;
 
 	/**
 	 * Constructors
@@ -22,7 +22,7 @@ public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter
 		super(XConceptExtension.KEY_NAME);
 		
 		for (String activity : activities) {
-			addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity));
+			addValue(activity);
 		}
 	}
 }
diff --git a/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java b/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java
index 091d1aa..d781227 100644
--- a/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java
+++ b/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java
@@ -1,14 +1,13 @@
 package beamline.filters;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.deckfour.xes.model.XAttribute;
-import org.deckfour.xes.model.XTrace;
+import org.apache.flink.api.common.functions.FilterFunction;
 
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Predicate;
+import beamline.events.BEvent;
 
 /**
  * This filter retains events based on the equality of a certain trace
@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
  *
  * @param <T> the type of the attribute
  */
-public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
+public class RetainOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
 
+	private static final long serialVersionUID = 1225284800265650317L;
 	private String attributeName;
 	private Set<T> attributeValues;
 	
@@ -38,8 +38,8 @@ public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implement
 	}
 	
 	@Override
-	public boolean test(@NonNull XTrace t) throws Throwable {
-		return attributeValues.contains(t.getAttributes().get(attributeName));
+	public boolean filter(BEvent t) {
+		return attributeValues.contains(t.getTraceAttributes().get(attributeName));
 	}
 
 }
diff --git a/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java b/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java
index f923054..1375404 100644
--- a/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java
+++ b/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java
@@ -1,14 +1,13 @@
 package beamline.filters;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.deckfour.xes.model.XAttribute;
-import org.deckfour.xes.model.XTrace;
+import org.apache.flink.api.common.functions.FilterFunction;
 
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Predicate;
+import beamline.events.BEvent;
 
 /**
  * This filter retains events based on the equality of a certain event
@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
  *
  * @param <T> the type of the attribute
  */
-public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
+public class RetainOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
 
+	private static final long serialVersionUID = -720485056040728235L;
 	private String attributeName;
 	private Set<T> attributeValues;
 	
@@ -47,8 +47,7 @@ public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implemen
 	}
 	
 	@Override
-	public boolean test(@NonNull XTrace t) throws Throwable {
-		return attributeValues.contains(t.get(0).getAttributes().get(attributeName));
+	public boolean filter(BEvent event) {
+		return attributeValues.contains(event.getEventAttributes().get(attributeName));
 	}
-
 }
diff --git a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java
index f2c8b64..d0bd477 100644
--- a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java
+++ b/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java
@@ -1,5 +1,5 @@
 package beamline.mappers;
-
+/*
 import java.util.HashMap;
 import java.util.Map;
 
@@ -23,7 +23,7 @@ import io.reactivex.rxjava3.functions.Function;
  * number of case ids grows as well.
  * 
  * @author Andrea Burattin
- */
+ *
 public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> {
 
 	private Map<String, XEvent> map = new HashMap<>();
@@ -47,3 +47,5 @@ public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, Obser
 	}
 
 }
+*/
+class InfiniteSizeDirectlyFollowsMapper{}
\ No newline at end of file
diff --git a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java
index 5f2c812..bd90c53 100644
--- a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java
+++ b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java
@@ -1,24 +1,48 @@
 package beamline.models.algorithms;
 
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Consumer;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+import beamline.events.BEvent;
 
 /**
  * This abstract class defines the root of the mining algorithms hierarchy. It
- * is a {@link Consumer} of elements with type <code>T</code> that is capable of
- * producing responses of a certain type <code>K</code>.
+ * is a {@link MapFunction} of elements with type {@link BEvent} that is capable
+ * of producing responses of type {@link Response}.
  * 
  * @author Andrea Burattin
- *
- * @param <T> the type of the consumed events
- * @param <K> the type of the responses produced by the mining algorithm
  */
-public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
+public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Serializable> {
 
-	private int processedEvents = 0;
-	private K latestResponse;
-	private HookEventProcessing onBeforeEvent = null;
-	private HookEventProcessing onAfterEvent = null;
+	private static final long serialVersionUID = 10170817098305999L;
+	private transient ValueState<Long> processedEvents;
+	private transient ValueState<Serializable> latestResponse;
+	private transient HookEventProcessing onBeforeEvent = null;
+	private transient HookEventProcessing onAfterEvent = null;
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processede vents", Long.class));
+		latestResponse = getRuntimeContext().getState(new ValueStateDescriptor<>("latest response", Serializable.class));
+	}
+	
+	@Override
+	public Serializable map(BEvent t) throws Exception {
+		if (onBeforeEvent != null) {
+			onBeforeEvent.trigger();
+		}
+		process(t);
+		if (onAfterEvent != null) {
+			onAfterEvent.trigger();
+		}
+		return getLatestResponse();
+	}
 	
 	/**
 	 * This abstract method is what each derive class is expected to implement.
@@ -28,15 +52,23 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
 	 * @param event the new event being observed
 	 * @return the result of the mining of the event
 	 */
-	public abstract K ingest(T event);
+	public abstract Serializable ingest(BEvent event);
 	
 	/**
 	 * Returns the total number of events processed so far
 	 * 
 	 * @return the total number of events processed so far
 	 */
-	public int getProcessedEvents() {
-		return processedEvents;
+	public long getProcessedEvents() {
+		try {
+			if (processedEvents == null || processedEvents.value() == null) {
+				return 0l;
+			}
+			return processedEvents.value().longValue();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return -1;
 	}
 	
 	/**
@@ -44,8 +76,13 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
 	 * 
 	 * @return the latest result of the mining
 	 */
-	public K getLatestResponse() {
-		return latestResponse;
+	public Serializable getLatestResponse() {
+		try {
+			return latestResponse.value();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return null;
 	}
 	
 	/**
@@ -68,24 +105,31 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
 		this.onAfterEvent = onAfterEvent;
 	}
 	
-	protected void process(T event) {
-		this.processedEvents++;
-		latestResponse = ingest(event);
-	}
-	
-	protected K setLatestResponse(K latestResponse) {
-		this.latestResponse = latestResponse;
-		return latestResponse;
+	/*
+	 * The internal processor in charge of updating the internal status of the
+	 * map.
+	 */
+	protected void process(BEvent event) {
+		try {
+			long value = 1;
+			if (processedEvents.value() != null) {
+				value = processedEvents.value() + 1;
+			}
+			processedEvents.update(value);
+			latestResponse.update(ingest(event));
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
 	}
 	
-	@Override
-	public void accept(@NonNull T t) throws Throwable {
-		if (onBeforeEvent != null) {
-			onBeforeEvent.trigger();
-		}
-		process(t);
-		if (onAfterEvent != null) {
-			onAfterEvent.trigger();
+	/*
+	 * Setter of the latest response onto the status.
+	 */
+	protected void setLatestResponse(Serializable latestResponse) {
+		try {
+			this.latestResponse.update(latestResponse);
+		} catch (IOException e) {
+			e.printStackTrace();
 		}
 	}
 }
diff --git a/src/main/java/beamline/sources/BeamlineAbstractSource.java b/src/main/java/beamline/sources/BeamlineAbstractSource.java
new file mode 100644
index 0000000..24eea49
--- /dev/null
+++ b/src/main/java/beamline/sources/BeamlineAbstractSource.java
@@ -0,0 +1,31 @@
+package beamline.sources;
+
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import beamline.events.BEvent;
+
+/**
+ * This interface is supposed just to bind the type of {@link SourceFunction} to
+ * {@link BEvent}.
+ * 
+ * @author Andrea Burattin
+ */
+public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent> {
+
+	private static final long serialVersionUID = 1072198158533070679L;
+	private boolean running = true;
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean isRunning() {
+		return running;
+	}
+	
+	@Override
+	public void cancel() {
+		running = false;
+	}
+}
\ No newline at end of file
diff --git a/src/main/java/beamline/sources/CSVLogSource.java b/src/main/java/beamline/sources/CSVLogSource.java
index 9083f5f..bcba48c 100644
--- a/src/main/java/beamline/sources/CSVLogSource.java
+++ b/src/main/java/beamline/sources/CSVLogSource.java
@@ -8,32 +8,27 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.deckfour.xes.model.XTrace;
 
 import com.opencsv.CSVParser;
 import com.opencsv.CSVReader;
 import com.opencsv.CSVReaderBuilder;
 
+import beamline.events.BEvent;
 import beamline.exceptions.SourceException;
-import beamline.utils.EventUtils;
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.core.ObservableEmitter;
-import io.reactivex.rxjava3.core.ObservableOnSubscribe;
 
 /**
- * This implementation of a {@link XesSource} produces events according to the
- * events contained in a CSV file. This source produces a cold observable.
+ * This implementation of a {@link BeamlineAbstractSource} produces events according to
+ * the events contained in a CSV file.
  * 
  * @author Andrea Burattin
  */
-public class CSVLogSource implements XesSource {
+public class CSVLogSource extends BeamlineAbstractSource {
 
-	private CSVReader csvReader;
+	private static final long serialVersionUID = 205574514393782145L;
+	private transient CSVParser parser;
 	private String filename;
 	private int caseIdColumn;
 	private int activityNameColumn;
-	private CSVParser parser;
 	
 	/**
 	 * Constructs the source by providing a CSV parser.
@@ -75,36 +70,31 @@ public class CSVLogSource implements XesSource {
 	}
 
 	@Override
-	public Observable<XTrace> getObservable() {
-		return Observable.create(new ObservableOnSubscribe<XTrace>() {
-			@Override
-			public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable {
-				String[] line;
-				while ((line = csvReader.readNext()) != null) {
-					List<Pair<String, String>> attributes = new LinkedList<>();
-					for (int i = 0; i < line.length; i++) {
-						attributes.add(Pair.of("attribute_" + i, line[i]));
-					}
-					emitter.onNext(EventUtils.create(line[activityNameColumn], line[caseIdColumn], null, attributes));
-				}
-				emitter.onComplete();
-			}
-		});
-	}
-
-	@Override
-	public void prepare() throws SourceException {
-		Reader reader;
+	public void run(SourceContext<BEvent> ctx) throws Exception {
+		Reader reader = null;
+		CSVReader csvReader = null;
 		try {
 			reader = Files.newBufferedReader(Paths.get(filename));
+			if (parser == null) {
+				csvReader = new CSVReader(reader);
+			} else  {
+				csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
+			}
+			
+			String[] line;
+			while ((line = csvReader.readNext()) != null && isRunning()) {
+				List<Pair<String, String>> attributes = new LinkedList<>();
+				for (int i = 0; i < line.length; i++) {
+					attributes.add(Pair.of("attribute_" + i, line[i]));
+				}
+				ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
+			}
 		} catch (IOException e) {
 			throw new SourceException(e.getMessage());
-		}
-		if (parser == null) {
-			csvReader = new CSVReader(reader);
-		} else {
-			csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
+		} finally {
+			if (csvReader != null) {
+				csvReader.close();
+			}
 		}
 	}
-
 }
diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java
index 40ee221..d19e907 100644
--- a/src/main/java/beamline/sources/MQTTXesSource.java
+++ b/src/main/java/beamline/sources/MQTTXesSource.java
@@ -1,8 +1,9 @@
 package beamline.sources;
 
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.UUID;
 
-import org.deckfour.xes.model.XTrace;
 import org.eclipse.paho.client.mqttv3.IMqttClient;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -11,14 +12,12 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
+import beamline.events.BEvent;
 import beamline.exceptions.SourceException;
-import beamline.utils.EventUtils;
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.subjects.PublishSubject;
 
 /**
- * This implementation of a {@link XesSource} produces events as they are
- * observed in an MQTT-XES broker. This source produces a hot observable.
+ * This implementation of a {@link BeamlineAbstractSource} produces events as
+ * they are observed in an MQTT-XES broker.
  * 
  * <p>
  * Example of usage:
@@ -32,12 +31,13 @@ import io.reactivex.rxjava3.subjects.PublishSubject;
  * 
  * @author Andrea Burattin
  */
-public class MQTTXesSource implements XesSource {
+public class MQTTXesSource extends BeamlineAbstractSource {
 
+	private static final long serialVersionUID = 7849358403852399322L;
 	private String processName;
 	private String brokerHost;
 	private String topicBase;
-	private PublishSubject<XTrace> ps;
+	private transient IMqttClient myClient;
 	
 	/**
 	 * Constructs the source
@@ -50,21 +50,15 @@ public class MQTTXesSource implements XesSource {
 		this.brokerHost = brokerHost;
 		this.topicBase = topicBase;
 		this.processName = processName;
-		this.ps = PublishSubject.create();
-	}
-	
-	@Override
-	public Observable<XTrace> getObservable() {
-		return ps;
 	}
 
 	@Override
-	public void prepare() throws SourceException {
+	public void run(SourceContext<BEvent> ctx) throws Exception {
+		Queue<BEvent> buffer = new LinkedList<>();
 		MqttConnectOptions options = new MqttConnectOptions();
 		options.setCleanSession(true);
 		options.setKeepAliveInterval(30);
 
-		IMqttClient myClient;
 		try {
 			myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
 			myClient.setCallback(new MqttCallback() {
@@ -75,7 +69,8 @@ public class MQTTXesSource implements XesSource {
 					String partBeforeActName = topic.substring(0, posLastSlash);
 					String activityName = topic.substring(posLastSlash + 1);
 					String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1);
-					ps.onNext(EventUtils.create(activityName, caseId));
+					BEvent b = BEvent.create(processName, activityName, caseId);
+					buffer.add(b);
 				}
 				
 				@Override
@@ -93,6 +88,25 @@ public class MQTTXesSource implements XesSource {
 		} catch (MqttException e) {
 			throw new SourceException(e.getMessage());
 		}
+		
+		while(isRunning()) {
+			while (buffer.isEmpty()) {
+				Thread.sleep(100l);
+			}
+			ctx.collect(buffer.poll());
+		}
+	}
+	
+	@Override
+	public void cancel() {
+		super.cancel();
+		if (myClient != null && myClient.isConnected()) {
+			try {
+				myClient.disconnect();
+			} catch (MqttException e) {
+				// nothing to do here
+			}
+		}
 	}
 
 }
diff --git a/src/main/java/beamline/sources/Source.java b/src/main/java/beamline/sources/Source.java
deleted file mode 100644
index 5c0772f..0000000
--- a/src/main/java/beamline/sources/Source.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package beamline.sources;
-
-import beamline.exceptions.SourceException;
-import io.reactivex.rxjava3.core.Observable;
-
-/**
- * This interface is the base type that should be extended by all sources to be
- * used in the framework. When using a source implementing this type, the method
- * {@link #prepare()} should be called <strong>before</strong>
- * {@link #getObservable()}.
- * 
- * @author Andrea Burattin
- *
- * @param <T> the type of observable objects this interface will produce.
- */
-public interface Source<T> {
-
-	/**
-	 * This method returns the observable created by the source. Before calling
-	 * this method, it is important to prepare the source by calling the
-	 * {@link #prepare()} method.
-	 * 
-	 * @return the {@link Observable}
-	 */
-	public Observable<T> getObservable();
-	
-	/**
-	 * This method is supposed to be called before the {@link #getObservable()}
-	 * one: it is in charge of preparing the source to be processed.
-	 * 
-	 * @throws SourceException while preparing the source, it is important to be
-	 * aware that some sources may generate specific exceptions (e.g., file not
-	 * found, network problems).
-	 */
-	public void prepare() throws SourceException;
-}
diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java
index 4308ca8..7b90e80 100644
--- a/src/main/java/beamline/sources/XesLogSource.java
+++ b/src/main/java/beamline/sources/XesLogSource.java
@@ -2,43 +2,41 @@ package beamline.sources;
 
 import java.io.File;
 import java.util.Collections;
-import java.util.Date;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.deckfour.xes.extension.std.XConceptExtension;
 import org.deckfour.xes.extension.std.XTimeExtension;
-import org.deckfour.xes.factory.XFactory;
-import org.deckfour.xes.factory.XFactoryNaiveImpl;
+import org.deckfour.xes.in.XMxmlGZIPParser;
+import org.deckfour.xes.in.XMxmlParser;
 import org.deckfour.xes.in.XParser;
 import org.deckfour.xes.in.XesXmlGZIPParser;
 import org.deckfour.xes.in.XesXmlParser;
 import org.deckfour.xes.model.XAttribute;
-import org.deckfour.xes.model.XAttributeMap;
 import org.deckfour.xes.model.XEvent;
 import org.deckfour.xes.model.XLog;
 import org.deckfour.xes.model.XTrace;
 
+import beamline.events.BEvent;
+import beamline.exceptions.EventException;
 import beamline.exceptions.SourceException;
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.core.ObservableEmitter;
-import io.reactivex.rxjava3.core.ObservableOnSubscribe;
 
 /**
- * This implementation of a {@link XesSource} produces events according to the
- * events contained in an {@link XLog}. The events are first sorted according to
- * their timestamp and then sent. This source produces a cold observable.
+ * This implementation of a {@link BeamlineAbstractSource} produces events according to
+ * the events contained in an {@link XLog}. The events are first sorted
+ * according to their timestamp and then sent.
  * 
  * @author Andrea Burattin
  */
-public class XesLogSource implements XesSource {
+public class XesLogSource extends BeamlineAbstractSource {
+
+	private static final long serialVersionUID = 1095855454671335981L;
 
-	private static XFactory xesFactory = new XFactoryNaiveImpl();
-	
 	private String fileName;
-	private XLog log;
-	private List<XTrace> events;
+	private transient XLog log;
+	private List<BEvent> events;
 	
 	/**
 	 * Constructs a source from the path of a log
@@ -62,28 +60,30 @@ public class XesLogSource implements XesSource {
 	}
 	
 	@Override
-	public Observable<XTrace> getObservable() {
-		return Observable.create(new ObservableOnSubscribe<XTrace>() {
-			@Override
-			public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable {
-				for (XTrace wrapper : events) {
-					emitter.onNext(wrapper);
-				}
-				emitter.onComplete();
-			}
-		});
-	}
-	
-	@Override
-	public void prepare() throws SourceException {
+	public void run(SourceContext<BEvent> ctx) throws Exception {
 		if (log == null) {
 			parseLog(fileName);
 		}
-		prepareStream();
+		if (events == null) {
+			prepareStream();
+		}
+		Iterator<BEvent> i = events.iterator();
+		while(i.hasNext() && isRunning()) {
+			BEvent event = i.next();
+			if (event.getEventTime() != null) {
+				ctx.collectWithTimestamp(event, event.getEventTime().getTime());
+			} else {
+				ctx.collect(i.next());
+			}
+		}
 	}
 	
 	private void parseLog(String fileName) throws SourceException {
-		XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser() };
+		XParser[] parsers = new XParser[] {
+				new XesXmlGZIPParser(),
+				new XesXmlParser(),
+				new XMxmlParser(),
+				new XMxmlGZIPParser() };
 		File file = new File(fileName);
 		for (XParser p : parsers) {
 			if (p.canParse(file)) {
@@ -98,41 +98,50 @@ public class XesLogSource implements XesSource {
 		throw new SourceException("XES file format not supported");
 	}
 	
-	private void prepareStream() throws SourceException {
+	private void prepareStream() throws SourceException, EventException {
 		if (log.isEmpty()) {
 			throw new SourceException("The given log is empty");
 		}
+		
+		// construct the process name
+		String processName = XConceptExtension.instance().extractName(log);
+		if (processName == null) {
+			processName = "unnamed-xes-process";
+			if (fileName != null) {
+				processName = fileName;
+			}
+		}
+		
 		// populate all events
 		events = new LinkedList<>();
 		for (XTrace t : log) {
 			for (XEvent e : t) {
-				// create the wrapping trace
-				XTrace eventWrapper = xesFactory.createTrace();
-				XAttributeMap am = t.getAttributes();
-				for (Map.Entry<String, XAttribute> v : am.entrySet()) {
-					eventWrapper.getAttributes().put(v.getKey(), v.getValue());
+				BEvent be = BEvent.create(
+					processName,
+					XConceptExtension.instance().extractName(e),
+					XConceptExtension.instance().extractName(t),
+					XTimeExtension.instance().extractTimestamp(e));
+				
+				// log attributes
+				for (Map.Entry<String, XAttribute> v : log.getAttributes().entrySet()) {
+					be.setLogAttribute(v.getKey(), v.getValue());
 				}
-				// create the actual event
-				XEvent newEvent = xesFactory.createEvent();
-				XAttributeMap amEvent = e.getAttributes();
-				for (Map.Entry<String, XAttribute> v : amEvent.entrySet()) {
-					newEvent.getAttributes().put(v.getKey(), v.getValue());
+				
+				// trace attributes
+				for (Map.Entry<String, XAttribute> v : t.getAttributes().entrySet()) {
+					be.setTraceAttribute(v.getKey(), v.getValue());
 				}
-				eventWrapper.add(newEvent);
-				events.add(eventWrapper);
+				
+				// event attributes
+				for (Map.Entry<String, XAttribute> v : e.getAttributes().entrySet()) {
+					be.setEventAttribute(v.getKey(), v.getValue());
+				}
+				
+				events.add(be);
 			}
 		}
 		
 		// sort events
-		Collections.sort(events, (XTrace o1, XTrace o2) -> {
-			XEvent e1 = o1.get(0);
-			XEvent e2 = o2.get(0);
-			Date d1 = XTimeExtension.instance().extractTimestamp(e1);
-			Date d2 = XTimeExtension.instance().extractTimestamp(e2);
-			if (d1 == null || d2 == null) {
-				return 0;
-			}
-			return d1.compareTo(d2);
-		});
+		Collections.sort(events);
 	}
 }
diff --git a/src/main/java/beamline/sources/XesSource.java b/src/main/java/beamline/sources/XesSource.java
deleted file mode 100644
index 6ba5c65..0000000
--- a/src/main/java/beamline/sources/XesSource.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package beamline.sources;
-
-import org.deckfour.xes.model.XTrace;
-
-/**
- * This interface is supposed just to bind the type of {@link Source} to
- * {@link XTrace}.
- * 
- * @author Andrea Burattin
- */
-public interface XesSource extends Source<XTrace> {
-
-}
\ No newline at end of file
diff --git a/src/main/java/beamline/utils/EventUtils.java b/src/main/java/beamline/utils/EventUtils.java
deleted file mode 100644
index 1e3037f..0000000
--- a/src/main/java/beamline/utils/EventUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package beamline.utils;
-
-import java.util.Collection;
-import java.util.Date;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.deckfour.xes.extension.std.XConceptExtension;
-import org.deckfour.xes.extension.std.XTimeExtension;
-import org.deckfour.xes.factory.XFactory;
-import org.deckfour.xes.factory.XFactoryNaiveImpl;
-import org.deckfour.xes.model.XEvent;
-import org.deckfour.xes.model.XTrace;
-import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
-
-import beamline.exceptions.EventException;
-
-/**
- * This class contains some utility methods useful to handle and process events.
- * 
- * @author Andrea Burattin
- */
-public class EventUtils {
-
-	private static final XFactory xesFactory = new XFactoryNaiveImpl();
-	
-	private EventUtils() { }
-	
-	/**
-	 * Creates a new {@link XTrace} referring to one event
-	 * 
-	 * @param activityName the name of the activity
-	 * @param caseId the identifier of the process instance
-	 * @param time the time when the event has happened
-	 * @param eventAttributes a collection of string attributes for the event
-	 * @return the new event
-	 * @throws EventException this exception is thrown is incomplete information
-	 * is provided
-	 */
-	public static XTrace create(String activityName, String caseId, Date time, Collection<Pair<String, String>> eventAttributes) throws EventException {
-		if (activityName == null || caseId == null) {
-			throw new EventException("Activity name or case id missing");
-		}
-		
-		XEvent event = xesFactory.createEvent();
-		XConceptExtension.instance().assignName(event, activityName);
-		if (time == null) {
-			XTimeExtension.instance().assignTimestamp(event, new Date());
-		} else {
-			XTimeExtension.instance().assignTimestamp(event, time);
-		}
-		if (eventAttributes != null) {
-			for(Pair<String, String> a : eventAttributes) {
-				event.getAttributes().put(a.getLeft(), new XAttributeLiteralImpl(a.getLeft(), a.getRight()));
-			}
-		}
-		XTrace eventWrapper = xesFactory.createTrace();
-		XConceptExtension.instance().assignName(eventWrapper, caseId);
-		eventWrapper.add(event);
-		return eventWrapper;
-	}
-	
-	/**
-	 * Creates a new {@link XTrace} referring to one event
-	 * 
-	 * @param activityName the name of the activity
-	 * @param caseId the identifier of the process instance
-	 * @param time the time when the event has happened
-	 * @return the new event
-	 * @throws EventException this exception is thrown is incomplete information
-	 * is provided
-	 */
-	public static XTrace create(String activityName, String caseId, Date time) throws EventException {
-		return create(activityName, caseId, time, null);
-	}
-	
-	/**
-	 * Creates a new {@link XTrace} referring to one event. The time of the
-	 * event is set to the current time
-	 * 
-	 * @param activityName the name of the activity
-	 * @param caseId the identifier of the process instance
-	 * @return the new event
-	 * @throws EventException this exception is thrown is incomplete information
-	 * is provided
-	 */
-	public static XTrace create(String activityName, String caseId) throws EventException {
-		return create(activityName, caseId, null, null);
-	}
-	
-	/**
-	 * Extracts the activity name
-	 * 
-	 * @param event the event
-	 * @return the activity name
-	 */
-	public static String getActivityName(XTrace event) {
-		return XConceptExtension.instance().extractName(event.get(0));
-	}
-	
-	/**
-	 * Extracts the case id
-	 * 
-	 * @param event the event
-	 * @return the case id
-	 */
-	public static String getCaseId(XTrace event) {
-		return XConceptExtension.instance().extractName(event);
-	}
-}
diff --git a/src/main/java/beamline/utils/package-info.java b/src/main/java/beamline/utils/package-info.java
deleted file mode 100644
index 6db5f83..0000000
--- a/src/main/java/beamline/utils/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * This package contains utility classes to be used throughout the framework.
- */
-package beamline.utils;
\ No newline at end of file
diff --git a/src/test/java/beamline/tests/AlgorithmTest.java b/src/test/java/beamline/tests/AlgorithmTest.java
index 4ce3dee..5ab6263 100644
--- a/src/test/java/beamline/tests/AlgorithmTest.java
+++ b/src/test/java/beamline/tests/AlgorithmTest.java
@@ -4,62 +4,109 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasItems;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import beamline.events.BEvent;
 import beamline.models.algorithms.StreamMiningAlgorithm;
-import io.reactivex.rxjava3.core.Observable;
+import beamline.models.responses.Response;
 
 public class AlgorithmTest {
 
-	Observable<Integer> o = Observable.just(3, 7, 11, 13);
-	
-	@Test
-	public void test_result() {
-		StreamMiningAlgorithm<Integer, Integer> m = new StreamMiningAlgorithm<Integer, Integer>() {
-			public Integer product = 1;
-			
-			@Override
-			public Integer ingest(Integer event) {
-				product *= event;
-				setLatestResponse(-product);
-				return product;
-			}
-		};
-		
-		o.subscribe(m);
-		assertEquals(4, m.getProcessedEvents());
-		assertEquals(3003, m.getLatestResponse());
-	}
-	
+//	private OneInputStreamOperatorTestHarness<String, Long> testHarness;
+	private StreamMiningAlgorithm statefulFlatMapFunction;
+
+//	@BeforeEach
+//	public void setupTestHarness() throws Exception {
+//
+//		// instantiate user-defined function
+//		statefulFlatMapFunction = new StatefulFlatMapFunction();
+//
+//		// wrap user defined function into a the corresponding operator
+//		testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
+//
+//		// optionally configured the execution environment
+//		testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+//
+//		// open the test harness (will also call open() on RichFunctions)
+//		testHarness.open();
+//	}
+
 	@Test
-	public void test_hooks() {
-		StreamMiningAlgorithm<Integer, Integer> m = new StreamMiningAlgorithm<Integer, Integer>() {
-			public Integer product = 1;
+	public void test_result() throws Exception {
+		StreamMiningAlgorithm m = new StreamMiningAlgorithm() {
+			private static final long serialVersionUID = 3268754545347297698L;
 			
 			@Override
-			public Integer ingest(Integer event) {
-				product *= event;
+			public Serializable ingest(BEvent event) {
+				int product = 1;
+				if (getLatestResponse() != null) {
+					product = (int) getLatestResponse();
+				}
+				product *= Integer.parseInt(event.getEventName());
 				setLatestResponse(-product);
 				return product;
 			}
 		};
 		
-		List<Integer> resultsBefore = new ArrayList<Integer>();
-		m.setOnBeforeEvent(() -> {
-			resultsBefore.add(m.getProcessedEvents());
-		});
+//		private OneInputStreamOperatorTestHarness<BEvent, Serializable> testHarness = new OneInputStreamOperatorTestHarness<BEvent, Serializable>(m);
+//		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+//		env.fromElements(
+//			BEvent.create("p", "3", "c1"),
+//			BEvent.create("p", "7", "c1"),
+//			BEvent.create("p", "11", "c1"),
+//			BEvent.create("p", "13", "c1")).keyBy(BEvent::getProcessName).map(m).print();
+//		env.execute();
 		
-		List<Integer> resultsAfter = new ArrayList<Integer>();
-		m.setOnAfterEvent(() -> {
-			resultsAfter.add(m.getProcessedEvents());
-		});
+//		Collector<BEvent> stream = mock
 		
-		o.subscribe(m);
+		System.out.println(m.getProcessedEvents());
 		
-		assertThat(resultsBefore, hasItems(0,1,2,3));
-		assertThat(resultsAfter, hasItems(1,2,3,4));
+		assertEquals(4l, m.getProcessedEvents());
+		assertEquals(3003, m.getLatestResponse());
 	}
+
+//	@Test
+//	public void test_hooks() throws Exception {
+//		StreamMiningAlgorithm<Integer, Integer> m = new StreamMiningAlgorithm<Integer, Integer>() {
+//			public Integer product = 1;
+//			
+//			@Override
+//			public Integer ingest(Integer event) {
+//				product *= event;
+//				setLatestResponse(-product);
+//				return product;
+//			}
+//		};
+//		
+//		List<Long> resultsBefore = new ArrayList<>();
+//		m.setOnBeforeEvent(() -> {
+//			resultsBefore.add(m.getProcessedEvents());
+//		});
+//		
+//		List<Long> resultsAfter = new ArrayList<>();
+//		m.setOnAfterEvent(() -> {
+//			resultsAfter.add(m.getProcessedEvents());
+//		});
+//		
+//		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+//		env.fromElements(3, 7, 11, 13).map(m);
+//		env.execute();
+//		
+//		assertThat(resultsBefore, hasItems(0l,1l,2l,3l));
+//		assertThat(resultsAfter, hasItems(1l,2l,3l,4l));
+//	}
 }
diff --git a/src/test/java/beamline/tests/FiltersTest.java b/src/test/java/beamline/tests/FiltersTest.java
index c8fa5c0..d4ff385 100644
--- a/src/test/java/beamline/tests/FiltersTest.java
+++ b/src/test/java/beamline/tests/FiltersTest.java
@@ -7,6 +7,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
 import org.junit.jupiter.api.Test;
 
@@ -20,12 +23,21 @@ public class FiltersTest {
 
 	@Test
 	public void test_exclude_activities_on_name_filter() {
-		List<String> results = new ArrayList<String>();
-		Utils.generateObservableSameCaseId()
-			.filter(new ExcludeActivitiesFilter("A"))
-			.subscribe((t) -> results.add(EventUtils.getActivityName(t)));
-		assertEquals(3, results.size());
-		assertThat(results, hasItems("K","B","C"));
+		
+		FilterFunction f = new ExcludeActivitiesFilter("A");
+		
+		List<String> out = new ArrayList<>();
+		ListCollector<String> listCollector = new ListCollector<>(out);
+		
+		f.fil
+		
+		
+//		List<String> results = new ArrayList<String>();
+//		Utils.generateObservableSameCaseId()
+//			.filter(new ExcludeActivitiesFilter("A"))
+//			.subscribe((t) -> results.add(EventUtils.getActivityName(t)));
+//		assertEquals(3, results.size());
+//		assertThat(results, hasItems("K","B","C"));
 	}
 	
 	@Test
diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java
index 123f1ea..497fd6c 100644
--- a/src/test/java/beamline/tests/Utils.java
+++ b/src/test/java/beamline/tests/Utils.java
@@ -1,5 +1,9 @@
 package beamline.tests;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.deckfour.xes.extension.std.XConceptExtension;
 import org.deckfour.xes.extension.std.XTimeExtension;
 import org.deckfour.xes.factory.XFactory;
@@ -8,10 +12,9 @@ import org.deckfour.xes.model.XLog;
 import org.deckfour.xes.model.XTrace;
 import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
 
+import beamline.events.BEvent;
 import beamline.exceptions.EventException;
 import beamline.mappers.DirectlyFollowsRelation;
-import beamline.utils.EventUtils;
-import io.reactivex.rxjava3.core.Observable;
 
 public class Utils {
 
@@ -25,24 +28,28 @@ public class Utils {
 	 * - A
 	 * - C / trace attribute: (a1,v4)
 	 */
-	public static Observable<XTrace> generateObservableSameCaseId() {
-		XTrace[] events = null;
-		try {
-			events = new XTrace[] {
-				EventUtils.create("K", "c"),
-				EventUtils.create("A", "c"),
-				EventUtils.create("B", "c"),
-				EventUtils.create("A", "c"),
-				EventUtils.create("C", "c")
-			};
-		} catch (EventException e) {
-			e.printStackTrace();
-		}
-		events[1].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v1"));
-		events[2].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v3"));
-		events[3].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v2"));
-		events[4].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v4"));
-		return Observable.fromArray(events);
+//	public static Observable<XTrace> generateObservableSameCaseId() {
+//		XTrace[] events = null;
+//		try {
+//			events = new XTrace[] {
+//				EventUtils.create("K", "c"),
+//				EventUtils.create("A", "c"),
+//				EventUtils.create("B", "c"),
+//				EventUtils.create("A", "c"),
+//				EventUtils.create("C", "c")
+//			};
+//		} catch (EventException e) {
+//			e.printStackTrace();
+//		}
+//		events[1].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v1"));
+//		events[2].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v3"));
+//		events[3].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v2"));
+//		events[4].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v4"));
+//		return Observable.fromArray(events);
+//	}
+	
+	public static DataStream<BEvent> generateObservableSameCaseId(ExecutionEnvironment env) {
+		DataSet<BEvent> ds = CollectionDataSets.getCustomTypeDataSet(env);
 	}
 	
 	/*
-- 
GitLab