--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -149,23 +149,34 @@
WorkQueue<DbUpdTask*> *tqp = &(ndbp->m_wqueue);
DebugLog::getdbl()->setloglevel(ndbp->m_loglevel);
- DbUpdTask *tsk;
+ DbUpdTask *tsk = 0;
for (;;) {
- size_t qsz;
+ size_t qsz = -1;
if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit();
return (void*)1;
}
- LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
- bool status;
- if (tsk->txtlen == (size_t)-1) {
- status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm);
- } else {
+ bool status = false;
+ switch (tsk->op) {
+ case DbUpdTask::AddOrUpdate:
+ LOGDEB(("DbUpdWorker: got add/update task, ql %d\n", int(qsz)));
status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen);
+ break;
+ case DbUpdTask::Delete:
+ LOGDEB(("DbUpdWorker: got delete task, ql %d\n", int(qsz)));
+ status = ndbp->purgeFileWrite(false, tsk->udi, tsk->uniterm);
+ break;
+ case DbUpdTask::PurgeOrphans:
+ LOGDEB(("DbUpdWorker: got orphans purge task, ql %d\n", int(qsz)));
+ status = ndbp->purgeFileWrite(true, tsk->udi, tsk->uniterm);
+ break;
+ default:
+ LOGERR(("DbUpdWorker: unknown op %d !!\n", tsk->op));
+ break;
}
if (!status) {
- LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
+ LOGERR(("DbUpdWorker: xxWrite failed\n"));
tqp->workerExit();
delete tsk;
return (void*)0;
@@ -1151,8 +1162,8 @@
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
- DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
- doc.text.length());
+ DbUpdTask *tp = new DbUpdTask(DbUpdTask::AddOrUpdate, udi, uniterm,
+ newdocument, doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
@@ -1292,7 +1303,7 @@
}
// Test if doc given by udi has changed since last indexed (test sigs)
-bool Db::needUpdate(const string &udi, const string& sig)
+bool Db::needUpdate(const string &udi, const string& sig, bool *existed)
{
if (m_ndb == 0)
return false;
@@ -1300,8 +1311,12 @@
// If we are doing an in place or full reset, no need to
// test. Note that there is no need to update the existence map
// either, it will be done when updating the index
- if (o_inPlaceReset || m_mode == DbTrunc)
+ if (o_inPlaceReset || m_mode == DbTrunc) {
+ // For in place reset, pretend the doc existed, to enable subdoc purge
+ if (existed)
+ *existed = o_inPlaceReset;
return true;
+ }
string uniterm = make_uniterm(udi);
string ermsg;
@@ -1325,9 +1340,13 @@
if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
// If no document exist with this path, we do need update
LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str()));
+ if (existed)
+ *existed = false;
return true;
}
Xapian::Document doc = m_ndb->xrdb.get_document(*docid);
+ if (existed)
+ *existed = true;
// Retrieve old file/doc signature from value
string osig = doc.get_value(VALUE_SIG);
@@ -1542,8 +1561,8 @@
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
- Xapian::Document xdoc;
- DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
+ DbUpdTask *tp = new DbUpdTask(DbUpdTask::Delete, udi, uniterm,
+ Xapian::Document(), (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;
@@ -1552,49 +1571,98 @@
}
}
#endif
-
- return purgeFileWrite(udi, uniterm);
-}
-
-bool Db::purgeFileWrite(const string& udi, const string& uniterm)
+ /* We get there is IDX_THREADS is not defined or there is no queue */
+ return m_ndb->purgeFileWrite(false, udi, uniterm);
+}
+
+/* Delete subdocs with an out of date sig. We do this to purge
+ obsolete subdocs during a partial update where no general purge
+ will be done */
+bool Db::purgeOrphans(const string &udi)
+{
+ LOGDEB(("Db:purgeOrphans: [%s]\n", udi.c_str()));
+ if (m_ndb == 0 || !m_ndb->m_iswritable)
+ return false;
+
+ string uniterm = make_uniterm(udi);
+
+#ifdef IDX_THREADS
+ if (m_ndb->m_havewriteq) {
+ DbUpdTask *tp = new DbUpdTask(DbUpdTask::PurgeOrphans, udi, uniterm,
+ Xapian::Document(), (size_t)-1);
+ if (!m_ndb->m_wqueue.put(tp)) {
+ LOGERR(("Db::purgeFile:Cant queue task\n"));
+ return false;
+ } else {
+ return true;
+ }
+ }
+#endif
+ /* We get there is IDX_THREADS is not defined or there is no queue */
+ return m_ndb->purgeFileWrite(true, udi, uniterm);
+}
+
+bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
+ const string& uniterm)
{
#if defined(IDX_THREADS)
// We need a mutex even if we have a write queue (so we can only
// be called by a single thread) to protect about multiple acces
// to xrdb from subDocs() which is also called from needupdate()
// (called from outside the write thread !
- PTMutexLocker lock(m_ndb->m_mutex);
+ PTMutexLocker lock(m_mutex);
#endif // IDX_THREADS
- Xapian::WritableDatabase db = m_ndb->xwdb;
string ermsg;
try {
- Xapian::PostingIterator docid = db.postlist_begin(uniterm);
- if (docid == db.postlist_end(uniterm)) {
+ Xapian::PostingIterator docid = xwdb.postlist_begin(uniterm);
+ if (docid == xwdb.postlist_end(uniterm)) {
return true;
}
- LOGDEB(("purgeFile: delete docid %d\n", *docid));
- if (m_flushMb > 0) {
- Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid);
- maybeflush(trms * 5);
- }
- db.delete_document(*docid);
+ if (m_rcldb->m_flushMb > 0) {
+ Xapian::termcount trms = xwdb.get_doclength(*docid);
+ m_rcldb->maybeflush(trms * 5);
+ }
+ string sig;
+ if (orphansOnly) {
+ Xapian::Document doc = xwdb.get_document(*docid);
+ sig = doc.get_value(VALUE_SIG);
+ if (sig.empty()) {
+ LOGINFO(("purgeFileWrite: got empty sig\n"));
+ return false;
+ }
+ } else {
+ LOGDEB(("purgeFile: delete docid %d\n", *docid));
+ xwdb.delete_document(*docid);
+ }
vector<Xapian::docid> docids;
- m_ndb->subDocs(udi, docids);
+ subDocs(udi, docids);
LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size()));
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
- LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
- if (m_flushMb > 0) {
- Xapian::termcount trms = m_ndb->xwdb.get_doclength(*it);
- maybeflush(trms * 5);
- }
- db.delete_document(*it);
+ if (m_rcldb->m_flushMb > 0) {
+ Xapian::termcount trms = xwdb.get_doclength(*it);
+ m_rcldb->maybeflush(trms * 5);
+ }
+ string subdocsig;
+ if (orphansOnly) {
+ Xapian::Document doc = xwdb.get_document(*it);
+ subdocsig = doc.get_value(VALUE_SIG);
+ if (subdocsig.empty()) {
+ LOGINFO(("purgeFileWrite: got empty sig for subdoc??\n"));
+ continue;
+ }
+ }
+
+ if (!orphansOnly || sig != subdocsig) {
+ LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
+ xwdb.delete_document(*it);
+ }
}
return true;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
- LOGERR(("Db::purgeFile: %s\n", ermsg.c_str()));
+ LOGERR(("Db::purgeFileWrite: %s\n", ermsg.c_str()));
}
return false;
}