--- a/src/index/fsindexer.cpp
+++ b/src/index/fsindexer.cpp
@@ -53,32 +53,78 @@
#define RCL_STTIME st_mtime
#endif // RCL_USE_XATTR
-#ifndef NO_NAMESPACES
using namespace std;
-#endif /* NO_NAMESPACES */
-
-#ifndef deleteZ
-#define deleteZ(X) {delete X;X = 0;}
-#endif
+
+#ifdef IDX_THREADS
+class DbUpdTask {
+public:
+ DbUpdTask(RclConfig *cnf, const string& u, const string& p,
+ const Rcl::Doc& d)
+ : udi(u), parent_udi(p), doc(d), config(cnf)
+ {}
+ string udi;
+ string parent_udi;
+ Rcl::Doc doc;
+ RclConfig *config;
+};
+extern void *FsIndexerDbUpdWorker(void*);
+
+class InternfileTask {
+public:
+ InternfileTask(const std::string &f, const struct stat *i_stp)
+ : fn(f), statbuf(*i_stp)
+ {}
+ string fn;
+ struct stat statbuf;
+};
+extern void *FsIndexerInternfileWorker(void*);
+#endif // IDX_THREADS
+
+// Thread safe variation of the "missing helpers" storage. Only the
+// addMissing method needs protection, the rest are called from the
+// main thread either before or after the exciting part
+class FSIFIMissingStore : public FIMissingStore {
+#ifdef IDX_THREADS
+ PTMutexInit m_mutex;
+#endif
+public:
+ virtual void addMissing(const string& prog, const string& mt)
+ {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ FIMissingStore::addMissing(prog, mt);
+ }
+};
FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
- : m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FIMissingStore)
-#ifdef IDX_THREADS
- , m_wqueue(10)
+ : m_config(cnf), m_db(db), m_updater(updfunc),
+ m_missing(new FSIFIMissingStore)
+#ifdef IDX_THREADS
+ , m_iwqueue("Internfile", 2), m_dwqueue("Split", 2)
#endif // IDX_THREADS
{
m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS
- if (!m_wqueue.start(FsIndexerIndexWorker, this)) {
+ m_loglevel = DebugLog::getdbl()->getlevel();
+ if (!m_iwqueue.start(4, FsIndexerInternfileWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
return;
}
-#endif // IDX_THREADS
-}
+ if (!m_dwqueue.start(2, FsIndexerDbUpdWorker, this)) {
+ LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
+ return;
+ }
+#endif // IDX_THREADS
+}
+
FsIndexer::~FsIndexer() {
#ifdef IDX_THREADS
- void *status = m_wqueue.setTerminateAndWait();
- LOGERR(("FsIndexer: worker status: %ld\n", long(status)));
+ void *status = m_iwqueue.setTerminateAndWait();
+ LOGINFO(("FsIndexer: internfile wrker status: %ld (1->ok)\n",
+ long(status)));
+ status = m_dwqueue.setTerminateAndWait();
+ LOGINFO(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status)));
#endif // IDX_THREADS
delete m_missing;
}
@@ -98,10 +144,14 @@
// Recursively index each directory in the topdirs:
bool FsIndexer::index()
{
+ Chrono chron;
if (!init())
return false;
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
m_updater->status.reset();
m_updater->status.dbtotdocs = m_db->docCnt();
}
@@ -138,15 +188,21 @@
}
#ifdef IDX_THREADS
- m_wqueue.waitIdle();
-#endif // IDX_THREADS
- string missing;
- FileInterner::getMissingDescription(m_missing, missing);
- if (!missing.empty()) {
- LOGINFO(("FsIndexer::index missing helper program(s):\n%s\n",
- missing.c_str()));
- }
- m_config->storeMissingHelperDesc(missing);
+ m_iwqueue.waitIdle();
+ m_dwqueue.waitIdle();
+ m_db->waitUpdIdle();
+#endif // IDX_THREADS
+
+ if (m_missing) {
+ string missing;
+ m_missing->getMissingDescription(missing);
+ if (!missing.empty()) {
+ LOGINFO(("FsIndexer::index missing helper program(s):\n%s\n",
+ missing.c_str()));
+ }
+ m_config->storeMissingHelperDesc(missing);
+ }
+ LOGERR(("fsindexer index time: %d mS\n", chron.ms()));
return true;
}
@@ -303,22 +359,51 @@
}
#ifdef IDX_THREADS
-void *FsIndexerIndexWorker(void * fsp)
+void *FsIndexerDbUpdWorker(void * fsp)
{
FsIndexer *fip = (FsIndexer*)fsp;
- WorkQueue<IndexingTask*> *tqp = &fip->m_wqueue;
- IndexingTask *tsk;
+ WorkQueue<DbUpdTask*> *tqp = &fip->m_dwqueue;
+ DebugLog::getdbl()->setloglevel(fip->m_loglevel);
+
+ DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
- LOGDEB(("FsIndexerIndexWorker: got task, ql %d\n", int(tqp->size())));
- if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) {
- tqp->setTerminateAndWait();
+ LOGDEB(("FsIndexerDbUpdWorker: got task, ql %d\n", int(tqp->size())));
+ if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi,
+ tsk->doc)) {
+ LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n"));
tqp->workerExit();
return (void*)0;
}
+ delete tsk;
+ }
+}
+
+void *FsIndexerInternfileWorker(void * fsp)
+{
+ FsIndexer *fip = (FsIndexer*)fsp;
+ WorkQueue<InternfileTask*> *tqp = &fip->m_iwqueue;
+ DebugLog::getdbl()->setloglevel(fip->m_loglevel);
+ TempDir tmpdir;
+ RclConfig *myconf = new RclConfig(*(fip->m_config));
+
+ InternfileTask *tsk;
+ for (;;) {
+ if (!tqp->take(&tsk)) {
+ tqp->workerExit();
+ return (void*)1;
+ }
+ LOGDEB1(("FsIndexerInternfileWorker: fn %s\n", tsk->fn.c_str()));
+ if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) !=
+ FsTreeWalker::FtwOk) {
+ LOGERR(("FsIndexerInternfileWorker: processone failed\n"));
+ tqp->workerExit();
+ return (void*)0;
+ }
+ LOGDEB1(("FsIndexerInternfileWorker: done fn %s\n", tsk->fn.c_str()));
delete tsk;
}
}
@@ -339,8 +424,13 @@
FsIndexer::processone(const std::string &fn, const struct stat *stp,
FsTreeWalker::CbFlag flg)
{
- if (m_updater && !m_updater->update()) {
- return FsTreeWalker::FtwStop;
+ if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ if (!m_updater->update()) {
+ return FsTreeWalker::FtwStop;
+ }
}
// If we're changing directories, possibly adjust parameters (set
@@ -363,6 +453,26 @@
if (flg == FsTreeWalker::FtwDirReturn)
return FsTreeWalker::FtwOk;
}
+
+#ifdef IDX_THREADS
+ InternfileTask *tp = new InternfileTask(fn, stp);
+ if (!m_iwqueue.put(tp))
+ return FsTreeWalker::FtwError;
+ return FsTreeWalker::FtwOk;
+#else
+ return processonefile(m_config, m_tmpdir, fn, stp);
+#endif // IDX_THREADS
+}
+
+
+FsTreeWalker::Status
+FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
+ const std::string &fn, const struct stat *stp)
+{
+
+#ifdef IDX_THREADS
+ config->setKeyDir(path_getfather(fn));
+#endif
////////////////////
// Check db up to date ? Doing this before file type
@@ -379,9 +489,20 @@
makesig(stp, sig);
string udi;
make_udi(fn, cstr_null, udi);
- if (!m_db->needUpdate(udi, sig)) {
+ bool needupdate;
+ {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ needupdate = m_db->needUpdate(udi, sig);
+ }
+
+ if (!needupdate) {
LOGDEB0(("processone: up to date: %s\n", fn.c_str()));
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
// Status bar update, abort request etc.
m_updater->status.fn = fn;
++(m_updater->status.filesdone);
@@ -395,7 +516,7 @@
LOGDEB0(("processone: processing: [%s] %s\n",
displayableBytes(stp->st_size).c_str(), fn.c_str()));
- FileInterner interner(fn, stp, m_config, m_tmpdir, FileInterner::FIF_none);
+ FileInterner interner(fn, stp, config, tmpdir, FileInterner::FIF_none);
if (!interner.ok()) {
// no indexing whatsoever in this case. This typically means that
// indexallfilenames is not set
@@ -482,17 +603,23 @@
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
- IndexingTask *tp = new IndexingTask(udi, doc.ipath.empty() ?
- cstr_null : parent_udi, doc);
- if (!m_wqueue.put(tp))
+ DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ?
+ cstr_null : parent_udi, doc);
+ if (!m_dwqueue.put(tp)) {
+ LOGERR(("processonefile: wqueue.put failed\n"));
return FsTreeWalker::FtwError;
+ }
#else
- if (!m_db->addOrUpdate(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc))
+ if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null :
+ parent_udi, doc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
// Tell what we are doing and check for interrupt request
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
++(m_updater->status.docsdone);
if (m_updater->status.dbtotdocs < m_updater->status.docsdone)
m_updater->status.dbtotdocs = m_updater->status.docsdone;
@@ -522,11 +649,11 @@
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
#ifdef IDX_THREADS
- IndexingTask *tp = new IndexingTask(parent_udi, cstr_null, fileDoc);
- if (!m_wqueue.put(tp))
+ DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, fileDoc);
+ if (!m_dwqueue.put(tp))
return FsTreeWalker::FtwError;
#else
- if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc))
+ if (!m_db->addOrUpdate(config, parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}