Skip to content
Snippets Groups Projects
Commit c8298f7d authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Completed tests for XesLogSource and CSV, started working on MQTT

parent 1ef9c2da
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,6 @@ public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent>
@Override
public void cancel() {
running = false;
this.running = false;
}
}
\ No newline at end of file
......@@ -37,7 +37,6 @@ public class MQTTXesSource extends BeamlineAbstractSource {
private String processName;
private String brokerHost;
private String topicBase;
private transient IMqttClient myClient;
/**
* Constructs the source
......@@ -59,6 +58,7 @@ public class MQTTXesSource extends BeamlineAbstractSource {
options.setCleanSession(true);
options.setKeepAliveInterval(30);
IMqttClient myClient = null;
try {
myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
myClient.setCallback(new MqttCallback() {
......@@ -90,23 +90,32 @@ public class MQTTXesSource extends BeamlineAbstractSource {
}
while(isRunning()) {
while (buffer.isEmpty()) {
while (isRunning() && buffer.isEmpty()) {
System.out.println("sleeping " + isRunning());
Thread.sleep(100l);
}
if (isRunning()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(buffer.poll());
}
}
}
System.out.println("aaa");
@Override
public void cancel() {
super.cancel();
if (myClient != null && myClient.isConnected()) {
if (!isRunning() && myClient.isConnected()) {
try {
myClient.disconnect();
} catch (MqttException e) {
// nothing to do here
}
}
System.err.println("done");
}
@Override
public void cancel() {
// TODO Auto-generated method stub
super.cancel();
System.out.println("closing");
}
}
package beamline.sources;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
......@@ -18,6 +20,7 @@ import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XLog;
import org.deckfour.xes.model.XTrace;
import org.deckfour.xes.out.XesXmlGZIPSerializer;
import beamline.events.BEvent;
import beamline.exceptions.EventException;
......@@ -35,7 +38,6 @@ public class XesLogSource extends BeamlineAbstractSource {
private static final long serialVersionUID = 1095855454671335981L;
private String fileName;
private transient XLog log;
private List<BEvent> events;
/**
......@@ -54,18 +56,18 @@ public class XesLogSource extends BeamlineAbstractSource {
* Constructs a source from the given log
*
* @param log the log to use as source
* @throws IOException
*/
public XesLogSource(XLog log) {
this.log = log;
public XesLogSource(XLog log) throws IOException {
File tmpFile = File.createTempFile("file", ".xes.gz");
new XesXmlGZIPSerializer().serialize(log, new FileOutputStream(tmpFile));
this.fileName = tmpFile.getAbsolutePath();
}
@Override
public void run(SourceContext<BEvent> ctx) throws Exception {
if (log == null) {
parseLog(fileName);
}
if (events == null) {
prepareStream();
prepareStream(parseLog(fileName));
}
Iterator<BEvent> i = events.iterator();
while(i.hasNext() && isRunning()) {
......@@ -82,7 +84,7 @@ public class XesLogSource extends BeamlineAbstractSource {
}
}
private void parseLog(String fileName) throws SourceException {
private XLog parseLog(String fileName) throws SourceException {
XParser[] parsers = new XParser[] {
new XesXmlGZIPParser(),
new XesXmlParser(),
......@@ -92,17 +94,16 @@ public class XesLogSource extends BeamlineAbstractSource {
for (XParser p : parsers) {
if (p.canParse(file)) {
try {
log = p.parse(file).get(0);
return p.parse(file).get(0);
} catch (Exception e) {
throw new SourceException(e.getMessage());
}
return;
}
}
throw new SourceException("XES file format not supported");
}
private void prepareStream() throws SourceException, EventException {
private void prepareStream(XLog log) throws SourceException, EventException {
if (log.isEmpty()) {
throw new SourceException("The given log is empty");
}
......
......@@ -91,62 +91,90 @@ public class SourcesTest {
});
}
// @Test
// public void test_xes_source_1() {
// XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
// assertThrowsExactly(SourceException.class, () -> s1.prepare());
//
// XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
// assertThrowsExactly(SourceException.class, () -> s2.prepare());
//
// XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
// assertThrowsExactly(SourceException.class, () -> s3.prepare());
// }
//
// @Test
// public void test_xes_source_2() {
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
// XesLogSource s = new XesLogSource(Utils.generteXLog());
// try {
// s.prepare();
// } catch (SourceException e) {
// e.printStackTrace();
// }
// s.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
//
// assertEquals(9, acts.size());
// assertEquals(9, caseIds.size());
//
// assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
// assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
// }
//
// @Test
// public void test_xes_source_3() {
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
// XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
// try {
// s.prepare();
// } catch (SourceException e) {
// e.printStackTrace();
// }
// s.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
//
// assertEquals(5, acts.size());
// assertEquals(5, caseIds.size());
//
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
// }
//
@Test
public void test_xes_source_1() {
XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes");
assertThrowsExactly(JobExecutionException.class, () -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(s1).map(e -> e).print();
env.execute();
});
XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes");
assertThrowsExactly(JobExecutionException.class, () -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(s2).map(e -> e).print();
env.execute();
});
XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv");
assertThrowsExactly(JobExecutionException.class, () -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(s3).map(e -> e).print();
env.execute();
});
}
@Test
public void test_xes_source_2() throws Exception {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
XesLogSource source = new XesLogSource(Utils.generteXLog());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<BEvent> stream = env.addSource(source);
stream.executeAndCollect().forEachRemaining((BEvent e) -> {
acts.add(e.getEventName());
caseIds.add(e.getTraceName());
});
assertEquals(9, acts.size());
assertEquals(9, caseIds.size());
assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C"));
assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1"));
}
@Test
public void test_xes_source_3() throws Exception {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<BEvent> stream = env.addSource(s);
stream.executeAndCollect().forEachRemaining((BEvent e) -> {
acts.add(e.getEventName());
caseIds.add(e.getTraceName());
});
assertEquals(5, acts.size());
assertEquals(5, caseIds.size());
assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
}
@Test
public void test_xes_source_4() throws Exception {
List<String> acts = new LinkedList<>();
List<String> caseIds = new LinkedList<>();
XesLogSource s = new XesLogSource("src/test/resources/sources/source_2.xes");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<BEvent> stream = env.addSource(s);
stream.executeAndCollect().forEachRemaining((BEvent e) -> {
acts.add(e.getEventName());
caseIds.add(e.getTraceName());
});
assertEquals(5, acts.size());
assertEquals(5, caseIds.size());
assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
}
// @Test
// public void test_mqtt_1() {
// try {
......@@ -158,14 +186,15 @@ public class SourcesTest {
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
//
// // create actual source
// MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
// source.prepare();
// source.getObservable().subscribe((t) -> {
// acts.add(EventUtils.getActivityName(t));
// caseIds.add(EventUtils.getCaseId(t));
// });
// MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name");
//
// new Thread(new Runnable() {
//
// @Override
// public void run() {
// try {
// Thread.sleep(5000);
// System.out.println("going...");
// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
// client.connect();
//
......@@ -174,8 +203,27 @@ public class SourcesTest {
// publish(client, "c2", "a22");
// publish(client, "c1", "a12");
// publish(client, "c2", "a23");
// s.cancel();
//
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (MqttException e) {
// e.printStackTrace();
// }
// }
// }).start();
//
// // create actual source
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<BEvent> stream = env.addSource(s);
//
// stream.executeAndCollect().forEachRemaining((BEvent e) -> {
// System.out.println(e);
// acts.add(e.getEventName());
// caseIds.add(e.getTraceName());
// });
//
// System.out.println("3");
//
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
......@@ -184,13 +232,13 @@ public class SourcesTest {
// e.printStackTrace();
// }
// }
//
// @Test
// public void test_mqtt_2() {
// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
// assertThrowsExactly(SourceException.class, () -> source.prepare());
// }
//
// protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
// client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
// }
......
<?xml version="1.0" encoding="UTF-8"?>
<log xes.version="1849.2016" xmlns="http://www.xes-standard.org" xes.creator="Fluxicon Disco">
<extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/>
<extension name="Lifecycle" prefix="lifecycle" uri="http://www.xes-standard.org/lifecycle.xesext"/>
<global scope="trace">
<string key="concept:name" value="name"/>
<string key="variant" value="string"/>
<int key="variant-index" value="0"/>
</global>
<global scope="event">
<string key="concept:name" value="name"/>
<string key="lifecycle:transition" value="transition"/>
<string key="act" value="string"/>
</global>
<classifier name="Activity" keys="act"/>
<string key="lifecycle:model" value="standard"/>
<string key="creator" value="Fluxicon Disco"/>
<string key="library" value="Fluxicon Octane"/>
<trace>
<string key="concept:name" value="c1"/>
<string key="variant" value="Variant 1"/>
<int key="variant-index" value="1"/>
<string key="creator" value="Fluxicon Disco"/>
<event>
<string key="concept:name" value="a11"/>
<string key="lifecycle:transition" value="complete"/>
<string key="act" value="a11"/>
</event>
<event>
<string key="concept:name" value="a12"/>
<string key="lifecycle:transition" value="complete"/>
<string key="act" value="a12"/>
</event>
</trace>
<trace>
<string key="concept:name" value="c2"/>
<string key="variant" value="Variant 2"/>
<int key="variant-index" value="2"/>
<string key="creator" value="Fluxicon Disco"/>
<event>
<string key="concept:name" value="a21"/>
<string key="lifecycle:transition" value="complete"/>
<string key="act" value="a21"/>
</event>
<event>
<string key="concept:name" value="a22"/>
<string key="lifecycle:transition" value="complete"/>
<string key="act" value="a22"/>
</event>
<event>
<string key="concept:name" value="a23"/>
<string key="lifecycle:transition" value="complete"/>
<string key="act" value="a23"/>
</event>
</trace>
</log>
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment