|
a/src/rcldb/rcldb.cpp |
|
b/src/rcldb/rcldb.cpp |
|
... |
|
... |
130 |
Db::Native::Native(Db *db)
|
130 |
Db::Native::Native(Db *db)
|
131 |
: m_rcldb(db), m_isopen(false), m_iswritable(false),
|
131 |
: m_rcldb(db), m_isopen(false), m_iswritable(false),
|
132 |
m_noversionwrite(false)
|
132 |
m_noversionwrite(false)
|
133 |
#ifdef IDX_THREADS
|
133 |
#ifdef IDX_THREADS
|
134 |
, m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
|
134 |
, m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
|
135 |
m_totalworkns(0LL)
|
135 |
m_loglevel(4),
|
|
|
136 |
m_totalworkns(0LL), m_havewriteq(false)
|
136 |
#endif // IDX_THREADS
|
137 |
#endif // IDX_THREADS
|
137 |
{
|
138 |
{
|
138 |
LOGDEB1(("Native::Native: me %p\n", this));
|
139 |
LOGDEB1(("Native::Native: me %p\n", this));
|
139 |
}
|
140 |
}
|
140 |
|
141 |
|
141 |
Db::Native::~Native()
|
142 |
Db::Native::~Native()
|
142 |
{
|
143 |
{
|
143 |
LOGDEB1(("Native::~Native: me %p\n", this));
|
144 |
LOGDEB1(("Native::~Native: me %p\n", this));
|
144 |
#ifdef IDX_THREADS
|
145 |
#ifdef IDX_THREADS
|
145 |
if (m_haveWriteQ) {
|
146 |
if (m_havewriteq) {
|
146 |
void *status = m_wqueue.setTerminateAndWait();
|
147 |
void *status = m_wqueue.setTerminateAndWait();
|
147 |
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
|
148 |
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
|
148 |
}
|
149 |
}
|
149 |
#endif // IDX_THREADS
|
150 |
#endif // IDX_THREADS
|
150 |
}
|
151 |
}
|
|
... |
|
... |
184 |
|
185 |
|
185 |
void Db::Native::maybeStartThreads()
|
186 |
void Db::Native::maybeStartThreads()
|
186 |
{
|
187 |
{
|
187 |
m_loglevel = DebugLog::getdbl()->getlevel();
|
188 |
m_loglevel = DebugLog::getdbl()->getlevel();
|
188 |
|
189 |
|
189 |
m_haveWriteQ = false;
|
190 |
m_havewriteq = false;
|
190 |
const RclConfig *cnf = m_rcldb->m_config;
|
191 |
const RclConfig *cnf = m_rcldb->m_config;
|
191 |
int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
|
192 |
int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
|
192 |
int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
|
193 |
int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
|
193 |
if (writethreads > 1) {
|
194 |
if (writethreads > 1) {
|
194 |
LOGINFO(("RclDb: write threads count was forced down to 1\n"));
|
195 |
LOGINFO(("RclDb: write threads count was forced down to 1\n"));
|
|
... |
|
... |
197 |
if (writeqlen >= 0 && writethreads > 0) {
|
198 |
if (writeqlen >= 0 && writethreads > 0) {
|
198 |
if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
|
199 |
if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
|
199 |
LOGERR(("Db::Db: Worker start failed\n"));
|
200 |
LOGERR(("Db::Db: Worker start failed\n"));
|
200 |
return;
|
201 |
return;
|
201 |
}
|
202 |
}
|
202 |
m_haveWriteQ = true;
|
203 |
m_havewriteq = true;
|
203 |
}
|
204 |
}
|
204 |
LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
|
205 |
LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
|
205 |
m_haveWriteQ, writeqlen, writethreads));
|
206 |
m_havewriteq, writeqlen, writethreads));
|
206 |
}
|
207 |
}
|
207 |
|
208 |
|
208 |
#endif // IDX_THREADS
|
209 |
#endif // IDX_THREADS
|
209 |
|
210 |
|
210 |
/* See comment in class declaration: return all subdocuments of a
|
211 |
/* See comment in class declaration: return all subdocuments of a
|
|
... |
|
... |
498 |
if (w) {
|
499 |
if (w) {
|
499 |
if (!m_ndb->m_noversionwrite)
|
500 |
if (!m_ndb->m_noversionwrite)
|
500 |
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
|
501 |
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
|
501 |
LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
|
502 |
LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
|
502 |
}
|
503 |
}
|
|
|
504 |
#ifdef IDX_THREADS
|
|
|
505 |
waitUpdIdle();
|
|
|
506 |
#endif
|
503 |
// Used to do a flush here. Cant see why it should be necessary.
|
507 |
// Used to do a flush here. Cant see why it should be necessary.
|
504 |
deleteZ(m_ndb);
|
508 |
deleteZ(m_ndb);
|
505 |
if (w)
|
509 |
if (w)
|
506 |
LOGDEB(("Rcl::Db:close() xapian close done.\n"));
|
510 |
LOGDEB(("Rcl::Db:close() xapian close done.\n"));
|
507 |
if (final) {
|
511 |
if (final) {
|
|
... |
|
... |
1136 |
|
1140 |
|
1137 |
LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
|
1141 |
LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
|
1138 |
newdocument.set_data(record);
|
1142 |
newdocument.set_data(record);
|
1139 |
|
1143 |
|
1140 |
#ifdef IDX_THREADS
|
1144 |
#ifdef IDX_THREADS
|
1141 |
if (m_ndb->m_haveWriteQ) {
|
1145 |
if (m_ndb->m_havewriteq) {
|
1142 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
|
1146 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
|
1143 |
doc.text.length());
|
1147 |
doc.text.length());
|
1144 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1148 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1145 |
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
|
1149 |
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
|
1146 |
return false;
|
1150 |
return false;
|
|
... |
|
... |
1161 |
Chrono chron;
|
1165 |
Chrono chron;
|
1162 |
// In the case where there is a separate (single) db update
|
1166 |
// In the case where there is a separate (single) db update
|
1163 |
// thread, we only need to protect the update map update below
|
1167 |
// thread, we only need to protect the update map update below
|
1164 |
// (against interaction with threads calling needUpdate()). Else,
|
1168 |
// (against interaction with threads calling needUpdate()). Else,
|
1165 |
// all threads from above need to synchronize here
|
1169 |
// all threads from above need to synchronize here
|
1166 |
PTMutexLocker lock(m_mutex, m_haveWriteQ);
|
1170 |
PTMutexLocker lock(m_mutex, m_havewriteq);
|
1167 |
#endif
|
1171 |
#endif
|
1168 |
|
1172 |
|
1169 |
// Check file system full every mbyte of indexed text. It's a bit wasteful
|
1173 |
// Check file system full every mbyte of indexed text. It's a bit wasteful
|
1170 |
// to do this after having prepared the document, but it needs to be in
|
1174 |
// to do this after having prepared the document, but it needs to be in
|
1171 |
// the single-threaded section.
|
1175 |
// the single-threaded section.
|
|
... |
|
... |
1191 |
Xapian::docid did =
|
1195 |
Xapian::docid did =
|
1192 |
xwdb.replace_document(uniterm, newdocument);
|
1196 |
xwdb.replace_document(uniterm, newdocument);
|
1193 |
#ifdef IDX_THREADS
|
1197 |
#ifdef IDX_THREADS
|
1194 |
// Need to protect against interaction with the up-to-date checks
|
1198 |
// Need to protect against interaction with the up-to-date checks
|
1195 |
// which also update the existence map
|
1199 |
// which also update the existence map
|
1196 |
PTMutexLocker lock(m_mutex, !m_haveWriteQ);
|
1200 |
PTMutexLocker lock(m_mutex, !m_havewriteq);
|
1197 |
#endif
|
1201 |
#endif
|
1198 |
if (did < m_rcldb->updated.size()) {
|
1202 |
if (did < m_rcldb->updated.size()) {
|
1199 |
m_rcldb->updated[did] = true;
|
1203 |
m_rcldb->updated[did] = true;
|
1200 |
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
|
1204 |
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
|
1201 |
} else {
|
1205 |
} else {
|
|
... |
|
... |
1227 |
}
|
1231 |
}
|
1228 |
|
1232 |
|
1229 |
#ifdef IDX_THREADS
|
1233 |
#ifdef IDX_THREADS
|
1230 |
void Db::waitUpdIdle()
|
1234 |
void Db::waitUpdIdle()
|
1231 |
{
|
1235 |
{
|
1232 |
if (m_ndb->m_haveWriteQ) {
|
1236 |
if (m_ndb->m_iswritable && m_ndb->m_havewriteq) {
|
1233 |
Chrono chron;
|
1237 |
Chrono chron;
|
1234 |
m_ndb->m_wqueue.waitIdle();
|
1238 |
m_ndb->m_wqueue.waitIdle();
|
1235 |
// We flush here just for correct measurement of the thread work time
|
1239 |
// We flush here just for correct measurement of the thread work time
|
1236 |
string ermsg;
|
1240 |
string ermsg;
|
1237 |
try {
|
1241 |
try {
|
|
... |
|
... |
1413 |
if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false)
|
1417 |
if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false)
|
1414 |
return false;
|
1418 |
return false;
|
1415 |
|
1419 |
|
1416 |
#ifdef IDX_THREADS
|
1420 |
#ifdef IDX_THREADS
|
1417 |
// If we manage our own write queue, make sure it's drained and closed
|
1421 |
// If we manage our own write queue, make sure it's drained and closed
|
1418 |
if (m_ndb->m_haveWriteQ)
|
1422 |
if (m_ndb->m_havewriteq)
|
1419 |
m_ndb->m_wqueue.setTerminateAndWait();
|
1423 |
m_ndb->m_wqueue.setTerminateAndWait();
|
1420 |
// else we need to lock out other top level threads. This is just
|
1424 |
// else we need to lock out other top level threads. This is just
|
1421 |
// a precaution as they should have been waited for by the top
|
1425 |
// a precaution as they should have been waited for by the top
|
1422 |
// level actor at this point
|
1426 |
// level actor at this point
|
1423 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
|
1427 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
|
1424 |
#endif // IDX_THREADS
|
1428 |
#endif // IDX_THREADS
|
1425 |
|
1429 |
|
1426 |
// For xapian versions up to 1.0.1, deleting a non-existant
|
1430 |
// For xapian versions up to 1.0.1, deleting a non-existant
|
1427 |
// document would trigger an exception that would discard any
|
1431 |
// document would trigger an exception that would discard any
|
1428 |
// pending update. This could lose both previous added documents
|
1432 |
// pending update. This could lose both previous added documents
|
|
... |
|
... |
1486 |
bool Db::docExists(const string& uniterm)
|
1490 |
bool Db::docExists(const string& uniterm)
|
1487 |
{
|
1491 |
{
|
1488 |
#ifdef IDX_THREADS
|
1492 |
#ifdef IDX_THREADS
|
1489 |
// If we're not running our own (single) thread, need to protect
|
1493 |
// If we're not running our own (single) thread, need to protect
|
1490 |
// read db against multiaccess (e.g. from needUpdate(), or this method).
|
1494 |
// read db against multiaccess (e.g. from needUpdate(), or this method).
|
1491 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
|
1495 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
|
1492 |
#endif
|
1496 |
#endif
|
1493 |
|
1497 |
|
1494 |
string ermsg;
|
1498 |
string ermsg;
|
1495 |
try {
|
1499 |
try {
|
1496 |
Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
|
1500 |
Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
|
|
... |
|
... |
1519 |
*existed = exists;
|
1523 |
*existed = exists;
|
1520 |
if (!exists)
|
1524 |
if (!exists)
|
1521 |
return true;
|
1525 |
return true;
|
1522 |
|
1526 |
|
1523 |
#ifdef IDX_THREADS
|
1527 |
#ifdef IDX_THREADS
|
1524 |
if (m_ndb->m_haveWriteQ) {
|
1528 |
if (m_ndb->m_havewriteq) {
|
1525 |
Xapian::Document xdoc;
|
1529 |
Xapian::Document xdoc;
|
1526 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
|
1530 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
|
1527 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1531 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1528 |
LOGERR(("Db::purgeFile:Cant queue task\n"));
|
1532 |
LOGERR(("Db::purgeFile:Cant queue task\n"));
|
1529 |
return false;
|
1533 |
return false;
|
|
... |
|
... |
1540 |
{
|
1544 |
{
|
1541 |
#if defined(IDX_THREADS)
|
1545 |
#if defined(IDX_THREADS)
|
1542 |
// If we have a write queue we're called from there, and single
|
1546 |
// If we have a write queue we're called from there, and single
|
1543 |
// threaded, no locking. Else need to mutex other threads from
|
1547 |
// threaded, no locking. Else need to mutex other threads from
|
1544 |
// above
|
1548 |
// above
|
1545 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
|
1549 |
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
|
1546 |
#endif // IDX_THREADS
|
1550 |
#endif // IDX_THREADS
|
1547 |
|
1551 |
|
1548 |
Xapian::WritableDatabase db = m_ndb->xwdb;
|
1552 |
Xapian::WritableDatabase db = m_ndb->xwdb;
|
1549 |
string ermsg;
|
1553 |
string ermsg;
|
1550 |
try {
|
1554 |
try {
|