Skip to content
Snippets Groups Projects
Commit 1ef9c2da authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Added tests for CSVLogSource

parent 0b7c0f01
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ package beamline.sources;
import java.io.IOException;
import java.io.Reader;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
......@@ -10,22 +11,24 @@ import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.ICSVParser;
import beamline.events.BEvent;
import beamline.exceptions.SourceException;
/**
* This implementation of a {@link BeamlineAbstractSource} produces events according to
* the events contained in a CSV file.
* This implementation of a {@link BeamlineAbstractSource} produces events
* according to the events contained in a CSV file.
*
* @author Andrea Burattin
*/
public class CSVLogSource extends BeamlineAbstractSource {
private static final long serialVersionUID = 205574514393782145L;
private transient CSVParser parser;
private CSVLogSource.ParserConfiguration parserConfiguration;
private String filename;
private int caseIdColumn;
private int activityNameColumn;
......@@ -33,27 +36,19 @@ public class CSVLogSource extends BeamlineAbstractSource {
/**
* Constructs the source by providing a CSV parser.
*
* <p>
* A parser can be produced, for example with the following code:
* <pre>
* CSVParser parser = new CSVParserBuilder()
* .withSeparator(',')
* .withIgnoreQuotations(true)
* .build();
* </pre>
*
* @param filename the absolute path of the CSV file
* @param caseIdColumn the id of the column containing the case id (counting
* starts from 0)
* @param activityNameColumn the id of the column containing the activity
* name (counting starts from 0)
* @param parser the parser to be used for parsing the CSV file
* @param parserConfiguration the parser configuration to be used for
* parsing the CSV file
*/
public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVParser parser) {
public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVLogSource.ParserConfiguration parserConfiguration) {
this.filename = filename;
this.caseIdColumn = caseIdColumn;
this.activityNameColumn = activityNameColumn;
this.parser = parser;
this.parserConfiguration = parserConfiguration;
}
/**
......@@ -66,20 +61,21 @@ public class CSVLogSource extends BeamlineAbstractSource {
* name (counting starts from 0)
*/
public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn) {
this(filename, caseIdColumn, activityNameColumn, null);
this(filename, caseIdColumn, activityNameColumn, new CSVLogSource.ParserConfiguration());
}
@Override
public void run(SourceContext<BEvent> ctx) throws Exception {
Reader reader = null;
CSVReader csvReader = null;
try {
CSVParser parser = new CSVParserBuilder()
.withSeparator(parserConfiguration.separator)
.build();
reader = Files.newBufferedReader(Paths.get(filename));
if (parser == null) {
csvReader = new CSVReader(reader);
} else {
csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
}
csvReader = new CSVReaderBuilder(reader)
.withCSVParser(parser)
.build();
String[] line;
while ((line = csvReader.readNext()) != null && isRunning()) {
......@@ -87,7 +83,9 @@ public class CSVLogSource extends BeamlineAbstractSource {
for (int i = 0; i < line.length; i++) {
attributes.add(Pair.of("attribute_" + i, line[i]));
}
ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
synchronized (ctx.getCheckpointLock()) {
ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
}
}
} catch (IOException e) {
throw new SourceException(e.getMessage());
......@@ -97,4 +95,24 @@ public class CSVLogSource extends BeamlineAbstractSource {
}
}
}
/**
*
* @author Andrea Burattin
*/
public static class ParserConfiguration implements Serializable {
private static final long serialVersionUID = 375203248074405954L;
char separator = ICSVParser.DEFAULT_SEPARATOR;
/**
*
* @param separator
* @return
*/
public ParserConfiguration withSeparator(char separator) {
this.separator = separator;
return this;
}
}
}
......@@ -71,9 +71,13 @@ public class XesLogSource extends BeamlineAbstractSource {
while(i.hasNext() && isRunning()) {
BEvent event = i.next();
if (event.getEventTime() != null) {
ctx.collectWithTimestamp(event, event.getEventTime().getTime());
synchronized (ctx.getCheckpointLock()) {
ctx.collectWithTimestamp(event, event.getEventTime().getTime());
}
} else {
ctx.collect(i.next());
synchronized (ctx.getCheckpointLock()) {
ctx.collect(i.next());
}
}
}
}
......
......@@ -7,11 +7,20 @@ import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.experimental.CollectSink;
import org.apache.flink.util.Collector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
......@@ -20,27 +29,25 @@ import org.junit.jupiter.api.Test;
import com.opencsv.CSVParserBuilder;
import beamline.events.BEvent;
import beamline.exceptions.SourceException;
import beamline.sources.CSVLogSource;
import beamline.sources.MQTTXesSource;
import beamline.sources.XesLogSource;
import beamline.utils.EventUtils;
public class SourcesTest {
@Test
public void test_csv_source_1() {
public void test_csv_source_1() throws Exception {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
CSVLogSource s = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1);
try {
s.prepare();
} catch (SourceException e) {
e.printStackTrace();
}
s.getObservable().subscribe((t) -> {
acts.add(EventUtils.getActivityName(t));
caseIds.add(EventUtils.getCaseId(t));
CSVLogSource source = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<BEvent> stream = env.addSource(source);
stream.executeAndCollect().forEachRemaining((BEvent e) -> {
acts.add(e.getEventName());
caseIds.add(e.getTraceName());
});
assertEquals(5, acts.size());
......@@ -51,22 +58,20 @@ public class SourcesTest {
}
@Test
public void test_csv_source_2() {
public void test_csv_source_2() throws Exception {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
CSVLogSource s = new CSVLogSource(
CSVLogSource source = new CSVLogSource(
"src/test/resources/sources/source_2.csv",
0,
1,
new CSVParserBuilder().withSeparator('|').build());
try {
s.prepare();
} catch (SourceException e) {
e.printStackTrace();
}
s.getObservable().subscribe((t) -> {
acts.add(EventUtils.getActivityName(t));
caseIds.add(EventUtils.getCaseId(t));
new CSVLogSource.ParserConfiguration().withSeparator('|'));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<BEvent> stream = env.addSource(source);
stream.executeAndCollect().forEachRemaining((BEvent e) -> {
acts.add(e.getEventName());
caseIds.add(e.getTraceName());
});
assertEquals(5, acts.size());
......@@ -78,128 +83,132 @@ public class SourcesTest {
@Test
public void test_csv_source_3() {
CSVLogSource s = new CSVLogSource("DOESNT_EXIST", 0, 1);
assertThrowsExactly(SourceException.class, () -> s.prepare());
}
@Test
public void test_xes_source_1() {
XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
assertThrowsExactly(SourceException.class, () -> s1.prepare());
XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
assertThrowsExactly(SourceException.class, () -> s2.prepare());
XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
assertThrowsExactly(SourceException.class, () -> s3.prepare());
}
@Test
public void test_xes_source_2() {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
XesLogSource s = new XesLogSource(Utils.generteXLog());
try {
s.prepare();
} catch (SourceException e) {
e.printStackTrace();
}
s.getObservable().subscribe((t) -> {
acts.add(EventUtils.getActivityName(t));
caseIds.add(EventUtils.getCaseId(t));
});
assertEquals(9, acts.size());
assertEquals(9, caseIds.size());
assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
}
@Test
public void test_xes_source_3() {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
try {
s.prepare();
} catch (SourceException e) {
e.printStackTrace();
}
s.getObservable().subscribe((t) -> {
acts.add(EventUtils.getActivityName(t));
caseIds.add(EventUtils.getCaseId(t));
CSVLogSource source = new CSVLogSource("DOESNT_EXIST", 0, 1);
assertThrowsExactly(JobExecutionException.class, () -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(source).map(e -> e).print();
env.execute();
});
assertEquals(5, acts.size());
assertEquals(5, caseIds.size());
assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
}
@Test
public void test_mqtt_1() {
try {
// create mqtt broker
BrokerService brokerService = createBroker();
brokerService.start();
brokerService.waitUntilStarted();
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
// create actual source
MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
source.prepare();
source.getObservable().subscribe((t) -> {
acts.add(EventUtils.getActivityName(t));
caseIds.add(EventUtils.getCaseId(t));
});
MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
client.connect();
publish(client, "c1", "a11");
publish(client, "c2", "a21");
publish(client, "c2", "a22");
publish(client, "c1", "a12");
publish(client, "c2", "a23");
Thread.sleep(100);
assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void test_mqtt_2() {
MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
assertThrowsExactly(SourceException.class, () -> source.prepare());
}
protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
}
protected BrokerService createBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setPopulateJMSXUserID(true);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("mqtt://localhost:9999"));
connector.setName("mqtt");
brokerService.addConnector(connector);
return brokerService;
}
// @Test
// public void test_xes_source_1() {
// XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
// assertThrowsExactly(SourceException.class, () -> s1.prepare());
//
// XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
// assertThrowsExactly(SourceException.class, () -> s2.prepare());
//
// XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
// assertThrowsExactly(SourceException.class, () -> s3.prepare());
// }
//
// @Test
// public void test_xes_source_2() {
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
// XesLogSource s = new XesLogSource(Utils.generteXLog());
// try {
// s.prepare();
// } catch (SourceException e) {
// e.printStackTrace();
// }
// s.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
//
// assertEquals(9, acts.size());
// assertEquals(9, caseIds.size());
//
// assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
// assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
// }
//
// @Test
// public void test_xes_source_3() {
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
// XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
// try {
// s.prepare();
// } catch (SourceException e) {
// e.printStackTrace();
// }
// s.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
//
// assertEquals(5, acts.size());
// assertEquals(5, caseIds.size());
//
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
// }
//
// @Test
// public void test_mqtt_1() {
// try {
// // create mqtt broker
// BrokerService brokerService = createBroker();
// brokerService.start();
// brokerService.waitUntilStarted();
//
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
//
// // create actual source
// MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
// source.prepare();
// source.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
//
// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
// client.connect();
//
// publish(client, "c1", "a11");
// publish(client, "c2", "a21");
// publish(client, "c2", "a22");
// publish(client, "c1", "a12");
// publish(client, "c2", "a23");
//
// Thread.sleep(100);
//
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
//
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void test_mqtt_2() {
// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
// assertThrowsExactly(SourceException.class, () -> source.prepare());
// }
//
// protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
// client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
// }
//
// protected BrokerService createBroker() throws Exception {
// BrokerService brokerService = new BrokerService();
// brokerService.setDeleteAllMessagesOnStartup(true);
// brokerService.setPersistent(false);
// brokerService.setAdvisorySupport(false);
// brokerService.setUseJmx(true);
// brokerService.getManagementContext().setCreateConnector(false);
// brokerService.setPopulateJMSXUserID(true);
//
// TransportConnector connector = new TransportConnector();
// connector.setUri(new URI("mqtt://localhost:9999"));
// connector.setName("mqtt");
// brokerService.addConnector(connector);
//
// return brokerService;
// }
}
......@@ -48,10 +48,6 @@ public class Utils {
// return Observable.fromArray(events);
// }
public static DataStream<BEvent> generateObservableSameCaseId(ExecutionEnvironment env) {
DataSet<BEvent> ds = CollectionDataSets.getCustomTypeDataSet(env);
}
/*
* c1: <K,A,B,A,C>
* c2: <O,A,I,C>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment