Switch to unified view

a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp
...
...
278
    if (m_config) {
278
    if (m_config) {
279
    m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
279
    m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
280
    m_config->getConfParam("idxflushmb", &m_flushMb);
280
    m_config->getConfParam("idxflushmb", &m_flushMb);
281
    }
281
    }
282
#ifdef IDX_THREADS
282
#ifdef IDX_THREADS
283
    if (m_ndb) {
284
  m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
283
    if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
285
  if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
284
    LOGERR(("Db::Db: Worker start failed\n"));
286
        LOGERR(("Db::Db: Worker start failed\n"));
285
    return;
287
        return;
288
  }
286
    }
289
    }
287
#endif // IDX_THREADS
290
#endif // IDX_THREADS
288
}
291
}
289
292
290
Db::~Db()
293
Db::~Db()
...
...
459
{
462
{
460
    int res = -1;
463
    int res = -1;
461
    if (!m_ndb || !m_ndb->m_isopen)
464
    if (!m_ndb || !m_ndb->m_isopen)
462
        return -1;
465
        return -1;
463
466
464
    XAPTRY(res = m_ndb->xdb().get_doccount(), m_ndb->xrdb, m_reason);
467
    XAPTRY(res = m_ndb->xrdb.get_doccount(), m_ndb->xrdb, m_reason);
465
468
466
    if (!m_reason.empty()) {
469
    if (!m_reason.empty()) {
467
        LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
470
        LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
468
        return -1;
471
        return -1;
469
    }
472
    }
...
...
786
#ifdef IDX_THREADS
789
#ifdef IDX_THREADS
787
void *DbUpdWorker(void* vdbp)
790
void *DbUpdWorker(void* vdbp)
788
{
791
{
789
    Db *dbp = (Db *)vdbp;
792
    Db *dbp = (Db *)vdbp;
790
    WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
793
    WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
794
    DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
795
791
    DbUpdTask *tsk;
796
    DbUpdTask *tsk;
792
793
    for (;;) {
797
    for (;;) {
794
    if (!tqp->take(&tsk)) {
798
    if (!tqp->take(&tsk)) {
795
        tqp->workerExit();
799
        tqp->workerExit();
796
        return (void*)1;
800
        return (void*)1;
797
    }
801
    }
798
    LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
802
    LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
799
803
  if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm, 
800
  const char *fnc = tsk->udi.c_str();
804
                 tsk->doc, tsk->txtlen)) {
801
  string ermsg;
805
      LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
802
803
  // Add db entry or update existing entry:
804
  try {
805
      Xapian::docid did = 
806
      dbp->m_ndb->xwdb.replace_document(tsk->uniterm, 
807
                        tsk->doc);
808
      if (did < dbp->updated.size()) {
809
      dbp->updated[did] = true;
810
      LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
811
      } else {
812
      LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
813
      }
814
  } XCATCHERROR(ermsg);
815
816
  if (!ermsg.empty()) {
817
      LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
818
      ermsg.erase();
819
      // FIXME: is this ever actually needed?
820
      try {
821
      dbp->m_ndb->xwdb.add_document(tsk->doc);
822
      LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n", 
823
          fnc));
824
      } XCATCHERROR(ermsg);
825
      if (!ermsg.empty()) {
826
      LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
827
      tqp->workerExit();
806
        tqp->workerExit();
807
      delete tsk;
828
      return (void*)0;
808
        return (void*)0;
829
      }
830
    }
809
    }
831
  dbp->maybeflush(tsk->txtlen);
832
833
    delete tsk;
810
    delete tsk;
834
    }
811
    }
835
}
812
}
836
#endif // IDX_THREADS
813
#endif // IDX_THREADS
837
814
838
// Add document in internal form to the database: index the terms in
815
// Add document in internal form to the database: index the terms in
839
// the title abstract and body and add special terms for file name,
816
// the title abstract and body and add special terms for file name,
840
// date, mime type etc. , create the document data record (more
817
// date, mime type etc. , create the document data record (more
841
// metadata), and update database
818
// metadata), and update database
842
bool Db::addOrUpdate(const string &udi, const string &parent_udi,
819
bool Db::addOrUpdate(RclConfig *config, const string &udi, 
843
           Doc &doc)
820
           const string &parent_udi, Doc &doc)
844
{
821
{
845
    LOGDEB(("Db::add: udi [%s] parent [%s]\n", 
822
    LOGDEB(("Db::add: udi [%s] parent [%s]\n", 
846
         udi.c_str(), parent_udi.c_str()));
823
         udi.c_str(), parent_udi.c_str()));
847
    if (m_ndb == 0)
824
    if (m_ndb == 0)
848
    return false;
825
    return false;
849
    // Check file system full every mbyte of indexed text.
850
    if (m_maxFsOccupPc > 0 && 
851
  (m_occFirstCheck || (m_curtxtsz - m_occtxtsz) / MB >= 1)) {
852
  LOGDEB(("Db::add: checking file system usage\n"));
853
  int pc;
854
  m_occFirstCheck = 0;
855
  if (fsocc(m_basedir, &pc) && pc >= m_maxFsOccupPc) {
856
      LOGERR(("Db::add: stop indexing: file system "
857
           "%d%% full > max %d%%\n", pc, m_maxFsOccupPc));
858
      return false;
859
  }
860
  m_occtxtsz = m_curtxtsz;
861
    }
862
826
863
    Xapian::Document newdocument;
827
    Xapian::Document newdocument;
864
828
865
    // The term processing pipeline:
829
    // The term processing pipeline:
866
    TermProcIdx tpidx;
830
    TermProcIdx tpidx;
...
...
1080
              cstr_nc);
1044
              cstr_nc);
1081
    }
1045
    }
1082
    if (!doc.meta[Doc::keyabs].empty())
1046
    if (!doc.meta[Doc::keyabs].empty())
1083
    RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
1047
    RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
1084
1048
1085
    const set<string>& stored = m_config->getStoredFields();
1049
    const set<string>& stored = config->getStoredFields();
1086
    for (set<string>::const_iterator it = stored.begin();
1050
    for (set<string>::const_iterator it = stored.begin();
1087
     it != stored.end(); it++) {
1051
     it != stored.end(); it++) {
1088
    string nm = m_config->fieldCanon(*it);
1052
    string nm = config->fieldCanon(*it);
1089
    if (!doc.meta[*it].empty()) {
1053
    if (!doc.meta[*it].empty()) {
1090
        string value = 
1054
        string value = 
1091
        neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
1055
        neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
1092
        RECORD_APPEND(record, nm, value);
1056
        RECORD_APPEND(record, nm, value);
1093
    }
1057
    }
...
...
1123
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
1087
    DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
1124
    if (!m_ndb->m_wqueue.put(tp)) {
1088
    if (!m_ndb->m_wqueue.put(tp)) {
1125
    LOGERR(("Db::addOrUpdate:Cant queue task\n"));
1089
    LOGERR(("Db::addOrUpdate:Cant queue task\n"));
1126
    return false;
1090
    return false;
1127
    }
1091
    }
1092
    return true;
1128
#else
1093
#else
1094
    return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument, 
1095
                 doc.text.length());
1096
#endif // IDX_THREADS
1097
}
1098
1099
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, 
1100
            Xapian::Document& newdocument, size_t textlen)
1101
{
1102
    // Check file system full every mbyte of indexed text. It's a bit wasteful
1103
    // to do this after having prepared the document, but it needs to be in
1104
    // the single-threaded section.
1105
    if (m_rcldb->m_maxFsOccupPc > 0 && 
1106
  (m_rcldb->m_occFirstCheck || 
1107
   (m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
1108
  LOGDEB(("Db::add: checking file system usage\n"));
1109
  int pc;
1110
  m_rcldb->m_occFirstCheck = 0;
1111
  if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
1112
      LOGERR(("Db::add: stop indexing: file system "
1113
          "%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
1114
      return false;
1115
  }
1116
  m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
1117
    }
1118
1129
    const char *fnc = udi.c_str();
1119
    const char *fnc = udi.c_str();
1130
    string ermsg;
1120
    string ermsg;
1131
1121
1132
    // Add db entry or update existing entry:
1122
    // Add db entry or update existing entry:
1133
    try {
1123
    try {
1134
    Xapian::docid did = 
1124
    Xapian::docid did = 
1135
        m_ndb->xwdb.replace_document(uniterm, newdocument);
1125
        xwdb.replace_document(uniterm, newdocument);
1136
    if (did < updated.size()) {
1126
    if (did < m_rcldb->updated.size()) {
1137
        updated[did] = true;
1127
        m_rcldb->updated[did] = true;
1138
        LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
1128
        LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
1139
    } else {
1129
    } else {
1140
        LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
1130
        LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
1141
    }
1131
    }
1142
    } XCATCHERROR(ermsg);
1132
    } XCATCHERROR(ermsg);
...
...
1144
    if (!ermsg.empty()) {
1134
    if (!ermsg.empty()) {
1145
    LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
1135
    LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
1146
    ermsg.erase();
1136
    ermsg.erase();
1147
    // FIXME: is this ever actually needed?
1137
    // FIXME: is this ever actually needed?
1148
    try {
1138
    try {
1149
        m_ndb->xwdb.add_document(newdocument);
1139
        xwdb.add_document(newdocument);
1150
        LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n", 
1140
        LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n", 
1151
            fnc));
1141
            fnc));
1152
    } XCATCHERROR(ermsg);
1142
    } XCATCHERROR(ermsg);
1153
    if (!ermsg.empty()) {
1143
    if (!ermsg.empty()) {
1154
        LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
1144
        LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
1155
        return false;
1145
        return false;
1156
    }
1146
    }
1157
    }
1147
    }
1158
1148
1159
    // Test if we're over the flush threshold (limit memory usage):
1149
    // Test if we're over the flush threshold (limit memory usage):
1160
    maybeflush(doc.text.length());
1150
    return m_rcldb->maybeflush(textlen);
1151
}
1152
1161
#endif // IDX_THREADS
1153
#ifdef IDX_THREADS
1162
    return true;
1154
void Db::waitUpdIdle()
1155
{
1156
    m_ndb->m_wqueue.waitIdle();
1163
}
1157
}
1158
#endif
1164
1159
1165
// Flush when idxflushmbs is reached
1160
// Flush when idxflushmbs is reached
1166
bool Db::maybeflush(off_t moretext)
1161
bool Db::maybeflush(off_t moretext)
1167
{
1162
{
1168
    if (m_flushMb > 0) {
1163
    if (m_flushMb > 0) {
...
...
1231
1226
1232
        // Up to date. 
1227
        // Up to date. 
1233
1228
1234
        // Set the uptodate flag for doc / pseudo doc
1229
        // Set the uptodate flag for doc / pseudo doc
1235
        if (m_mode  != DbRO) {
1230
        if (m_mode  != DbRO) {
1231
#warning we need a lock here !
1236
        updated[*docid] = true;
1232
        updated[*docid] = true;
1237
1233
1238
        // Set the existence flag for all the subdocs (if any)
1234
        // Set the existence flag for all the subdocs (if any)
1239
        vector<Xapian::docid> docids;
1235
        vector<Xapian::docid> docids;
1240
        if (!m_ndb->subDocs(udi, docids)) {
1236
        if (!m_ndb->subDocs(udi, docids)) {
...
...
1242
            return true;
1238
            return true;
1243
        }
1239
        }
1244
        for (vector<Xapian::docid>::iterator it = docids.begin();
1240
        for (vector<Xapian::docid>::iterator it = docids.begin();
1245
             it != docids.end(); it++) {
1241
             it != docids.end(); it++) {
1246
            if (*it < updated.size()) {
1242
            if (*it < updated.size()) {
1247
            LOGDEB2(("Db::needUpdate: set flag for docid %d\n", *it));
1243
            LOGDEB2(("Db::needUpdate: docid %d set\n", *it));
1248
            updated[*it] = true;
1244
            updated[*it] = true;
1249
            }
1245
            }
1250
        }
1246
        }
1251
        }
1247
        }
1252
        return false;
1248
        return false;