Switch to side-by-side view

--- a/src/utils/execmd.cpp
+++ b/src/utils/execmd.cpp
@@ -15,7 +15,7 @@
  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
  */
 #ifndef TEST_EXECMD
-#ifdef RECOLL_DATADIR
+#ifdef BUILDING_RECOLL
 #include "autoconfig.h"
 #else
 #include "config.h"
@@ -57,9 +57,7 @@
 
 extern char **environ;
 
-bool ExecCmd::o_useVfork = false;
-
-#ifdef RECOLL_DATADIR
+#ifdef BUILDING_RECOLL
 #include "debuglog.h"
 #include "smallut.h"
 
@@ -112,7 +110,86 @@
 	}
     }
 }
-#endif // RECOLL_DATADIR
+#endif // BUILDING_RECOLL
+
+class ExecCmd::Internal {
+public:
+    Internal()
+    : m_advise(0), m_provide(0), m_timeoutMs(1000), 
+      m_rlimit_as_mbytes(0) {
+    }
+
+    static bool      o_useVfork;
+
+    std::vector<std::string>   m_env;
+    ExecCmdAdvise   *m_advise;
+    ExecCmdProvide  *m_provide;
+    bool             m_killRequest;
+    int              m_timeoutMs;
+    int              m_rlimit_as_mbytes;
+    string           m_stderrFile;
+    // Pipe for data going to the command
+    int              m_pipein[2];
+    STD_SHARED_PTR<NetconCli> m_tocmd;
+    // Pipe for data coming out
+    int              m_pipeout[2];
+    STD_SHARED_PTR<NetconCli> m_fromcmd;
+    // Subprocess id
+    pid_t            m_pid;
+    // Saved sigmask
+    sigset_t         m_blkcld;
+
+    // Reset internal state indicators. Any resources should have been
+    // previously freed
+    void reset() {
+	m_killRequest = false;
+	m_pipein[0] = m_pipein[1] = m_pipeout[0] = m_pipeout[1] = -1;
+	m_pid = -1;
+	sigemptyset(&m_blkcld);
+    }
+    // Child process code
+    inline void dochild(const std::string &cmd, const char **argv, 
+			const char **envv, bool has_input, bool has_output);
+};
+bool ExecCmd::Internal::o_useVfork = false;
+
+ExecCmd::ExecCmd()
+{
+    m = new Internal();
+    if (m)
+        m->reset();
+}
+void ExecCmd::setAdvise(ExecCmdAdvise *adv) 
+{
+    m->m_advise = adv;
+}
+void ExecCmd::setProvide(ExecCmdProvide *p) 
+{
+    m->m_provide = p;
+}
+void ExecCmd::setTimeout(int mS) 
+{
+    if (mS > 30) 
+        m->m_timeoutMs = mS;
+}
+void ExecCmd::setStderr(const std::string &stderrFile) 
+{
+    m->m_stderrFile = stderrFile;
+}
+pid_t ExecCmd::getChildPid() 
+{
+    return m->m_pid;
+}
+void ExecCmd::setKill() 
+{
+    m->m_killRequest = true;
+}
+void ExecCmd::zapChild() 
+{
+    setKill(); 
+    (void)wait();
+}
+
 
 /* From FreeBSD's which command */
 static bool exec_is_there(const char *candidate)
@@ -174,12 +251,12 @@
     // an executable file, we have a problem.
     const char *argv[] = {"/", 0};
     execve("/", (char *const *)argv, environ);
-    o_useVfork  = on;
+    Internal::o_useVfork  = on;
 }
 
 void ExecCmd::putenv(const string &ea)
 {
-    m_env.push_back(ea);
+    m->m_env.push_back(ea);
 }
 
 void  ExecCmd::putenv(const string &name, const string& value)
@@ -201,8 +278,12 @@
  *  during method executions */
 class ExecCmdRsrc {
 public:
-    ExecCmdRsrc(ExecCmd *parent) : m_parent(parent), m_active(true) {}
-    void inactivate() {m_active = false;}
+    ExecCmdRsrc(ExecCmd::Internal *parent) 
+        : m_parent(parent), m_active(true) {
+    }
+    void inactivate() {
+        m_active = false;
+    }
     ~ExecCmdRsrc() {
 	if (!m_active || !m_parent)
 	    return;
@@ -250,13 +331,15 @@
 	m_parent->reset();
     }
 private:
-    ExecCmd *m_parent;
+    ExecCmd::Internal *m_parent;
     bool    m_active;
 };
 
 ExecCmd::~ExecCmd()
 {
-    ExecCmdRsrc(this);
+    ExecCmdRsrc(this->m);
+    if (m)
+        delete m;
 }
 
 // In child process. Set up pipes and exec command. 
@@ -272,9 +355,9 @@
 // If one of the calls block, the problem manifests itself by 20mn
 // (filter timeout) of looping on "ExecCmd::doexec: selectloop
 // returned 1', because the father is waiting on the read descriptor
-inline void ExecCmd::dochild(const string &cmd, const char **argv,
-			     const char **envv,
-			     bool has_input, bool has_output)
+inline void ExecCmd::Internal::dochild(const string &cmd, const char **argv,
+                                       const char **envv,
+                                       bool has_input, bool has_output)
 {
     // Start our own process group
     if (setpgid(0, getpid())) {
@@ -384,7 +467,7 @@
 
 void ExecCmd::setrlimit_as(int mbytes)
 {
-    m_rlimit_as_mbytes = mbytes;
+    m->m_rlimit_as_mbytes = mbytes;
 }
 
 int ExecCmd::startExec(const string &cmd, const vector<string>& args,
@@ -401,13 +484,13 @@
     }
 
     // The resource manager ensures resources are freed if we return early
-    ExecCmdRsrc e(this);
-
-    if (has_input && pipe(m_pipein) < 0) {
+    ExecCmdRsrc e(this->m);
+
+    if (has_input && pipe(m->m_pipein) < 0) {
 	LOGERR(("ExecCmd::startExec: pipe(2) failed. errno %d\n", errno));
 	return -1;
     }
-    if (has_output && pipe(m_pipeout) < 0) {
+    if (has_output && pipe(m->m_pipeout) < 0) {
 	LOGERR(("ExecCmd::startExec: pipe(2) failed. errno %d\n", errno));
 	return -1;
     }
@@ -441,7 +524,7 @@
     for (envsize = 0; ; envsize++) 
 	if (environ[envsize] == 0)
 	    break;
-    envv = (Ccharp *)malloc((envsize + m_env.size() + 2) * sizeof(char *));
+    envv = (Ccharp *)malloc((envsize + m->m_env.size() + 2) * sizeof(char *));
     if (envv == 0) {
 	LOGERR(("ExecCmd::doexec: malloc() failed. errno %d\n",	errno));
         free(argv);
@@ -450,8 +533,8 @@
     int eidx;
     for (eidx = 0; eidx < envsize; eidx++)
 	envv[eidx] = environ[eidx];
-    for (vector<string>::const_iterator it = m_env.begin(); 
-	 it != m_env.end(); it++) {
+    for (vector<string>::const_iterator it = m->m_env.begin(); 
+	 it != m->m_env.end(); it++) {
 	envv[eidx++] = it->c_str();
     }
     envv[eidx] = 0;
@@ -495,27 +578,27 @@
         posix_spawn_file_actions_init(&facts);
 
         if (has_input) {
-            posix_spawn_file_actions_addclose(&facts, m_pipein[1]);
-            if (m_pipein[0] != 0) {
-                posix_spawn_file_actions_adddup2(&facts, m_pipein[0], 0);
-                posix_spawn_file_actions_addclose(&facts, m_pipein[0]);
+            posix_spawn_file_actions_addclose(&facts, m->m_pipein[1]);
+            if (m->m_pipein[0] != 0) {
+                posix_spawn_file_actions_adddup2(&facts, m->m_pipein[0], 0);
+                posix_spawn_file_actions_addclose(&facts, m->m_pipein[0]);
             }
         }
         if (has_output) {
-            posix_spawn_file_actions_addclose(&facts, m_pipeout[0]);
-            if (m_pipeout[1] != 1) {
-                posix_spawn_file_actions_adddup2(&facts, m_pipeout[1], 1);
-                posix_spawn_file_actions_addclose(&facts, m_pipeout[1]);
+            posix_spawn_file_actions_addclose(&facts, m->m_pipeout[0]);
+            if (m->m_pipeout[1] != 1) {
+                posix_spawn_file_actions_adddup2(&facts, m->m_pipeout[1], 1);
+                posix_spawn_file_actions_addclose(&facts, m->m_pipeout[1]);
             }
         }
 
         // Do we need to redirect stderr ?
-        if (!m_stderrFile.empty()) {
+        if (!m->m_stderrFile.empty()) {
             int oflags = O_WRONLY|O_CREAT;
 #ifdef O_APPEND
             oflags |= O_APPEND;
 #endif
-            posix_spawn_file_actions_addopen(&facts, 2, m_stderrFile.c_str(), 
+            posix_spawn_file_actions_addopen(&facts, 2, m->m_stderrFile.c_str(), 
                                              oflags, 0600);
         }
         LOGDEB1(("using SPAWN\n"));
@@ -527,7 +610,7 @@
             posix_spawn_file_actions_addclose(&facts, i);
         }
 
-        int ret = posix_spawn(&m_pid, exe.c_str(), &facts, &attrs, 
+        int ret = posix_spawn(&m->m_pid, exe.c_str(), &facts, &attrs, 
                               (char *const *)argv, (char *const *)envv);
         posix_spawnattr_destroy(&attrs);
         posix_spawn_file_actions_destroy(&facts);
@@ -539,22 +622,22 @@
     }
 
 #else
-    if (o_useVfork) {
+    if (Internal::o_useVfork) {
         LOGDEB1(("using VFORK\n"));
-	m_pid = vfork();
+	m->m_pid = vfork();
     } else {
         LOGDEB1(("using FORK\n"));
-	m_pid = fork();
-    }
-    if (m_pid < 0) {
+	m->m_pid = fork();
+    }
+    if (m->m_pid < 0) {
 	LOGERR(("ExecCmd::startExec: fork(2) failed. errno %d\n", errno));
 	return -1;
     }
-    if (m_pid == 0) {
+    if (m->m_pid == 0) {
 	// e.inactivate() is not needed. As we do not return, the call
 	// stack won't be unwound and destructors of local objects
 	// won't be called.
-	dochild(exe, argv, envv, has_input, has_output);
+	m->dochild(exe, argv, envv, has_input, has_output);
 	// dochild does not return. Just in case...
 	_exit(1);
     }
@@ -570,30 +653,30 @@
 
     // Set the process group for the child. This is also done in the
     // child process see wikipedia(Process_group)
-    if (setpgid(m_pid, m_pid)) {
+    if (setpgid(m->m_pid, m->m_pid)) {
         // This can fail with EACCES if the son has already done execve 
         // (linux at least)
         LOGDEB2(("ExecCmd: father setpgid(son)(%d,%d) errno %d (ok)\n",
-                 m_pid, m_pid, errno));
-    }
-
-    sigemptyset(&m_blkcld);
-    sigaddset(&m_blkcld, SIGCHLD);
-    pthread_sigmask(SIG_BLOCK, &m_blkcld, 0);
+                 m->m_pid, m->m_pid, errno));
+    }
+
+    sigemptyset(&m->m_blkcld);
+    sigaddset(&m->m_blkcld, SIGCHLD);
+    pthread_sigmask(SIG_BLOCK, &m->m_blkcld, 0);
 
     if (has_input) {
-	close(m_pipein[0]);
-	m_pipein[0] = -1;
+	close(m->m_pipein[0]);
+	m->m_pipein[0] = -1;
 	NetconCli *iclicon = new NetconCli();
-	iclicon->setconn(m_pipein[1]);
-	m_tocmd = NetconP(iclicon);
+	iclicon->setconn(m->m_pipein[1]);
+	m->m_tocmd = STD_SHARED_PTR<NetconCli>(iclicon);
     }
     if (has_output) {
-	close(m_pipeout[1]);
-	m_pipeout[1] = -1;
+	close(m->m_pipeout[1]);
+	m->m_pipeout[1] = -1;
 	NetconCli *oclicon = new NetconCli();
-	oclicon->setconn(m_pipeout[0]);
-	m_fromcmd = NetconP(oclicon);
+	oclicon->setconn(m->m_pipeout[0]);
+	m->m_fromcmd = STD_SHARED_PTR<NetconCli>(oclicon);
     }
 
     /* Don't want to undo what we just did ! */
@@ -605,28 +688,38 @@
 // Netcon callback. Send data to the command's input
 class ExecWriter : public NetconWorker {
 public:
-    ExecWriter(const string *input, ExecCmdProvide *provide) 
-	: m_input(input), m_cnt(0), m_provide(provide)
-    {}				    
+    ExecWriter(const string *input, ExecCmdProvide *provide, 
+               ExecCmd::Internal *parent)
+	: m_cmd(parent), m_input(input), m_cnt(0), m_provide(provide) {
+    }
+    void shutdown() {
+        close(m_cmd->m_pipein[1]);
+        m_cmd->m_pipein[1] = -1;
+	m_cmd->m_tocmd.reset();
+    }
     virtual int data(NetconData *con, Netcon::Event reason)
     {
-	if (!m_input) return -1;
+	if (!m_input) 
+            return -1;
 	LOGDEB1(("ExecWriter: input m_cnt %d input length %d\n", m_cnt, 
 		 m_input->length()));
 	if (m_cnt >= m_input->length()) {
-	    // Fd ready for more but we got none.
-	    if (m_provide) {
-		m_provide->newData();
-		if (m_input->empty()) {
-		    return 0;
-		} else {
-		    m_cnt = 0;
-		}
-		LOGDEB2(("ExecWriter: provide m_cnt %d input length %d\n", 
-			 m_cnt, m_input->length()));
-	    } else {
+	    // Fd ready for more but we got none. Try to get data, else
+            // shutdown;
+	    if (!m_provide) {
+                shutdown();
 		return 0;
-	    }
+            }
+            m_provide->newData();
+            if (m_input->empty()) {
+                shutdown();
+                return 0;
+            } else {
+                // Ready with new buffer, reset use count
+                m_cnt = 0;
+            }
+            LOGDEB2(("ExecWriter: provide m_cnt %d input length %d\n", 
+                     m_cnt, m_input->length()));
 	}
 	int ret = con->send(m_input->c_str() + m_cnt, 
 			    m_input->length() - m_cnt);
@@ -639,6 +732,7 @@
 	return ret;
     }
 private:
+    ExecCmd::Internal *m_cmd;
     const string   *m_input;
     unsigned int    m_cnt; // Current offset inside m_input
     ExecCmdProvide *m_provide;
@@ -679,63 +773,63 @@
     }
 
     // Cleanup in case we return early
-    ExecCmdRsrc e(this);
+    ExecCmdRsrc e(this->m);
     SelectLoop myloop;
     int ret = 0;
     if (input || output) {
         // Setup output
 	if (output) {
-	    NetconCli *oclicon = dynamic_cast<NetconCli *>(m_fromcmd.get());
+	    NetconCli *oclicon = m->m_fromcmd.get();
 	    if (!oclicon) {
 		LOGERR(("ExecCmd::doexec: no connection from command\n"));
 		return -1;
 	    }
 	    oclicon->setcallback(STD_SHARED_PTR<NetconWorker>
-				 (new ExecReader(output, m_advise)));
-	    myloop.addselcon(m_fromcmd, Netcon::NETCONPOLL_READ);
+				 (new ExecReader(output, m->m_advise)));
+	    myloop.addselcon(m->m_fromcmd, Netcon::NETCONPOLL_READ);
 	    // Give up ownership 
-	    m_fromcmd.reset();
+	    m->m_fromcmd.reset();
 	} 
         // Setup input
 	if (input) {
-	    NetconCli *iclicon = dynamic_cast<NetconCli *>(m_tocmd.get());
+	    NetconCli *iclicon = m->m_tocmd.get();
 	    if (!iclicon) {
 		LOGERR(("ExecCmd::doexec: no connection from command\n"));
 		return -1;
 	    }
 	    iclicon->setcallback(STD_SHARED_PTR<NetconWorker>
-				 (new ExecWriter(input, m_provide)));
-	    myloop.addselcon(m_tocmd, Netcon::NETCONPOLL_WRITE);
+				 (new ExecWriter(input, m->m_provide, m)));
+	    myloop.addselcon(m->m_tocmd, Netcon::NETCONPOLL_WRITE);
 	    // Give up ownership 
-	    m_tocmd.reset();
+	    m->m_tocmd.reset();
 	}
 
         // Do the actual reading/writing/waiting
-	myloop.setperiodichandler(0, 0, m_timeoutMs);
+	myloop.setperiodichandler(0, 0, m->m_timeoutMs);
 	while ((ret = myloop.doLoop()) > 0) {
 	    LOGDEB(("ExecCmd::doexec: selectloop returned %d\n", ret));
-	    if (m_advise)
-		m_advise->newData(0);
-	    if (m_killRequest) {
+	    if (m->m_advise)
+		m->m_advise->newData(0);
+	    if (m->m_killRequest) {
 		LOGINFO(("ExecCmd::doexec: cancel request\n"));
 		break;
 	    }
 	}
 	LOGDEB0(("ExecCmd::doexec: selectloop returned %d\n", ret));
         // Check for interrupt request: we won't want to waitpid()
-        if (m_advise)
-            m_advise->newData(0);
+        if (m->m_advise)
+            m->m_advise->newData(0);
 
         // The netcons don't take ownership of the fds: we have to close them
         // (have to do it before wait, this may be the signal the child is 
         // waiting for exiting).
         if (input) {
-            close(m_pipein[1]);
-            m_pipein[1] = -1;
+            close(m->m_pipein[1]);
+            m->m_pipein[1] = -1;
         }
         if (output) {
-            close(m_pipeout[0]);
-            m_pipeout[0] = -1;
+            close(m->m_pipeout[0]);
+            m->m_pipeout[0] = -1;
         }
     }
 
@@ -750,14 +844,14 @@
 
 int ExecCmd::send(const string& data)
 {
-    NetconCli *con = dynamic_cast<NetconCli *>(m_tocmd.get());
+    NetconCli *con = m->m_tocmd.get();
     if (con == 0) {
 	LOGERR(("ExecCmd::send: outpipe is closed\n"));
 	return -1;
     }
     unsigned int nwritten = 0;
     while (nwritten < data.length()) {
-	if (m_killRequest)
+	if (m->m_killRequest)
 	    break;
 	int n = con->send(data.c_str() + nwritten, data.length() - nwritten);
 	if (n < 0) {
@@ -771,7 +865,7 @@
 
 int ExecCmd::receive(string& data, int cnt)
 {
-    NetconCli *con = dynamic_cast<NetconCli *>(m_fromcmd.get());
+    NetconCli *con = m->m_fromcmd.get();
     if (con == 0) {
 	LOGERR(("ExecCmd::receive: inpipe is closed\n"));
 	return -1;
@@ -798,7 +892,7 @@
 
 int ExecCmd::getline(string& data)
 {
-    NetconCli *con = dynamic_cast<NetconCli *>(m_fromcmd.get());
+    NetconCli *con = m->m_fromcmd.get();
     if (con == 0) {
 	LOGERR(("ExecCmd::receive: inpipe is closed\n"));
 	return -1;
@@ -817,36 +911,54 @@
 }
 
 // Wait for command status and clean up all resources.
+// We would like to avoid blocking here too, but there is no simple
+// way to do this. The 2 possible approaches would be to:
+//  - Use signals (alarm), waitpid() is interruptible. but signals and
+//    threads... This would need a specialized thread, inter-thread comms etc.
+//  - Use an intermediary process when starting the command. The
+//    process forks a timer process, and the real command, then calls
+//    a blocking waitpid on all at the end, and is guaranteed to get
+//    at least the timer process status, thus yielding a select()
+//    equivalent. This is bad too, because the timeout is on the whole
+//    exec, not just the wait
+// Just calling waitpid() with WNOHANG with a sleep() between tries
+// does not work: the first waitpid() usually comes too early and
+// reaps nothing, resulting in almost always one sleep() or more.
+// 
+// So no timeout here. This has not been a problem in practise inside recoll.
+// In case of need, using a semi-busy loop with short sleeps
+// increasing from a few mS might work without creating too much
+// overhead.
 int ExecCmd::wait()
 {
-    ExecCmdRsrc e(this);
+    ExecCmdRsrc e(this->m);
     int status = -1;
-    if (!m_killRequest && m_pid > 0) {
-	if (waitpid(m_pid, &status, 0) < 0) {
+    if (!m->m_killRequest && m->m_pid > 0) {
+	if (waitpid(m->m_pid, &status, 0) < 0) {
 	    LOGERR(("ExecCmd::waitpid: returned -1 errno %d\n", errno));
 	    status = -1;
 	}
         LOGDEB(("ExecCmd::wait: got status 0x%x\n", status));
-	m_pid = -1;
-    }
-    // Let the ExecCmdRsrc cleanup
+	m->m_pid = -1;
+    }
+    // Let the ExecCmdRsrc cleanup, it will do the killing/waiting if needed
     return status;
 }
 
 bool ExecCmd::maybereap(int *status)
 {
-    ExecCmdRsrc e(this);
+    ExecCmdRsrc e(this->m);
     *status = -1;
 
-    if (m_pid <= 0) {
+    if (m->m_pid <= 0) {
 	// Already waited for ??
 	return true;
     }
 
-    pid_t pid = waitpid(m_pid, status, WNOHANG);
+    pid_t pid = waitpid(m->m_pid, status, WNOHANG);
     if (pid < 0) {
         LOGERR(("ExecCmd::maybereap: returned -1 errno %d\n", errno));
-	m_pid = -1;
+	m->m_pid = -1;
 	return true;
     } else if (pid == 0) {
 	LOGDEB1(("ExecCmd::maybereap: not exited yet\n"));
@@ -854,7 +966,7 @@
 	return false;
     } else {
         LOGDEB(("ExecCmd::maybereap: got status 0x%x\n", status));
-	m_pid = -1;
+	m->m_pid = -1;
 	return true;
     }
 }
@@ -992,133 +1104,164 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#include <signal.h>
 
 #include <string>
 #include <iostream>
 #include <sstream>
 #include <vector>
-using namespace std;
 
 #include "debuglog.h"
 #include "cancelcheck.h"
 #include "execmd.h"
 #include "smallut.h"
 
-// Testing with rclaudio: use an mp3 as parameter
-static const string tstcmd("/usr/share/recoll/filters/rclaudio");
-static const string mimetype("audio/mpeg");
-bool exercise_mhexecm(const string& filename)
+using namespace std;
+
+
+// Testing the rclexecm protocol outside of recoll. Here we use the
+// rcldoc.py filter, you can try with rclaudio too, adjust the file arg
+// accordingly
+bool exercise_mhexecm(const string& cmdstr, const string& mimetype, 
+                      vector<string>& files)
 {
     ExecCmd cmd;
 
-    vector<string>myparams; 
-
-    if (cmd.startExec(tstcmd, myparams, 1, 1) < 0) {
-	cerr << "startExec " << tstcmd << " failed. Missing command?\n";
+    vector<string> myparams; 
+
+    if (cmd.startExec(cmdstr, myparams, 1, 1) < 0) {
+	cerr << "startExec " << cmdstr << " failed. Missing command?\n";
 	return false;
     }
 
-    // Build request message
-    ostringstream obuf;
-    obuf << "FileName: " << filename.length() << "\n" << filename;
-    obuf << "Mimetype: " << mimetype.length() << "\n" << mimetype;
-    // Bogus parameter should be skipped by filter
-    obuf << "BogusParam: " << string("bogus").length() << "\n" << "bogus";
-    obuf << "\n";
-    cerr << "SENDING: [" << obuf.str() << "]\n";
-    // Send it 
-    if (cmd.send(obuf.str()) < 0) {
-	// The real code calls zapchild here, but we don't need it as
-	// this will be handled by ~ExecCmd
-        //cmd.zapChild();
-        cerr << "send error\n";
-        return false;
-    }
-
-    // Read answer
-    for (int loop=0;;loop++) {
-        string name, data;
-
-	// Code from mh_execm.cpp: readDataElement
-	string ibuf;
-	// Read name and length
-	if (cmd.getline(ibuf) <= 0) {
-	    cerr << "getline error\n";
-	    return false;
-	}
-	// Empty line (end of message)
-	if (!ibuf.compare("\n")) {
-	    cerr << "Got empty line\n";
-	    name.clear();
-	    return true;
-	}
-
-	// Filters will sometimes abort before entering the real protocol, ie if
-	// a module can't be loaded. Check the special filter error first word:
-	if (ibuf.find("RECFILTERROR ") == 0) {
-	    cerr << "Got RECFILTERROR\n";
-	    return false;
-	}
-
-	// We're expecting something like Name: len\n
-	vector<string> tokens;
-	stringToTokens(ibuf, tokens);
-	if (tokens.size() != 2) {
-	    cerr << "bad line in filter output: [" << ibuf << "]\n";
-	    return false;
-	}
-	vector<string>::iterator it = tokens.begin();
-	name = *it++;
-	string& slen = *it;
-	int len;
-	if (sscanf(slen.c_str(), "%d", &len) != 1) {
-	    cerr << "bad line in filter output (no len): [" << ibuf << "]\n";
-	    return false;
-	}
-	// Read element data
-	data.erase();
-	if (len > 0 && cmd.receive(data, len) != len) {
-	    cerr << "MHExecMultiple: expected " << len << 
-		" bytes of data, got " << data.length() << endl;
-	    return false;
-	}
-
-	// Empty element: end of message
-        if (name.empty())
-            break;
-	cerr << "Got name: [" << name << "] data [" << data << "]\n";
-    }
-
+    for (vector<string>::const_iterator it = files.begin();
+         it != files.end(); it++) {
+        // Build request message
+        ostringstream obuf;
+        obuf << "Filename: " << (*it).length() << "\n" << (*it);
+        obuf << "Mimetype: " << mimetype.length() << "\n" << mimetype;
+        // Bogus parameter should be skipped by filter
+        obuf << "BogusParam: " << string("bogus").length() << "\n" << "bogus";
+        obuf << "\n";
+        cerr << "SENDING: [" << obuf.str() << "]\n";
+        // Send it 
+        if (cmd.send(obuf.str()) < 0) {
+            // The real code calls zapchild here, but we don't need it as
+            // this will be handled by ~ExecCmd
+            //cmd.zapChild();
+            cerr << "send error\n";
+            return false;
+        }
+
+        // Read answer
+        for (int loop=0;;loop++) {
+            string name, data;
+
+            // Code from mh_execm.cpp: readDataElement
+            string ibuf;
+            // Read name and length
+            if (cmd.getline(ibuf) <= 0) {
+                cerr << "getline error\n";
+                return false;
+            }
+            // Empty line (end of message)
+            if (!ibuf.compare("\n")) {
+                cerr << "Got empty line\n";
+                name.clear();
+                break;
+            }
+
+            // Filters will sometimes abort before entering the real
+            // protocol, ie if a module can't be loaded. Check the
+            // special filter error first word:
+            if (ibuf.find("RECFILTERROR ") == 0) {
+                cerr << "Got RECFILTERROR\n";
+                return false;
+            }
+
+            // We're expecting something like Name: len\n
+            vector<string> tokens;
+            stringToTokens(ibuf, tokens);
+            if (tokens.size() != 2) {
+                cerr << "bad line in filter output: [" << ibuf << "]\n";
+                return false;
+            }
+            vector<string>::iterator it = tokens.begin();
+            name = *it++;
+            string& slen = *it;
+            int len;
+            if (sscanf(slen.c_str(), "%d", &len) != 1) {
+                cerr << "bad line in filter output (no len): [" << 
+                    ibuf << "]\n";
+                return false;
+            }
+            // Read element data
+            data.erase();
+            if (len > 0 && cmd.receive(data, len) != len) {
+                cerr << "MHExecMultiple: expected " << len << 
+                    " bytes of data, got " << data.length() << endl;
+                return false;
+            }
+
+            // Empty element: end of message
+            if (name.empty())
+                break;
+            cerr << "Got name: [" << name << "] data [" << data << "]\n";
+        }
+    }
     return true;
 }
 
+static char *thisprog;
+static char usage [] =
+"trexecmd [-c -r -i -o] cmd [arg1 arg2 ...]\n" 
+"   -c : test cancellation (ie: trexecmd -c sleep 1000)\n"
+"   -r : run reexec. Must be separate option.\n"
+"   -i : command takes input\n"
+"   -o : command produces output\n"
+"    If -i is set, we send /etc/group contents to whatever command is run\n"
+"    If -o is set, we print whatever comes out\n"
+"trexecmd -m <filter> <mimetype> <file> [file ...]: test execm:\n"
+"     <filter> should be the path to an execm filter\n"
+"     <mimetype> the type of the file parameters\n"
+"trexecmd -w cmd : do the 'which' thing\n"
+;
+
+static void Usage(void)
+{
+    fprintf(stderr, "%s: usage:\n%s", thisprog, usage);
+    exit(1);
+}
 
 static int     op_flags;
 #define OPT_MOINS 0x1
-#define OPT_b	  0x4 
+#define OPT_i     0x4
 #define OPT_w     0x8
 #define OPT_c     0x10
 #define OPT_r     0x20
 #define OPT_m     0x40
-
-const char *data = "Une ligne de donnees\n";
+#define OPT_o     0x80
+
+// Data sink for data coming out of the command. We also use it to set
+// a cancellation after a moment.
 class MEAdv : public ExecCmdAdvise {
 public:
-    ExecCmd *cmd;
     void newData(int cnt) {
 	if (op_flags & OPT_c) {
 	    static int  callcnt;
-	    if (callcnt++ == 3) {
-		throw CancelExcept();
+	    if (callcnt++ == 10) {
+                // Just sets the cancellation flag
+		CancelCheck::instance().setCancel();
+                // Would be called from somewhere else and throws an
+                // exception. We call it here for simplicity
+                CancelCheck::instance().checkCancel();
 	    }
 	}
 	cerr << "newData(" << cnt << ")" << endl;
-	//	CancelCheck::instance().setCancel();
-	//	CancelCheck::instance().checkCancel();
-	//	cmd->setCancel();
     }
 };
 
+// Data provider, used if the -i flag is set
 class MEPv : public ExecCmdProvide {
 public:
     FILE *m_fp;
@@ -1143,22 +1286,8 @@
 };
 
 
-static char *thisprog;
-static char usage [] =
-"trexecmd [-c|-r] cmd [arg1 arg2 ...]\n" 
-" -c : test cancellation (ie: trexecmd -c sleep 1000)\n"
-" -r : test reexec\n"
-" -m <path to mp3 file>: test execm: needs installed and working rclaudio/mutagen\n"
-"trexecmd -w cmd : do the which thing\n"
-;
-static void Usage(void)
-{
-    fprintf(stderr, "%s: usage:\n%s", thisprog, usage);
-    exit(1);
-}
 
 ReExec reexec;
-
 int main(int argc, char *argv[])
 {
     reexec.init(argc, argv);
@@ -1188,6 +1317,8 @@
 	    case 'r':	op_flags |= OPT_r; break;
 	    case 'w':	op_flags |= OPT_w; break;
 	    case 'm':	op_flags |= OPT_m; break;
+	    case 'i':	op_flags |= OPT_i; break;
+	    case 'o':	op_flags |= OPT_o; break;
 	    default: Usage();	break;
 	    }
     b1: argc--; argv++;
@@ -1201,15 +1332,22 @@
     while (argc > 0) {
 	l.push_back(*argv++); argc--;
     }
+
     DebugLog::getdbl()->setloglevel(DEBDEB1);
     DebugLog::setfilename("stderr");
     signal(SIGPIPE, SIG_IGN);
 
     if (op_flags & OPT_r) {
-	// Test reexec
+	// Test reexec. Normally only once, next time we fall through
+        // because we remove the -r option (only works if it was isolated, not like -rc
 	chdir("/");
         argv[0] = strdup("");
 	sleep(1);
+        cerr << "Calling reexec\n";
+        // We remove the -r arg from list, otherwise we are going to
+        // loop (which you can try by commenting out the following
+        // line)
+        reexec.removeArg("-r");
         reexec.reexec();
     }
 
@@ -1218,44 +1356,59 @@
 	string path;
 	if (ExecCmd::which(arg1, path)) {
 	    cout << path << endl;
-	    exit(0);
+            return 0;
 	} 
-	exit(1);
-    }
-
-    if (op_flags & OPT_m) {
-	return exercise_mhexecm(arg1) ? 0 : 1;
-    }
-
-    //////////////
-    // Default: execute command line arguments
-    ExecCmd mexec;
-    MEAdv adv;
-    adv.cmd = &mexec;
-    mexec.setAdvise(&adv);
-    mexec.setTimeout(5);
-    mexec.setStderr("/tmp/trexecStderr");
-    mexec.putenv("TESTVARIABLE1=TESTVALUE1");
-    mexec.putenv("TESTVARIABLE2=TESTVALUE2");
-    mexec.putenv("TESTVARIABLE3=TESTVALUE3");
-
-    string input, output;
-    //    input = data;
-    string *ip = 0;
-    ip = &input;
-
-    MEPv  pv(&input);
-    mexec.setProvide(&pv);
-
-    int status = -1;
-    try {
-	status = mexec.doexec(arg1, l, ip, &output);
-    } catch (CancelExcept) {
-	cerr << "CANCELLED" << endl;
-    }
-
-    fprintf(stderr, "Status: 0x%x\n", status);
-    cout << output;
-    exit (status >> 8);
+	return 1;
+    } else if (op_flags & OPT_m) {
+        if (l.size() < 2)
+            Usage();
+        string mimetype = l[0];
+        l.erase(l.begin());
+	return exercise_mhexecm(arg1, mimetype, l) ? 0 : 1;
+    } else {
+        // Default: execute command line arguments
+        ExecCmd mexec;
+
+        // Set callback to be called whenever there is new data
+        // available and at a periodic interval, to check for
+        // cancellation
+        MEAdv adv;
+        mexec.setAdvise(&adv);
+        mexec.setTimeout(5);
+
+        // Stderr output goes there
+        mexec.setStderr("/tmp/trexecStderr");
+        
+        // A few environment variables. Check with trexecmd env
+        mexec.putenv("TESTVARIABLE1=TESTVALUE1");
+        mexec.putenv("TESTVARIABLE2=TESTVALUE2");
+        mexec.putenv("TESTVARIABLE3=TESTVALUE3");
+
+        string input, output;
+        MEPv  pv(&input);
+        
+        string *ip = 0;
+        if (op_flags  & OPT_i) {
+            ip = &input;
+            mexec.setProvide(&pv);
+        }
+        string *op = 0;
+        if (op_flags & OPT_o) {
+            op = &output;
+        }
+
+        int status = -1;
+        try {
+            status = mexec.doexec(arg1, l, ip, op);
+        } catch (CancelExcept) {
+            cerr << "CANCELLED" << endl;
+        }
+
+        fprintf(stderr, "Status: 0x%x\n", status);
+        if (op_flags & OPT_o) {
+            cout << output;
+        }
+        exit (status >> 8);
+    }
 }
 #endif // TEST