package net.timbusproject.extractors.provenance;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import net.sf.taverna.raven.appconfig.ApplicationUserHome;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.provenance.api.ProvenanceAccess;
import net.sf.taverna.t2.provenance.api.ProvenanceConnectorType;
import net.sf.taverna.t2.provenance.connector.ProvenanceConnector;
import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
import net.sf.taverna.t2.provenance.lineageservice.utils.WorkflowRun;
import net.sf.taverna.t2.reference.ExternalReferenceSPI;
import net.sf.taverna.t2.reference.Identified;
import net.sf.taverna.t2.reference.ReferenceServiceException;
import net.sf.taverna.t2.reference.ReferenceSet;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.reference.ValueCarryingExternalReference;
import net.timbusproject.extractors.CSVEntry;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import uk.gov.nationalarchives.droid.command.action.CommandLineException;
import au.com.bytecode.opencsv.CSVReader;
import au.com.bytecode.opencsv.bean.ColumnPositionMappingStrategy;
import au.com.bytecode.opencsv.bean.CsvToBean;
/**
* Class for accesing provenance data generated by a Taverna Workflow Run, which is stored in the standard Derby
* database.
*
* Tested with Taverna 2.4.0.
*
* @author munterberger
*
*/
public class ProvenanceAccessClient {
private static Logger LOGGER = LogManager.getLogger("ProvenanceAccessClient");
private final String TAVERNA_VERSION = "2.4.0";
private final String defaultPathToDerbyDatabase = new ApplicationUserHome("taverna-" + TAVERNA_VERSION)
.getAppUserHome() + File.separator + "t2-database";
private String t2Databasepath;
private Map<Integer, ProvenanceAccessEntry> workflowMap = new HashMap<Integer, ProvenanceAccessEntry>();
private DerbyProvenanceClient c;
private InvocationContext ic;
private ProvenanceAccess pa;
private ProvenanceConnector pc;
private boolean setupAlreadyCalled = false;
public ProvenanceAccessClient() {
LOGGER.setLevel(Level.DEBUG);
this.t2Databasepath = defaultPathToDerbyDatabase;
}
public ProvenanceAccessClient(String t2Databasepath) {
this();
this.t2Databasepath = t2Databasepath;
}
// Setup routine
public void setUp() {
if (setupAlreadyCalled) {
return;
}
c = new DerbyProvenanceClient(t2Databasepath);
try {
c.setUp(ProvenanceConnectorType.DERBY);
ic = c.getInvocationContext();
pa = c.getProvenanceAccess();
pc = pa.getProvenanceConnector();
} catch (Exception e) {
LOGGER.error("ERROR: " + e.getMessage());
e.printStackTrace();
}
setupAlreadyCalled = true;
}
public ProvenanceAccess getProvenanceAccess() {
return pa;
}
public void processWorkflow(Integer id, final String droidOutputFilePath) throws ProvenanceAccessException,
ReferenceServiceException, SQLException, IOException {
if (id == null) {
throw new IllegalArgumentException("ID must not be null");
}
if (droidOutputFilePath == null || droidOutputFilePath.isEmpty()) {
throw new IllegalArgumentException("Problems accessing the DROID output file at " + droidOutputFilePath);
}
final File droidOutputFile = new File(droidOutputFilePath);
if (!droidOutputFile.exists()) {
droidOutputFile.createNewFile();
LOGGER.debug("DROID profile file successfully created! " + droidOutputFile.exists());
} else {
LOGGER.debug("DROID profile file already exists! File will be overwritten.");
}
// call the setup routine
setUp();
// get internal ID.
ProvenanceAccessEntry entry = workflowMap.get(id);
if (entry == null) {
// remove file
droidOutputFile.delete();
throw new ProvenanceAccessException("ERROR: No entry found for ID " + id + ".");
}
String workflowRunID = entry.getWorkflowRunId();
String workflowID = entry.getWorkflowId();
LOGGER.info("Get provenance data for workflow [ID = " + workflowID + "] and run [ID = " + workflowRunID + "].");
String fileName = new String();
final Path tempDir = Files.createTempDirectory("workflowRunID-" + workflowRunID + "-");
// all processors
List<ProvenanceProcessor> provenanceProcesor = pa.getProcessorsForWorkflowID(workflowID);
LOGGER.debug("Following processor are found");
for (ProvenanceProcessor processor : provenanceProcesor) {
LOGGER.debug("\t '" + processor.getProcessorName() + "'");
}
HashMap<String, String> queryConstraints = new HashMap<String, String>();
queryConstraints.put("V.workflowId", workflowID);
for (PortBinding binding : pa.getPortBindings(queryConstraints)) {
LOGGER.debug("Found binding for port= " + binding.getPortName());
// Only interested in results from a specific workflow run
if (binding.getWorkflowRunId().equals(workflowRunID)) {
T2Reference ref = null;
try {
ref = ic.getReferenceService().referenceFromString(binding.getValue());
} catch (ReferenceServiceException rse) {
LOGGER.debug("\t Error trying to fetch T2Reference (" + binding.getValue() + ") from t2-database ("
+ t2Databasepath + "). No further treatment.");
break; // no further treatment!
}
Identified identified = pc.getReferenceService().resolveIdentifier(ref, null, ic);
if (identified instanceof ReferenceSet) {
ReferenceSet referenceSet = (ReferenceSet) identified;
Set<ExternalReferenceSPI> externalReferences = referenceSet.getExternalReferences();
for (ExternalReferenceSPI externalReference : externalReferences) {
if (externalReference instanceof ValueCarryingExternalReference<?>) {
ValueCarryingExternalReference<?> vcer = (ValueCarryingExternalReference<?>) externalReference;
// string ? filepath or URL
if (String.class.isAssignableFrom(vcer.getValueType())) {
String possibleFilePath = (String) vcer.getValue();
// valid file ?
File file = isFile(possibleFilePath);
if (file != null) {
LOGGER.debug("\t Found file (" + file.getPath() + ") for port= "
+ binding.getPortName());
fileName += "[" + binding.getPortName() + "]-";
File tmpFile = File.createTempFile(fileName.toString(), ".file", tempDir.toFile());
FileUtils.copyFile(file, tmpFile);
fileName = "";
}
// valid URL ?
URL url = isURL(possibleFilePath);
if (url != null) {
LOGGER.debug("\t Found valid URL (" + possibleFilePath + ") for port = "
+ binding.getPortName());
// download file and copy it to temp directory
fileName += "[" + binding.getPortName() + "]-";
File content = File.createTempFile(fileName, ".url", tempDir.toFile());
FileUtils.copyURLToFile(url, content);
LOGGER.debug("\t Created temporary file (" + content.getAbsolutePath() + ")");
fileName = "";
}
}
// file as byte array ?
if (byte[].class.isAssignableFrom(vcer.getValueType())) {
LOGGER.debug("\t Found byte[] for port= " + binding.getPortName());
fileName += "[" + binding.getPortName() + "]-";
byte[] content = (byte[]) vcer.getValue();
File tmpFile = File.createTempFile(fileName.toString(), ".binary", tempDir.toFile());
FileUtils.writeByteArrayToFile(tmpFile, content);
LOGGER.debug("\t Create " + fileName + " file for port= " + binding.getPortName());
// reset file name
fileName = "";
}
}
}
}
}
}
// DROID identification process
// identifying all files from the temp directory.
String path = null;
File droidJar = new File("droid-command-line-6.1.3.jar");
if (droidJar.exists()) {
path = droidJar.getAbsolutePath();
LOGGER.info("Start DROID identification process for directory " + tempDir.toString() + " ...");
final String dir = tempDir.toString();
LOGGER.info("DROID process file can be found under " + droidOutputFilePath);
LOGGER.info("Add files > process them > Start creating profile ... (can take a few seconds)");
// Run DROID CommandLineTool in a separate system process
final String command = "java -jar " + path + " -a " + dir + " -p " + droidOutputFilePath;
LOGGER.debug("Running DROID command (" + command + ")");
Process proc = Runtime.getRuntime().exec(command);
try {
if (0 == proc.waitFor()) {
LOGGER.info("Everything fine!");
} else {
LOGGER.error("ERROR " + getStream(proc.getErrorStream()));
}
} catch (InterruptedException e) {
LOGGER.error("ERROR " + e.getMessage());
e.printStackTrace();
}
} else {
LOGGER.error("Can't find DROID jar under following location (" + droidJar.getAbsolutePath() + ").");
}
}
/**
* Generates a CSV report out of a DROID profile file
*
* @param droidProfile
* @throws CommandLineException
*/
public void generateReport(File droidProfile) {
if (droidProfile == null || !droidProfile.exists()) {
throw new IllegalArgumentException("Can't find DROID file (" + droidProfile.getAbsolutePath() + ")");
}
String path = null;
File droidJar = new File("droid-command-line-6.1.3.jar");
if (droidJar.exists()) {
path = droidJar.getAbsolutePath();
final String dir = droidProfile.getParentFile().getAbsolutePath();
final String fileName = droidProfile.getName();
LOGGER.debug("Directory of the DROID profile file = " + dir);
File resultCSV = new File(dir + File.separator + fileName + ".csv");
LOGGER.info("Generate CSV-report [" + resultCSV.getAbsolutePath() + "] out of profile ["
+ droidProfile.getAbsolutePath() + "].");
try {
final String command = "java -jar " + path + " -p " + droidProfile.getAbsolutePath() + " -e "
+ resultCSV.getAbsolutePath();
LOGGER.debug("Running DROID command (" + command + ")");
Process proc = Runtime.getRuntime().exec(command);
try {
if (0 == proc.waitFor()) {
LOGGER.info("Everything fine!");
} else {
LOGGER.error("ERROR " + getStream(proc.getErrorStream()));
}
} catch (InterruptedException e) {
LOGGER.error("ERROR " + e.getMessage());
e.printStackTrace();
}
} catch (IOException e) {
LOGGER.error("ERROR " + e.getMessage());
e.printStackTrace();
}
} else {
LOGGER.error("Can't find DROID jar under following location (" + droidJar.getAbsolutePath() + ").");
}
}
private String getStream(InputStream stream) {
if (stream != null) {
byte b[];
try {
b = new byte[stream.available()];
stream.read(b, 0, b.length);
return new String(b);
} catch (IOException e) {
// nothing to print
}
}
return "NOTHING TO PRINT";
}
/**
* Read a DROID generated CSV report and convert each row in the report to a CSVEntry.
*
* @param CSVReport
* file
* @return List with a CSVEntry for each row in the DROID report.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public List<CSVEntry> readCSV(File CSVReport) {
if (CSVReport == null) {
LOGGER.error("ERROR: CSVReport must not be null.");
throw new IllegalArgumentException("ERROR: CSVReport must not be null.");
}
CSVReader reader = null;
try {
reader = new CSVReader(new InputStreamReader(new FileInputStream(CSVReport.getAbsolutePath()), "UTF-8"),
',', '\'', 1);
} catch (UnsupportedEncodingException e){
LOGGER.error("ERROR: File not found " + e.getMessage());
return Collections.EMPTY_LIST;
}
catch (FileNotFoundException e){
LOGGER.error("ERROR: File not found " + e.getMessage());
return Collections.EMPTY_LIST;
}
try {
ColumnPositionMappingStrategy strat = new ColumnPositionMappingStrategy();
strat.setType(CSVEntry.class);
strat.setColumnMapping(new String[] { "ID", "PARENT_ID", "URI", "FILE_PATH", "NAME", "METHOD", "STATUS",
"SIZE", "TYPE", "EXT", "LAST_MODIFIED", "EXTENSION_MISMATCH", "MD5_HASH", "FORMAT_COUNT", "PUID",
"MIME_TYPE", "FORMAT_NAME", "FORMAT_VERSION" });
CsvToBean csv = new CsvToBean();
List<CSVEntry> list = csv.parse(strat, reader);
// step over line1 in CSV
return list.subList(1, list.size());
} finally {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public List<CSVEntry> getCSVEntriesByPortname(List<CSVEntry> all, String portName) {
List<CSVEntry> entries = new ArrayList<CSVEntry>();
for (CSVEntry entry : all) {
if (entry.getNAME().equals(portName)) {
entries.add(entry);
}
}
return entries;
}
/**
* Make sure that each entry which has the same name and PUID only exists once.
*
* @return List without any double entries
*/
public List<CSVEntry> filterEntries(List<CSVEntry> origignal) {
List<CSVEntry> sublist = new ArrayList<CSVEntry>();
for (CSVEntry entry : origignal) {
if (!sublist.contains(entry)) {
sublist.add(entry);
}
}
return sublist;
}
public Map<Integer, ProvenanceAccessEntry> listAllWorkflowsReadable() {
setUp();
Integer interalID = 0;
for (WorkflowRun workflowRun : pa.getAllWorkflowIDs()) {
ProvenanceAccessEntry entry = new ProvenanceAccessEntry(workflowRun.getWorkflowId(),
workflowRun.getWorkflowRunId(), workflowRun.getWorkflowExternalName(), workflowRun.getTimestamp());
workflowMap.put(interalID, entry);
interalID++;
}
return workflowMap;
}
/**
* Dirty way of detecting if a string is a representation of a file path.
*
* @param possibleFilePath
* @return file if exists
*/
private File isFile(String possibleFilePath) {
String regexPath = "([a-zA-Z]:)?(\\\\[a-zA-Z0-9_.-]+)+\\\\?";
if (possibleFilePath != null && possibleFilePath.length() > 5 && possibleFilePath.contains(File.separator)
&& Pattern.matches(regexPath, possibleFilePath)) {
LOGGER.debug("\t Detect if file path (" + possibleFilePath + ") point to a file. ");
try {
// can we create a file of the possibleFilePath
File file = new File(possibleFilePath);
if (!file.isDirectory()) {
return file;
}
} catch (Exception e) {
LOGGER.debug("\t File can not be detected. " + e.getMessage());
return null;
}
}
return null;
}
private URL isURL(String possibleURL) {
String regexURL = "\\b(https?|ftp|file)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]";
if (possibleURL != null && possibleURL.length() > 5 && Pattern.matches(regexURL, possibleURL)) {
LOGGER.debug("\t Detect if (" + possibleURL + ") is a valid URL.");
try {
return new URL(possibleURL);
} catch (MalformedURLException e) {
LOGGER.debug("\t Not a valid URL.");
}
}
return null;
}
}