Skip to content
Snippets Groups Projects
Commit 9ba07004 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Removed old files

parent 9891980f
No related branches found
No related tags found
No related merge requests found
package beamline.models.streams;
import java.util.Iterator;
import java.util.stream.Stream;
public interface ObservableStream<T> extends Iterable<T> {
public abstract void prepare() throws Exception;
public abstract Stream<T> stream();
}
package beamline.models.streams;
import java.util.Date;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XTrace;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class XesMqttSource implements XesSource {
private static XFactory xesFactory = new XFactoryNaiveImpl();
private String processName;
private String brokerHost;
private String topicBase;
private BlockingQueue<XTrace> queue;
private Stream<XTrace> stream;
public XesMqttSource(String brokerHost, String topicBase, String processName) {
this.brokerHost = brokerHost;
this.topicBase = topicBase;
this.processName = processName;
this.stream = Stream.generate(new Supplier<XTrace>() {
@Override
public XTrace get() {
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
});
}
@Override
public Stream<XTrace> stream() {
return stream;
}
public void prepare() throws Exception {
this.queue = new ArrayBlockingQueue<XTrace>(1000);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(30);
IMqttClient myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
myClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
int posLastSlash = topic.lastIndexOf("/");
String partBeforeActName = topic.substring(0, posLastSlash);
String activityName = topic.substring(posLastSlash + 1);
String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1);
XEvent event = xesFactory.createEvent();
XConceptExtension.instance().assignName(event, activityName);
XTimeExtension.instance().assignTimestamp(event, new Date());
XTrace eventWrapper = xesFactory.createTrace();
XConceptExtension.instance().assignName(eventWrapper, caseId);
eventWrapper.add(event);
queue.add(eventWrapper);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) { }
@Override
public void connectionLost(Throwable cause) { }
});
myClient.connect(options);
myClient.subscribe(topicBase + "/" + processName + "/#");
}
public Iterator<XTrace> iterator() {
return new Iterator<XTrace>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public XTrace next() {
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
};
}
}
package beamline.models.streams;
import org.deckfour.xes.model.XTrace;
public interface XesSource extends ObservableStream<XTrace> {
}
package beamline.models.streams;
import java.io.File;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Stream;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.in.XParser;
import org.deckfour.xes.in.XesXmlGZIPParser;
import org.deckfour.xes.in.XesXmlParser;
import org.deckfour.xes.model.XAttributeMap;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XLog;
import org.deckfour.xes.model.XTrace;
public class XesStreamSource implements XesSource {
private static XFactory xesFactory = new XFactoryNaiveImpl();
private String fileName;
private XLog log;
private List<XTrace> events;
public XesStreamSource(String fileName) {
this.fileName = fileName;
}
public void prepare() throws Exception {
parseLog(fileName);
prepareStream();
}
public Stream<XTrace> stream() {
return events.stream();
}
public Iterator<XTrace> iterator() {
return events.iterator();
}
private void parseLog(String fileName) throws Exception {
XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser() };
File file = new File(fileName);
for (XParser p : parsers) {
if (p.canParse(file)) {
log = p.parse(file).get(0);
return;
}
}
throw new Exception("XES file format not supported");
}
private void prepareStream() {
if (log == null) {
return;
}
// populate all events
events = new LinkedList<XTrace>();
for (XTrace t : log) {
for (XEvent e : t) {
// create the wrapping trace
XTrace eventWrapper = xesFactory.createTrace();
XAttributeMap am = t.getAttributes();
for (String key : am.keySet()) {
eventWrapper.getAttributes().put(key, am.get(key));
}
// create the actual event
XEvent newEvent = xesFactory.createEvent();
XAttributeMap amEvent = e.getAttributes();
for (String key : amEvent.keySet()) {
newEvent.getAttributes().put(key, amEvent.get(key));
}
eventWrapper.add(newEvent);
events.add(eventWrapper);
}
}
// sort events
Collections.sort(events, new Comparator<XTrace>() {
public int compare(XTrace o1, XTrace o2) {
XEvent e1 = o1.get(0);
XEvent e2 = o2.get(0);
Date d1 = XTimeExtension.instance().extractTimestamp(e1);
Date d2 = XTimeExtension.instance().extractTimestamp(e2);
if (d1 == null || d2 == null) {
return 0;
}
return d1.compareTo(d2);
}
});
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment