From 1ef9c2da89bcfed82abe15bde445a7efa7d1a7d7 Mon Sep 17 00:00:00 2001
From: Andrea Burattin <andrea.burattin@gmail.com>
Date: Mon, 21 Mar 2022 09:53:25 +0100
Subject: [PATCH] Added tests for CSVLogSource

---
 .../java/beamline/sources/CSVLogSource.java   |  64 ++--
 .../java/beamline/sources/XesLogSource.java   |   8 +-
 src/test/java/beamline/tests/SourcesTest.java | 295 +++++++++---------
 src/test/java/beamline/tests/Utils.java       |   4 -
 4 files changed, 199 insertions(+), 172 deletions(-)

diff --git a/src/main/java/beamline/sources/CSVLogSource.java b/src/main/java/beamline/sources/CSVLogSource.java
index bcba48c..b97d3bb 100644
--- a/src/main/java/beamline/sources/CSVLogSource.java
+++ b/src/main/java/beamline/sources/CSVLogSource.java
@@ -2,6 +2,7 @@ package beamline.sources;
 
 import java.io.IOException;
 import java.io.Reader;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.LinkedList;
@@ -10,22 +11,24 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
 import com.opencsv.CSVReader;
 import com.opencsv.CSVReaderBuilder;
+import com.opencsv.ICSVParser;
 
 import beamline.events.BEvent;
 import beamline.exceptions.SourceException;
 
 /**
- * This implementation of a {@link BeamlineAbstractSource} produces events according to
- * the events contained in a CSV file.
+ * This implementation of a {@link BeamlineAbstractSource} produces events
+ * according to the events contained in a CSV file.
  * 
  * @author Andrea Burattin
  */
 public class CSVLogSource extends BeamlineAbstractSource {
 
 	private static final long serialVersionUID = 205574514393782145L;
-	private transient CSVParser parser;
+	private CSVLogSource.ParserConfiguration parserConfiguration;
 	private String filename;
 	private int caseIdColumn;
 	private int activityNameColumn;
@@ -33,27 +36,19 @@ public class CSVLogSource extends BeamlineAbstractSource {
 	/**
 	 * Constructs the source by providing a CSV parser.
 	 * 
-	 * <p>
-	 * A parser can be produced, for example with the following code:
-	 * <pre>
-	 * CSVParser parser = new CSVParserBuilder()
-	 *     .withSeparator(',')
-	 *     .withIgnoreQuotations(true)
-	 *     .build();
-	 * </pre>
-	 * 
 	 * @param filename the absolute path of the CSV file
 	 * @param caseIdColumn the id of the column containing the case id (counting
 	 * starts from 0)
 	 * @param activityNameColumn the id of the column containing the activity
 	 * name (counting starts from 0)
-	 * @param parser the parser to be used for parsing the CSV file
+	 * @param parserConfiguration the parser configuration to be used for
+	 * parsing the CSV file
 	 */
-	public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVParser parser) {
+	public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVLogSource.ParserConfiguration parserConfiguration) {
 		this.filename = filename;
 		this.caseIdColumn = caseIdColumn;
 		this.activityNameColumn = activityNameColumn;
-		this.parser = parser;
+		this.parserConfiguration = parserConfiguration;
 	}
 	
 	/**
@@ -66,20 +61,21 @@ public class CSVLogSource extends BeamlineAbstractSource {
 	 * name (counting starts from 0)
 	 */
 	public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn) {
-		this(filename, caseIdColumn, activityNameColumn, null);
+		this(filename, caseIdColumn, activityNameColumn, new CSVLogSource.ParserConfiguration());
 	}
-
+	
 	@Override
 	public void run(SourceContext<BEvent> ctx) throws Exception {
 		Reader reader = null;
 		CSVReader csvReader = null;
 		try {
+			CSVParser parser = new CSVParserBuilder()
+					.withSeparator(parserConfiguration.separator)
+					.build();
 			reader = Files.newBufferedReader(Paths.get(filename));
-			if (parser == null) {
-				csvReader = new CSVReader(reader);
-			} else  {
-				csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
-			}
+			csvReader = new CSVReaderBuilder(reader)
+					.withCSVParser(parser)
+					.build();
 			
 			String[] line;
 			while ((line = csvReader.readNext()) != null && isRunning()) {
@@ -87,7 +83,9 @@ public class CSVLogSource extends BeamlineAbstractSource {
 				for (int i = 0; i < line.length; i++) {
 					attributes.add(Pair.of("attribute_" + i, line[i]));
 				}
-				ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
+				}
 			}
 		} catch (IOException e) {
 			throw new SourceException(e.getMessage());
@@ -97,4 +95,24 @@ public class CSVLogSource extends BeamlineAbstractSource {
 			}
 		}
 	}
+	
+	/**
+	 * 
+	 * @author Andrea Burattin
+	 */
+	public static class ParserConfiguration implements Serializable {
+		
+		private static final long serialVersionUID = 375203248074405954L;
+		char separator = ICSVParser.DEFAULT_SEPARATOR;
+		
+		/**
+		 * 
+		 * @param separator
+		 * @return
+		 */
+		public ParserConfiguration withSeparator(char separator) {
+			this.separator = separator;
+			return this;
+		}
+	}
 }
diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java
index 7b90e80..b155f14 100644
--- a/src/main/java/beamline/sources/XesLogSource.java
+++ b/src/main/java/beamline/sources/XesLogSource.java
@@ -71,9 +71,13 @@ public class XesLogSource extends BeamlineAbstractSource {
 		while(i.hasNext() && isRunning()) {
 			BEvent event = i.next();
 			if (event.getEventTime() != null) {
-				ctx.collectWithTimestamp(event, event.getEventTime().getTime());
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collectWithTimestamp(event, event.getEventTime().getTime());
+				}
 			} else {
-				ctx.collect(i.next());
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(i.next());
+				}
 			}
 		}
 	}
diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java
index 3fff905..5f131d1 100644
--- a/src/test/java/beamline/tests/SourcesTest.java
+++ b/src/test/java/beamline/tests/SourcesTest.java
@@ -7,11 +7,20 @@ import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.util.Collector;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
@@ -20,27 +29,25 @@ import org.junit.jupiter.api.Test;
 
 import com.opencsv.CSVParserBuilder;
 
+import beamline.events.BEvent;
 import beamline.exceptions.SourceException;
 import beamline.sources.CSVLogSource;
 import beamline.sources.MQTTXesSource;
 import beamline.sources.XesLogSource;
-import beamline.utils.EventUtils;
 
 public class SourcesTest {
 
 	@Test
-	public void test_csv_source_1() {
+	public void test_csv_source_1() throws Exception {
 		List<String> acts = new LinkedList<>();
 		List<String> caseIds = new LinkedList<>();
-		CSVLogSource s = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1);
-		try {
-			s.prepare();
-		} catch (SourceException e) {
-			e.printStackTrace();
-		}
-		s.getObservable().subscribe((t) -> {
-			acts.add(EventUtils.getActivityName(t));
-			caseIds.add(EventUtils.getCaseId(t));
+		CSVLogSource source = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<BEvent> stream = env.addSource(source);
+		stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+			acts.add(e.getEventName());
+			caseIds.add(e.getTraceName());
 		});
 		
 		assertEquals(5, acts.size());
@@ -51,22 +58,20 @@ public class SourcesTest {
 	}
 	
 	@Test
-	public void test_csv_source_2() {
+	public void test_csv_source_2() throws Exception {
 		List<String> acts = new LinkedList<>();
 		List<String> caseIds = new LinkedList<>();
-		CSVLogSource s = new CSVLogSource(
+		CSVLogSource source = new CSVLogSource(
 				"src/test/resources/sources/source_2.csv",
 				0,
 				1,
-				new CSVParserBuilder().withSeparator('|').build());
-		try {
-			s.prepare();
-		} catch (SourceException e) {
-			e.printStackTrace();
-		}
-		s.getObservable().subscribe((t) -> {
-			acts.add(EventUtils.getActivityName(t));
-			caseIds.add(EventUtils.getCaseId(t));
+				new CSVLogSource.ParserConfiguration().withSeparator('|'));
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<BEvent> stream = env.addSource(source);
+		stream.executeAndCollect().forEachRemaining((BEvent e) -> {
+			acts.add(e.getEventName());
+			caseIds.add(e.getTraceName());
 		});
 		
 		assertEquals(5, acts.size());
@@ -78,128 +83,132 @@ public class SourcesTest {
 	
 	@Test
 	public void test_csv_source_3() {
-		CSVLogSource s = new CSVLogSource("DOESNT_EXIST", 0, 1);
-		assertThrowsExactly(SourceException.class, () -> s.prepare());
-	}
-	
-	@Test
-	public void test_xes_source_1() {
-		XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
-		assertThrowsExactly(SourceException.class, () -> s1.prepare());
-		
-		XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
-		assertThrowsExactly(SourceException.class, () -> s2.prepare());
-
-		XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
-		assertThrowsExactly(SourceException.class, () -> s3.prepare());
-	}
-	
-	@Test
-	public void test_xes_source_2() {
-		List<String> acts = new LinkedList<>();
-		List<String> caseIds = new LinkedList<>();
-		XesLogSource s = new XesLogSource(Utils.generteXLog());
-		try {
-			s.prepare();
-		} catch (SourceException e) {
-			e.printStackTrace();
-		}
-		s.getObservable().subscribe((t) -> {
-			acts.add(EventUtils.getActivityName(t));
-			caseIds.add(EventUtils.getCaseId(t));
-		});
-		
-		assertEquals(9, acts.size());
-		assertEquals(9, caseIds.size());
-		
-		assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
-		assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
-	}
-	
-	@Test
-	public void test_xes_source_3() {
-		List<String> acts = new LinkedList<>();
-		List<String> caseIds = new LinkedList<>();
-		XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
-		try {
-			s.prepare();
-		} catch (SourceException e) {
-			e.printStackTrace();
-		}
-		s.getObservable().subscribe((t) -> {
-			acts.add(EventUtils.getActivityName(t));
-			caseIds.add(EventUtils.getCaseId(t));
+		CSVLogSource source = new CSVLogSource("DOESNT_EXIST", 0, 1);
+		assertThrowsExactly(JobExecutionException.class, () -> {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.addSource(source).map(e -> e).print();
+			env.execute();
 		});
-		
-		assertEquals(5, acts.size());
-		assertEquals(5, caseIds.size());
-		
-		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
-		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
-	}
-
-	@Test
-	public void test_mqtt_1() {
-		try {
-			// create mqtt broker
-			BrokerService brokerService = createBroker();
-			brokerService.start();
-			brokerService.waitUntilStarted();
-			
-			List<String> acts = new LinkedList<>();
-			List<String> caseIds = new LinkedList<>();
-			
-			// create actual source
-			MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
-			source.prepare();
-			source.getObservable().subscribe((t) -> {
-				acts.add(EventUtils.getActivityName(t));
-				caseIds.add(EventUtils.getCaseId(t));
-			});
-
-			MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
-			client.connect();
-			
-			publish(client, "c1", "a11");
-			publish(client, "c2", "a21");
-			publish(client, "c2", "a22");
-			publish(client, "c1", "a12");
-			publish(client, "c2", "a23");
-			
-			Thread.sleep(100);
-			
-			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
-			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
-			
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
 	}
 	
-	@Test
-	public void test_mqtt_2() {
-		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
-		assertThrowsExactly(SourceException.class, () -> source.prepare());
-	}
-	
-	protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
-		client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
-	}
-	
-	protected BrokerService createBroker() throws Exception {
-		BrokerService brokerService = new BrokerService();
-		brokerService.setDeleteAllMessagesOnStartup(true);
-		brokerService.setPersistent(false);
-		brokerService.setAdvisorySupport(false);
-		brokerService.setUseJmx(true);
-		brokerService.getManagementContext().setCreateConnector(false);
-		brokerService.setPopulateJMSXUserID(true);
-
-		TransportConnector connector = new TransportConnector();
-		connector.setUri(new URI("mqtt://localhost:9999"));
-		connector.setName("mqtt");
-		brokerService.addConnector(connector);
-
-		return brokerService;
-	}
+//	@Test
+//	public void test_xes_source_1() {
+//		XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
+//		assertThrowsExactly(SourceException.class, () -> s1.prepare());
+//		
+//		XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
+//		assertThrowsExactly(SourceException.class, () -> s2.prepare());
+//
+//		XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
+//		assertThrowsExactly(SourceException.class, () -> s3.prepare());
+//	}
+//	
+//	@Test
+//	public void test_xes_source_2() {
+//		List<String> acts = new LinkedList<>();
+//		List<String> caseIds = new LinkedList<>();
+//		XesLogSource s = new XesLogSource(Utils.generteXLog());
+//		try {
+//			s.prepare();
+//		} catch (SourceException e) {
+//			e.printStackTrace();
+//		}
+//		s.getObservable().subscribe((t) -> {
+//			acts.add(EventUtils.getActivityName(t));
+//			caseIds.add(EventUtils.getCaseId(t));
+//		});
+//		
+//		assertEquals(9, acts.size());
+//		assertEquals(9, caseIds.size());
+//		
+//		assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
+//		assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
+//	}
+//	
+//	@Test
+//	public void test_xes_source_3() {
+//		List<String> acts = new LinkedList<>();
+//		List<String> caseIds = new LinkedList<>();
+//		XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
+//		try {
+//			s.prepare();
+//		} catch (SourceException e) {
+//			e.printStackTrace();
+//		}
+//		s.getObservable().subscribe((t) -> {
+//			acts.add(EventUtils.getActivityName(t));
+//			caseIds.add(EventUtils.getCaseId(t));
+//		});
+//		
+//		assertEquals(5, acts.size());
+//		assertEquals(5, caseIds.size());
+//		
+//		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+//		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+//	}
+//
+//	@Test
+//	public void test_mqtt_1() {
+//		try {
+//			// create mqtt broker
+//			BrokerService brokerService = createBroker();
+//			brokerService.start();
+//			brokerService.waitUntilStarted();
+//			
+//			List<String> acts = new LinkedList<>();
+//			List<String> caseIds = new LinkedList<>();
+//			
+//			// create actual source
+//			MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
+//			source.prepare();
+//			source.getObservable().subscribe((t) -> {
+//				acts.add(EventUtils.getActivityName(t));
+//				caseIds.add(EventUtils.getCaseId(t));
+//			});
+//
+//			MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
+//			client.connect();
+//			
+//			publish(client, "c1", "a11");
+//			publish(client, "c2", "a21");
+//			publish(client, "c2", "a22");
+//			publish(client, "c1", "a12");
+//			publish(client, "c2", "a23");
+//			
+//			Thread.sleep(100);
+//			
+//			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+//			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+//			
+//		} catch (Exception e) {
+//			e.printStackTrace();
+//		}
+//	}
+//	
+//	@Test
+//	public void test_mqtt_2() {
+//		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
+//		assertThrowsExactly(SourceException.class, () -> source.prepare());
+//	}
+//	
+//	protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
+//		client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
+//	}
+//	
+//	protected BrokerService createBroker() throws Exception {
+//		BrokerService brokerService = new BrokerService();
+//		brokerService.setDeleteAllMessagesOnStartup(true);
+//		brokerService.setPersistent(false);
+//		brokerService.setAdvisorySupport(false);
+//		brokerService.setUseJmx(true);
+//		brokerService.getManagementContext().setCreateConnector(false);
+//		brokerService.setPopulateJMSXUserID(true);
+//
+//		TransportConnector connector = new TransportConnector();
+//		connector.setUri(new URI("mqtt://localhost:9999"));
+//		connector.setName("mqtt");
+//		brokerService.addConnector(connector);
+//
+//		return brokerService;
+//	}
 }
diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java
index 497fd6c..035ea22 100644
--- a/src/test/java/beamline/tests/Utils.java
+++ b/src/test/java/beamline/tests/Utils.java
@@ -48,10 +48,6 @@ public class Utils {
 //		return Observable.fromArray(events);
 //	}
 	
-	public static DataStream<BEvent> generateObservableSameCaseId(ExecutionEnvironment env) {
-		DataSet<BEvent> ds = CollectionDataSets.getCustomTypeDataSet(env);
-	}
-	
 	/*
 	 * c1: <K,A,B,A,C>
 	 * c2: <O,A,I,C>
-- 
GitLab