Switch to unified view

a/src/index/rclmonrcv.cpp b/src/index/rclmonrcv.cpp
...
...
38
class RclMonitor {
38
class RclMonitor {
39
public:
39
public:
40
    RclMonitor(){}
40
    RclMonitor(){}
41
    virtual ~RclMonitor() {}
41
    virtual ~RclMonitor() {}
42
    virtual bool addWatch(const string& path, bool isDir) = 0;
42
    virtual bool addWatch(const string& path, bool isDir) = 0;
43
    virtual bool getEvent(RclMonEvent& ev, int secs = -1) = 0;
43
    virtual bool getEvent(RclMonEvent& ev, int msecs = -1) = 0;
44
    virtual bool ok() const = 0;
44
    virtual bool ok() const = 0;
45
    // Does this monitor generate 'exist' events at startup?
45
    // Does this monitor generate 'exist' events at startup?
46
    virtual bool generatesExist() const = 0; 
46
    virtual bool generatesExist() const = 0; 
47
};
47
};
48
48
...
...
126
{
126
{
127
    RclMonEventQueue *queue = (RclMonEventQueue *)q;
127
    RclMonEventQueue *queue = (RclMonEventQueue *)q;
128
128
129
    LOGDEB(("rclMonRcvRun: running\n"));
129
    LOGDEB(("rclMonRcvRun: running\n"));
130
    recoll_threadinit();
130
    recoll_threadinit();
131
    // Make a local copy of the configuration as it doesn't like
132
    // concurrent accesses. It's ok to copy it here as the other
133
    // thread will not work before we have sent events.
134
    RclConfig lconfig(*queue->getConfig());
131
135
136
    string loglevel;
137
    lconfig.getConfParam(string("daemloglevel"), loglevel);
138
    if (loglevel.empty())
139
  lconfig.getConfParam(string("loglevel"), loglevel);
140
    if (!loglevel.empty()) {
141
  int lev = atoi(loglevel.c_str());
142
  DebugLog::getdbl()->setloglevel(lev);
143
    }
132
144
133
    // Create the fam/whatever interface object
145
    // Create the fam/whatever interface object
134
    RclMonitor *mon;
146
    RclMonitor *mon;
135
    if ((mon = makeMonitor()) == 0) {
147
    if ((mon = makeMonitor()) == 0) {
136
    LOGERR(("rclMonRcvRun: makeMonitor failed\n"));
148
    LOGERR(("rclMonRcvRun: makeMonitor failed\n"));
137
    queue->setTerminate();
149
    queue->setTerminate();
138
    return 0;
150
    return 0;
139
    }
151
    }
140
152
141
    // Make a local copy of the configuration as it doesn't like
142
    // concurrent accesses. It's ok to copy it here as the other
143
    // thread will not work before we have sent events.
144
    RclConfig lconfig(*queue->getConfig());
145
153
146
    // Get top directories from config 
154
    // Get top directories from config 
147
    list<string> tdl = lconfig.getTopdirs();
155
    list<string> tdl = lconfig.getTopdirs();
148
    if (tdl.empty()) {
156
    if (tdl.empty()) {
149
    LOGERR(("rclMonRcvRun:: top directory list (topdirs param.) not"
157
    LOGERR(("rclMonRcvRun:: top directory list (topdirs param.) not"
...
...
165
        walker.setOpts(FsTreeWalker::FtwFollow);
173
        walker.setOpts(FsTreeWalker::FtwFollow);
166
    } else {
174
    } else {
167
        walker.setOpts(FsTreeWalker::FtwOptNone);
175
        walker.setOpts(FsTreeWalker::FtwOptNone);
168
    }
176
    }
169
    LOGDEB(("rclMonRcvRun: walking %s\n", it->c_str()));
177
    LOGDEB(("rclMonRcvRun: walking %s\n", it->c_str()));
170
  walker.walk(*it, walkcb);
178
  if (walker.walk(*it, walkcb) != FsTreeWalker::FtwOk) {
179
      LOGERR(("rclMonRcvRun: tree walk failed\n"));
180
      goto terminate;
181
  }
182
    }
183
171
    }
184
    {
172
173
    bool dobeagle = false;
185
  bool dobeagle = false;
174
    lconfig.getConfParam("processbeaglequeue", &dobeagle);
186
  lconfig.getConfParam("processbeaglequeue", &dobeagle);
175
    if (dobeagle) {
187
  if (dobeagle) {
176
        string beaglequeuedir;
188
      string beaglequeuedir;
177
        if (!lconfig.getConfParam("beaglequeuedir", beaglequeuedir))
189
      if (!lconfig.getConfParam("beaglequeuedir", beaglequeuedir))
178
            beaglequeuedir = path_tildexpand("~/.beagle/ToIndex/");
190
      beaglequeuedir = path_tildexpand("~/.beagle/ToIndex/");
179
        mon->addWatch(beaglequeuedir, true);
191
      if (!mon->addWatch(beaglequeuedir, true)) {
192
      LOGERR(("rclMonRcvRun: addwatch (beaglequeuedit) failed\n"));
193
      goto terminate;
194
      }
195
  }
180
    }
196
    }
181
197
182
    // Forever wait for monitoring events and add them to queue:
198
    // Forever wait for monitoring events and add them to queue:
183
    MONDEB(("rclMonRcvRun: waiting for events. q->ok() %d\n", queue->ok()));
199
    MONDEB(("rclMonRcvRun: waiting for events. q->ok() %d\n", queue->ok()));
184
    while (queue->ok() && mon->ok()) {
200
    while (queue->ok() && mon->ok()) {
...
...
186
    // Note: I could find no way to get the select
202
    // Note: I could find no way to get the select
187
    // call to return when a signal is delivered to the process
203
    // call to return when a signal is delivered to the process
188
    // (it goes to the main thread, from which I tried to close or
204
    // (it goes to the main thread, from which I tried to close or
189
    // write to the select fd, with no effect). So set a 
205
    // write to the select fd, with no effect). So set a 
190
    // timeout so that an intr will be detected
206
    // timeout so that an intr will be detected
191
    if (mon->getEvent(ev, 2)) {
207
    if (mon->getEvent(ev, 2000)) {
192
        if (ev.m_etyp == RclMonEvent::RCLEVT_DIRCREATE) {
208
        if (ev.m_etyp == RclMonEvent::RCLEVT_DIRCREATE) {
193
        // Recursive addwatch: there may already be stuff
209
        // Recursive addwatch: there may already be stuff
194
        // inside this directory. Ie: files were quickly
210
        // inside this directory. Ie: files were quickly
195
        // created, or this is actually the target of a
211
        // created, or this is actually the target of a
196
        // directory move. This is necessary for inotify, but
212
        // directory move. This is necessary for inotify, but
...
...
199
        // deal as prc will sort/merge).
215
        // deal as prc will sort/merge).
200
        if (!walker.inSkippedNames(path_getsimple(ev.m_path)) && 
216
        if (!walker.inSkippedNames(path_getsimple(ev.m_path)) && 
201
            !walker.inSkippedPaths(ev.m_path)) {
217
            !walker.inSkippedPaths(ev.m_path)) {
202
            LOGDEB(("rclMonRcvRun: walking new dir %s\n", 
218
            LOGDEB(("rclMonRcvRun: walking new dir %s\n", 
203
                ev.m_path.c_str()));
219
                ev.m_path.c_str()));
204
          walker.walk(ev.m_path, walkcb);
220
          if (walker.walk(ev.m_path, walkcb) != FsTreeWalker::FtwOk) {
221
          LOGERR(("rclMonRcvRun: failed walking new dir %s\n", 
222
              ev.m_path.c_str()));
223
          goto terminate;
224
          }
205
        }
225
        }
206
        }
226
        }
207
227
208
        if (ev.m_etyp !=  RclMonEvent::RCLEVT_NONE)
228
        if (ev.m_etyp !=  RclMonEvent::RCLEVT_NONE)
209
        queue->pushEvent(ev);
229
        queue->pushEvent(ev);
210
    }
230
    }
211
    }
231
    }
212
232
233
terminate:
213
    queue->setTerminate();
234
    queue->setTerminate();
214
    LOGINFO(("rclMonRcvRun: monrcv thread routine returning\n"));
235
    LOGINFO(("rclMonRcvRun: monrcv thread routine returning\n"));
215
    return 0;
236
    return 0;
216
}
237
}
217
238
...
...
239
#ifdef RCL_USE_FAM
260
#ifdef RCL_USE_FAM
240
//////////////////////////////////////////////////////////////////////////
261
//////////////////////////////////////////////////////////////////////////
241
/** Fam/gamin -based monitor class */
262
/** Fam/gamin -based monitor class */
242
#include <fam.h>
263
#include <fam.h>
243
#include <sys/select.h>
264
#include <sys/select.h>
265
#include <setjmp.h>
266
#include <unistd.h>
267
#include <signal.h>
244
268
245
/** FAM based monitor class. We have to keep a record of FAM watch
269
/** FAM based monitor class. We have to keep a record of FAM watch
246
    request numbers to directory names as the event only contain the
270
    request numbers to directory names as the event only contain the
247
    request number and file name, not the full path */
271
    request number and file name, not the full path */
248
class RclFAM : public RclMonitor {
272
class RclFAM : public RclMonitor {
249
public:
273
public:
250
    RclFAM();
274
    RclFAM();
251
    virtual ~RclFAM();
275
    virtual ~RclFAM();
252
    virtual bool addWatch(const string& path, bool isdir);
276
    virtual bool addWatch(const string& path, bool isdir);
253
    virtual bool getEvent(RclMonEvent& ev, int secs = -1);
277
    virtual bool getEvent(RclMonEvent& ev, int msecs = -1);
254
    bool ok() const {return m_ok;}
278
    bool ok() const {return m_ok;}
255
    virtual bool generatesExist() const {return true;}
279
    virtual bool generatesExist() const {return true;}
256
280
257
private:
281
private:
258
    bool m_ok;
282
    bool m_ok;
...
...
303
{
327
{
304
    if (ok())
328
    if (ok())
305
    FAMClose(&m_conn);
329
    FAMClose(&m_conn);
306
}
330
}
307
331
332
static jmp_buf jbuf;
333
static void onalrm(int sig)
334
{
335
    longjmp(jbuf, 1);
336
}
308
bool RclFAM::addWatch(const string& path, bool isdir)
337
bool RclFAM::addWatch(const string& path, bool isdir)
309
{
338
{
310
    if (!ok())
339
    if (!ok())
311
    return false;
340
    return false;
341
    bool ret = false;
342
312
    MONDEB(("RclFAM::addWatch: adding %s\n", path.c_str()));
343
    MONDEB(("RclFAM::addWatch: adding %s\n", path.c_str()));
344
345
    // It happens that the following call block forever. 
346
    // We'd like to be able to at least terminate on a signal here, but
347
    // gamin forever retries its write call on EINTR, so it's not even useful
348
    // to unblock signals. SIGALRM is not used by the main thread, so at least
349
    // ensure that we exit after gamin gets stuck.
350
    if (setjmp(jbuf)) {
351
  LOGERR(("RclFAM::addWatch: timeout talking to FAM\n"));
352
  return false;
353
    }
354
    signal(SIGALRM, onalrm);
355
    alarm(20);
313
    FAMRequest req;
356
    FAMRequest req;
314
    if (isdir) {
357
    if (isdir) {
315
    if (FAMMonitorDirectory(&m_conn, path.c_str(), &req, 0) != 0) {
358
    if (FAMMonitorDirectory(&m_conn, path.c_str(), &req, 0) != 0) {
316
        LOGERR(("RclFAM::addWatch: FAMMonitorDirectory failed\n"));
359
        LOGERR(("RclFAM::addWatch: FAMMonitorDirectory failed\n"));
317
      return false;
360
      goto out;
318
    }
361
    }
319
    } else {
362
    } else {
320
    if (FAMMonitorFile(&m_conn, path.c_str(), &req, 0) != 0) {
363
    if (FAMMonitorFile(&m_conn, path.c_str(), &req, 0) != 0) {
321
        LOGERR(("RclFAM::addWatch: FAMMonitorFile failed\n"));
364
        LOGERR(("RclFAM::addWatch: FAMMonitorFile failed\n"));
322
      return false;
365
      goto out;
323
    }
366
    }
324
    }
367
    }
325
    m_idtopath[req.reqnum] = path;
368
    m_idtopath[req.reqnum] = path;
326
    return true;
369
    ret = true;
370
371
out:
372
    alarm(0);
373
    return ret;
327
}
374
}
328
375
329
// Note: return false only for queue empty or error 
376
// Note: return false only for queue empty or error 
330
// Return EVT_NONE for bad event to keep queue processing going
377
// Return EVT_NONE for bad event to keep queue processing going
331
bool RclFAM::getEvent(RclMonEvent& ev, int secs)
378
bool RclFAM::getEvent(RclMonEvent& ev, int msecs)
332
{
379
{
333
    if (!ok())
380
    if (!ok())
334
    return false;
381
    return false;
335
    MONDEB(("RclFAM::getEvent:\n"));
382
    MONDEB(("RclFAM::getEvent:\n"));
336
383
...
...
338
    int fam_fd = FAMCONNECTION_GETFD(&m_conn);
385
    int fam_fd = FAMCONNECTION_GETFD(&m_conn);
339
    FD_ZERO(&readfds);
386
    FD_ZERO(&readfds);
340
    FD_SET(fam_fd, &readfds);
387
    FD_SET(fam_fd, &readfds);
341
388
342
    MONDEB(("RclFAM::getEvent: select. fam_fd is %d\n", fam_fd));
389
    MONDEB(("RclFAM::getEvent: select. fam_fd is %d\n", fam_fd));
390
    // Fam / gamin is sometimes a bit slow to send events. Always add
391
    // a little timeout, because if we fail to retrieve enough events,
392
    // we risk deadlocking in addwatch()
393
    if (msecs == 0)
394
  msecs = 2;
343
    struct timeval timeout;
395
    struct timeval timeout;
344
    if (secs >= 0) {
396
    if (msecs >= 0) {
345
  memset(&timeout, 0, sizeof(timeout));
346
    timeout.tv_sec = secs;
397
    timeout.tv_sec = msecs / 1000;
398
  timeout.tv_usec = (msecs % 1000) * 1000;
347
    }
399
    }
348
    int ret;
400
    int ret;
349
    if ((ret=select(fam_fd+1, &readfds, 0, 0, secs >= 0 ? &timeout : 0)) < 0) {
401
    if ((ret=select(fam_fd+1, &readfds, 0, 0, msecs >= 0 ? &timeout : 0)) < 0) {
350
    LOGERR(("RclFAM::getEvent: select failed, errno %d\n", errno));
402
    LOGERR(("RclFAM::getEvent: select failed, errno %d\n", errno));
351
    close();
403
    close();
352
    return false;
404
    return false;
353
    } else if (ret == 0) {
405
    } else if (ret == 0) {
354
    // timeout
406
    // timeout
...
...
453
    {
505
    {
454
    close();
506
    close();
455
    }
507
    }
456
508
457
    virtual bool addWatch(const string& path, bool isdir);
509
    virtual bool addWatch(const string& path, bool isdir);
458
    virtual bool getEvent(RclMonEvent& ev, int secs = -1);
510
    virtual bool getEvent(RclMonEvent& ev, int msecs = -1);
459
    bool ok() const {return m_ok;}
511
    bool ok() const {return m_ok;}
460
    virtual bool generatesExist() const {return false;}
512
    virtual bool generatesExist() const {return false;}
461
513
462
private:
514
private:
463
    bool m_ok;
515
    bool m_ok;
...
...
530
    return true;
582
    return true;
531
}
583
}
532
584
533
// Note: return false only for queue empty or error 
585
// Note: return false only for queue empty or error 
534
// Return EVT_NONE for bad event to keep queue processing going
586
// Return EVT_NONE for bad event to keep queue processing going
535
bool RclIntf::getEvent(RclMonEvent& ev, int secs)
587
bool RclIntf::getEvent(RclMonEvent& ev, int msecs)
536
{
588
{
537
    if (!ok())
589
    if (!ok())
538
    return false;
590
    return false;
539
    ev.m_etyp = RclMonEvent::RCLEVT_NONE;
591
    ev.m_etyp = RclMonEvent::RCLEVT_NONE;
540
    MONDEB(("RclIntf::getEvent:\n"));
592
    MONDEB(("RclIntf::getEvent:\n"));
...
...
542
    if (m_evp == 0) {
594
    if (m_evp == 0) {
543
    fd_set readfds;
595
    fd_set readfds;
544
    FD_ZERO(&readfds);
596
    FD_ZERO(&readfds);
545
    FD_SET(m_fd, &readfds);
597
    FD_SET(m_fd, &readfds);
546
    struct timeval timeout;
598
    struct timeval timeout;
547
    if (secs >= 0) {
599
    if (msecs >= 0) {
548
      memset(&timeout, 0, sizeof(timeout));
549
        timeout.tv_sec = secs;
600
        timeout.tv_sec = msecs / 1000;
601
      timeout.tv_usec = (msecs % 1000) * 1000;
550
    }
602
    }
551
    int ret;
603
    int ret;
552
    MONDEB(("RclIntf::getEvent: select\n"));
604
    MONDEB(("RclIntf::getEvent: select\n"));
553
    if ((ret=select(m_fd + 1, &readfds, 0, 0, secs >= 0 ? &timeout : 0)) < 0) {
605
    if ((ret=select(m_fd + 1, &readfds, 0, 0, msecs >= 0 ? &timeout : 0)) < 0) {
554
        LOGERR(("RclIntf::getEvent: select failed, errno %d\n", errno));
606
        LOGERR(("RclIntf::getEvent: select failed, errno %d\n", errno));
555
        close();
607
        close();
556
        return false;
608
        return false;
557
    } else if (ret == 0) {
609
    } else if (ret == 0) {
558
        MONDEB(("RclIntf::getEvent: select timeout\n"));
610
        MONDEB(("RclIntf::getEvent: select timeout\n"));