--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -280,9 +280,12 @@
m_config->getConfParam("idxflushmb", &m_flushMb);
}
#ifdef IDX_THREADS
- if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
- LOGERR(("Db::Db: Worker start failed\n"));
- return;
+ if (m_ndb) {
+ m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
+ if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
+ LOGERR(("Db::Db: Worker start failed\n"));
+ return;
+ }
}
#endif // IDX_THREADS
}
@@ -461,7 +464,7 @@
if (!m_ndb || !m_ndb->m_isopen)
return -1;
- XAPTRY(res = m_ndb->xdb().get_doccount(), m_ndb->xrdb, m_reason);
+ XAPTRY(res = m_ndb->xrdb.get_doccount(), m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
@@ -788,48 +791,22 @@
{
Db *dbp = (Db *)vdbp;
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
+ DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
+
DbUpdTask *tsk;
-
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
-
- const char *fnc = tsk->udi.c_str();
- string ermsg;
-
- // Add db entry or update existing entry:
- try {
- Xapian::docid did =
- dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
- tsk->doc);
- if (did < dbp->updated.size()) {
- dbp->updated[did] = true;
- LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
- } else {
- LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
- }
- } XCATCHERROR(ermsg);
-
- if (!ermsg.empty()) {
- LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
- ermsg.erase();
- // FIXME: is this ever actually needed?
- try {
- dbp->m_ndb->xwdb.add_document(tsk->doc);
- LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
- fnc));
- } XCATCHERROR(ermsg);
- if (!ermsg.empty()) {
- LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
- tqp->workerExit();
- return (void*)0;
- }
- }
- dbp->maybeflush(tsk->txtlen);
-
+ if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
+ tsk->doc, tsk->txtlen)) {
+ LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
+ tqp->workerExit();
+ delete tsk;
+ return (void*)0;
+ }
delete tsk;
}
}
@@ -839,26 +816,13 @@
// the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more
// metadata), and update database
-bool Db::addOrUpdate(const string &udi, const string &parent_udi,
- Doc &doc)
+bool Db::addOrUpdate(RclConfig *config, const string &udi,
+ const string &parent_udi, Doc &doc)
{
LOGDEB(("Db::add: udi [%s] parent [%s]\n",
udi.c_str(), parent_udi.c_str()));
if (m_ndb == 0)
return false;
- // Check file system full every mbyte of indexed text.
- if (m_maxFsOccupPc > 0 &&
- (m_occFirstCheck || (m_curtxtsz - m_occtxtsz) / MB >= 1)) {
- LOGDEB(("Db::add: checking file system usage\n"));
- int pc;
- m_occFirstCheck = 0;
- if (fsocc(m_basedir, &pc) && pc >= m_maxFsOccupPc) {
- LOGERR(("Db::add: stop indexing: file system "
- "%d%% full > max %d%%\n", pc, m_maxFsOccupPc));
- return false;
- }
- m_occtxtsz = m_curtxtsz;
- }
Xapian::Document newdocument;
@@ -1082,10 +1046,10 @@
if (!doc.meta[Doc::keyabs].empty())
RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
- const set<string>& stored = m_config->getStoredFields();
+ const set<string>& stored = config->getStoredFields();
for (set<string>::const_iterator it = stored.begin();
it != stored.end(); it++) {
- string nm = m_config->fieldCanon(*it);
+ string nm = config->fieldCanon(*it);
if (!doc.meta[*it].empty()) {
string value =
neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
@@ -1125,16 +1089,42 @@
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
}
+ return true;
#else
+ return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
+ doc.text.length());
+#endif // IDX_THREADS
+}
+
+bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
+ Xapian::Document& newdocument, size_t textlen)
+{
+ // Check file system full every mbyte of indexed text. It's a bit wasteful
+ // to do this after having prepared the document, but it needs to be in
+ // the single-threaded section.
+ if (m_rcldb->m_maxFsOccupPc > 0 &&
+ (m_rcldb->m_occFirstCheck ||
+ (m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
+ LOGDEB(("Db::add: checking file system usage\n"));
+ int pc;
+ m_rcldb->m_occFirstCheck = 0;
+ if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
+ LOGERR(("Db::add: stop indexing: file system "
+ "%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
+ return false;
+ }
+ m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
+ }
+
const char *fnc = udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
- m_ndb->xwdb.replace_document(uniterm, newdocument);
- if (did < updated.size()) {
- updated[did] = true;
+ xwdb.replace_document(uniterm, newdocument);
+ if (did < m_rcldb->updated.size()) {
+ m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
@@ -1146,7 +1136,7 @@
ermsg.erase();
// FIXME: is this ever actually needed?
try {
- m_ndb->xwdb.add_document(newdocument);
+ xwdb.add_document(newdocument);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
@@ -1157,10 +1147,15 @@
}
// Test if we're over the flush threshold (limit memory usage):
- maybeflush(doc.text.length());
-#endif // IDX_THREADS
- return true;
-}
+ return m_rcldb->maybeflush(textlen);
+}
+
+#ifdef IDX_THREADS
+void Db::waitUpdIdle()
+{
+ m_ndb->m_wqueue.waitIdle();
+}
+#endif
// Flush when idxflushmbs is reached
bool Db::maybeflush(off_t moretext)
@@ -1233,6 +1228,7 @@
// Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) {
+#warning we need a lock here !
updated[*docid] = true;
// Set the existence flag for all the subdocs (if any)
@@ -1244,7 +1240,7 @@
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
if (*it < updated.size()) {
- LOGDEB2(("Db::needUpdate: set flag for docid %d\n", *it));
+ LOGDEB2(("Db::needUpdate: docid %d set\n", *it));
updated[*it] = true;
}
}