|
a/src/rcldb/rcldb.cpp |
|
b/src/rcldb/rcldb.cpp |
|
... |
|
... |
278 |
if (m_config) {
|
278 |
if (m_config) {
|
279 |
m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
|
279 |
m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
|
280 |
m_config->getConfParam("idxflushmb", &m_flushMb);
|
280 |
m_config->getConfParam("idxflushmb", &m_flushMb);
|
281 |
}
|
281 |
}
|
282 |
#ifdef IDX_THREADS
|
282 |
#ifdef IDX_THREADS
|
|
|
283 |
if (m_ndb) {
|
|
|
284 |
m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
|
283 |
if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
|
285 |
if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
|
284 |
LOGERR(("Db::Db: Worker start failed\n"));
|
286 |
LOGERR(("Db::Db: Worker start failed\n"));
|
285 |
return;
|
287 |
return;
|
|
|
288 |
}
|
286 |
}
|
289 |
}
|
287 |
#endif // IDX_THREADS
|
290 |
#endif // IDX_THREADS
|
288 |
}
|
291 |
}
|
289 |
|
292 |
|
290 |
Db::~Db()
|
293 |
Db::~Db()
|
|
... |
|
... |
459 |
{
|
462 |
{
|
460 |
int res = -1;
|
463 |
int res = -1;
|
461 |
if (!m_ndb || !m_ndb->m_isopen)
|
464 |
if (!m_ndb || !m_ndb->m_isopen)
|
462 |
return -1;
|
465 |
return -1;
|
463 |
|
466 |
|
464 |
XAPTRY(res = m_ndb->xdb().get_doccount(), m_ndb->xrdb, m_reason);
|
467 |
XAPTRY(res = m_ndb->xrdb.get_doccount(), m_ndb->xrdb, m_reason);
|
465 |
|
468 |
|
466 |
if (!m_reason.empty()) {
|
469 |
if (!m_reason.empty()) {
|
467 |
LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
|
470 |
LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
|
468 |
return -1;
|
471 |
return -1;
|
469 |
}
|
472 |
}
|
|
... |
|
... |
786 |
#ifdef IDX_THREADS
|
789 |
#ifdef IDX_THREADS
|
787 |
void *DbUpdWorker(void* vdbp)
|
790 |
void *DbUpdWorker(void* vdbp)
|
788 |
{
|
791 |
{
|
789 |
Db *dbp = (Db *)vdbp;
|
792 |
Db *dbp = (Db *)vdbp;
|
790 |
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
|
793 |
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
|
|
|
794 |
DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
|
|
|
795 |
|
791 |
DbUpdTask *tsk;
|
796 |
DbUpdTask *tsk;
|
792 |
|
|
|
793 |
for (;;) {
|
797 |
for (;;) {
|
794 |
if (!tqp->take(&tsk)) {
|
798 |
if (!tqp->take(&tsk)) {
|
795 |
tqp->workerExit();
|
799 |
tqp->workerExit();
|
796 |
return (void*)1;
|
800 |
return (void*)1;
|
797 |
}
|
801 |
}
|
798 |
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
|
802 |
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
|
799 |
|
803 |
if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
|
800 |
const char *fnc = tsk->udi.c_str();
|
804 |
tsk->doc, tsk->txtlen)) {
|
801 |
string ermsg;
|
805 |
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
|
802 |
|
|
|
803 |
// Add db entry or update existing entry:
|
|
|
804 |
try {
|
|
|
805 |
Xapian::docid did =
|
|
|
806 |
dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
|
|
|
807 |
tsk->doc);
|
|
|
808 |
if (did < dbp->updated.size()) {
|
|
|
809 |
dbp->updated[did] = true;
|
|
|
810 |
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
|
|
|
811 |
} else {
|
|
|
812 |
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
|
|
|
813 |
}
|
|
|
814 |
} XCATCHERROR(ermsg);
|
|
|
815 |
|
|
|
816 |
if (!ermsg.empty()) {
|
|
|
817 |
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
|
|
|
818 |
ermsg.erase();
|
|
|
819 |
// FIXME: is this ever actually needed?
|
|
|
820 |
try {
|
|
|
821 |
dbp->m_ndb->xwdb.add_document(tsk->doc);
|
|
|
822 |
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
|
|
|
823 |
fnc));
|
|
|
824 |
} XCATCHERROR(ermsg);
|
|
|
825 |
if (!ermsg.empty()) {
|
|
|
826 |
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
|
|
|
827 |
tqp->workerExit();
|
806 |
tqp->workerExit();
|
|
|
807 |
delete tsk;
|
828 |
return (void*)0;
|
808 |
return (void*)0;
|
829 |
}
|
|
|
830 |
}
|
809 |
}
|
831 |
dbp->maybeflush(tsk->txtlen);
|
|
|
832 |
|
|
|
833 |
delete tsk;
|
810 |
delete tsk;
|
834 |
}
|
811 |
}
|
835 |
}
|
812 |
}
|
836 |
#endif // IDX_THREADS
|
813 |
#endif // IDX_THREADS
|
837 |
|
814 |
|
838 |
// Add document in internal form to the database: index the terms in
|
815 |
// Add document in internal form to the database: index the terms in
|
839 |
// the title abstract and body and add special terms for file name,
|
816 |
// the title abstract and body and add special terms for file name,
|
840 |
// date, mime type etc. , create the document data record (more
|
817 |
// date, mime type etc. , create the document data record (more
|
841 |
// metadata), and update database
|
818 |
// metadata), and update database
|
842 |
bool Db::addOrUpdate(const string &udi, const string &parent_udi,
|
819 |
bool Db::addOrUpdate(RclConfig *config, const string &udi,
|
843 |
Doc &doc)
|
820 |
const string &parent_udi, Doc &doc)
|
844 |
{
|
821 |
{
|
845 |
LOGDEB(("Db::add: udi [%s] parent [%s]\n",
|
822 |
LOGDEB(("Db::add: udi [%s] parent [%s]\n",
|
846 |
udi.c_str(), parent_udi.c_str()));
|
823 |
udi.c_str(), parent_udi.c_str()));
|
847 |
if (m_ndb == 0)
|
824 |
if (m_ndb == 0)
|
848 |
return false;
|
825 |
return false;
|
849 |
// Check file system full every mbyte of indexed text.
|
|
|
850 |
if (m_maxFsOccupPc > 0 &&
|
|
|
851 |
(m_occFirstCheck || (m_curtxtsz - m_occtxtsz) / MB >= 1)) {
|
|
|
852 |
LOGDEB(("Db::add: checking file system usage\n"));
|
|
|
853 |
int pc;
|
|
|
854 |
m_occFirstCheck = 0;
|
|
|
855 |
if (fsocc(m_basedir, &pc) && pc >= m_maxFsOccupPc) {
|
|
|
856 |
LOGERR(("Db::add: stop indexing: file system "
|
|
|
857 |
"%d%% full > max %d%%\n", pc, m_maxFsOccupPc));
|
|
|
858 |
return false;
|
|
|
859 |
}
|
|
|
860 |
m_occtxtsz = m_curtxtsz;
|
|
|
861 |
}
|
|
|
862 |
|
826 |
|
863 |
Xapian::Document newdocument;
|
827 |
Xapian::Document newdocument;
|
864 |
|
828 |
|
865 |
// The term processing pipeline:
|
829 |
// The term processing pipeline:
|
866 |
TermProcIdx tpidx;
|
830 |
TermProcIdx tpidx;
|
|
... |
|
... |
1080 |
cstr_nc);
|
1044 |
cstr_nc);
|
1081 |
}
|
1045 |
}
|
1082 |
if (!doc.meta[Doc::keyabs].empty())
|
1046 |
if (!doc.meta[Doc::keyabs].empty())
|
1083 |
RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
|
1047 |
RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
|
1084 |
|
1048 |
|
1085 |
const set<string>& stored = m_config->getStoredFields();
|
1049 |
const set<string>& stored = config->getStoredFields();
|
1086 |
for (set<string>::const_iterator it = stored.begin();
|
1050 |
for (set<string>::const_iterator it = stored.begin();
|
1087 |
it != stored.end(); it++) {
|
1051 |
it != stored.end(); it++) {
|
1088 |
string nm = m_config->fieldCanon(*it);
|
1052 |
string nm = config->fieldCanon(*it);
|
1089 |
if (!doc.meta[*it].empty()) {
|
1053 |
if (!doc.meta[*it].empty()) {
|
1090 |
string value =
|
1054 |
string value =
|
1091 |
neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
|
1055 |
neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
|
1092 |
RECORD_APPEND(record, nm, value);
|
1056 |
RECORD_APPEND(record, nm, value);
|
1093 |
}
|
1057 |
}
|
|
... |
|
... |
1123 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
|
1087 |
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
|
1124 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1088 |
if (!m_ndb->m_wqueue.put(tp)) {
|
1125 |
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
|
1089 |
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
|
1126 |
return false;
|
1090 |
return false;
|
1127 |
}
|
1091 |
}
|
|
|
1092 |
return true;
|
1128 |
#else
|
1093 |
#else
|
|
|
1094 |
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
|
|
|
1095 |
doc.text.length());
|
|
|
1096 |
#endif // IDX_THREADS
|
|
|
1097 |
}
|
|
|
1098 |
|
|
|
1099 |
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
|
|
|
1100 |
Xapian::Document& newdocument, size_t textlen)
|
|
|
1101 |
{
|
|
|
1102 |
// Check file system full every mbyte of indexed text. It's a bit wasteful
|
|
|
1103 |
// to do this after having prepared the document, but it needs to be in
|
|
|
1104 |
// the single-threaded section.
|
|
|
1105 |
if (m_rcldb->m_maxFsOccupPc > 0 &&
|
|
|
1106 |
(m_rcldb->m_occFirstCheck ||
|
|
|
1107 |
(m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
|
|
|
1108 |
LOGDEB(("Db::add: checking file system usage\n"));
|
|
|
1109 |
int pc;
|
|
|
1110 |
m_rcldb->m_occFirstCheck = 0;
|
|
|
1111 |
if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
|
|
|
1112 |
LOGERR(("Db::add: stop indexing: file system "
|
|
|
1113 |
"%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
|
|
|
1114 |
return false;
|
|
|
1115 |
}
|
|
|
1116 |
m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
|
|
|
1117 |
}
|
|
|
1118 |
|
1129 |
const char *fnc = udi.c_str();
|
1119 |
const char *fnc = udi.c_str();
|
1130 |
string ermsg;
|
1120 |
string ermsg;
|
1131 |
|
1121 |
|
1132 |
// Add db entry or update existing entry:
|
1122 |
// Add db entry or update existing entry:
|
1133 |
try {
|
1123 |
try {
|
1134 |
Xapian::docid did =
|
1124 |
Xapian::docid did =
|
1135 |
m_ndb->xwdb.replace_document(uniterm, newdocument);
|
1125 |
xwdb.replace_document(uniterm, newdocument);
|
1136 |
if (did < updated.size()) {
|
1126 |
if (did < m_rcldb->updated.size()) {
|
1137 |
updated[did] = true;
|
1127 |
m_rcldb->updated[did] = true;
|
1138 |
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
|
1128 |
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
|
1139 |
} else {
|
1129 |
} else {
|
1140 |
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
|
1130 |
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
|
1141 |
}
|
1131 |
}
|
1142 |
} XCATCHERROR(ermsg);
|
1132 |
} XCATCHERROR(ermsg);
|
|
... |
|
... |
1144 |
if (!ermsg.empty()) {
|
1134 |
if (!ermsg.empty()) {
|
1145 |
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
|
1135 |
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
|
1146 |
ermsg.erase();
|
1136 |
ermsg.erase();
|
1147 |
// FIXME: is this ever actually needed?
|
1137 |
// FIXME: is this ever actually needed?
|
1148 |
try {
|
1138 |
try {
|
1149 |
m_ndb->xwdb.add_document(newdocument);
|
1139 |
xwdb.add_document(newdocument);
|
1150 |
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
|
1140 |
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
|
1151 |
fnc));
|
1141 |
fnc));
|
1152 |
} XCATCHERROR(ermsg);
|
1142 |
} XCATCHERROR(ermsg);
|
1153 |
if (!ermsg.empty()) {
|
1143 |
if (!ermsg.empty()) {
|
1154 |
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
|
1144 |
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
|
1155 |
return false;
|
1145 |
return false;
|
1156 |
}
|
1146 |
}
|
1157 |
}
|
1147 |
}
|
1158 |
|
1148 |
|
1159 |
// Test if we're over the flush threshold (limit memory usage):
|
1149 |
// Test if we're over the flush threshold (limit memory usage):
|
1160 |
maybeflush(doc.text.length());
|
1150 |
return m_rcldb->maybeflush(textlen);
|
|
|
1151 |
}
|
|
|
1152 |
|
1161 |
#endif // IDX_THREADS
|
1153 |
#ifdef IDX_THREADS
|
1162 |
return true;
|
1154 |
void Db::waitUpdIdle()
|
|
|
1155 |
{
|
|
|
1156 |
m_ndb->m_wqueue.waitIdle();
|
1163 |
}
|
1157 |
}
|
|
|
1158 |
#endif
|
1164 |
|
1159 |
|
1165 |
// Flush when idxflushmbs is reached
|
1160 |
// Flush when idxflushmbs is reached
|
1166 |
bool Db::maybeflush(off_t moretext)
|
1161 |
bool Db::maybeflush(off_t moretext)
|
1167 |
{
|
1162 |
{
|
1168 |
if (m_flushMb > 0) {
|
1163 |
if (m_flushMb > 0) {
|
|
... |
|
... |
1231 |
|
1226 |
|
1232 |
// Up to date.
|
1227 |
// Up to date.
|
1233 |
|
1228 |
|
1234 |
// Set the uptodate flag for doc / pseudo doc
|
1229 |
// Set the uptodate flag for doc / pseudo doc
|
1235 |
if (m_mode != DbRO) {
|
1230 |
if (m_mode != DbRO) {
|
|
|
1231 |
#warning we need a lock here !
|
1236 |
updated[*docid] = true;
|
1232 |
updated[*docid] = true;
|
1237 |
|
1233 |
|
1238 |
// Set the existence flag for all the subdocs (if any)
|
1234 |
// Set the existence flag for all the subdocs (if any)
|
1239 |
vector<Xapian::docid> docids;
|
1235 |
vector<Xapian::docid> docids;
|
1240 |
if (!m_ndb->subDocs(udi, docids)) {
|
1236 |
if (!m_ndb->subDocs(udi, docids)) {
|
|
... |
|
... |
1242 |
return true;
|
1238 |
return true;
|
1243 |
}
|
1239 |
}
|
1244 |
for (vector<Xapian::docid>::iterator it = docids.begin();
|
1240 |
for (vector<Xapian::docid>::iterator it = docids.begin();
|
1245 |
it != docids.end(); it++) {
|
1241 |
it != docids.end(); it++) {
|
1246 |
if (*it < updated.size()) {
|
1242 |
if (*it < updated.size()) {
|
1247 |
LOGDEB2(("Db::needUpdate: set flag for docid %d\n", *it));
|
1243 |
LOGDEB2(("Db::needUpdate: docid %d set\n", *it));
|
1248 |
updated[*it] = true;
|
1244 |
updated[*it] = true;
|
1249 |
}
|
1245 |
}
|
1250 |
}
|
1246 |
}
|
1251 |
}
|
1247 |
}
|
1252 |
return false;
|
1248 |
return false;
|