From 9ba07004849940be7c51745688e6e81c6d0e3024 Mon Sep 17 00:00:00 2001 From: Andrea Burattin <andrea.burattin@gmail.com> Date: Tue, 28 Dec 2021 16:22:22 +0100 Subject: [PATCH] Removed old files --- .../models/streams/ObservableStream.java | 11 -- .../models/streams/XesMqttSource.java | 111 ------------------ .../beamline/models/streams/XesSource.java | 7 -- .../models/streams/XesStreamSource.java | 99 ---------------- 4 files changed, 228 deletions(-) delete mode 100644 src/main/java/beamline/models/streams/ObservableStream.java delete mode 100644 src/main/java/beamline/models/streams/XesMqttSource.java delete mode 100644 src/main/java/beamline/models/streams/XesSource.java delete mode 100644 src/main/java/beamline/models/streams/XesStreamSource.java diff --git a/src/main/java/beamline/models/streams/ObservableStream.java b/src/main/java/beamline/models/streams/ObservableStream.java deleted file mode 100644 index cc2d84d..0000000 --- a/src/main/java/beamline/models/streams/ObservableStream.java +++ /dev/null @@ -1,11 +0,0 @@ -package beamline.models.streams; - -import java.util.Iterator; -import java.util.stream.Stream; - -public interface ObservableStream<T> extends Iterable<T> { - - public abstract void prepare() throws Exception; - - public abstract Stream<T> stream(); -} diff --git a/src/main/java/beamline/models/streams/XesMqttSource.java b/src/main/java/beamline/models/streams/XesMqttSource.java deleted file mode 100644 index a6385d6..0000000 --- a/src/main/java/beamline/models/streams/XesMqttSource.java +++ /dev/null @@ -1,111 +0,0 @@ -package beamline.models.streams; - -import java.util.Date; -import java.util.Iterator; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.function.Supplier; -import java.util.stream.Stream; - -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.factory.XFactory; -import org.deckfour.xes.factory.XFactoryNaiveImpl; -import org.deckfour.xes.model.XEvent; -import org.deckfour.xes.model.XTrace; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -public class XesMqttSource implements XesSource { - - private static XFactory xesFactory = new XFactoryNaiveImpl(); - - private String processName; - private String brokerHost; - private String topicBase; - private BlockingQueue<XTrace> queue; - private Stream<XTrace> stream; - - public XesMqttSource(String brokerHost, String topicBase, String processName) { - this.brokerHost = brokerHost; - this.topicBase = topicBase; - this.processName = processName; - this.stream = Stream.generate(new Supplier<XTrace>() { - @Override - public XTrace get() { - try { - return queue.take(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return null; - } - }); - } - - @Override - public Stream<XTrace> stream() { - return stream; - } - - public void prepare() throws Exception { - this.queue = new ArrayBlockingQueue<XTrace>(1000); - - MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(true); - options.setKeepAliveInterval(30); - - IMqttClient myClient = new MqttClient(brokerHost, UUID.randomUUID().toString()); - myClient.setCallback(new MqttCallback() { - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - int posLastSlash = topic.lastIndexOf("/"); - String partBeforeActName = topic.substring(0, posLastSlash); - String activityName = topic.substring(posLastSlash + 1); - String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1); - - XEvent event = xesFactory.createEvent(); - XConceptExtension.instance().assignName(event, activityName); - XTimeExtension.instance().assignTimestamp(event, new Date()); - XTrace eventWrapper = xesFactory.createTrace(); - XConceptExtension.instance().assignName(eventWrapper, caseId); - eventWrapper.add(event); - - queue.add(eventWrapper); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { } - - @Override - public void connectionLost(Throwable cause) { } - }); - myClient.connect(options); - myClient.subscribe(topicBase + "/" + processName + "/#"); - } - - public Iterator<XTrace> iterator() { - return new Iterator<XTrace>() { - @Override - public boolean hasNext() { - return true; - } - - @Override - public XTrace next() { - try { - return queue.take(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return null; - } - }; - } -} diff --git a/src/main/java/beamline/models/streams/XesSource.java b/src/main/java/beamline/models/streams/XesSource.java deleted file mode 100644 index 8f9cd6b..0000000 --- a/src/main/java/beamline/models/streams/XesSource.java +++ /dev/null @@ -1,7 +0,0 @@ -package beamline.models.streams; - -import org.deckfour.xes.model.XTrace; - -public interface XesSource extends ObservableStream<XTrace> { - -} diff --git a/src/main/java/beamline/models/streams/XesStreamSource.java b/src/main/java/beamline/models/streams/XesStreamSource.java deleted file mode 100644 index 4eda957..0000000 --- a/src/main/java/beamline/models/streams/XesStreamSource.java +++ /dev/null @@ -1,99 +0,0 @@ -package beamline.models.streams; - -import java.io.File; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.stream.Stream; - -import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.factory.XFactory; -import org.deckfour.xes.factory.XFactoryNaiveImpl; -import org.deckfour.xes.in.XParser; -import org.deckfour.xes.in.XesXmlGZIPParser; -import org.deckfour.xes.in.XesXmlParser; -import org.deckfour.xes.model.XAttributeMap; -import org.deckfour.xes.model.XEvent; -import org.deckfour.xes.model.XLog; -import org.deckfour.xes.model.XTrace; - -public class XesStreamSource implements XesSource { - - private static XFactory xesFactory = new XFactoryNaiveImpl(); - - private String fileName; - private XLog log; - private List<XTrace> events; - - public XesStreamSource(String fileName) { - this.fileName = fileName; - } - - public void prepare() throws Exception { - parseLog(fileName); - prepareStream(); - } - - public Stream<XTrace> stream() { - return events.stream(); - } - - public Iterator<XTrace> iterator() { - return events.iterator(); - } - - private void parseLog(String fileName) throws Exception { - XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser() }; - File file = new File(fileName); - for (XParser p : parsers) { - if (p.canParse(file)) { - log = p.parse(file).get(0); - return; - } - } - throw new Exception("XES file format not supported"); - } - - private void prepareStream() { - if (log == null) { - return; - } - // populate all events - events = new LinkedList<XTrace>(); - for (XTrace t : log) { - for (XEvent e : t) { - // create the wrapping trace - XTrace eventWrapper = xesFactory.createTrace(); - XAttributeMap am = t.getAttributes(); - for (String key : am.keySet()) { - eventWrapper.getAttributes().put(key, am.get(key)); - } - // create the actual event - XEvent newEvent = xesFactory.createEvent(); - XAttributeMap amEvent = e.getAttributes(); - for (String key : amEvent.keySet()) { - newEvent.getAttributes().put(key, amEvent.get(key)); - } - eventWrapper.add(newEvent); - events.add(eventWrapper); - } - } - - // sort events - Collections.sort(events, new Comparator<XTrace>() { - public int compare(XTrace o1, XTrace o2) { - XEvent e1 = o1.get(0); - XEvent e2 = o2.get(0); - Date d1 = XTimeExtension.instance().extractTimestamp(e1); - Date d2 = XTimeExtension.instance().extractTimestamp(e2); - if (d1 == null || d2 == null) { - return 0; - } - return d1.compareTo(d2); - } - }); - } -} -- GitLab