|
a |
|
b/Ming/ming/datastore.py |
|
|
1 |
from __future__ import with_statement
|
|
|
2 |
import time
|
|
|
3 |
import logging
|
|
|
4 |
|
|
|
5 |
from threading import Lock
|
|
|
6 |
|
|
|
7 |
from pymongo.connection import Connection
|
|
|
8 |
from pymongo.master_slave_connection import MasterSlaveConnection
|
|
|
9 |
|
|
|
10 |
from .utils import parse_uri
|
|
|
11 |
|
|
|
12 |
log = logging.getLogger(__name__)
|
|
|
13 |
|
|
|
14 |
class tl_property(object):
|
|
|
15 |
|
|
|
16 |
def __init__(self, tl_attr='_tl_value'):
|
|
|
17 |
self.tl_attr = tl_attr
|
|
|
18 |
self.name = self.func = None
|
|
|
19 |
|
|
|
20 |
def __call__(self, func):
|
|
|
21 |
self.func = func
|
|
|
22 |
self.name = func.__name__
|
|
|
23 |
return self
|
|
|
24 |
|
|
|
25 |
def __get__(self, obj, type=None):
|
|
|
26 |
tl = getattr(obj, self.tl_attr)
|
|
|
27 |
try:
|
|
|
28 |
value = getattr(tl, self.name)
|
|
|
29 |
except AttributeError:
|
|
|
30 |
value = self.func(obj)
|
|
|
31 |
setattr(tl, self.name, value)
|
|
|
32 |
return value
|
|
|
33 |
|
|
|
34 |
class DataStore(object):
|
|
|
35 |
"""Manages a connections to Mongo, with seprate connections per thread."""
|
|
|
36 |
|
|
|
37 |
def __init__(self, master='mongo://localhost:27017/gutenberg', slave=None,
|
|
|
38 |
connect_retry=10):
|
|
|
39 |
# self._tl_value = ThreadLocal()
|
|
|
40 |
self._conn = None
|
|
|
41 |
self._lock = Lock()
|
|
|
42 |
self._connect_retry = connect_retry
|
|
|
43 |
self.configure(master, slave)
|
|
|
44 |
|
|
|
45 |
def __repr__(self):
|
|
|
46 |
return 'DataStore(master=%r, slave=%r)' % (
|
|
|
47 |
self.master_args, self.slave_args)
|
|
|
48 |
|
|
|
49 |
def configure(self, master='mongo://localhost:27017/gutenberg', slave=None):
|
|
|
50 |
log.disabled = 0 # @%#$@ logging fileconfig disables our logger
|
|
|
51 |
if isinstance(master, basestring):
|
|
|
52 |
master = [ master ]
|
|
|
53 |
if slave is None: slave = []
|
|
|
54 |
assert master, 'You MUST supply at least one master mongo connection'
|
|
|
55 |
self.master_args = [ parse_uri(s) for s in master if s ]
|
|
|
56 |
self.slave_args = [ parse_uri(s) for s in slave if s ]
|
|
|
57 |
if len(self.master_args) > 2:
|
|
|
58 |
log.warning(
|
|
|
59 |
'Only two masters are supported at present, you specified %r',
|
|
|
60 |
master)
|
|
|
61 |
self.master_args = self.master_args[:2]
|
|
|
62 |
if len(self.master_args) > 1 and self.slave_args:
|
|
|
63 |
log.warning(
|
|
|
64 |
'Master/slave is not supported with replica pairs')
|
|
|
65 |
self.slave_Args = []
|
|
|
66 |
self.database = (self.master_args+self.slave_args)[0]['path'][1:]
|
|
|
67 |
for a in self.master_args + self.slave_args:
|
|
|
68 |
assert a['path'] == '/' + self.database, \
|
|
|
69 |
"All connections MUST use the same database"
|
|
|
70 |
|
|
|
71 |
@property
|
|
|
72 |
def conn(self):
|
|
|
73 |
for attempt in xrange(self._connect_retry+1):
|
|
|
74 |
if self._conn is not None: break
|
|
|
75 |
with self._lock:
|
|
|
76 |
if self._connect() is None:
|
|
|
77 |
time.sleep(1)
|
|
|
78 |
return self._conn
|
|
|
79 |
|
|
|
80 |
def _connect(self):
|
|
|
81 |
self._conn = None
|
|
|
82 |
try:
|
|
|
83 |
if len(self.master_args) == 2:
|
|
|
84 |
self._conn = Connection.paired(
|
|
|
85 |
(str(self.master_args[0]['host']), int(self.master_args[0]['port'])),
|
|
|
86 |
(str(self.master_args[1]['host']), int(self.master_args[1]['port'])),
|
|
|
87 |
pool_size=16)
|
|
|
88 |
else:
|
|
|
89 |
if self.master_args:
|
|
|
90 |
try:
|
|
|
91 |
master = Connection(str(self.master_args[0]['host']), int(self.master_args[0]['port']),
|
|
|
92 |
pool_size=8)
|
|
|
93 |
except:
|
|
|
94 |
if self.slave_args:
|
|
|
95 |
log.exception('Cannot connect to master: %s will use slave: %s' % (self.master_args, self.slave_args))
|
|
|
96 |
# and continue... to use the slave only
|
|
|
97 |
master = None
|
|
|
98 |
else:
|
|
|
99 |
raise
|
|
|
100 |
else:
|
|
|
101 |
log.info('No master connection specified, using slaves only: %s' % self.slave_args)
|
|
|
102 |
master = None
|
|
|
103 |
|
|
|
104 |
if self.slave_args:
|
|
|
105 |
slave = [ Connection(str(a['host']), int(a['port']), pool_size=16, slave_okay=True)
|
|
|
106 |
for a in self.slave_args ]
|
|
|
107 |
if master:
|
|
|
108 |
self._conn = MasterSlaveConnection(master, slave)
|
|
|
109 |
else:
|
|
|
110 |
self._conn = slave[0]
|
|
|
111 |
|
|
|
112 |
else:
|
|
|
113 |
self._conn = master
|
|
|
114 |
except:
|
|
|
115 |
log.exception('Cannot connect to %s %s' % (self.master_args, self.slave_args))
|
|
|
116 |
else:
|
|
|
117 |
#log.info('Connected to %s %s' % (self.master_args, self.slave_args))
|
|
|
118 |
pass
|
|
|
119 |
return self._conn
|
|
|
120 |
|
|
|
121 |
@property
|
|
|
122 |
def db(self):
|
|
|
123 |
return getattr(self.conn, self.database, None)
|