--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -34,6 +34,7 @@
#include "xapian.h"
#include "rclconfig.h"
+#include "debuglog.h"
#include "rcldb.h"
#include "rcldb_p.h"
#include "stemdb.h"
@@ -41,7 +42,6 @@
#include "transcode.h"
#include "unacpp.h"
#include "conftree.h"
-#include "debuglog.h"
#include "pathut.h"
#include "smallut.h"
#include "utf8iter.h"
@@ -571,6 +571,12 @@
m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
m_config->getConfParam("idxflushmb", &m_flushMb);
}
+#ifdef IDX_THREADS
+ if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
+ LOGERR(("Db::Db: Worker start failed\n"));
+ return;
+ }
+#endif // IDX_THREADS
}
Db::~Db()
@@ -862,7 +868,6 @@
// fields and position jumps to separate fields
class TextSplitDb : public TextSplitP {
public:
- Xapian::WritableDatabase db;
Xapian::Document &doc; // Xapian document
// Base for document section. Gets large increment when we change
// sections, to avoid cross-section proximity matches.
@@ -875,10 +880,9 @@
// to compute the first position of the next section.
Xapian::termpos curpos;
- TextSplitDb(Xapian::WritableDatabase idb,
- Xapian::Document &d, TermProc *prc)
+ TextSplitDb(Xapian::Document &d, TermProc *prc)
: TextSplitP(prc),
- db(idb), doc(d), basepos(1), curpos(0), wdfinc(1)
+ doc(d), basepos(1), curpos(0), wdfinc(1)
{}
// Reimplement text_to_words to add start and end special terms
virtual bool text_to_words(const string &in);
@@ -1003,6 +1007,58 @@
#define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";}
+#ifdef IDX_THREADS
+void *DbUpdWorker(void* vdbp)
+{
+ Db *dbp = (Db *)vdbp;
+ WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
+ DbUpdTask *tsk;
+
+ for (;;) {
+ if (!tqp->take(&tsk)) {
+ tqp->workerExit();
+ return (void*)1;
+ }
+ LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
+
+ const char *fnc = tsk->udi.c_str();
+ string ermsg;
+
+ // Add db entry or update existing entry:
+ try {
+ Xapian::docid did =
+ dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
+ tsk->doc);
+ if (did < dbp->updated.size()) {
+ dbp->updated[did] = true;
+ LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
+ } else {
+ LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
+ }
+ } XCATCHERROR(ermsg);
+
+ if (!ermsg.empty()) {
+ LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
+ ermsg.erase();
+ // FIXME: is this ever actually needed?
+ try {
+ dbp->m_ndb->xwdb.add_document(tsk->doc);
+ LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
+ fnc));
+ } XCATCHERROR(ermsg);
+ if (!ermsg.empty()) {
+ LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
+ tqp->workerExit();
+ return (void*)0;
+ }
+ }
+ dbp->maybeflush(tsk->txtlen);
+
+ delete tsk;
+ }
+}
+#endif // IDX_THREADS
+
// Add document in internal form to the database: index the terms in
// the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more
@@ -1039,7 +1095,7 @@
// TermProcCommongrams tpcommon(nxt, m_stops); nxt = &tpcommon;
TermProcPrep tpprep(nxt); nxt = &tpprep;
- TextSplitDb splitter(m_ndb->xwdb, newdocument, nxt);
+ TextSplitDb splitter(newdocument, nxt);
tpidx.setTSD(&splitter);
// Split and index file name as document term(s)
@@ -1271,6 +1327,13 @@
LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
newdocument.set_data(record);
+#ifdef IDX_THREADS
+ DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
+ if (!m_ndb->m_wqueue.put(tp)) {
+ LOGERR(("Db::addOrUpdate:Cant queue task\n"));
+ return false;
+ }
+#else
const char *fnc = udi.c_str();
string ermsg;
@@ -1303,6 +1366,7 @@
// Test if we're over the flush threshold (limit memory usage):
maybeflush(doc.text.length());
+#endif // IDX_THREADS
return true;
}
@@ -1454,6 +1518,10 @@
m_ndb->m_iswritable));
if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false)
return false;
+
+#ifdef IDX_THREADS
+ m_ndb->m_wqueue.waitIdle();
+#endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant
// document would trigger an exception that would discard any
@@ -1520,6 +1588,11 @@
LOGDEB(("Db:purgeFile: [%s]\n", udi.c_str()));
if (m_ndb == 0 || !m_ndb->m_iswritable)
return false;
+
+#ifdef IDX_THREADS
+ m_ndb->m_wqueue.waitIdle();
+#endif // IDX_THREADS
+
Xapian::WritableDatabase db = m_ndb->xwdb;
string uniterm = make_uniterm(udi);
string ermsg;