0001"""
0002Database URI parsing
0003"""
0004
0005import pkg_resources
0006import threading
0007
0008def parse_uri(uri):
0009 """
0010 Parse a URI and return a dictionary representing the parse
0011 results. This dictionary should have the following keys:
0012
0013 ``plugin``:
0014
0015 The plugin for this URI. This plugin also parses the URI
0016 itself.
0017
0018 ``user``:
0019
0020 Username, if given, or None if not. This key should not be
0021 provided if a username is not meaningful for this database.
0022
0023 ``password``:
0024
0025 Like username.
0026
0027 ``host``:
0028
0029 A host name. '' generally means localhost, but may mean
0030 through a local socket. Should not be provided if not
0031 applicable.
0032
0033 ``dbname``:
0034
0035 The name of the database.
0036
0037 ``filename``:
0038
0039 If the database is stored in a single file, the name of that
0040 file.
0041
0042 ``uri``:
0043
0044 The original URI.
0045
0046 ``vars``:
0047
0048 Any query-string parameters. Some of these may be consumed by
0049 the plugin/database connector, but should be present here
0050 anyway.
0051
0052 Other keys are allowed.
0053 """
0054
0055 plugin = load_plugin(uri)
0056 result = plugin.parse_uri(uri)
0057 result['uri'] = uri
0058 result['plugin'] = plugin
0059 return result
0060
0061def load_plugin(uri):
0062 if ':' not in uri:
0063 raise ValueError('Does not look like a URI (no scheme): %r' % uri)
0064 scheme = uri.split(':', 1)[0]
0065 plugin = _find_plugin_by_scheme(scheme)
0066 return plugin
0067
0068_plugins_by_scheme = {}
0069_plugins_by_scheme_lock = threading.Lock()
0070
0071def _find_plugin_by_scheme(name):
0072 if name in _plugins_by_scheme:
0073 return _plugins_by_scheme[name]
0074 _plugins_by_scheme_lock.acquire()
0075 try:
0076 if name in _plugins_by_scheme:
0077 return _plugins_by_scheme[name]
0078 plugin = _get_plugin(name)
0079 if not getattr(plugin, '_sqlapi_activated', False):
0080 plugin.activate()
0081 plugin.activate_exceptions()
0082 plugin._sqlapi_activated = True
0083 return plugin
0084 finally:
0085 _plugins_by_scheme_lock.release()
0086
0087def _get_plugin(plugin_name):
0088 ep_name = plugin_name.lower()
0089 ep = None
0090 dist = None
0091 try:
0092 dist = pkg_resources.get_distribution(plugin_name)
0093 except pkg_resources.DistributionNotFound:
0094 pass
0095 if dist is not None:
0096 try:
0097 ep = pkg_resources.load_entry_point(
0098 dist, 'sqlapi.db_backend', ep_name)
0099 except ImportError:
0100 pass
0101 if ep is None:
0102 options = list(pkg_resources.iter_entry_points(
0103 'sqlapi.db_backend', ep_name))
0104
0105 ep = options[0].load()
0106 if ep is None:
0107
0108 raise ImportError(
0109 "Cannot find any plugin for the name %r" % plugin_name)
0110 return ep
0111
0112
0113
0114
0115
0116def remove_scheme(uri):
0117 """
0118 Return the URI with the scheme removed.
0119
0120 ::
0121
0122 >>> remove_scheme('sqlite:table.db')
0123 'table.db'
0124 """
0125 return uri.split(':', 1)[1]
0126
0127def parse_user_host(uri_wo_scheme):
0128 """
0129 Remove the user and host information. Returns ``(user, password,
0130 host, rest_of_uri)``. Expects to have ``remove_scheme()`` run
0131 first.
0132
0133 ::
0134
0135 >>> parse_user_host('//pgsql@localhost/foo')
0136 ('pgsql', None, 'localhost', 'foo')
0137 >>> parse_user_host('///test')
0138 (None, None, '', 'test')
0139 >>> parse_user_host('//test:pass@foo.com/bar')
0140 ('test', 'pass', 'foo.com', 'bar')
0141 """
0142 if not uri_wo_scheme.startswith('//'):
0143 raise ValueError(
0144 "URIs should have // after scheme: %r" % uri_wo_scheme)
0145 host = uri_wo_scheme[2:]
0146 if '/' not in host:
0147 rest = ''
0148 else:
0149 host, rest = host.split('/', 1)
0150 if '@' in host:
0151 user, host = host.split('@', 1)
0152 else:
0153 user = None
0154 if ':' in user:
0155 user, password = user.split(':', 1)
0156 else:
0157 password = None
0158 return (user, password, host, rest)
0159
0160def parse_query_string(uri, strict_parsing=True):
0161 """
0162 Given a URI (or a fragment, as returned by ``parse_user_host``)
0163 return ``(uri_without_query_string, vars)``. ``vars`` is a
0164 dictionary of variables in the query string (after ``?``). If
0165 there are duplicate variables, they will show up as a list of
0166 values.
0167 """
0168 if '?' in uri:
0169 uri, qs = uri.split('?', 1)
0170 qs_list = cgi.parse_sql(
0171 qs, keep_blank_values=True, strict_parsing=strict_parsing)
0172 d = {}
0173 for name, value in qs_list:
0174 if name in d:
0175 if isinstance(d[name], list):
0176 d[name].append(value)
0177 else:
0178 d[name] = [d[name], value]
0179 else:
0180 d[name] = value
0181 return uri, d
0182 else:
0183 return uri, {}
0184
0185
0186
0187def int_or_none(v, none_text=None, name=None):
0188 if v is None or v == '' or v == none_text:
0189 return None
0190 try:
0191 return int(v)
0192 except ValueError:
0193 if name:
0194 raise ValueError('int value expected for %s (got %r)' % (name, v))
0195 else:
0196 raise ValueError('int value expected instead of %r' % v)
0197
0198def asbool(obj, name=None):
0199 if isinstance(obj, (str, unicode)):
0200 obj = obj.strip().lower()
0201 if obj in ['true', 'yes', 'on', 'y', 't', '1']:
0202 return True
0203 elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
0204 return False
0205 else:
0206 if name:
0207 raise ValueError(
0208 "true/false value expected for %s (got %r)"
0209 % (name, obj))
0210 else:
0211 raise ValueError(
0212 "String is not true/false: %r" % obj)
0213 return bool(obj)