diff --git a/src/main/java/beamline/events/BEvent.java b/src/main/java/beamline/events/BEvent.java
index bc27273cf4d9d28f91268e9811d74350147c73d1..c4de45fcb3858b2013bc9390a73e1a20a9aacedc 100644
--- a/src/main/java/beamline/events/BEvent.java
+++ b/src/main/java/beamline/events/BEvent.java
@@ -6,6 +6,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.deckfour.xes.extension.std.XConceptExtension;
 import org.deckfour.xes.extension.std.XTimeExtension;
@@ -199,6 +201,36 @@ public class BEvent implements Serializable, Comparable<BEvent> {
 		return getEventTime().compareTo(o.getEventTime());
 	}
 	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == null) {
+			return false;
+		}
+		if (obj == this) {
+			return true;
+		}
+		if (obj.getClass() != getClass()) {
+			return false;
+		}
+		BEvent other = (BEvent) obj;
+		return new EqualsBuilder()
+				.appendSuper(super.equals(obj))
+				.append(logAttributes, other.logAttributes)
+				.append(traceAttributes, other.traceAttributes)
+				.append(eventAttributes, other.eventAttributes)
+				.isEquals();
+
+	}
+	
+	@Override
+	public int hashCode() {
+		return new HashCodeBuilder(17, 37)
+				.append(logAttributes)
+				.append(traceAttributes)
+				.append(eventAttributes)
+				.toHashCode();
+	}
+	
 	//
 	// Private methods
 	//
diff --git a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java b/src/main/java/beamline/mappers/DirectlyFollowsRelation.java
deleted file mode 100644
index a15690cf5f1758e464c607f41bad1087ef535552..0000000000000000000000000000000000000000
--- a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package beamline.mappers;
-
-import org.deckfour.xes.model.XEvent;
-
-/**
- * This class represents a directly follows relation as produced by
- * {@link InfiniteSizeDirectlyFollowsMapper}.
- * 
- * @author Andrea Burattin
- */
-public class DirectlyFollowsRelation {
-
-	private String caseId;
-	private XEvent first;
-	private XEvent second;
-	
-	/**
-	 * Constructor
-	 * 
-	 * @param caseId the case id
-	 * @param first the first event
-	 * @param second the second event
-	 */
-	public DirectlyFollowsRelation(String caseId, XEvent first, XEvent second) {
-		this.caseId = caseId;
-		this.first = first;
-		this.second = second;
-	}
-	
-	/**
-	 * Returns the case id this directly follows relation belongs to
-	 * 
-	 * @return the case id
-	 */
-	public String getCaseId() {
-		return caseId;
-	}
-
-	/**
-	 * Returns the first event
-	 * 
-	 * @return the first event
-	 */
-	public XEvent getFirst() {
-		return first;
-	}
-
-	/**
-	 * Returns the second event
-	 * 
-	 * @return the second event
-	 */
-	public XEvent getSecond() {
-		return second;
-	}
-}
diff --git a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java
deleted file mode 100644
index d0bd47751a71e10893a9cb9a0819ad1eb3d90f15..0000000000000000000000000000000000000000
--- a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package beamline.mappers;
-/*
-import java.util.HashMap;
-import java.util.Map;
-
-import org.deckfour.xes.extension.std.XConceptExtension;
-import org.deckfour.xes.model.XEvent;
-import org.deckfour.xes.model.XTrace;
-
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.core.ObservableSource;
-import io.reactivex.rxjava3.functions.Function;
-
-/**
- * This mapper transforms a stream of {@link XTrace}s into a stream of
- * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent
- * events appearing in the same case as a directly follows operator (generating
- * an object with type {@link DirectlyFollowsRelation}).
- * 
- * <p>
- * This mapper is called infinite because it's memory footprint will grow as the
- * number of case ids grows as well.
- * 
- * @author Andrea Burattin
- *
-public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> {
-
-	private Map<String, XEvent> map = new HashMap<>();
-	
-	@Override
-	public @NonNull ObservableSource<DirectlyFollowsRelation> apply(@NonNull XTrace t) throws Throwable {
-		String caseId = XConceptExtension.instance().extractName(t);
-		DirectlyFollowsRelation toRet = null;
-		
-		if (map.containsKey(caseId)) {
-			toRet = new DirectlyFollowsRelation(caseId, map.get(caseId), t.get(0));
-		}
-		
-		map.put(caseId, t.get(0));
-		
-		if (toRet == null) {
-			return Observable.empty();
-		} else {
-			return Observable.just(toRet);
-		}
-	}
-
-}
-*/
-class InfiniteSizeDirectlyFollowsMapper{}
\ No newline at end of file
diff --git a/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..6eadf941de91fb083154c5d4ff466a3029863226
--- /dev/null
+++ b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java
@@ -0,0 +1,41 @@
+package beamline.models.algorithms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.deckfour.xes.model.XTrace;
+
+import beamline.events.BEvent;
+import beamline.models.responses.DirectlyFollowsRelation;
+
+/**
+ * This mapper transforms a stream of {@link XTrace}s into a stream of
+ * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent
+ * events appearing in the same case as a directly follows operator (generating
+ * an object with type {@link DirectlyFollowsRelation}).
+ * 
+ * <p>
+ * This mapper is called infinite because it's memory footprint will grow as the
+ * number of case ids grows as well.
+ * 
+ * @author Andrea Burattin
+ */
+public class InfiniteSizeDirectlyFollowsMapper extends StreamMiningAlgorithm<DirectlyFollowsRelation> {
+
+	private static final long serialVersionUID = 9114527510820073110L;
+	private Map<String, BEvent> map = new HashMap<>();
+
+	@Override
+	public DirectlyFollowsRelation ingest(BEvent event) throws Exception {
+		String caseId = event.getTraceName();
+		DirectlyFollowsRelation toRet = null;
+		
+		if (map.containsKey(caseId)) {
+			toRet = new DirectlyFollowsRelation(map.get(caseId), event);
+		}
+		
+		map.put(caseId, event);
+		
+		return toRet;
+	}
+}
diff --git a/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java
new file mode 100644
index 0000000000000000000000000000000000000000..0b2742e1e4986d0231546011a52a0639ef2ea490
--- /dev/null
+++ b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java
@@ -0,0 +1,89 @@
+package beamline.models.responses;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+
+import beamline.events.BEvent;
+import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper;
+
+/**
+ * This class represents a directly follows relation as produced by
+ * {@link InfiniteSizeDirectlyFollowsMapper}.
+ * 
+ * @author Andrea Burattin
+ */
+public class DirectlyFollowsRelation extends Response {
+
+	private static final long serialVersionUID = 1775695752885219490L;
+	private Pair<BEvent, BEvent> pair;
+	
+	/**
+	 * Constructor
+	 * 
+	 * @param caseId the case id
+	 * @param first the first event
+	 * @param second the second event
+	 */
+	public DirectlyFollowsRelation(BEvent from, BEvent to) {
+		if (!from.getTraceName().equals(to.getTraceName())) {
+			throw new IllegalArgumentException();
+		}
+		pair = Pair.of(from, to);
+	}
+	
+	/**
+	 * Returns the case id this directly follows relation belongs to
+	 * 
+	 * @return the case id
+	 */
+	public String getCaseId() {
+		return pair.getLeft().getTraceName();
+	}
+	
+	/**
+	 * Returns the source event
+	 * 
+	 * @return the source event
+	 */
+	public BEvent getFrom() {
+		return pair.getLeft();
+	}
+
+	/**
+	 * Returns the target event
+	 * 
+	 * @return the target event
+	 */
+	public BEvent getTo() {
+		return pair.getRight();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == null) {
+			return false;
+		}
+		if (obj == this) {
+			return true;
+		}
+		if (obj.getClass() != getClass()) {
+			return false;
+		}
+		DirectlyFollowsRelation other = (DirectlyFollowsRelation) obj;
+		return new EqualsBuilder()
+				.appendSuper(super.equals(obj))
+				.append(getFrom(), other.getFrom())
+				.append(getTo(), other.getTo())
+				.isEquals();
+
+	}
+	
+	@Override
+	public int hashCode() {
+		return new HashCodeBuilder(17, 37)
+				.append(getFrom())
+				.append(getTo())
+				.toHashCode();
+	}
+}
diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java
index d58c67cb3e3dd28a948d6f250b5dca65f6dbe411..a375f64a800dbb0f74314b65fa6c4245e4596e84 100644
--- a/src/main/java/beamline/sources/MQTTXesSource.java
+++ b/src/main/java/beamline/sources/MQTTXesSource.java
@@ -50,7 +50,7 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 		this.topicBase = topicBase;
 		this.processName = processName;
 	}
-
+	
 	@Override
 	public void run(SourceContext<BEvent> ctx) throws Exception {
 		Queue<BEvent> buffer = new LinkedList<>();
@@ -91,7 +91,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 		
 		while(isRunning()) {
 			while (isRunning() && buffer.isEmpty()) {
-				System.out.println("sleeping " + isRunning());
 				Thread.sleep(100l);
 			}
 			if (isRunning()) {
@@ -100,7 +99,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 				}
 			}
 		}
-		System.out.println("aaa");
 		
 		if (!isRunning() && myClient.isConnected()) {
 			try {
@@ -109,13 +107,5 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 				// nothing to do here
 			}
 		}
-		System.err.println("done");
-	}
-	
-	@Override
-	public void cancel() {
-		// TODO Auto-generated method stub
-		super.cancel();
-		System.out.println("closing");
 	}
 }
diff --git a/src/test/java/beamline/tests/MapperTest.java b/src/test/java/beamline/tests/MapperTest.java
index 59d758e0ba19e428f43f2f3c3adab34162daefb4..f4dc59165ea1693d26b013e6f18a6258ec3bc8b0 100644
--- a/src/test/java/beamline/tests/MapperTest.java
+++ b/src/test/java/beamline/tests/MapperTest.java
@@ -6,25 +6,43 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
 
-import beamline.mappers.DirectlyFollowsRelation;
-import beamline.mappers.InfiniteSizeDirectlyFollowsMapper;
+import beamline.events.BEvent;
+import beamline.exceptions.EventException;
+import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper;
+import beamline.models.responses.DirectlyFollowsRelation;
 
 public class MapperTest {
 
 	@Test
-	public void test_infinite_size_df() {
+	public void test_infinite_size_df() throws EventException, Exception {
 		List<DirectlyFollowsRelation> results = new ArrayList<>();
-		// <K,A,B,A,C>
-		Utils.generateObservableSameCaseId()
+		// <K,A,B,A,C>, <A,B,A>
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env
+			.fromElements(
+				BEvent.create("p", "K", "c"),
+				BEvent.create("p", "A", "c2"),
+				BEvent.create("p", "A", "c"),
+				BEvent.create("p", "B", "c"),
+				BEvent.create("p", "B", "c2"),
+				BEvent.create("p", "A", "c"),
+				BEvent.create("p", "A", "c2"),
+				BEvent.create("p", "C", "c"))
+			.keyBy(BEvent::getProcessName)
 			.flatMap(new InfiniteSizeDirectlyFollowsMapper())
-			.subscribe((df) -> results.add(df));
+			.executeAndCollect().forEachRemaining((DirectlyFollowsRelation e) -> {
+				results.add(e);
+			});
 		
-		assertEquals(4, results.size());
+		assertEquals(6, results.size());
 		assertTrue(Utils.verifyDirectFollows(results.get(0), "K", "A", "c"));
 		assertTrue(Utils.verifyDirectFollows(results.get(1), "A", "B", "c"));
-		assertTrue(Utils.verifyDirectFollows(results.get(2), "B", "A", "c"));
-		assertTrue(Utils.verifyDirectFollows(results.get(3), "A", "C", "c"));
+		assertTrue(Utils.verifyDirectFollows(results.get(2), "A", "B", "c2"));
+		assertTrue(Utils.verifyDirectFollows(results.get(3), "B", "A", "c"));
+		assertTrue(Utils.verifyDirectFollows(results.get(4), "B", "A", "c2"));
+		assertTrue(Utils.verifyDirectFollows(results.get(5), "A", "C", "c"));
 	}
 }
diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java
index 211761e557c1379fffc84e9b37d6261907f603dd..315bbfe494a5c884194173aadeeaf07f375e7ef0 100644
--- a/src/test/java/beamline/tests/SourcesTest.java
+++ b/src/test/java/beamline/tests/SourcesTest.java
@@ -5,7 +5,9 @@ import static org.hamcrest.Matchers.hasItems;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 
+import java.io.File;
 import java.net.URI;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -14,12 +16,21 @@ import java.util.List;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.lang3.CharSet;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -27,6 +38,7 @@ import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.io.Files;
 import com.opencsv.CSVParserBuilder;
 
 import beamline.events.BEvent;
@@ -183,62 +195,80 @@ public class SourcesTest {
 //			brokerService.start();
 //			brokerService.waitUntilStarted();
 //			
-//			List<String> acts = new LinkedList<>();
+//			final List<String> acts = new LinkedList<>();
 //			List<String> caseIds = new LinkedList<>();
 //			
 //			MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
 //			
-//			new Thread(new Runnable() {
-//				
-//				@Override
-//				public void run() {
-//					try {
-//						Thread.sleep(5000);
-//						System.out.println("going...");
-//						MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
-//						client.connect();
-//						
-//						publish(client, "c1", "a11");
-//						publish(client, "c2", "a21");
-//						publish(client, "c2", "a22");
-//						publish(client, "c1", "a12");
-//						publish(client, "c2", "a23");
-//						s.cancel();
-//						
-//					} catch (InterruptedException e) {
-//						e.printStackTrace();
-//					} catch (MqttException e) {
-//						e.printStackTrace();
-//					}
-//				}
-//			}).start();
+//			// create the sink
+//			File tmpFile = File.createTempFile("mqtt", "log");
+//			StreamingFileSink<BEvent> sink = StreamingFileSink.forRowFormat(Path.fromLocalFile(tmpFile), new SimpleStringEncoder<BEvent>()).build();
+////			val sink: StreamingFileSink[String] = StreamingFileSink
+////					  .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
+////					  .build()
+//
 //			
 //			// create actual source
 //			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 //			DataStream<BEvent> stream = env.addSource(s);
+//			stream.addSink(sink);
+//			JobClient job = env.executeAsync();
+//			
+////			Thread.sleep(1000);
+//			
+//			System.out.println(tmpFile);
+//			
+//			System.out.println("going");
+//			MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
+//			client.connect();
+//			publish(client, "c1", "a11");
+//			publish(client, "c2", "a21");
+//			publish(client, "c2", "a22");
+//			publish(client, "c1", "a12");
+//			publish(client, "c2", "a23");
+//			
+//			Thread.sleep(1000);
+////			job.cancel();
+//			System.out.println(job.getJobStatus().isDone());
+//			
+//			System.out.println(Files.readLines(tmpFile, Charset.defaultCharset()));
+////			System.out.println("final " + acts);
+////			
+////			
+////			
+////			System.out.println("post-final " + acts);
+////			
+//////			System.out.println("1");
+////			stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+////				System.out.println(e);
+////				acts.add(e.getEventName());
+////				caseIds.add(e.getTraceName());
+////			});
+////			JobClient job = env.executeAsync();
+////
+////			Thread.sleep(1000);
+////			job.cancel();
+////			
+////			Thread.sleep(1000);
 //			
-//			stream.executeAndCollect().forEachRemaining((BEvent e) -> {
-//				System.out.println(e);
-//				acts.add(e.getEventName());
-//				caseIds.add(e.getTraceName());
-//			});
+////			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+////			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+////			
+////			System.out.println("3");
 //			
-//			System.out.println("3");
 //			
-//			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
-//			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
 //			
 //		} catch (Exception e) {
 //			e.printStackTrace();
 //		}
 //	}
-	
-//	@Test
-//	public void test_mqtt_2() {
-//		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
-//		assertThrowsExactly(SourceException.class, () -> source.prepare());
-//	}
-	
+//	
+////	@Test
+////	public void test_mqtt_2() {
+////		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
+////		assertThrowsExactly(SourceException.class, () -> source.prepare());
+////	}
+//	
 //	protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
 //		client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
 //	}
diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java
index 035ea224bbb3d6a9c629c9b254b139e8b443d8e6..746b548c58d609571f61345893244df4b4767a8c 100644
--- a/src/test/java/beamline/tests/Utils.java
+++ b/src/test/java/beamline/tests/Utils.java
@@ -14,7 +14,7 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
 
 import beamline.events.BEvent;
 import beamline.exceptions.EventException;
-import beamline.mappers.DirectlyFollowsRelation;
+import beamline.models.responses.DirectlyFollowsRelation;
 
 public class Utils {
 
@@ -81,8 +81,8 @@ public class Utils {
 	}
 
 	public static boolean verifyDirectFollows(DirectlyFollowsRelation df, String a1, String a2, String caseId) {
-		String df_a1 = XConceptExtension.instance().extractName(df.getFirst());
-		String df_a2 = XConceptExtension.instance().extractName(df.getSecond());
+		String df_a1 = df.getFrom().getEventName();
+		String df_a2 = df.getTo().getEventName();
 		return df_a1.equals(a1) && df_a2.equals(a2) && df.getCaseId().equals(caseId);
 	}
 }