Switch to unified view

a b/Ming/ming/datastore.py
1
from __future__ import with_statement
2
import time
3
import logging
4
5
from threading import Lock
6
7
from pymongo.connection import Connection
8
from pymongo.master_slave_connection import MasterSlaveConnection
9
10
from .utils import parse_uri
11
12
log = logging.getLogger(__name__)
13
14
class tl_property(object):
15
16
    def __init__(self, tl_attr='_tl_value'):
17
        self.tl_attr = tl_attr
18
        self.name = self.func = None
19
20
    def __call__(self, func):
21
        self.func = func
22
        self.name = func.__name__
23
        return self
24
25
    def __get__(self, obj, type=None):
26
        tl = getattr(obj, self.tl_attr)
27
        try:
28
            value = getattr(tl, self.name)
29
        except AttributeError:
30
            value = self.func(obj)
31
            setattr(tl, self.name, value)
32
        return value
33
34
class DataStore(object):
35
    """Manages a connections to Mongo, with seprate connections per thread."""
36
37
    def __init__(self, master='mongo://localhost:27017/gutenberg', slave=None,
38
                 connect_retry=10):
39
        # self._tl_value = ThreadLocal()
40
        self._conn = None
41
        self._lock = Lock()
42
        self._connect_retry = connect_retry
43
        self.configure(master, slave)
44
45
    def __repr__(self):
46
        return 'DataStore(master=%r, slave=%r)' % (
47
            self.master_args, self.slave_args)
48
49
    def configure(self, master='mongo://localhost:27017/gutenberg', slave=None):
50
        log.disabled = 0 # @%#$@ logging fileconfig disables our logger
51
        if isinstance(master, basestring):
52
            master = [ master ]
53
        if slave is None: slave = []
54
        assert master, 'You MUST supply at least one master mongo connection'
55
        self.master_args = [ parse_uri(s) for s in master if s ]
56
        self.slave_args = [ parse_uri(s) for s in slave if s ]
57
        if len(self.master_args) > 2:
58
            log.warning(
59
                'Only two masters are supported at present, you specified %r',
60
                master)
61
            self.master_args = self.master_args[:2]
62
        if len(self.master_args) > 1 and self.slave_args:
63
            log.warning(
64
                'Master/slave is not supported with replica pairs')
65
            self.slave_Args = []
66
        self.database = (self.master_args+self.slave_args)[0]['path'][1:]
67
        for a in self.master_args + self.slave_args:
68
            assert a['path'] == '/' + self.database, \
69
                "All connections MUST use the same database"
70
71
    @property
72
    def conn(self):
73
        for attempt in xrange(self._connect_retry+1):
74
            if self._conn is not None: break
75
            with self._lock:
76
                if self._connect() is None:
77
                    time.sleep(1)
78
        return self._conn
79
80
    def _connect(self):
81
        self._conn = None
82
        try:
83
            if len(self.master_args) == 2:
84
                self._conn = Connection.paired(
85
                    (str(self.master_args[0]['host']), int(self.master_args[0]['port'])),
86
                    (str(self.master_args[1]['host']), int(self.master_args[1]['port'])),
87
                    pool_size=16)
88
            else:
89
                if self.master_args:
90
                    try:
91
                        master = Connection(str(self.master_args[0]['host']), int(self.master_args[0]['port']),
92
                                            pool_size=8)
93
                    except:
94
                        if self.slave_args:
95
                            log.exception('Cannot connect to master: %s will use slave: %s' % (self.master_args, self.slave_args))
96
                            # and continue... to use the slave only
97
                            master = None
98
                        else:
99
                            raise
100
                else:
101
                    log.info('No master connection specified, using slaves only: %s' % self.slave_args)
102
                    master = None
103
104
                if self.slave_args:
105
                    slave = [ Connection(str(a['host']), int(a['port']), pool_size=16, slave_okay=True)
106
                               for a in self.slave_args ]
107
                    if master:
108
                        self._conn = MasterSlaveConnection(master, slave)
109
                    else:
110
                        self._conn = slave[0]
111
112
                else:
113
                    self._conn = master
114
        except:
115
            log.exception('Cannot connect to %s %s' % (self.master_args, self.slave_args))
116
        else:
117
            #log.info('Connected to %s %s' % (self.master_args, self.slave_args))
118
            pass
119
        return self._conn
120
121
    @property
122
    def db(self):
123
        return getattr(self.conn, self.database, None)