Skip to content
Snippets Groups Projects
Commit f1cbd234 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Added tests on source

parent 8b896312
Branches
Tags
No related merge requests found
/**
* This package contains some mappers that are available by default in the
* framework.
*/
package beamline.mappers;
\ No newline at end of file
package beamline.models.algorithms;
/**
* This interface defines the structure of the callback function that a
* {@link StreamMiningAlgorithm} can execute (cf.,
* {@link StreamMiningAlgorithm#setOnBeforeEvent(HookEventProcessing)} and
* {@link StreamMiningAlgorithm#setOnAfterEvent(HookEventProcessing)}).
*
* @author Andrea Burattin
*/
public interface HookEventProcessing {
/**
* The actual function to trigger
*/
public void trigger();
}
......@@ -95,7 +95,8 @@ public class MQTTXesSource extends BeamlineAbstractSource {
}
if (isRunning()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(buffer.poll());
BEvent e = buffer.poll();
ctx.collect(e);
}
}
}
......
......@@ -2,47 +2,30 @@ package beamline.tests;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.io.File;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.nio.file.StandardOpenOption;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.lang3.CharSet;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.experimental.CollectSink;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;
import com.google.common.io.Files;
import com.opencsv.CSVParserBuilder;
import beamline.events.BEvent;
import beamline.exceptions.SourceException;
import beamline.sources.CSVLogSource;
import beamline.sources.MQTTXesSource;
import beamline.sources.XesLogSource;
......@@ -187,106 +170,78 @@ public class SourcesTest {
assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
}
// @Test
// public void test_mqtt_1() {
// try {
// // create mqtt broker
// BrokerService brokerService = createBroker();
// brokerService.start();
// brokerService.waitUntilStarted();
//
// final List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
//
// MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
//
// // create the sink
// File tmpFile = File.createTempFile("mqtt", "log");
// StreamingFileSink<BEvent> sink = StreamingFileSink.forRowFormat(Path.fromLocalFile(tmpFile), new SimpleStringEncoder<BEvent>()).build();
//// val sink: StreamingFileSink[String] = StreamingFileSink
//// .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
//// .build()
//
//
// // create actual source
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<BEvent> stream = env.addSource(s);
// stream.addSink(sink);
@Test
public void test_mqtt_1() {
try {
// create mqtt broker
BrokerService brokerService = createBroker();
brokerService.start();
brokerService.waitUntilStarted();
MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
// create the sink file
final File tmpFile = File.createTempFile("mqtt", "log");
// create actual source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(s)
.keyBy(BEvent::getProcessName)
.addSink(new RichSinkFunction<BEvent>() {
private static final long serialVersionUID = -8658786866403985570L;
@Override
public void invoke(BEvent value, Context context) throws Exception {
String toWrite = value.getProcessName() + "-" + value.getTraceName() + "-" + value.getEventName() + "/";
java.nio.file.Files.write(tmpFile.toPath(), toWrite.getBytes(), StandardOpenOption.APPEND);
}
});
// JobClient job = env.executeAsync();
//
//// Thread.sleep(1000);
//
// System.out.println(tmpFile);
//
// System.out.println("going");
// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
// client.connect();
// publish(client, "c1", "a11");
// publish(client, "c2", "a21");
// publish(client, "c2", "a22");
// publish(client, "c1", "a12");
// publish(client, "c2", "a23");
//
// Thread.sleep(1000);
//// job.cancel();
// System.out.println(job.getJobStatus().isDone());
//
// System.out.println(Files.readLines(tmpFile, Charset.defaultCharset()));
//// System.out.println("final " + acts);
////
////
////
//// System.out.println("post-final " + acts);
////
////// System.out.println("1");
//// stream.executeAndCollect().forEachRemaining((BEvent e) -> {
//// System.out.println(e);
//// acts.add(e.getEventName());
//// caseIds.add(e.getTraceName());
//// });
//// JobClient job = env.executeAsync();
////
//// Thread.sleep(1000);
//// job.cancel();
////
//// Thread.sleep(1000);
//
//// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
//// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
////
//// System.out.println("3");
//
//
//
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
//// @Test
//// public void test_mqtt_2() {
//// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
//// assertThrowsExactly(SourceException.class, () -> source.prepare());
//// }
//
// protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
// client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
// }
//
// protected BrokerService createBroker() throws Exception {
// BrokerService brokerService = new BrokerService();
// brokerService.setDeleteAllMessagesOnStartup(true);
// brokerService.setPersistent(false);
// brokerService.setAdvisorySupport(false);
// brokerService.setUseJmx(true);
// brokerService.getManagementContext().setCreateConnector(false);
// brokerService.setPopulateJMSXUserID(true);
//
// TransportConnector connector = new TransportConnector();
// connector.setUri(new URI("mqtt://localhost:9999"));
// connector.setName("mqtt");
// brokerService.addConnector(connector);
//
// return brokerService;
// }
env.executeAsync();
Thread.sleep(2000);
System.out.println("going");
MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
client.connect();
publish(client, "c1", "a11");
publish(client, "c2", "a21");
publish(client, "c2", "a22");
publish(client, "c1", "a12");
publish(client, "c2", "a23");
Thread.sleep(2000);
// job.cancel();
System.out.println("Done");
assertEquals(
"name-c1-a11/name-c2-a21/name-c2-a22/name-c1-a12/name-c2-a23/",
org.apache.commons.io.FileUtils.readFileToString(tmpFile, "utf-8"));
} catch (Exception e) {
e.printStackTrace();
assertTrue(false); // error
}
}
protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
}
protected BrokerService createBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setPopulateJMSXUserID(true);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("mqtt://localhost:9999"));
connector.setName("mqtt");
brokerService.addConnector(connector);
return brokerService;
}
}
package beamline.tests;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.model.XLog;
import org.deckfour.xes.model.XTrace;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
import beamline.events.BEvent;
import beamline.exceptions.EventException;
import beamline.models.responses.DirectlyFollowsRelation;
public class Utils {
......
package beamline.tests;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.sql.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.model.XTrace;
import org.junit.jupiter.api.Test;
import beamline.exceptions.EventException;
import beamline.utils.EventUtils;
public class UtilsTest {
@Test
public void test_create_event() {
String activityName = UUID.randomUUID().toString();
String caseId = UUID.randomUUID().toString();
XTrace t = null;
try {
t = EventUtils.create(activityName, caseId);
} catch (EventException e) { }
assertNotNull(t);
assertEquals(XConceptExtension.instance().extractName(t), caseId);
assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName);
}
@Test
public void test_create_event_time() {
String activityName = UUID.randomUUID().toString();
String caseId = UUID.randomUUID().toString();
Date date = Date.valueOf("1996-01-23");
XTrace t = null;
try {
t = EventUtils.create(activityName, caseId, date);
} catch (EventException e) { }
assertNotNull(t);
assertEquals(XConceptExtension.instance().extractName(t), caseId);
assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName);
assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date);
}
@Test
public void test_create_event_attributes() {
String activityName = UUID.randomUUID().toString();
String caseId = UUID.randomUUID().toString();
Date date = Date.valueOf("1996-01-23");
List<Pair<String, String>> attributes = new LinkedList<Pair<String, String>>();
Map<String, String> values = new HashMap<String, String>();
for (int i = 0; i < 10; i++) {
String attributeName = "attr-" + i;
String attributeValue = UUID.randomUUID().toString();
values.put(attributeName, attributeValue);
attributes.add(Pair.of(attributeName, attributeValue));
}
XTrace t = null;
try {
t = EventUtils.create(activityName, caseId, date, attributes);
} catch (EventException e) { }
assertNotNull(t);
assertEquals(XConceptExtension.instance().extractName(t), caseId);
assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName);
assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date);
for(String name : t.get(0).getAttributes().keySet()) {
if (name.startsWith("attr-")) {
assertEquals(t.get(0).getAttributes().get(name).toString(), values.get(name));
}
}
}
@Test
public void test_no_activityname() {
assertThrows(EventException.class, () -> {
EventUtils.create(null, "");
});
assertThrows(EventException.class, () -> {
EventUtils.create("", null);
});
}
@Test
public void test_extract_name_case() throws EventException {
String activityName = UUID.randomUUID().toString();
String caseId = UUID.randomUUID().toString();
XTrace t = EventUtils.create(activityName, caseId);
assertEquals(activityName, EventUtils.getActivityName(t));
assertEquals(caseId, EventUtils.getCaseId(t));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment