diff --git a/src/main/java/beamline/sources/CSVLogSource.java b/src/main/java/beamline/sources/CSVLogSource.java
index a9ff1fc589ffc4fc5f85757154c33479053b15d0..85a09f993922d96082c6f8faf714e3ab4f1e04e8 100644
--- a/src/main/java/beamline/sources/CSVLogSource.java
+++ b/src/main/java/beamline/sources/CSVLogSource.java
@@ -91,6 +91,7 @@ public class CSVLogSource implements XesSource {
 					eventWrapper.add(newEvent);
 					emitter.onNext(eventWrapper);
 				}
+				emitter.onComplete();
 			}
 		});
 	}
diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java
index 5cce7095a37e0ec0ada71fd8578882707f656c16..d906900c158fd3cd2472b1ad2cb2df3121873201 100644
--- a/src/main/java/beamline/sources/XesLogSource.java
+++ b/src/main/java/beamline/sources/XesLogSource.java
@@ -67,6 +67,7 @@ public class XesLogSource implements XesSource {
 				for (XTrace wrapper : events) {
 					emitter.onNext(wrapper);
 				}
+				emitter.onComplete();
 			}
 		});
 	}