Switch to unified view

a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp
...
...
130
Db::Native::Native(Db *db) 
130
Db::Native::Native(Db *db) 
131
    : m_rcldb(db), m_isopen(false), m_iswritable(false),
131
    : m_rcldb(db), m_isopen(false), m_iswritable(false),
132
      m_noversionwrite(false)
132
      m_noversionwrite(false)
133
#ifdef IDX_THREADS
133
#ifdef IDX_THREADS
134
    , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
134
    , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
135
      m_totalworkns(0LL)
135
      m_loglevel(4),
136
      m_totalworkns(0LL), m_havewriteq(false)
136
#endif // IDX_THREADS
137
#endif // IDX_THREADS
137
{ 
138
{ 
138
    LOGDEB1(("Native::Native: me %p\n", this));
139
    LOGDEB1(("Native::Native: me %p\n", this));
139
}
140
}
140
141
141
Db::Native::~Native() 
142
Db::Native::~Native() 
142
{ 
143
{ 
143
    LOGDEB1(("Native::~Native: me %p\n", this));
144
    LOGDEB1(("Native::~Native: me %p\n", this));
144
#ifdef IDX_THREADS
145
#ifdef IDX_THREADS
145
    if (m_haveWriteQ) {
146
    if (m_havewriteq) {
146
    void *status = m_wqueue.setTerminateAndWait();
147
    void *status = m_wqueue.setTerminateAndWait();
147
    LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
148
    LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
148
    }
149
    }
149
#endif // IDX_THREADS
150
#endif // IDX_THREADS
150
}
151
}
...
...
184
185
185
void Db::Native::maybeStartThreads()
186
void Db::Native::maybeStartThreads()
186
{
187
{
187
    m_loglevel = DebugLog::getdbl()->getlevel();
188
    m_loglevel = DebugLog::getdbl()->getlevel();
188
189
189
    m_haveWriteQ = false;
190
    m_havewriteq = false;
190
    const RclConfig *cnf = m_rcldb->m_config;
191
    const RclConfig *cnf = m_rcldb->m_config;
191
    int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
192
    int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
192
    int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
193
    int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
193
    if (writethreads > 1) {
194
    if (writethreads > 1) {
194
    LOGINFO(("RclDb: write threads count was forced down to 1\n"));
195
    LOGINFO(("RclDb: write threads count was forced down to 1\n"));
...
...
197
    if (writeqlen >= 0 && writethreads > 0) {
198
    if (writeqlen >= 0 && writethreads > 0) {
198
    if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
199
    if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
199
        LOGERR(("Db::Db: Worker start failed\n"));
200
        LOGERR(("Db::Db: Worker start failed\n"));
200
        return;
201
        return;
201
    }
202
    }
202
    m_haveWriteQ = true;
203
    m_havewriteq = true;
203
    }
204
    }
204
    LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
205
    LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
205
        m_haveWriteQ, writeqlen, writethreads));
206
        m_havewriteq, writeqlen, writethreads));
206
}
207
}
207
208
208
#endif // IDX_THREADS
209
#endif // IDX_THREADS
209
210
210
/* See comment in class declaration: return all subdocuments of a
211
/* See comment in class declaration: return all subdocuments of a
...
...
498
    if (w) {
499
    if (w) {
499
        if (!m_ndb->m_noversionwrite)
500
        if (!m_ndb->m_noversionwrite)
500
        m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
501
        m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
501
        LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
502
        LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
502
    }
503
    }
504
#ifdef IDX_THREADS
505
  waitUpdIdle();
506
#endif
503
    // Used to do a flush here. Cant see why it should be necessary.
507
    // Used to do a flush here. Cant see why it should be necessary.
504
    deleteZ(m_ndb);
508
    deleteZ(m_ndb);
505
    if (w)
509
    if (w)
506
        LOGDEB(("Rcl::Db:close() xapian close done.\n"));
510
        LOGDEB(("Rcl::Db:close() xapian close done.\n"));
507
    if (final) {
511
    if (final) {
...
...
1136
1140
1137
    LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
1141
    LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
1138
    newdocument.set_data(record);
1142
    newdocument.set_data(record);
1139
1143
1140
#ifdef IDX_THREADS
1144
#ifdef IDX_THREADS
1141
    if (m_ndb->m_haveWriteQ) {
1145
    if (m_ndb->m_havewriteq) {
1142
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, 
1146
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, 
1143
                      doc.text.length());
1147
                      doc.text.length());
1144
    if (!m_ndb->m_wqueue.put(tp)) {
1148
    if (!m_ndb->m_wqueue.put(tp)) {
1145
        LOGERR(("Db::addOrUpdate:Cant queue task\n"));
1149
        LOGERR(("Db::addOrUpdate:Cant queue task\n"));
1146
        return false;
1150
        return false;
...
...
1161
    Chrono chron;
1165
    Chrono chron;
1162
    // In the case where there is a separate (single) db update
1166
    // In the case where there is a separate (single) db update
1163
    // thread, we only need to protect the update map update below
1167
    // thread, we only need to protect the update map update below
1164
    // (against interaction with threads calling needUpdate()). Else,
1168
    // (against interaction with threads calling needUpdate()). Else,
1165
    // all threads from above need to synchronize here
1169
    // all threads from above need to synchronize here
1166
    PTMutexLocker lock(m_mutex, m_haveWriteQ);
1170
    PTMutexLocker lock(m_mutex, m_havewriteq);
1167
#endif
1171
#endif
1168
1172
1169
    // Check file system full every mbyte of indexed text. It's a bit wasteful
1173
    // Check file system full every mbyte of indexed text. It's a bit wasteful
1170
    // to do this after having prepared the document, but it needs to be in
1174
    // to do this after having prepared the document, but it needs to be in
1171
    // the single-threaded section.
1175
    // the single-threaded section.
...
...
1191
    Xapian::docid did = 
1195
    Xapian::docid did = 
1192
        xwdb.replace_document(uniterm, newdocument);
1196
        xwdb.replace_document(uniterm, newdocument);
1193
#ifdef IDX_THREADS
1197
#ifdef IDX_THREADS
1194
    // Need to protect against interaction with the up-to-date checks
1198
    // Need to protect against interaction with the up-to-date checks
1195
    // which also update the existence map
1199
    // which also update the existence map
1196
    PTMutexLocker lock(m_mutex, !m_haveWriteQ);
1200
    PTMutexLocker lock(m_mutex, !m_havewriteq);
1197
#endif
1201
#endif
1198
    if (did < m_rcldb->updated.size()) {
1202
    if (did < m_rcldb->updated.size()) {
1199
        m_rcldb->updated[did] = true;
1203
        m_rcldb->updated[did] = true;
1200
        LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
1204
        LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
1201
    } else {
1205
    } else {
...
...
1227
}
1231
}
1228
1232
1229
#ifdef IDX_THREADS
1233
#ifdef IDX_THREADS
1230
void Db::waitUpdIdle()
1234
void Db::waitUpdIdle()
1231
{
1235
{
1232
    if (m_ndb->m_haveWriteQ) {
1236
    if (m_ndb->m_iswritable && m_ndb->m_havewriteq) {
1233
    Chrono chron;
1237
    Chrono chron;
1234
    m_ndb->m_wqueue.waitIdle();
1238
    m_ndb->m_wqueue.waitIdle();
1235
    // We flush here just for correct measurement of the thread work time
1239
    // We flush here just for correct measurement of the thread work time
1236
    string ermsg;
1240
    string ermsg;
1237
    try {
1241
    try {
...
...
1413
    if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false) 
1417
    if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false) 
1414
    return false;
1418
    return false;
1415
1419
1416
#ifdef IDX_THREADS
1420
#ifdef IDX_THREADS
1417
    // If we manage our own write queue, make sure it's drained and closed
1421
    // If we manage our own write queue, make sure it's drained and closed
1418
    if (m_ndb->m_haveWriteQ)
1422
    if (m_ndb->m_havewriteq)
1419
    m_ndb->m_wqueue.setTerminateAndWait();
1423
    m_ndb->m_wqueue.setTerminateAndWait();
1420
    // else we need to lock out other top level threads. This is just
1424
    // else we need to lock out other top level threads. This is just
1421
    // a precaution as they should have been waited for by the top
1425
    // a precaution as they should have been waited for by the top
1422
    // level actor at this point
1426
    // level actor at this point
1423
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
1427
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
1424
#endif // IDX_THREADS
1428
#endif // IDX_THREADS
1425
1429
1426
    // For xapian versions up to 1.0.1, deleting a non-existant
1430
    // For xapian versions up to 1.0.1, deleting a non-existant
1427
    // document would trigger an exception that would discard any
1431
    // document would trigger an exception that would discard any
1428
    // pending update. This could lose both previous added documents
1432
    // pending update. This could lose both previous added documents
...
...
1486
bool Db::docExists(const string& uniterm)
1490
bool Db::docExists(const string& uniterm)
1487
{
1491
{
1488
#ifdef IDX_THREADS
1492
#ifdef IDX_THREADS
1489
    // If we're not running our own (single) thread, need to protect
1493
    // If we're not running our own (single) thread, need to protect
1490
    // read db against multiaccess (e.g. from needUpdate(), or this method).
1494
    // read db against multiaccess (e.g. from needUpdate(), or this method).
1491
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
1495
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
1492
#endif
1496
#endif
1493
1497
1494
    string ermsg;
1498
    string ermsg;
1495
    try {
1499
    try {
1496
    Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
1500
    Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
...
...
1519
    *existed = exists;
1523
    *existed = exists;
1520
    if (!exists)
1524
    if (!exists)
1521
    return true;
1525
    return true;
1522
1526
1523
#ifdef IDX_THREADS
1527
#ifdef IDX_THREADS
1524
    if (m_ndb->m_haveWriteQ) {
1528
    if (m_ndb->m_havewriteq) {
1525
    Xapian::Document xdoc;
1529
    Xapian::Document xdoc;
1526
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
1530
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
1527
    if (!m_ndb->m_wqueue.put(tp)) {
1531
    if (!m_ndb->m_wqueue.put(tp)) {
1528
        LOGERR(("Db::purgeFile:Cant queue task\n"));
1532
        LOGERR(("Db::purgeFile:Cant queue task\n"));
1529
        return false;
1533
        return false;
...
...
1540
{
1544
{
1541
#if defined(IDX_THREADS) 
1545
#if defined(IDX_THREADS) 
1542
    // If we have a write queue we're called from there, and single
1546
    // If we have a write queue we're called from there, and single
1543
    // threaded, no locking.  Else need to mutex other threads from
1547
    // threaded, no locking.  Else need to mutex other threads from
1544
    // above
1548
    // above
1545
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
1549
    PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
1546
#endif // IDX_THREADS
1550
#endif // IDX_THREADS
1547
1551
1548
    Xapian::WritableDatabase db = m_ndb->xwdb;
1552
    Xapian::WritableDatabase db = m_ndb->xwdb;
1549
    string ermsg;
1553
    string ermsg;
1550
    try {
1554
    try {