import sys
import subprocess
import time
import urllib
import hashlib
import os
import locale
from gi.repository import Unity, GObject, Gio
try:
from recoll import rclconfig
hasrclconfig = True
except:
hasrclconfig = False
# As a temporary measure, we also look for rclconfig as a bare
# module. This is so that the intermediate releases of the lens can
# ship and use rclconfig.py with the lens code
if not hasrclconfig:
try:
import rclconfig
hasrclconfig = True
except:
pass
try:
from recoll import recoll
from recoll import rclextract
hasextract = True
except:
import recoll
hasextract = False
BUS_PATH = "/org/recoll/unitylensrecoll/scope/main"
XDGCACHE = os.getenv('XDG_CACHE_DIR', os.path.expanduser("~/.cache"))
THUMBDIRS = [os.path.join(XDGCACHE, "thumbnails"),
os.path.expanduser("~/.thumbnails")]
def _get_thumbnail_path(url):
"""Look for a thumbnail for the input url, according to the
freedesktop thumbnail storage standard. The input 'url' always
begins with file:// and is unencoded. We encode it properly
and compute the path inside the thumbnail storage
directory. We return the path only if the thumbnail does exist
(no generation performed)"""
global THUMBDIRS
# Compute the thumbnail file name by encoding and hashing the url string
path = url.replace("file://", "", 1)
try:
path = "file://" + urllib.quote(path)
except:
#print "_get_thumbnail_path: urllib.quote failed"
return None
#print "_get_thumbnail: encoded path: [%s]" % (path,)
thumbname = hashlib.md5(path).hexdigest() + ".png"
# If the "new style" directory exists, we should stop looking in
# the "old style" one (there might be interesting files in there,
# but they may be stale, so it's best to not touch them). We do
# this semi-dynamically so that we catch things up if the
# directory gets created while we are running.
if os.path.exists(THUMBDIRS[0]):
THUMBDIRS = THUMBDIRS[0:1]
# Check in appropriate directories to see if the thumbnail file exists
#print "_get_thumbnail: thumbname: [%s]" % (thumbname,)
for topdir in THUMBDIRS:
for dir in ("large", "normal"):
tpath = os.path.join(topdir, dir, thumbname)
# print "Testing [%s]" % (tpath,)
if os.path.exists(tpath):
return tpath
return None
# Icon names for some recoll mime types which don't have standard icon by the
# normal method
SPEC_MIME_ICONS = {'application/x-fsdirectory' : 'gnome-fs-directory.svg',
'inode/directory' : 'gnome-fs-directory.svg',
'message/rfc822' : 'mail-read',
'application/x-recoll' : 'recoll'}
# Truncate results here:
MAX_RESULTS = 30
# These category ids must match the order in which we add them to the lens
CATEGORY_ALL = 0
# typing timeout: we don't want to start a search for every
# char? Unity does batch on its side, but we may want more control ?
# Or not ? I'm not sure this does any good on a moderate size index.
# Set to 0 to not use it (default). Kept around because this still might be
# useful with a very big index ?
TYPING_TIMEOUT = 0
class Scope (Unity.Scope):
def __init__ (self):
Unity.Scope.__init__ (self, dbus_path=BUS_PATH)
# Listen for changes and requests
self.connect ("activate-uri", self.activate_uri)
if Unity._version == "4.0":
#print "Setting up for Unity 4.0"
self.connect("notify::active-search",
self._on_search_changed)
self.connect("notify::active-global-search",
self._on_global_search_changed)
self.connect("filters-changed",
self._on_search_changed);
else:
#print "Setting up for Unity 5.0+"
self.connect ("search-changed", self._on_search_changed)
self.connect ("filters-changed",
self._on_filters_changed)
self.last_connect_time = 0
self.timeout_id = None
language, self.localecharset = locale.getdefaultlocale()
if hasrclconfig:
self.config = rclconfig.RclConfig()
def _connect_db(self):
#print "Connecting to db"
self.db = None
dblist = []
if hasrclconfig:
extradbs = rclconfig.RclExtraDbs(self.config)
dblist = extradbs.getActDbs()
try:
self.db = recoll.connect(extra_dbs=dblist)
self.db.setAbstractParams(maxchars=200, contextwords=4)
except Exception, s:
print >> sys.stderr, "recoll-lens: Error connecting to db:", s
return
def get_search_string (self):
search = self.props.active_search
return search.props.search_string if search else None
def get_global_search_string (self):
search = self.props.active_global_search
return search.props.search_string if search else None
def search_finished (self):
search = self.props.active_search
if search:
search.emit("finished")
def global_search_finished (self):
search = self.props.active_global_search
if search:
search.emit("finished")
def reset (self):
self._do_browse (self.props.results_model)
self._do_browse (self.props.global_results_model)
def _on_filters_changed (self, scope):
#print "_on_filters_changed()"
self.queue_search_changed(Unity.SearchType.DEFAULT)
if Unity._version == "4.0":
def _on_search_changed (self, scope, param_spec=None):
search_string = self.get_search_string()
results = scope.props.results_model
#print "Search 4.0 changed to: '%s'" % search_string
self._update_results_model (search_string, results)
else:
def _on_search_changed (self, scope, search, search_type, cancellable):
search_string = search.props.search_string
results = search.props.results_model
#print "Search 5.0 changed to: '%s'" % search_string
if search_string:
self._update_results_model (search_string, results)
else:
search.props.results_model.clear()
def _on_global_search_changed (self, scope, param_spec):
search = self.get_global_search_string()
results = scope.props.global_results_model
#print "Global search changed to: '%s'" % search
self._update_results_model (search, results)
def _update_results_model (self, search_string, model):
if search_string:
self._do_search (search_string, model)
else:
self._do_browse (model)
def _do_browse (self, model):
if self.timeout_id is not None:
GObject.source_remove(self.timeout_id)
model.clear ()
if model is self.props.results_model:
self.search_finished()
else:
self.global_search_finished()
def _on_timeout(self, search_string, model):
if self.timeout_id is not None:
GObject.source_remove(self.timeout_id)
self.timeout_id = None
self._really_do_search(search_string, model)
if model is self.props.results_model:
self.search_finished()
else:
self.global_search_finished()
def _do_search (self, search_string, model):
if TYPING_TIMEOUT == 0:
self._really_do_search(search_string, model)
return True
if self.timeout_id is not None:
GObject.source_remove(self.timeout_id)
self.timeout_id = \
GObject.timeout_add(TYPING_TIMEOUT, self._on_timeout,
search_string, model)
def _really_do_search(self, search_string, model):
#print "really_do_search:", "[" + search_string + "]"
model.clear ()
if search_string == "":
return True
current_time = time.time()
if current_time - self.last_connect_time > 10:
self._connect_db()
self.last_connect_time = current_time
if not self.db:
model.append ("",
"error",
CATEGORY_ALL,
"text/plain",
"You need to use the recoll GUI to create the index first !",
"",
"")
return
fcat = self.get_filter("rclcat")
for option in fcat.options:
if option.props.active:
search_string += " rclcat:" + option.props.id
# Do the recoll thing
try:
query = self.db.query()
nres = query.execute(search_string.decode(self.localecharset))
except:
return
actual_results = 0
for i in range(nres):
try:
doc = query.fetchone()
except:
break
# Results with an ipath get a special mime type so that they
# get opened by starting a recoll instance.
thumbnail = None
if doc.ipath != "":
mimetype = "application/x-recoll"
url = doc.url + "#" + doc.ipath
else:
mimetype = doc.mimetype
url = doc.url
# doc.url is a unicode string which is badly wrong.
# Retrieve the binary path for thumbnail access.
thumbnail = _get_thumbnail_path(doc.getbinurl())
titleorfilename = doc.title
if titleorfilename == "":
titleorfilename = doc.filename
iconname = None
if thumbnail:
iconname = thumbnail
else:
if SPEC_MIME_ICONS.has_key(doc.mimetype):
iconname = SPEC_MIME_ICONS[doc.mimetype]
else:
icon = Gio.content_type_get_icon(doc.mimetype)
if icon:
# At least on Quantal, get_names() sometimes returns
# a list with '(null)' in the first position...
for iname in icon.get_names():
if iname != '(null)':
iconname = iname
break
#print "iconname(%s) = %s" % (doc.mimetype, iconname)
try:
abstract = self.db.makeDocAbstract(doc, query).encode('utf-8')
except:
break
model.append (url,
iconname,
CATEGORY_ALL,
mimetype,
titleorfilename,
abstract,
doc.url)
actual_results += 1
if actual_results >= MAX_RESULTS:
break
# If we return from here, the caller gets an error:
# Warning: g_object_get_qdata: assertion `G_IS_OBJECT (object)' failed
# Known bug, see:
# https://bugs.launchpad.net/unity/+bug/893688
# Then, the default url activation takes place
# which is not at all what we want.
# First workaround:
# In the recoll case, we just exit, the lens will be restarted.
# In the regular case, we return, and activation works exactly once for
# 2 calls on oneiric and mostly for precise...
# New workaround, suggested somewhere on the net and kept: other
# construction method
def activate_uri (self, scope, uri):
"""Activation handler for uri"""
#print "Activate: %s" % uri
# Pass all uri without fragments to the desktop handler
if uri.find("#") == -1:
# Reset browsing state when an app is launched
if Unity._version == "4.0":
self.reset ()
ret = Unity.ActivationResponse(handled=Unity.HandledType.NOT_HANDLED,
goto_uri=uri)
return ret
# Pass all others to recoll
proc = subprocess.Popen(["recoll", uri])
#print "Subprocess returned, going back to unity"
scope.props.results_model.clear();
scope.props.global_results_model.clear();
# Old workaround:
#sys.exit(0)
# New and better:
# The goto_uri thing is a workaround suggested somewhere instead of
# passing the string. Does fix the issue
#return Unity.ActivationResponse.new(Unity.HandledType.HIDE_DASH,''
ret = Unity.ActivationResponse(handled=Unity.HandledType.HIDE_DASH,
goto_uri='')
return ret