From c8298f7d4caafc8636b21829e42eac6af122172d Mon Sep 17 00:00:00 2001 From: Andrea Burattin <andrea.burattin@gmail.com> Date: Mon, 21 Mar 2022 22:22:27 +0100 Subject: [PATCH] Completed tests for XesLogSource and CSV, started working on MQTT --- .../sources/BeamlineAbstractSource.java | 2 +- .../java/beamline/sources/MQTTXesSource.java | 31 ++- .../java/beamline/sources/XesLogSource.java | 23 ++- src/test/java/beamline/tests/SourcesTest.java | 194 +++++++++++------- src/test/resources/sources/source_2.xes | 56 +++++ 5 files changed, 210 insertions(+), 96 deletions(-) create mode 100644 src/test/resources/sources/source_2.xes diff --git a/src/main/java/beamline/sources/BeamlineAbstractSource.java b/src/main/java/beamline/sources/BeamlineAbstractSource.java index 24eea49..ed39cb4 100644 --- a/src/main/java/beamline/sources/BeamlineAbstractSource.java +++ b/src/main/java/beamline/sources/BeamlineAbstractSource.java @@ -26,6 +26,6 @@ public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent> @Override public void cancel() { - running = false; + this.running = false; } } \ No newline at end of file diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java index d19e907..d58c67c 100644 --- a/src/main/java/beamline/sources/MQTTXesSource.java +++ b/src/main/java/beamline/sources/MQTTXesSource.java @@ -37,7 +37,6 @@ public class MQTTXesSource extends BeamlineAbstractSource { private String processName; private String brokerHost; private String topicBase; - private transient IMqttClient myClient; /** * Constructs the source @@ -58,7 +57,8 @@ public class MQTTXesSource extends BeamlineAbstractSource { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setKeepAliveInterval(30); - + + IMqttClient myClient = null; try { myClient = new MqttClient(brokerHost, UUID.randomUUID().toString()); myClient.setCallback(new MqttCallback() { @@ -90,23 +90,32 @@ public class MQTTXesSource extends BeamlineAbstractSource { } while(isRunning()) { - while (buffer.isEmpty()) { + while (isRunning() && buffer.isEmpty()) { + System.out.println("sleeping " + isRunning()); Thread.sleep(100l); } - ctx.collect(buffer.poll()); + if (isRunning()) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(buffer.poll()); + } + } } - } - - @Override - public void cancel() { - super.cancel(); - if (myClient != null && myClient.isConnected()) { + System.out.println("aaa"); + + if (!isRunning() && myClient.isConnected()) { try { myClient.disconnect(); } catch (MqttException e) { // nothing to do here } } + System.err.println("done"); + } + + @Override + public void cancel() { + // TODO Auto-generated method stub + super.cancel(); + System.out.println("closing"); } - } diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java index b155f14..7af0475 100644 --- a/src/main/java/beamline/sources/XesLogSource.java +++ b/src/main/java/beamline/sources/XesLogSource.java @@ -1,6 +1,8 @@ package beamline.sources; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -18,6 +20,7 @@ import org.deckfour.xes.model.XAttribute; import org.deckfour.xes.model.XEvent; import org.deckfour.xes.model.XLog; import org.deckfour.xes.model.XTrace; +import org.deckfour.xes.out.XesXmlGZIPSerializer; import beamline.events.BEvent; import beamline.exceptions.EventException; @@ -35,7 +38,6 @@ public class XesLogSource extends BeamlineAbstractSource { private static final long serialVersionUID = 1095855454671335981L; private String fileName; - private transient XLog log; private List<BEvent> events; /** @@ -54,18 +56,18 @@ public class XesLogSource extends BeamlineAbstractSource { * Constructs a source from the given log * * @param log the log to use as source + * @throws IOException */ - public XesLogSource(XLog log) { - this.log = log; + public XesLogSource(XLog log) throws IOException { + File tmpFile = File.createTempFile("file", ".xes.gz"); + new XesXmlGZIPSerializer().serialize(log, new FileOutputStream(tmpFile)); + this.fileName = tmpFile.getAbsolutePath(); } @Override public void run(SourceContext<BEvent> ctx) throws Exception { - if (log == null) { - parseLog(fileName); - } if (events == null) { - prepareStream(); + prepareStream(parseLog(fileName)); } Iterator<BEvent> i = events.iterator(); while(i.hasNext() && isRunning()) { @@ -82,7 +84,7 @@ public class XesLogSource extends BeamlineAbstractSource { } } - private void parseLog(String fileName) throws SourceException { + private XLog parseLog(String fileName) throws SourceException { XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser(), @@ -92,17 +94,16 @@ public class XesLogSource extends BeamlineAbstractSource { for (XParser p : parsers) { if (p.canParse(file)) { try { - log = p.parse(file).get(0); + return p.parse(file).get(0); } catch (Exception e) { throw new SourceException(e.getMessage()); } - return; } } throw new SourceException("XES file format not supported"); } - private void prepareStream() throws SourceException, EventException { + private void prepareStream(XLog log) throws SourceException, EventException { if (log.isEmpty()) { throw new SourceException("The given log is empty"); } diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java index 5f131d1..211761e 100644 --- a/src/test/java/beamline/tests/SourcesTest.java +++ b/src/test/java/beamline/tests/SourcesTest.java @@ -91,62 +91,90 @@ public class SourcesTest { }); } -// @Test -// public void test_xes_source_1() { -// XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes"); -// assertThrowsExactly(SourceException.class, () -> s1.prepare()); -// -// XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes"); -// assertThrowsExactly(SourceException.class, () -> s2.prepare()); -// -// XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv"); -// assertThrowsExactly(SourceException.class, () -> s3.prepare()); -// } -// -// @Test -// public void test_xes_source_2() { -// List<String> acts = new LinkedList<>(); -// List<String> caseIds = new LinkedList<>(); -// XesLogSource s = new XesLogSource(Utils.generteXLog()); -// try { -// s.prepare(); -// } catch (SourceException e) { -// e.printStackTrace(); -// } -// s.getObservable().subscribe((t) -> { -// acts.add(EventUtils.getActivityName(t)); -// caseIds.add(EventUtils.getCaseId(t)); -// }); -// -// assertEquals(9, acts.size()); -// assertEquals(9, caseIds.size()); -// -// assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C")); -// assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1")); -// } -// -// @Test -// public void test_xes_source_3() { -// List<String> acts = new LinkedList<>(); -// List<String> caseIds = new LinkedList<>(); -// XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz"); -// try { -// s.prepare(); -// } catch (SourceException e) { -// e.printStackTrace(); -// } -// s.getObservable().subscribe((t) -> { -// acts.add(EventUtils.getActivityName(t)); -// caseIds.add(EventUtils.getCaseId(t)); -// }); -// -// assertEquals(5, acts.size()); -// assertEquals(5, caseIds.size()); -// -// assertThat(acts, hasItems("a11","a21","a22","a12","a23")); -// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); -// } -// + @Test + public void test_xes_source_1() { + XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes"); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s1).map(e -> e).print(); + env.execute(); + }); + + XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes"); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s2).map(e -> e).print(); + env.execute(); + }); + + XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv"); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s3).map(e -> e).print(); + env.execute(); + }); + } + + @Test + public void test_xes_source_2() throws Exception { + List<String> acts = new LinkedList<>(); + List<String> caseIds = new LinkedList<>(); + XesLogSource source = new XesLogSource(Utils.generteXLog()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(source); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); + }); + + assertEquals(9, acts.size()); + assertEquals(9, caseIds.size()); + + assertThat(acts, hasItems("K","C","A","I","B","O","A","A","C")); + assertThat(caseIds, hasItems("c1","c2","c1","c2","c1","c2","c1","c2","c1")); + } + + @Test + public void test_xes_source_3() throws Exception { + List<String> acts = new LinkedList<>(); + List<String> caseIds = new LinkedList<>(); + XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(s); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); + }); + + assertEquals(5, acts.size()); + assertEquals(5, caseIds.size()); + + assertThat(acts, hasItems("a11","a21","a22","a12","a23")); + assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); + } + + @Test + public void test_xes_source_4() throws Exception { + List<String> acts = new LinkedList<>(); + List<String> caseIds = new LinkedList<>(); + XesLogSource s = new XesLogSource("src/test/resources/sources/source_2.xes"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(s); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); + }); + + assertEquals(5, acts.size()); + assertEquals(5, caseIds.size()); + + assertThat(acts, hasItems("a11","a21","a22","a12","a23")); + assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); + } + // @Test // public void test_mqtt_1() { // try { @@ -158,24 +186,44 @@ public class SourcesTest { // List<String> acts = new LinkedList<>(); // List<String> caseIds = new LinkedList<>(); // +// MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); +// +// new Thread(new Runnable() { +// +// @Override +// public void run() { +// try { +// Thread.sleep(5000); +// System.out.println("going..."); +// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); +// client.connect(); +// +// publish(client, "c1", "a11"); +// publish(client, "c2", "a21"); +// publish(client, "c2", "a22"); +// publish(client, "c1", "a12"); +// publish(client, "c2", "a23"); +// s.cancel(); +// +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } catch (MqttException e) { +// e.printStackTrace(); +// } +// } +// }).start(); +// // // create actual source -// MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name"); -// source.prepare(); -// source.getObservable().subscribe((t) -> { -// acts.add(EventUtils.getActivityName(t)); -// caseIds.add(EventUtils.getCaseId(t)); -// }); -// -// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); -// client.connect(); +// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// DataStream<BEvent> stream = env.addSource(s); // -// publish(client, "c1", "a11"); -// publish(client, "c2", "a21"); -// publish(client, "c2", "a22"); -// publish(client, "c1", "a12"); -// publish(client, "c2", "a23"); +// stream.executeAndCollect().forEachRemaining((BEvent e) -> { +// System.out.println(e); +// acts.add(e.getEventName()); +// caseIds.add(e.getTraceName()); +// }); // -// Thread.sleep(100); +// System.out.println("3"); // // assertThat(acts, hasItems("a11","a21","a22","a12","a23")); // assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); @@ -184,13 +232,13 @@ public class SourcesTest { // e.printStackTrace(); // } // } -// + // @Test // public void test_mqtt_2() { // MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); // assertThrowsExactly(SourceException.class, () -> source.prepare()); // } -// + // protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { // client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); // } diff --git a/src/test/resources/sources/source_2.xes b/src/test/resources/sources/source_2.xes new file mode 100644 index 0000000..7e04dcb --- /dev/null +++ b/src/test/resources/sources/source_2.xes @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<log xes.version="1849.2016" xmlns="http://www.xes-standard.org" xes.creator="Fluxicon Disco"> + <extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/> + <extension name="Lifecycle" prefix="lifecycle" uri="http://www.xes-standard.org/lifecycle.xesext"/> + <global scope="trace"> + <string key="concept:name" value="name"/> + <string key="variant" value="string"/> + <int key="variant-index" value="0"/> + </global> + <global scope="event"> + <string key="concept:name" value="name"/> + <string key="lifecycle:transition" value="transition"/> + <string key="act" value="string"/> + </global> + <classifier name="Activity" keys="act"/> + <string key="lifecycle:model" value="standard"/> + <string key="creator" value="Fluxicon Disco"/> + <string key="library" value="Fluxicon Octane"/> + <trace> + <string key="concept:name" value="c1"/> + <string key="variant" value="Variant 1"/> + <int key="variant-index" value="1"/> + <string key="creator" value="Fluxicon Disco"/> + <event> + <string key="concept:name" value="a11"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a11"/> + </event> + <event> + <string key="concept:name" value="a12"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a12"/> + </event> + </trace> + <trace> + <string key="concept:name" value="c2"/> + <string key="variant" value="Variant 2"/> + <int key="variant-index" value="2"/> + <string key="creator" value="Fluxicon Disco"/> + <event> + <string key="concept:name" value="a21"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a21"/> + </event> + <event> + <string key="concept:name" value="a22"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a22"/> + </event> + <event> + <string key="concept:name" value="a23"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a23"/> + </event> + </trace> +</log> \ No newline at end of file -- GitLab