Switch to unified view

a/src/index/rclmonprc.cpp b/src/index/rclmonprc.cpp
...
...
22
 * Recoll real time monitor processing. This file has the code to retrieve
22
 * Recoll real time monitor processing. This file has the code to retrieve
23
 * event from the event queue and do the database-side processing. Also the 
23
 * event from the event queue and do the database-side processing. Also the 
24
 * initialization function.
24
 * initialization function.
25
 */
25
 */
26
26
27
#include <pthread.h>
28
#include <errno.h>
27
#include <errno.h>
29
#include <fnmatch.h>
28
#include <fnmatch.h>
30
#include "safeunistd.h"
29
#include "safeunistd.h"
31
30
32
#include <cstring>
31
#include <cstring>
33
#include <cstdio>
32
#include <cstdio>
34
#include <cstdlib>
33
#include <cstdlib>
35
#include <list>
34
#include <list>
36
#include <vector>
35
#include <vector>
36
#include <thread>
37
#include <mutex>
38
#include <condition_variable>
39
#include <chrono>
40
37
using std::list;
41
using std::list;
38
using std::vector;
42
using std::vector;
39
43
40
#include "log.h"
44
#include "log.h"
41
#include "rclmon.h"
45
#include "rclmon.h"
...
...
48
#include "x11mon.h"
52
#include "x11mon.h"
49
#endif
53
#endif
50
#include "subtreelist.h"
54
#include "subtreelist.h"
51
55
52
typedef unsigned long mttcast;
56
typedef unsigned long mttcast;
53
54
static pthread_t rcv_thrid;
55
57
56
// Seconds between auxiliary db (stem, spell) updates:
58
// Seconds between auxiliary db (stem, spell) updates:
57
static const int dfltauxinterval = 60 *60;
59
static const int dfltauxinterval = 60 *60;
58
static int auxinterval = dfltauxinterval;
60
static int auxinterval = dfltauxinterval;
59
61
...
...
133
    delays_type m_delays;
135
    delays_type m_delays;
134
    // Configured intervals for path patterns, read from the configuration.
136
    // Configured intervals for path patterns, read from the configuration.
135
    vector<DelayPat> m_delaypats;
137
    vector<DelayPat> m_delaypats;
136
    RclConfig *m_config;
138
    RclConfig *m_config;
137
    bool       m_ok;
139
    bool       m_ok;
140
138
    pthread_mutex_t m_mutex;
141
    std::mutex m_mutex;
139
    pthread_cond_t m_cond;
142
    std::condition_variable m_cond;
143
140
    RclEQData() 
144
    RclEQData() 
141
    : m_config(0), m_ok(false)
145
    : m_config(0), m_ok(true)
142
    {
146
    {
143
  if (!pthread_mutex_init(&m_mutex, 0) && !pthread_cond_init(&m_cond, 0))
144
      m_ok = true;
145
    }
147
    }
146
    void readDelayPats(int dfltsecs);
148
    void readDelayPats(int dfltsecs);
147
    DelayPat searchDelayPats(const string& path)
149
    DelayPat searchDelayPats(const string& path)
148
    {
150
    {
149
    for (vector<DelayPat>::iterator it = m_delaypats.begin();
151
    for (vector<DelayPat>::iterator it = m_delaypats.begin();
...
...
190
// Insert event (as queue iterator) into delays list, in time order,
192
// Insert event (as queue iterator) into delays list, in time order,
191
// We DO NOT take care of duplicate qits. erase should be called first
193
// We DO NOT take care of duplicate qits. erase should be called first
192
// when necessary.
194
// when necessary.
193
void RclEQData::delayInsert(const queue_type::iterator &qit)
195
void RclEQData::delayInsert(const queue_type::iterator &qit)
194
{
196
{
195
    MONDEB(("RclEQData::delayInsert: minclock %lu\n", 
197
    MONDEB("RclEQData::delayInsert: minclock " << qit->second.m_minclock <<
196
            (mttcast)qit->second.m_minclock));
198
           std::endl);
197
    for (delays_type::iterator dit = m_delays.begin(); 
199
    for (delays_type::iterator dit = m_delays.begin(); 
198
     dit != m_delays.end(); dit++) {
200
     dit != m_delays.end(); dit++) {
199
    queue_type::iterator qit1 = *dit;
201
    queue_type::iterator qit1 = *dit;
200
    if ((*qit1).second.m_minclock > qit->second.m_minclock) {
202
    if ((*qit1).second.m_minclock > qit->second.m_minclock) {
201
        m_delays.insert(dit, qit);
203
        m_delays.insert(dit, qit);
...
...
220
    if (m_data)
222
    if (m_data)
221
    m_data->m_opts = opts;
223
    m_data->m_opts = opts;
222
}
224
}
223
225
224
/** Wait until there is something to process on the queue, or timeout.
226
/** Wait until there is something to process on the queue, or timeout.
225
 *  Must be called with the queue locked 
227
 *  returns a queue lock
226
 */
228
 */
227
bool RclMonEventQueue::wait(int seconds, bool *top)
229
std::unique_lock<std::mutex> RclMonEventQueue::wait(int seconds, bool *top)
228
{
230
{
229
    MONDEB(("RclMonEventQueue::wait\n"));
231
    std::unique_lock<std::mutex> lock(m_data->m_mutex);
232
    
233
    MONDEB("RclMonEventQueue::wait, seconds: " << seconds << std::endl);
230
    if (!empty()) {
234
    if (!empty()) {
231
    MONDEB(("RclMonEventQueue:: imm return\n"));
235
    MONDEB("RclMonEventQueue:: immediate return\n");
232
  return true;
236
  return lock;
233
    }
237
    }
234
238
235
    int err;
239
    int err;
236
    if (seconds > 0) {
240
    if (seconds > 0) {
237
  struct timespec to;
238
  to.tv_sec = time(0L) + seconds;
239
  to.tv_nsec = 0;
240
    if (top)
241
    if (top)
241
        *top = false;
242
        *top = false;
242
  if ((err = 
243
  if (m_data->m_cond.wait_for(lock, std::chrono::seconds(seconds)) ==
243
       pthread_cond_timedwait(&m_data->m_cond, &m_data->m_mutex, &to))) {
244
            std::cv_status::timeout) {
244
      if (err == ETIMEDOUT) {
245
            *top = true;
245
      *top = true;
246
      MONDEB(("RclMonEventQueue:: timeout\n"));
246
            MONDEB("RclMonEventQueue:: timeout\n");
247
      return true;
247
            return lock;
248
      }
248
        }
249
      LOGERR("RclMonEventQueue::wait:pthread_cond_timedwait failedwith err "  << (err) << "\n" );
250
      return false;
251
  }
252
    } else {
249
    } else {
253
  if ((err = pthread_cond_wait(&m_data->m_cond, &m_data->m_mutex))) {
250
  m_data->m_cond.wait(lock);
254
      LOGERR("RclMonEventQueue::wait: pthread_cond_wait failedwith err "  << (err) << "\n" );
255
      return false;
256
  }
257
    }
251
    }
258
    MONDEB(("RclMonEventQueue:: normal return\n"));
252
    MONDEB("RclMonEventQueue:: non-timeout return\n");
259
    return true;
253
    return lock;
260
}
261
262
bool RclMonEventQueue::lock()
263
{
264
    MONDEB(("RclMonEventQueue:: lock\n"));
265
    if (pthread_mutex_lock(&m_data->m_mutex)) {
266
  LOGERR("RclMonEventQueue::lock: pthread_mutex_lock failed\n" );
267
  return false;
268
    }
269
    MONDEB(("RclMonEventQueue:: lock return\n"));
270
    return true;
271
}
272
273
bool RclMonEventQueue::unlock()
274
{
275
    MONDEB(("RclMonEventQueue:: unlock\n"));
276
    if (pthread_mutex_unlock(&m_data->m_mutex)) {
277
  LOGERR("RclMonEventQueue::lock: pthread_mutex_unlock failed\n" );
278
  return false;
279
    }
280
    return true;
281
}
254
}
282
255
283
void RclMonEventQueue::setConfig(RclConfig *cnf)
256
void RclMonEventQueue::setConfig(RclConfig *cnf)
284
{
257
{
285
    m_data->m_config = cnf;
258
    m_data->m_config = cnf;
...
...
310
    return true;
283
    return true;
311
}
284
}
312
285
313
void RclMonEventQueue::setTerminate()
286
void RclMonEventQueue::setTerminate()
314
{
287
{
315
    MONDEB(("RclMonEventQueue:: setTerminate\n"));
288
    MONDEB("RclMonEventQueue:: setTerminate\n");
316
    lock();
289
    std::unique_lock<std::mutex> lock(m_data->m_mutex);
317
    m_data->m_ok = false;
290
    m_data->m_ok = false;
318
    pthread_cond_broadcast(&m_data->m_cond);
291
    m_data->m_cond.notify_all();
319
    unlock();
320
}
292
}
321
293
322
// Must be called with the queue locked
294
// Must be called with the queue locked
323
bool RclMonEventQueue::empty()
295
bool RclMonEventQueue::empty()
324
{
296
{
325
    if (m_data == 0) {
297
    if (m_data == 0) {
326
    MONDEB(("RclMonEventQueue::empty(): true (m_data==0)\n"));
298
    MONDEB("RclMonEventQueue::empty(): true (m_data==0)\n");
327
    return true;
299
    return true;
328
    }
300
    }
329
    if (!m_data->m_iqueue.empty()) {
301
    if (!m_data->m_iqueue.empty()) {
330
    MONDEB(("RclMonEventQueue::empty(): false (m_iqueue not empty)\n"));
302
    MONDEB("RclMonEventQueue::empty(): false (m_iqueue not empty)\n");
331
    return true;
303
    return true;
332
    }
304
    }
333
    if (m_data->m_dqueue.empty()) {
305
    if (m_data->m_dqueue.empty()) {
334
    MONDEB(("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n"));
306
    MONDEB("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n");
335
    return true;
307
    return true;
336
    }
308
    }
337
    // Only dqueue has events. Have to check the delays (only the
309
    // Only dqueue has events. Have to check the delays (only the
338
    // first, earliest one):
310
    // first, earliest one):
339
    queue_type::iterator qit = *(m_data->m_delays.begin());
311
    queue_type::iterator qit = *(m_data->m_delays.begin());
340
    if (qit->second.m_minclock > time(0)) {
312
    if (qit->second.m_minclock > time(0)) {
341
    MONDEB(("RclMonEventQueue::empty(): true (no delay ready %lu)\n",
313
    MONDEB("RclMonEventQueue::empty(): true (no delay ready " << 
342
      (mttcast)qit->second.m_minclock));
314
               qit->second.m_minclock << ")\n");
343
    return true;
315
    return true;
344
    }
316
    }
345
    MONDEB(("RclMonEventQueue::empty(): returning false (delay expired)\n"));
317
    MONDEB("RclMonEventQueue::empty(): returning false (delay expired)\n");
346
    return false;
318
    return false;
347
}
319
}
348
320
349
// Retrieve indexing event for processing. Returns empty event if
321
// Retrieve indexing event for processing. Returns empty event if
350
// nothing interesting is found
322
// nothing interesting is found
351
// Must be called with the queue locked
323
// Must be called with the queue locked
352
RclMonEvent RclMonEventQueue::pop()
324
RclMonEvent RclMonEventQueue::pop()
353
{
325
{
354
    time_t now = time(0);
326
    time_t now = time(0);
355
    MONDEB(("RclMonEventQueue::pop(), now %lu\n", (mttcast)now));
327
    MONDEB("RclMonEventQueue::pop(), now " << now << std::endl);
356
328
357
    // Look at the delayed events, get rid of the expired/unactive
329
    // Look at the delayed events, get rid of the expired/unactive
358
    // ones, possibly return an expired/needidx one.
330
    // ones, possibly return an expired/needidx one.
359
    while (!m_data->m_delays.empty()) {
331
    while (!m_data->m_delays.empty()) {
360
    delays_type::iterator dit = m_data->m_delays.begin();
332
    delays_type::iterator dit = m_data->m_delays.begin();
361
    queue_type::iterator qit = *dit;
333
    queue_type::iterator qit = *dit;
362
    MONDEB(("RclMonEventQueue::pop(): in delays: evt minclock %lu\n", 
334
    MONDEB("RclMonEventQueue::pop(): in delays: evt minclock " << 
363
      (mttcast)qit->second.m_minclock));
335
      qit->second.m_minclock << std::endl);
364
    if (qit->second.m_minclock <= now) {
336
    if (qit->second.m_minclock <= now) {
365
        if (qit->second.m_needidx) {
337
        if (qit->second.m_needidx) {
366
        RclMonEvent ev = qit->second;
338
        RclMonEvent ev = qit->second;
367
        qit->second.m_minclock = time(0) + qit->second.m_itvsecs;
339
        qit->second.m_minclock = time(0) + qit->second.m_itvsecs;
368
        qit->second.m_needidx = false;
340
        qit->second.m_needidx = false;
...
...
397
// older. TBVerified ?
369
// older. TBVerified ?
398
// Some conf-designated files, supposedly updated at a high rate get
370
// Some conf-designated files, supposedly updated at a high rate get
399
// special processing to limit their reindexing rate.
371
// special processing to limit their reindexing rate.
400
bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
372
bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
401
{
373
{
402
    MONDEB(("RclMonEventQueue::pushEvent for %s\n", ev.m_path.c_str()));
374
    MONDEB("RclMonEventQueue::pushEvent for " << ev.m_path << std::endl);
403
    lock();
375
    std::unique_lock<std::mutex> lock(m_data->m_mutex);
404
376
405
    DelayPat pat = m_data->searchDelayPats(ev.m_path);
377
    DelayPat pat = m_data->searchDelayPats(ev.m_path);
406
    if (pat.seconds != 0) {
378
    if (pat.seconds != 0) {
407
    // Using delayed reindex queue. Need to take care of minclock and also
379
    // Using delayed reindex queue. Need to take care of minclock and also
408
    // insert into the in-minclock-order list
380
    // insert into the in-minclock-order list
...
...
430
    // Immediate event: just insert it, erasing any previously
402
    // Immediate event: just insert it, erasing any previously
431
    // existing entry
403
    // existing entry
432
    m_data->m_iqueue[ev.m_path] = ev;
404
    m_data->m_iqueue[ev.m_path] = ev;
433
    }
405
    }
434
406
435
    pthread_cond_broadcast(&m_data->m_cond);
407
    m_data->m_cond.notify_all();
436
    unlock();
437
    return true;
408
    return true;
438
}
409
}
439
410
440
static bool checkfileanddelete(const string& fname)
411
static bool checkfileanddelete(const string& fname)
441
{
412
{
...
...
480
    if (!conf->getConfParam("monauxinterval", &auxinterval))
451
    if (!conf->getConfParam("monauxinterval", &auxinterval))
481
    auxinterval = dfltauxinterval;
452
    auxinterval = dfltauxinterval;
482
    if (!conf->getConfParam("monixinterval", &ixinterval))
453
    if (!conf->getConfParam("monixinterval", &ixinterval))
483
    ixinterval = dfltixinterval;
454
    ixinterval = dfltixinterval;
484
455
485
486
    rclEQ.setConfig(conf);
456
    rclEQ.setConfig(conf);
487
    rclEQ.setopts(opts);
457
    rclEQ.setopts(opts);
488
458
489
    if (pthread_create(&rcv_thrid, 0, &rclMonRcvRun, &rclEQ) != 0) {
459
    std::thread treceive(rclMonRcvRun, &rclEQ);
490
  LOGERR("startMonitor: cant create event-receiving thread\n" );
460
    treceive.detach();
491
  return false;
492
    }
461
    
493
494
    if (!rclEQ.lock()) {
495
  LOGERR("startMonitor: cant lock queue ???\n" );
496
  return false;
497
    }
498
    LOGDEB("start_monitoring: entering main loop\n" );
462
    LOGDEB("start_monitoring: entering main loop\n" );
499
463
500
    bool timedout;
464
    bool timedout;
501
    time_t lastauxtime = time(0);
465
    time_t lastauxtime = time(0);
502
    time_t lastixtime = lastauxtime;
466
    time_t lastixtime = lastauxtime;
503
    bool didsomething = false;
467
    bool didsomething = false;
504
    list<string> modified;
468
    list<string> modified;
505
    list<string> deleted;
469
    list<string> deleted;
506
470
471
    ;
472
    
507
    // Set a relatively short timeout for better monitoring of exit requests
473
    // Set a relatively short timeout for better monitoring of exit requests
508
    while (rclEQ.wait(2, &timedout)) {
474
    while (true) {
509
  // Queue is locked.
475
        {
476
            std::unique_lock<std::mutex> lock = rclEQ.wait(2, &timedout);
510
477
511
  // x11IsAlive() can't be called from ok() because both threads call it
478
            // x11IsAlive() can't be called from ok() because both
512
  // and Xlib is not multithreaded.
479
            // threads call it and Xlib is not multithreaded.
513
#ifndef _WIN32
480
#ifndef _WIN32
514
        bool x11dead = !(opts & RCLMON_NOX11) && !x11IsAlive();
481
            bool x11dead = !(opts & RCLMON_NOX11) && !x11IsAlive();
515
        if (x11dead)
482
            if (x11dead)
516
            LOGDEB("RclMonprc: x11 is dead\n" );
483
                LOGDEB("RclMonprc: x11 is dead\n" );
517
#else
484
#else
518
        bool x11dead = false;
485
            bool x11dead = false;
519
#endif
486
#endif
520
  if (!rclEQ.ok() || x11dead) {
487
            if (!rclEQ.ok() || x11dead) {
521
      rclEQ.unlock();
488
                break;
522
      break;
489
            }
523
  }
524
        
490
        
525
  // Process event queue
491
            // Process event queue
526
  for (;;) {
492
            for (;;) {
527
      // Retrieve event
493
                // Retrieve event
528
      RclMonEvent ev = rclEQ.pop();
494
                RclMonEvent ev = rclEQ.pop();
529
      if (ev.m_path.empty())
495
                if (ev.m_path.empty())
530
      break;
496
                    break;
531
      switch (ev.evtype()) {
497
                switch (ev.evtype()) {
532
      case RclMonEvent::RCLEVT_MODIFY:
498
                case RclMonEvent::RCLEVT_MODIFY:
533
      case RclMonEvent::RCLEVT_DIRCREATE:
499
                case RclMonEvent::RCLEVT_DIRCREATE:
534
      LOGDEB0("Monitor: Modify/Check on "  << (ev.m_path) << "\n" );
500
                    LOGDEB0("Monitor: Modify/Check on "  << ev.m_path << "\n");
535
      modified.push_back(ev.m_path);
501
                    modified.push_back(ev.m_path);
536
      break;
502
                    break;
537
      case RclMonEvent::RCLEVT_DELETE:
503
                case RclMonEvent::RCLEVT_DELETE:
538
      LOGDEB0("Monitor: Delete on "  << (ev.m_path) << "\n" );
504
                    LOGDEB0("Monitor: Delete on "  << (ev.m_path) << "\n" );
539
      // If this is for a directory (which the caller should
505
                    // If this is for a directory (which the caller should
540
      // tell us because he knows), we should purge the db
506
                    // tell us because he knows), we should purge the db
541
      // of all the subtree, because on a directory rename,
507
                    // of all the subtree, because on a directory rename,
542
      // inotify will only generate one event for the
508
                    // inotify will only generate one event for the
543
      // renamed top, not the subentries. This is relatively
509
                    // renamed top, not the subentries. This is relatively
544
      // complicated to do though, and we currently do not
510
                    // complicated to do though, and we currently do not
545
      // do it, and just wait for a restart to do a full run and
511
                    // do it, and just wait for a restart to do a full run and
546
      // purge.
512
                    // purge.
547
      deleted.push_back(ev.m_path);
513
                    deleted.push_back(ev.m_path);
548
      if (ev.evflags() & RclMonEvent::RCLEVT_ISDIR) {
514
                    if (ev.evflags() & RclMonEvent::RCLEVT_ISDIR) {
549
          vector<string> paths;
515
                        vector<string> paths;
550
          if (subtreelist(conf, ev.m_path, paths)) {
516
                        if (subtreelist(conf, ev.m_path, paths)) {
551
          deleted.insert(deleted.end(),
517
                            deleted.insert(deleted.end(),
552
                     paths.begin(), paths.end());
518
                                           paths.begin(), paths.end());
553
          }
519
                        }
554
      }
520
                    }
555
      break;
521
                    break;
556
      default:
522
                default:
557
      LOGDEB("Monitor: got Other on ["  << (ev.m_path) << "]\n" );
523
                    LOGDEB("Monitor: got Other on ["  << (ev.m_path) << "]\n" );
558
      }
524
                }
559
  }
525
            }
560
  // Unlock queue before processing lists
526
        }
561
  rclEQ.unlock();
562
527
563
    // Process. We don't do this every time but let the lists accumulate
528
    // Process. We don't do this every time but let the lists accumulate
564
        // a little, this saves processing. Start at once if list is big.
529
        // a little, this saves processing. Start at once if list is big.
565
        time_t now = time(0);
530
        time_t now = time(0);
566
        if (expeditedIndexingRequested(conf) ||
531
        if (expeditedIndexingRequested(conf) ||
...
...
606
        // change. -n was added by the reexec after the initial
571
        // change. -n was added by the reexec after the initial
607
        // pass even if it was not given on the command line
572
        // pass even if it was not given on the command line
608
        o_reexec->removeArg("-n");
573
        o_reexec->removeArg("-n");
609
        o_reexec->reexec();
574
        o_reexec->reexec();
610
    }
575
    }
611
  // Lock queue before waiting again
612
  rclEQ.lock();
613
    }
576
    }
614
    LOGDEB("Rclmonprc: calling queue setTerminate\n" );
577
    LOGDEB("Rclmonprc: calling queue setTerminate\n" );
615
    rclEQ.setTerminate();
578
    rclEQ.setTerminate();
616
579
617
    // We used to wait for the receiver thread here before returning,
580
    // We used to wait for the receiver thread here before returning,
618
    // but this is not useful and may waste time / risk problems
581
    // but this is not useful and may waste time / risk problems
619
    // during our limited time window for exiting. To be reviewed if
582
    // during our limited time window for exiting. To be reviewed if
620
    // we ever need several monitor invocations in the same process
583
    // we ever need several monitor invocations in the same process
621
    // (can't foresee any reason why we'd want to do this).
584
    // (can't foresee any reason why we'd want to do this).
622
    //   pthread_join(rcv_thrid, 0);
623
    LOGDEB("Monitor: returning\n" );
585
    LOGDEB("Monitor: returning\n" );
624
    return true;
586
    return true;
625
}
587
}
626
588
627
#endif // RCL_MONITOR
589
#endif // RCL_MONITOR