diff --git a/src/main/java/beamline/mappers/package-info.java b/src/main/java/beamline/mappers/package-info.java deleted file mode 100644 index 9e81654575586012e88a585616ee139bbaadb325..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/mappers/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * This package contains some mappers that are available by default in the - * framework. - */ -package beamline.mappers; \ No newline at end of file diff --git a/src/main/java/beamline/models/algorithms/HookEventProcessing.java b/src/main/java/beamline/models/algorithms/HookEventProcessing.java deleted file mode 100644 index 590620e23622c47b7cbe6f97f6a5bf558c537656..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/models/algorithms/HookEventProcessing.java +++ /dev/null @@ -1,17 +0,0 @@ -package beamline.models.algorithms; - -/** - * This interface defines the structure of the callback function that a - * {@link StreamMiningAlgorithm} can execute (cf., - * {@link StreamMiningAlgorithm#setOnBeforeEvent(HookEventProcessing)} and - * {@link StreamMiningAlgorithm#setOnAfterEvent(HookEventProcessing)}). - * - * @author Andrea Burattin - */ -public interface HookEventProcessing { - - /** - * The actual function to trigger - */ - public void trigger(); -} diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java index a375f64a800dbb0f74314b65fa6c4245e4596e84..ef820e1f88745e31423e08bfe8705c7af84940e8 100644 --- a/src/main/java/beamline/sources/MQTTXesSource.java +++ b/src/main/java/beamline/sources/MQTTXesSource.java @@ -95,7 +95,8 @@ public class MQTTXesSource extends BeamlineAbstractSource { } if (isRunning()) { synchronized (ctx.getCheckpointLock()) { - ctx.collect(buffer.poll()); + BEvent e = buffer.poll(); + ctx.collect(e); } } } diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java index 315bbfe494a5c884194173aadeeaf07f375e7ef0..5a7dd075d7aede1c6ed749167516a900f8c057d8 100644 --- a/src/test/java/beamline/tests/SourcesTest.java +++ b/src/test/java/beamline/tests/SourcesTest.java @@ -2,47 +2,30 @@ package beamline.tests; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItems; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import java.io.File; import java.net.URI; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Iterator; +import java.nio.file.StandardOpenOption; import java.util.LinkedList; import java.util.List; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.commons.lang3.CharSet; -import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.Collector; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; -import com.google.common.io.Files; -import com.opencsv.CSVParserBuilder; - import beamline.events.BEvent; -import beamline.exceptions.SourceException; import beamline.sources.CSVLogSource; import beamline.sources.MQTTXesSource; import beamline.sources.XesLogSource; @@ -187,106 +170,78 @@ public class SourcesTest { assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); } -// @Test -// public void test_mqtt_1() { -// try { -// // create mqtt broker -// BrokerService brokerService = createBroker(); -// brokerService.start(); -// brokerService.waitUntilStarted(); -// -// final List<String> acts = new LinkedList<>(); -// List<String> caseIds = new LinkedList<>(); -// -// MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); -// -// // create the sink -// File tmpFile = File.createTempFile("mqtt", "log"); -// StreamingFileSink<BEvent> sink = StreamingFileSink.forRowFormat(Path.fromLocalFile(tmpFile), new SimpleStringEncoder<BEvent>()).build(); -//// val sink: StreamingFileSink[String] = StreamingFileSink -//// .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")) -//// .build() -// -// -// // create actual source -// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -// DataStream<BEvent> stream = env.addSource(s); -// stream.addSink(sink); + @Test + public void test_mqtt_1() { + try { + // create mqtt broker + BrokerService brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(); + + MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); + + // create the sink file + final File tmpFile = File.createTempFile("mqtt", "log"); + + // create actual source + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .addSource(s) + .keyBy(BEvent::getProcessName) + .addSink(new RichSinkFunction<BEvent>() { + private static final long serialVersionUID = -8658786866403985570L; + + @Override + public void invoke(BEvent value, Context context) throws Exception { + String toWrite = value.getProcessName() + "-" + value.getTraceName() + "-" + value.getEventName() + "/"; + java.nio.file.Files.write(tmpFile.toPath(), toWrite.getBytes(), StandardOpenOption.APPEND); + } + }); // JobClient job = env.executeAsync(); -// -//// Thread.sleep(1000); -// -// System.out.println(tmpFile); -// -// System.out.println("going"); -// MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); -// client.connect(); -// publish(client, "c1", "a11"); -// publish(client, "c2", "a21"); -// publish(client, "c2", "a22"); -// publish(client, "c1", "a12"); -// publish(client, "c2", "a23"); -// -// Thread.sleep(1000); -//// job.cancel(); -// System.out.println(job.getJobStatus().isDone()); -// -// System.out.println(Files.readLines(tmpFile, Charset.defaultCharset())); -//// System.out.println("final " + acts); -//// -//// -//// -//// System.out.println("post-final " + acts); -//// -////// System.out.println("1"); -//// stream.executeAndCollect().forEachRemaining((BEvent e) -> { -//// System.out.println(e); -//// acts.add(e.getEventName()); -//// caseIds.add(e.getTraceName()); -//// }); -//// JobClient job = env.executeAsync(); -//// -//// Thread.sleep(1000); -//// job.cancel(); -//// -//// Thread.sleep(1000); -// -//// assertThat(acts, hasItems("a11","a21","a22","a12","a23")); -//// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); -//// -//// System.out.println("3"); -// -// -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -//// @Test -//// public void test_mqtt_2() { -//// MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); -//// assertThrowsExactly(SourceException.class, () -> source.prepare()); -//// } -// -// protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { -// client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); -// } -// -// protected BrokerService createBroker() throws Exception { -// BrokerService brokerService = new BrokerService(); -// brokerService.setDeleteAllMessagesOnStartup(true); -// brokerService.setPersistent(false); -// brokerService.setAdvisorySupport(false); -// brokerService.setUseJmx(true); -// brokerService.getManagementContext().setCreateConnector(false); -// brokerService.setPopulateJMSXUserID(true); -// -// TransportConnector connector = new TransportConnector(); -// connector.setUri(new URI("mqtt://localhost:9999")); -// connector.setName("mqtt"); -// brokerService.addConnector(connector); -// -// return brokerService; -// } + env.executeAsync(); + + Thread.sleep(2000); + + System.out.println("going"); + MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); + client.connect(); + publish(client, "c1", "a11"); + publish(client, "c2", "a21"); + publish(client, "c2", "a22"); + publish(client, "c1", "a12"); + publish(client, "c2", "a23"); + + Thread.sleep(2000); +// job.cancel(); + System.out.println("Done"); + assertEquals( + "name-c1-a11/name-c2-a21/name-c2-a22/name-c1-a12/name-c2-a23/", + org.apache.commons.io.FileUtils.readFileToString(tmpFile, "utf-8")); + + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); // error + } + } + + protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { + client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); + } + + protected BrokerService createBroker() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPopulateJMSXUserID(true); + + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("mqtt://localhost:9999")); + connector.setName("mqtt"); + brokerService.addConnector(connector); + + return brokerService; + } } diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java index 746b548c58d609571f61345893244df4b4767a8c..e72b50574eefc8c1dc13c74ac7c8a1f2d73dcc4d 100644 --- a/src/test/java/beamline/tests/Utils.java +++ b/src/test/java/beamline/tests/Utils.java @@ -1,19 +1,12 @@ package beamline.tests; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.streaming.api.datastream.DataStream; import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XTimeExtension; import org.deckfour.xes.factory.XFactory; import org.deckfour.xes.factory.XFactoryNaiveImpl; import org.deckfour.xes.model.XLog; import org.deckfour.xes.model.XTrace; -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; -import beamline.events.BEvent; -import beamline.exceptions.EventException; import beamline.models.responses.DirectlyFollowsRelation; public class Utils { diff --git a/src/test/java/beamline/tests/UtilsTest.java b/src/test/java/beamline/tests/UtilsTest.java deleted file mode 100644 index a7fe3dce39daea414861a78254176d4f3dedfa62..0000000000000000000000000000000000000000 --- a/src/test/java/beamline/tests/UtilsTest.java +++ /dev/null @@ -1,102 +0,0 @@ -package beamline.tests; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.sql.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.commons.lang3.tuple.Pair; -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.model.XTrace; -import org.junit.jupiter.api.Test; - -import beamline.exceptions.EventException; -import beamline.utils.EventUtils; - -public class UtilsTest { - - @Test - public void test_create_event() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - } - - @Test - public void test_create_event_time() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - Date date = Date.valueOf("1996-01-23"); - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId, date); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date); - } - - @Test - public void test_create_event_attributes() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - Date date = Date.valueOf("1996-01-23"); - List<Pair<String, String>> attributes = new LinkedList<Pair<String, String>>(); - Map<String, String> values = new HashMap<String, String>(); - for (int i = 0; i < 10; i++) { - String attributeName = "attr-" + i; - String attributeValue = UUID.randomUUID().toString(); - values.put(attributeName, attributeValue); - attributes.add(Pair.of(attributeName, attributeValue)); - } - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId, date, attributes); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date); - for(String name : t.get(0).getAttributes().keySet()) { - if (name.startsWith("attr-")) { - assertEquals(t.get(0).getAttributes().get(name).toString(), values.get(name)); - } - } - } - - @Test - public void test_no_activityname() { - assertThrows(EventException.class, () -> { - EventUtils.create(null, ""); - }); - assertThrows(EventException.class, () -> { - EventUtils.create("", null); - }); - } - - @Test - public void test_extract_name_case() throws EventException { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - XTrace t = EventUtils.create(activityName, caseId); - assertEquals(activityName, EventUtils.getActivityName(t)); - assertEquals(caseId, EventUtils.getCaseId(t)); - } -}