diff --git a/src/main/java/beamline/sources/BeamlineAbstractSource.java b/src/main/java/beamline/sources/BeamlineAbstractSource.java
index 24eea49533ca3e9d9a46da256d65ae787754dece..ed39cb4172298b3607870ea3823c51fe639e8f1d 100644
--- a/src/main/java/beamline/sources/BeamlineAbstractSource.java
+++ b/src/main/java/beamline/sources/BeamlineAbstractSource.java
@@ -26,6 +26,6 @@ public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent>
 	
 	@Override
 	public void cancel() {
-		running = false;
+		this.running = false;
 	}
 }
\ No newline at end of file
diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java
index d19e907dcb345ec9eb1d444a32084745b917c657..d58c67cb3e3dd28a948d6f250b5dca65f6dbe411 100644
--- a/src/main/java/beamline/sources/MQTTXesSource.java
+++ b/src/main/java/beamline/sources/MQTTXesSource.java
@@ -37,7 +37,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 	private String processName;
 	private String brokerHost;
 	private String topicBase;
-	private transient IMqttClient myClient;
 	
 	/**
 	 * Constructs the source
@@ -58,7 +57,8 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 		MqttConnectOptions options = new MqttConnectOptions();
 		options.setCleanSession(true);
 		options.setKeepAliveInterval(30);
-
+		
+		IMqttClient myClient = null;
 		try {
 			myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
 			myClient.setCallback(new MqttCallback() {
@@ -90,23 +90,32 @@ public class MQTTXesSource extends BeamlineAbstractSource {
 		}
 		
 		while(isRunning()) {
-			while (buffer.isEmpty()) {
+			while (isRunning() && buffer.isEmpty()) {
+				System.out.println("sleeping " + isRunning());
 				Thread.sleep(100l);
 			}
-			ctx.collect(buffer.poll());
+			if (isRunning()) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(buffer.poll());
+				}
+			}
 		}
-	}
-	
-	@Override
-	public void cancel() {
-		super.cancel();
-		if (myClient != null && myClient.isConnected()) {
+		System.out.println("aaa");
+		
+		if (!isRunning() && myClient.isConnected()) {
 			try {
 				myClient.disconnect();
 			} catch (MqttException e) {
 				// nothing to do here
 			}
 		}
+		System.err.println("done");
+	}
+	
+	@Override
+	public void cancel() {
+		// TODO Auto-generated method stub
+		super.cancel();
+		System.out.println("closing");
 	}
-
 }
diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java
index b155f14b00fbc69f9f02ca824034638e55300b0f..7af0475830ee40c86fec041f10f22e5ff633c823 100644
--- a/src/main/java/beamline/sources/XesLogSource.java
+++ b/src/main/java/beamline/sources/XesLogSource.java
@@ -1,6 +1,8 @@
 package beamline.sources;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -18,6 +20,7 @@ import org.deckfour.xes.model.XAttribute;
 import org.deckfour.xes.model.XEvent;
 import org.deckfour.xes.model.XLog;
 import org.deckfour.xes.model.XTrace;
+import org.deckfour.xes.out.XesXmlGZIPSerializer;
 
 import beamline.events.BEvent;
 import beamline.exceptions.EventException;
@@ -35,7 +38,6 @@ public class XesLogSource extends BeamlineAbstractSource {
 	private static final long serialVersionUID = 1095855454671335981L;
 
 	private String fileName;
-	private transient XLog log;
 	private List<BEvent> events;
 	
 	/**
@@ -54,18 +56,18 @@ public class XesLogSource extends BeamlineAbstractSource {
 	 * Constructs a source from the given log
 	 * 
 	 * @param log the log to use as source
+	 * @throws IOException 
 	 */
-	public XesLogSource(XLog log) {
-		this.log = log;
+	public XesLogSource(XLog log) throws IOException {
+		File tmpFile = File.createTempFile("file", ".xes.gz");
+		new XesXmlGZIPSerializer().serialize(log, new FileOutputStream(tmpFile));
+		this.fileName = tmpFile.getAbsolutePath();
 	}
 	
 	@Override
 	public void run(SourceContext<BEvent> ctx) throws Exception {
-		if (log == null) {
-			parseLog(fileName);
-		}
 		if (events == null) {
-			prepareStream();
+			prepareStream(parseLog(fileName));
 		}
 		Iterator<BEvent> i = events.iterator();
 		while(i.hasNext() && isRunning()) {
@@ -82,7 +84,7 @@ public class XesLogSource extends BeamlineAbstractSource {
 		}
 	}
 	
-	private void parseLog(String fileName) throws SourceException {
+	private XLog parseLog(String fileName) throws SourceException {
 		XParser[] parsers = new XParser[] {
 				new XesXmlGZIPParser(),
 				new XesXmlParser(),
@@ -92,17 +94,16 @@ public class XesLogSource extends BeamlineAbstractSource {
 		for (XParser p : parsers) {
 			if (p.canParse(file)) {
 				try {
-					log = p.parse(file).get(0);
+					return p.parse(file).get(0);
 				} catch (Exception e) {
 					throw new SourceException(e.getMessage());
 				}
-				return;
 			}
 		}
 		throw new SourceException("XES file format not supported");
 	}
 	
-	private void prepareStream() throws SourceException, EventException {
+	private void prepareStream(XLog log) throws SourceException, EventException {
 		if (log.isEmpty()) {
 			throw new SourceException("The given log is empty");
 		}
diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java
index 5f131d19dabc36656410a81ca8413d44177c16bf..211761e557c1379fffc84e9b37d6261907f603dd 100644
--- a/src/test/java/beamline/tests/SourcesTest.java
+++ b/src/test/java/beamline/tests/SourcesTest.java
@@ -91,62 +91,90 @@ public class SourcesTest {
 		});
 	}
 	
-//	@Test
-//	public void test_xes_source_1() {
-//		XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
-//		assertThrowsExactly(SourceException.class, () -> s1.prepare());
-//		
-//		XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
-//		assertThrowsExactly(SourceException.class, () -> s2.prepare());
-//
-//		XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
-//		assertThrowsExactly(SourceException.class, () -> s3.prepare());
-//	}
-//	
-//	@Test
-//	public void test_xes_source_2() {
-//		List<String> acts = new LinkedList<>();
-//		List<String> caseIds = new LinkedList<>();
-//		XesLogSource s = new XesLogSource(Utils.generteXLog());
-//		try {
-//			s.prepare();
-//		} catch (SourceException e) {
-//			e.printStackTrace();
-//		}
-//		s.getObservable().subscribe((t) -> {
-//			acts.add(EventUtils.getActivityName(t));
-//			caseIds.add(EventUtils.getCaseId(t));
-//		});
-//		
-//		assertEquals(9, acts.size());
-//		assertEquals(9, caseIds.size());
-//		
-//		assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
-//		assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
-//	}
-//	
-//	@Test
-//	public void test_xes_source_3() {
-//		List<String> acts = new LinkedList<>();
-//		List<String> caseIds = new LinkedList<>();
-//		XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
-//		try {
-//			s.prepare();
-//		} catch (SourceException e) {
-//			e.printStackTrace();
-//		}
-//		s.getObservable().subscribe((t) -> {
-//			acts.add(EventUtils.getActivityName(t));
-//			caseIds.add(EventUtils.getCaseId(t));
-//		});
-//		
-//		assertEquals(5, acts.size());
-//		assertEquals(5, caseIds.size());
-//		
-//		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
-//		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
-//	}
-//
+	@Test
+	public void test_xes_source_1() {
+		XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
+		assertThrowsExactly(JobExecutionException.class, () -> {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.addSource(s1).map(e -> e).print();
+			env.execute();
+		});
+		
+		XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
+		assertThrowsExactly(JobExecutionException.class, () -> {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.addSource(s2).map(e -> e).print();
+			env.execute();
+		});
+
+		XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
+		assertThrowsExactly(JobExecutionException.class, () -> {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.addSource(s3).map(e -> e).print();
+			env.execute();
+		});
+	}
+	
+	@Test
+	public void test_xes_source_2() throws Exception {
+		List<String> acts = new LinkedList<>();
+		List<String> caseIds = new LinkedList<>();
+		XesLogSource source = new XesLogSource(Utils.generteXLog());
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<BEvent> stream = env.addSource(source);
+		stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+			acts.add(e.getEventName());
+			caseIds.add(e.getTraceName());
+		});
+		
+		assertEquals(9, acts.size());
+		assertEquals(9, caseIds.size());
+		
+		assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
+		assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
+	}
+	
+	@Test
+	public void test_xes_source_3() throws Exception {
+		List<String> acts = new LinkedList<>();
+		List<String> caseIds = new LinkedList<>();
+		XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<BEvent> stream = env.addSource(s);
+		stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+			acts.add(e.getEventName());
+			caseIds.add(e.getTraceName());
+		});
+		
+		assertEquals(5, acts.size());
+		assertEquals(5, caseIds.size());
+		
+		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+	}
+	
+	@Test
+	public void test_xes_source_4() throws Exception {
+		List<String> acts = new LinkedList<>();
+		List<String> caseIds = new LinkedList<>();
+		XesLogSource s = new XesLogSource("src/test/resources/sources/source_2.xes");
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<BEvent> stream = env.addSource(s);
+		stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+			acts.add(e.getEventName());
+			caseIds.add(e.getTraceName());
+		});
+		
+		assertEquals(5, acts.size());
+		assertEquals(5, caseIds.size());
+		
+		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+	}
+
 //	@Test
 //	public void test_mqtt_1() {
 //		try {
@@ -158,24 +186,44 @@ public class SourcesTest {
 //			List<String> acts = new LinkedList<>();
 //			List<String> caseIds = new LinkedList<>();
 //			
+//			MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
+//			
+//			new Thread(new Runnable() {
+//				
+//				@Override
+//				public void run() {
+//					try {
+//						Thread.sleep(5000);
+//						System.out.println("going...");
+//						MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
+//						client.connect();
+//						
+//						publish(client, "c1", "a11");
+//						publish(client, "c2", "a21");
+//						publish(client, "c2", "a22");
+//						publish(client, "c1", "a12");
+//						publish(client, "c2", "a23");
+//						s.cancel();
+//						
+//					} catch (InterruptedException e) {
+//						e.printStackTrace();
+//					} catch (MqttException e) {
+//						e.printStackTrace();
+//					}
+//				}
+//			}).start();
+//			
 //			// create actual source
-//			MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
-//			source.prepare();
-//			source.getObservable().subscribe((t) -> {
-//				acts.add(EventUtils.getActivityName(t));
-//				caseIds.add(EventUtils.getCaseId(t));
-//			});
-//
-//			MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
-//			client.connect();
+//			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+//			DataStream<BEvent> stream = env.addSource(s);
 //			
-//			publish(client, "c1", "a11");
-//			publish(client, "c2", "a21");
-//			publish(client, "c2", "a22");
-//			publish(client, "c1", "a12");
-//			publish(client, "c2", "a23");
+//			stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+//				System.out.println(e);
+//				acts.add(e.getEventName());
+//				caseIds.add(e.getTraceName());
+//			});
 //			
-//			Thread.sleep(100);
+//			System.out.println("3");
 //			
 //			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
 //			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
@@ -184,13 +232,13 @@ public class SourcesTest {
 //			e.printStackTrace();
 //		}
 //	}
-//	
+	
 //	@Test
 //	public void test_mqtt_2() {
 //		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
 //		assertThrowsExactly(SourceException.class, () -> source.prepare());
 //	}
-//	
+	
 //	protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
 //		client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
 //	}
diff --git a/src/test/resources/sources/source_2.xes b/src/test/resources/sources/source_2.xes
new file mode 100644
index 0000000000000000000000000000000000000000..7e04dcb19ebbe85098f4736ea503008b37004f34
--- /dev/null
+++ b/src/test/resources/sources/source_2.xes
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<log xes.version="1849.2016" xmlns="http://www.xes-standard.org" xes.creator="Fluxicon Disco">
+	<extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/>
+	<extension name="Lifecycle" prefix="lifecycle" uri="http://www.xes-standard.org/lifecycle.xesext"/>
+	<global scope="trace">
+		<string key="concept:name" value="name"/>
+		<string key="variant" value="string"/>
+		<int key="variant-index" value="0"/>
+	</global>
+	<global scope="event">
+		<string key="concept:name" value="name"/>
+		<string key="lifecycle:transition" value="transition"/>
+		<string key="act" value="string"/>
+	</global>
+	<classifier name="Activity" keys="act"/>
+	<string key="lifecycle:model" value="standard"/>
+	<string key="creator" value="Fluxicon Disco"/>
+	<string key="library" value="Fluxicon Octane"/>
+	<trace>
+		<string key="concept:name" value="c1"/>
+		<string key="variant" value="Variant 1"/>
+		<int key="variant-index" value="1"/>
+		<string key="creator" value="Fluxicon Disco"/>
+		<event>
+			<string key="concept:name" value="a11"/>
+			<string key="lifecycle:transition" value="complete"/>
+			<string key="act" value="a11"/>
+		</event>
+		<event>
+			<string key="concept:name" value="a12"/>
+			<string key="lifecycle:transition" value="complete"/>
+			<string key="act" value="a12"/>
+		</event>
+	</trace>
+	<trace>
+		<string key="concept:name" value="c2"/>
+		<string key="variant" value="Variant 2"/>
+		<int key="variant-index" value="2"/>
+		<string key="creator" value="Fluxicon Disco"/>
+		<event>
+			<string key="concept:name" value="a21"/>
+			<string key="lifecycle:transition" value="complete"/>
+			<string key="act" value="a21"/>
+		</event>
+		<event>
+			<string key="concept:name" value="a22"/>
+			<string key="lifecycle:transition" value="complete"/>
+			<string key="act" value="a22"/>
+		</event>
+		<event>
+			<string key="concept:name" value="a23"/>
+			<string key="lifecycle:transition" value="complete"/>
+			<string key="act" value="a23"/>
+		</event>
+	</trace>
+</log>
\ No newline at end of file