Switch to side-by-side view

--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -127,6 +127,29 @@
     return pterm;
 }
 
+Db::Native::Native(Db *db) 
+    : m_rcldb(db), m_isopen(false), m_iswritable(false),
+      m_noversionwrite(false)
+#ifdef IDX_THREADS
+    , m_wqueue("DbUpd", 
+	       m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
+      m_totalworkns(0LL)
+#endif // IDX_THREADS
+{ 
+    LOGDEB1(("Native::Native: me %p\n", this));
+}
+
+Db::Native::~Native() 
+{ 
+    LOGDEB1(("Native::~Native: me %p\n", this));
+#ifdef IDX_THREADS
+    if (m_haveWriteQ) {
+	void *status = m_wqueue.setTerminateAndWait();
+	LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
+    }
+#endif // IDX_THREADS
+}
+
 #ifdef IDX_THREADS
 void *DbUpdWorker(void* vdbp)
 {
@@ -143,8 +166,14 @@
 	    return (void*)1;
 	}
 	LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
-	if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, 
-				   tsk->doc, tsk->txtlen)) {
+	bool status;
+	if (tsk->txtlen == (size_t)-1) {
+	    status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm);
+	} else {
+	    status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, 
+					    tsk->doc, tsk->txtlen);
+	}
+	if (!status) {
 	    LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
 	    tqp->workerExit();
 	    delete tsk;
@@ -153,35 +182,31 @@
 	delete tsk;
     }
 }
+
+void Db::Native::maybeStartThreads()
+{
+    m_loglevel = DebugLog::getdbl()->getlevel();
+
+    m_haveWriteQ = false;
+    const RclConfig *cnf = m_rcldb->m_config;
+    int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
+    int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
+    if (writethreads > 1) {
+	LOGINFO(("RclDb: write threads count was forced down to 1\n"));
+	writethreads = 1;
+    }
+    if (writeqlen >= 0 && writethreads > 0) {
+	if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
+	    LOGERR(("Db::Db: Worker start failed\n"));
+	    return;
+	}
+	m_haveWriteQ = true;
+    }
+    LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
+	    m_haveWriteQ, writeqlen, writethreads));
+}
+
 #endif // IDX_THREADS
-
-Db::Native::Native(Db *db) 
-    : m_rcldb(db), m_isopen(false), m_iswritable(false),
-      m_noversionwrite(false)
-#ifdef IDX_THREADS
-    , m_wqueue("DbUpd", 2), m_totalworkns(0LL)
-#endif // IDX_THREADS
-{ 
-    LOGDEB1(("Native::Native: me %p\n", this));
-#ifdef IDX_THREADS
-    m_loglevel = DebugLog::getdbl()->getlevel();
-    if (!m_wqueue.start(1, DbUpdWorker, this)) {
-	LOGERR(("Db::Db: Worker start failed\n"));
-	return;
-    }
-#endif // IDX_THREADS
-}
-
-Db::Native::~Native() 
-{ 
-    LOGDEB1(("Native::~Native: me %p\n", this));
-#ifdef IDX_THREADS
-    if (m_iswritable) {
-	void *status = m_wqueue.setTerminateAndWait();
-	LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
-    }
-#endif // IDX_THREADS
-}
 
 /* See comment in class declaration: return all subdocuments of a
  * document given by its unique id. 
@@ -313,11 +338,11 @@
 
 bool Db::o_inPlaceReset;
 
-Db::Db(RclConfig *cfp)
-    : m_ndb(0), m_config(cfp), m_idxAbsTruncLen(250), m_synthAbsLen(250),
-      m_synthAbsWordCtxLen(4), m_flushMb(-1), 
-      m_curtxtsz(0), m_flushtxtsz(0), m_occtxtsz(0), m_occFirstCheck(1),
-      m_maxFsOccupPc(0), m_mode(Db::DbRO)
+Db::Db(const RclConfig *cfp)
+    : m_ndb(0), m_config(cfp), m_mode(Db::DbRO), m_curtxtsz(0), m_flushtxtsz(0),
+      m_occtxtsz(0), m_occFirstCheck(1),
+      m_idxAbsTruncLen(250), m_synthAbsLen(250), m_synthAbsWordCtxLen(4), 
+      m_flushMb(-1), m_maxFsOccupPc(0)
 {
 #ifndef RCL_INDEX_STRIPCHARS
     if (start_of_field_term.empty()) {
@@ -390,6 +415,7 @@
                     m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, 
                                              cstr_RCL_IDX_VERSION);
 		m_ndb->m_iswritable = true;
+		m_ndb->maybeStartThreads();
 		// We open a readonly object in all cases (possibly in
 		// addition to the r/w one) because some operations
 		// are faster when performed through a Database: no
@@ -424,8 +450,8 @@
 
 	// Check index format version. Must not try to check a just created or
 	// truncated db
-	if (mode != DbTrunc && m_ndb->xdb().get_doccount() > 0) {
-	    string version = m_ndb->xdb().get_metadata(cstr_RCL_IDX_VERSION_KEY);
+	if (mode != DbTrunc && m_ndb->xrdb.get_doccount() > 0) {
+	    string version = m_ndb->xrdb.get_metadata(cstr_RCL_IDX_VERSION_KEY);
 	    if (version.compare(cstr_RCL_IDX_VERSION)) {
 		m_ndb->m_noversionwrite = true;
 		LOGERR(("Rcl::Db::open: file index [%s], software [%s]\n",
@@ -541,7 +567,7 @@
 	return 0;
     }
 
-    XAPTRY(res = m_ndb->xdb().get_termfreq(term), m_ndb->xrdb, m_reason);
+    XAPTRY(res = m_ndb->xrdb.get_termfreq(term), m_ndb->xrdb, m_reason);
 
     if (!m_reason.empty()) {
         LOGERR(("Db::termDocCnt: got error: %s\n", m_reason.c_str()));
@@ -1110,16 +1136,20 @@
     newdocument.set_data(record);
 
 #ifdef IDX_THREADS
-    DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
-    if (!m_ndb->m_wqueue.put(tp)) {
-	LOGERR(("Db::addOrUpdate:Cant queue task\n"));
-	return false;
-    }
-    return true;
-#else
+    if (m_ndb->m_haveWriteQ) {
+	DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, 
+				      doc.text.length());
+	if (!m_ndb->m_wqueue.put(tp)) {
+	    LOGERR(("Db::addOrUpdate:Cant queue task\n"));
+	    return false;
+	} else {
+	    return true;
+	}
+    }
+#endif
+
     return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument, 
 				   doc.text.length());
-#endif // IDX_THREADS
 }
 
 bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, 
@@ -1127,7 +1157,13 @@
 {
 #ifdef IDX_THREADS
     Chrono chron;
+    // In the case where there is a separate (single) db update
+    // thread, we only need to protect the update map update below
+    // (against interaction with threads calling needUpdate()). Else,
+    // all threads from above need to synchronize here
+    PTMutexLocker lock(m_mutex, m_haveWriteQ);
 #endif
+
     // Check file system full every mbyte of indexed text. It's a bit wasteful
     // to do this after having prepared the document, but it needs to be in
     // the single-threaded section.
@@ -1155,7 +1191,7 @@
 #ifdef IDX_THREADS
 	// Need to protect against interaction with the up-to-date checks
 	// which also update the existence map
-	PTMutexLocker lock(m_rcldb->m_ndb->m_mutex);
+	PTMutexLocker lock(m_mutex, !m_haveWriteQ);
 #endif
 	if (did < m_rcldb->updated.size()) {
 	    m_rcldb->updated[did] = true;
@@ -1191,18 +1227,21 @@
 #ifdef IDX_THREADS
 void Db::waitUpdIdle()
 {
-    Chrono chron;
-    m_ndb->m_wqueue.waitIdle();
-    string ermsg;
-    try {
-	m_ndb->xwdb.flush();
-    } XCATCHERROR(ermsg);
-    if (!ermsg.empty()) {
-	LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
-    }
-    m_ndb->m_totalworkns += chron.nanos();
-    LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
-	    m_ndb->m_totalworkns/1000000));
+    if (m_ndb->m_haveWriteQ) {
+	Chrono chron;
+	m_ndb->m_wqueue.waitIdle();
+	// We flush here just for correct measurement of the thread work time
+	string ermsg;
+	try {
+	    m_ndb->xwdb.flush();
+	} XCATCHERROR(ermsg);
+	if (!ermsg.empty()) {
+	    LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
+	}
+	m_ndb->m_totalworkns += chron.nanos();
+	LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
+		m_ndb->m_totalworkns/1000000));
+    }
 }
 #endif
 
@@ -1243,6 +1282,13 @@
     string uniterm = make_uniterm(udi);
     string ermsg;
 
+#ifdef IDX_THREADS
+    // Need to protect against interaction with the doc update/insert
+    // thread which also updates the existence map, and even multiple
+    // accesses to the readonly Xapian::Database are not allowed
+    // anyway
+    PTMutexLocker lock(m_ndb->m_mutex);
+#endif
     // We look up the document indexed by the uniterm. This is either
     // the actual document file, or, for a multi-document file, the
     // pseudo-doc we create to stand for the file itself.
@@ -1277,12 +1323,6 @@
 
 	    // Set the uptodate flag for doc / pseudo doc
 	    if (m_mode 	!= DbRO) {
-#ifdef IDX_THREADS
-		// Need to protect against interaction with the doc
-		// update/insert thread which also updates the
-		// existence map
-		PTMutexLocker lock(m_ndb->m_mutex);
-#endif
 		updated[*docid] = true;
 
 		// Set the existence flag for all the subdocs (if any)
@@ -1372,7 +1412,13 @@
 	return false;
 
 #ifdef IDX_THREADS
-    m_ndb->m_wqueue.waitIdle();
+    // If we manage our own write queue, make sure it's drained and closed
+    if (m_ndb->m_haveWriteQ)
+	m_ndb->m_wqueue.setTerminateAndWait();
+    // else we need to lock out other top level threads. This is just
+    // a precaution as they should have been waited for by the top
+    // level actor at this point
+    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
 #endif // IDX_THREADS
 
     // For xapian versions up to 1.0.1, deleting a non-existant
@@ -1390,8 +1436,6 @@
 
     // Walk the document array and delete any xapian document whose
     // flag is not set (we did not see its source during indexing).
-    // Threads: we do not need a mutex here as the indexing threads
-    // are necessarily done at this point.
     int purgecount = 0;
     for (Xapian::docid docid = 1; docid < updated.size(); ++docid) {
 	if (!updated[docid]) {
@@ -1436,6 +1480,30 @@
     return true;
 }
 
+// Test for doc existence.
+bool Db::docExists(const string& uniterm)
+{
+#ifdef IDX_THREADS
+    // If we're not running our own (single) thread, need to protect
+    // read db against multiaccess (e.g. from needUpdate(), or this method).
+    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
+#endif
+
+    string ermsg;
+    try {
+	Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
+	if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
+	    return false;
+        } else {
+	    return true;
+	}
+    } XCATCHERROR(ermsg);
+    if (!ermsg.empty()) {
+	LOGERR(("Db::docExists(%s) %s\n", uniterm.c_str(), ermsg.c_str()));
+    }
+    return false;
+}
+
 /* Delete document(s) for given unique identifier (doc and descendents) */
 bool Db::purgeFile(const string &udi, bool *existed)
 {
@@ -1443,21 +1511,44 @@
     if (m_ndb == 0 || !m_ndb->m_iswritable)
 	return false;
 
+    string uniterm = make_uniterm(udi);
+    bool exists = docExists(uniterm);
+    if (existed)
+	*existed = exists;
+    if (!exists)
+	return true;
+
 #ifdef IDX_THREADS
-    m_ndb->m_wqueue.waitIdle();
+    if (m_ndb->m_haveWriteQ) {
+	Xapian::Document xdoc;
+	DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
+	if (!m_ndb->m_wqueue.put(tp)) {
+	    LOGERR(("Db::purgeFile:Cant queue task\n"));
+	    return false;
+	} else {
+	    return true;
+	}
+    }
+#endif
+
+    return purgeFileWrite(udi, uniterm);
+}
+
+bool Db::purgeFileWrite(const string& udi, const string& uniterm)
+{
+#if defined(IDX_THREADS) 
+    // If we have a write queue we're called from there, and single threaded, no locking.
+    // Else need to mutex other threads from above
+    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
 #endif // IDX_THREADS
 
     Xapian::WritableDatabase db = m_ndb->xwdb;
-    string uniterm = make_uniterm(udi);
     string ermsg;
     try {
 	Xapian::PostingIterator docid = db.postlist_begin(uniterm);
 	if (docid == db.postlist_end(uniterm)) {
-            if (existed)
-                *existed = false;
 	    return true;
         }
-        *existed = true;
 	LOGDEB(("purgeFile: delete docid %d\n", *docid));
 	if (m_flushMb > 0) {
 	    Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid);
@@ -1613,7 +1704,7 @@
 {
     if (!m_ndb || !m_ndb->m_isopen)
 	return false;
-    Xapian::Database xdb = m_ndb->xdb();
+    Xapian::Database xdb = m_ndb->xrdb;
 
     XAPTRY(res.dbdoccount = xdb.get_doccount();
            res.dbavgdoclen = xdb.get_avlength(), xdb, m_reason);
@@ -1769,7 +1860,7 @@
 	return 0;
     TermIter *tit = new TermIter;
     if (tit) {
-	tit->db = m_ndb->xdb();
+	tit->db = m_ndb->xrdb;
         XAPTRY(tit->it = tit->db.allterms_begin(), tit->db, m_reason);
 	if (!m_reason.empty()) {
 	    LOGERR(("Db::termWalkOpen: xapian error: %s\n", m_reason.c_str()));
@@ -1804,7 +1895,7 @@
     if (!m_ndb || !m_ndb->m_isopen)
 	return 0;
 
-    XAPTRY(if (!m_ndb->xdb().term_exists(word)) return false,
+    XAPTRY(if (!m_ndb->xrdb.term_exists(word)) return false,
            m_ndb->xrdb, m_reason);
 
     if (!m_reason.empty()) {