--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -127,6 +127,29 @@
return pterm;
}
+Db::Native::Native(Db *db)
+ : m_rcldb(db), m_isopen(false), m_iswritable(false),
+ m_noversionwrite(false)
+#ifdef IDX_THREADS
+ , m_wqueue("DbUpd",
+ m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
+ m_totalworkns(0LL)
+#endif // IDX_THREADS
+{
+ LOGDEB1(("Native::Native: me %p\n", this));
+}
+
+Db::Native::~Native()
+{
+ LOGDEB1(("Native::~Native: me %p\n", this));
+#ifdef IDX_THREADS
+ if (m_haveWriteQ) {
+ void *status = m_wqueue.setTerminateAndWait();
+ LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
+ }
+#endif // IDX_THREADS
+}
+
#ifdef IDX_THREADS
void *DbUpdWorker(void* vdbp)
{
@@ -143,8 +166,14 @@
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
- if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
- tsk->doc, tsk->txtlen)) {
+ bool status;
+ if (tsk->txtlen == (size_t)-1) {
+ status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm);
+ } else {
+ status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
+ tsk->doc, tsk->txtlen);
+ }
+ if (!status) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
tqp->workerExit();
delete tsk;
@@ -153,35 +182,31 @@
delete tsk;
}
}
+
+void Db::Native::maybeStartThreads()
+{
+ m_loglevel = DebugLog::getdbl()->getlevel();
+
+ m_haveWriteQ = false;
+ const RclConfig *cnf = m_rcldb->m_config;
+ int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
+ int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
+ if (writethreads > 1) {
+ LOGINFO(("RclDb: write threads count was forced down to 1\n"));
+ writethreads = 1;
+ }
+ if (writeqlen >= 0 && writethreads > 0) {
+ if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
+ LOGERR(("Db::Db: Worker start failed\n"));
+ return;
+ }
+ m_haveWriteQ = true;
+ }
+ LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
+ m_haveWriteQ, writeqlen, writethreads));
+}
+
#endif // IDX_THREADS
-
-Db::Native::Native(Db *db)
- : m_rcldb(db), m_isopen(false), m_iswritable(false),
- m_noversionwrite(false)
-#ifdef IDX_THREADS
- , m_wqueue("DbUpd", 2), m_totalworkns(0LL)
-#endif // IDX_THREADS
-{
- LOGDEB1(("Native::Native: me %p\n", this));
-#ifdef IDX_THREADS
- m_loglevel = DebugLog::getdbl()->getlevel();
- if (!m_wqueue.start(1, DbUpdWorker, this)) {
- LOGERR(("Db::Db: Worker start failed\n"));
- return;
- }
-#endif // IDX_THREADS
-}
-
-Db::Native::~Native()
-{
- LOGDEB1(("Native::~Native: me %p\n", this));
-#ifdef IDX_THREADS
- if (m_iswritable) {
- void *status = m_wqueue.setTerminateAndWait();
- LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
- }
-#endif // IDX_THREADS
-}
/* See comment in class declaration: return all subdocuments of a
* document given by its unique id.
@@ -313,11 +338,11 @@
bool Db::o_inPlaceReset;
-Db::Db(RclConfig *cfp)
- : m_ndb(0), m_config(cfp), m_idxAbsTruncLen(250), m_synthAbsLen(250),
- m_synthAbsWordCtxLen(4), m_flushMb(-1),
- m_curtxtsz(0), m_flushtxtsz(0), m_occtxtsz(0), m_occFirstCheck(1),
- m_maxFsOccupPc(0), m_mode(Db::DbRO)
+Db::Db(const RclConfig *cfp)
+ : m_ndb(0), m_config(cfp), m_mode(Db::DbRO), m_curtxtsz(0), m_flushtxtsz(0),
+ m_occtxtsz(0), m_occFirstCheck(1),
+ m_idxAbsTruncLen(250), m_synthAbsLen(250), m_synthAbsWordCtxLen(4),
+ m_flushMb(-1), m_maxFsOccupPc(0)
{
#ifndef RCL_INDEX_STRIPCHARS
if (start_of_field_term.empty()) {
@@ -390,6 +415,7 @@
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY,
cstr_RCL_IDX_VERSION);
m_ndb->m_iswritable = true;
+ m_ndb->maybeStartThreads();
// We open a readonly object in all cases (possibly in
// addition to the r/w one) because some operations
// are faster when performed through a Database: no
@@ -424,8 +450,8 @@
// Check index format version. Must not try to check a just created or
// truncated db
- if (mode != DbTrunc && m_ndb->xdb().get_doccount() > 0) {
- string version = m_ndb->xdb().get_metadata(cstr_RCL_IDX_VERSION_KEY);
+ if (mode != DbTrunc && m_ndb->xrdb.get_doccount() > 0) {
+ string version = m_ndb->xrdb.get_metadata(cstr_RCL_IDX_VERSION_KEY);
if (version.compare(cstr_RCL_IDX_VERSION)) {
m_ndb->m_noversionwrite = true;
LOGERR(("Rcl::Db::open: file index [%s], software [%s]\n",
@@ -541,7 +567,7 @@
return 0;
}
- XAPTRY(res = m_ndb->xdb().get_termfreq(term), m_ndb->xrdb, m_reason);
+ XAPTRY(res = m_ndb->xrdb.get_termfreq(term), m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::termDocCnt: got error: %s\n", m_reason.c_str()));
@@ -1110,16 +1136,20 @@
newdocument.set_data(record);
#ifdef IDX_THREADS
- DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
- if (!m_ndb->m_wqueue.put(tp)) {
- LOGERR(("Db::addOrUpdate:Cant queue task\n"));
- return false;
- }
- return true;
-#else
+ if (m_ndb->m_haveWriteQ) {
+ DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
+ doc.text.length());
+ if (!m_ndb->m_wqueue.put(tp)) {
+ LOGERR(("Db::addOrUpdate:Cant queue task\n"));
+ return false;
+ } else {
+ return true;
+ }
+ }
+#endif
+
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
doc.text.length());
-#endif // IDX_THREADS
}
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
@@ -1127,7 +1157,13 @@
{
#ifdef IDX_THREADS
Chrono chron;
+ // In the case where there is a separate (single) db update
+ // thread, we only need to protect the update map update below
+ // (against interaction with threads calling needUpdate()). Else,
+ // all threads from above need to synchronize here
+ PTMutexLocker lock(m_mutex, m_haveWriteQ);
#endif
+
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
// the single-threaded section.
@@ -1155,7 +1191,7 @@
#ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks
// which also update the existence map
- PTMutexLocker lock(m_rcldb->m_ndb->m_mutex);
+ PTMutexLocker lock(m_mutex, !m_haveWriteQ);
#endif
if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true;
@@ -1191,18 +1227,21 @@
#ifdef IDX_THREADS
void Db::waitUpdIdle()
{
- Chrono chron;
- m_ndb->m_wqueue.waitIdle();
- string ermsg;
- try {
- m_ndb->xwdb.flush();
- } XCATCHERROR(ermsg);
- if (!ermsg.empty()) {
- LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
- }
- m_ndb->m_totalworkns += chron.nanos();
- LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
- m_ndb->m_totalworkns/1000000));
+ if (m_ndb->m_haveWriteQ) {
+ Chrono chron;
+ m_ndb->m_wqueue.waitIdle();
+ // We flush here just for correct measurement of the thread work time
+ string ermsg;
+ try {
+ m_ndb->xwdb.flush();
+ } XCATCHERROR(ermsg);
+ if (!ermsg.empty()) {
+ LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
+ }
+ m_ndb->m_totalworkns += chron.nanos();
+ LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
+ m_ndb->m_totalworkns/1000000));
+ }
}
#endif
@@ -1243,6 +1282,13 @@
string uniterm = make_uniterm(udi);
string ermsg;
+#ifdef IDX_THREADS
+ // Need to protect against interaction with the doc update/insert
+ // thread which also updates the existence map, and even multiple
+ // accesses to the readonly Xapian::Database are not allowed
+ // anyway
+ PTMutexLocker lock(m_ndb->m_mutex);
+#endif
// We look up the document indexed by the uniterm. This is either
// the actual document file, or, for a multi-document file, the
// pseudo-doc we create to stand for the file itself.
@@ -1277,12 +1323,6 @@
// Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) {
-#ifdef IDX_THREADS
- // Need to protect against interaction with the doc
- // update/insert thread which also updates the
- // existence map
- PTMutexLocker lock(m_ndb->m_mutex);
-#endif
updated[*docid] = true;
// Set the existence flag for all the subdocs (if any)
@@ -1372,7 +1412,13 @@
return false;
#ifdef IDX_THREADS
- m_ndb->m_wqueue.waitIdle();
+ // If we manage our own write queue, make sure it's drained and closed
+ if (m_ndb->m_haveWriteQ)
+ m_ndb->m_wqueue.setTerminateAndWait();
+ // else we need to lock out other top level threads. This is just
+ // a precaution as they should have been waited for by the top
+ // level actor at this point
+ PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
#endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant
@@ -1390,8 +1436,6 @@
// Walk the document array and delete any xapian document whose
// flag is not set (we did not see its source during indexing).
- // Threads: we do not need a mutex here as the indexing threads
- // are necessarily done at this point.
int purgecount = 0;
for (Xapian::docid docid = 1; docid < updated.size(); ++docid) {
if (!updated[docid]) {
@@ -1436,6 +1480,30 @@
return true;
}
+// Test for doc existence.
+bool Db::docExists(const string& uniterm)
+{
+#ifdef IDX_THREADS
+ // If we're not running our own (single) thread, need to protect
+ // read db against multiaccess (e.g. from needUpdate(), or this method).
+ PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
+#endif
+
+ string ermsg;
+ try {
+ Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
+ if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
+ return false;
+ } else {
+ return true;
+ }
+ } XCATCHERROR(ermsg);
+ if (!ermsg.empty()) {
+ LOGERR(("Db::docExists(%s) %s\n", uniterm.c_str(), ermsg.c_str()));
+ }
+ return false;
+}
+
/* Delete document(s) for given unique identifier (doc and descendents) */
bool Db::purgeFile(const string &udi, bool *existed)
{
@@ -1443,21 +1511,44 @@
if (m_ndb == 0 || !m_ndb->m_iswritable)
return false;
+ string uniterm = make_uniterm(udi);
+ bool exists = docExists(uniterm);
+ if (existed)
+ *existed = exists;
+ if (!exists)
+ return true;
+
#ifdef IDX_THREADS
- m_ndb->m_wqueue.waitIdle();
+ if (m_ndb->m_haveWriteQ) {
+ Xapian::Document xdoc;
+ DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
+ if (!m_ndb->m_wqueue.put(tp)) {
+ LOGERR(("Db::purgeFile:Cant queue task\n"));
+ return false;
+ } else {
+ return true;
+ }
+ }
+#endif
+
+ return purgeFileWrite(udi, uniterm);
+}
+
+bool Db::purgeFileWrite(const string& udi, const string& uniterm)
+{
+#if defined(IDX_THREADS)
+ // If we have a write queue we're called from there, and single threaded, no locking.
+ // Else need to mutex other threads from above
+ PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
#endif // IDX_THREADS
Xapian::WritableDatabase db = m_ndb->xwdb;
- string uniterm = make_uniterm(udi);
string ermsg;
try {
Xapian::PostingIterator docid = db.postlist_begin(uniterm);
if (docid == db.postlist_end(uniterm)) {
- if (existed)
- *existed = false;
return true;
}
- *existed = true;
LOGDEB(("purgeFile: delete docid %d\n", *docid));
if (m_flushMb > 0) {
Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid);
@@ -1613,7 +1704,7 @@
{
if (!m_ndb || !m_ndb->m_isopen)
return false;
- Xapian::Database xdb = m_ndb->xdb();
+ Xapian::Database xdb = m_ndb->xrdb;
XAPTRY(res.dbdoccount = xdb.get_doccount();
res.dbavgdoclen = xdb.get_avlength(), xdb, m_reason);
@@ -1769,7 +1860,7 @@
return 0;
TermIter *tit = new TermIter;
if (tit) {
- tit->db = m_ndb->xdb();
+ tit->db = m_ndb->xrdb;
XAPTRY(tit->it = tit->db.allterms_begin(), tit->db, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::termWalkOpen: xapian error: %s\n", m_reason.c_str()));
@@ -1804,7 +1895,7 @@
if (!m_ndb || !m_ndb->m_isopen)
return 0;
- XAPTRY(if (!m_ndb->xdb().term_exists(word)) return false,
+ XAPTRY(if (!m_ndb->xrdb.term_exists(word)) return false,
m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {