Python源码示例:django.db()
示例1
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例2
def run(self):
for db_alias in settings.DATABASES:
self.db_alias = db_alias
self.db_vendor = connections[self.db_alias].vendor
print('Benchmarking %s…' % self.db_vendor)
for cache_alias in settings.CACHES:
cache = caches[cache_alias]
self.cache_name = cache.__class__.__name__[:-5].lower()
with override_settings(CACHALOT_CACHE=cache_alias):
self.execute_benchmark()
self.df = pd.DataFrame.from_records(self.data)
if not os.path.exists(RESULTS_PATH):
os.mkdir(RESULTS_PATH)
self.df.to_csv(os.path.join(RESULTS_PATH, 'data.csv'))
self.xlim = (0, self.df['time'].max() * 1.01)
self.output('db')
self.output('cache')
示例3
def import_places(self):
# munigeo saves addresses in local db, we just create Places from them.
# note that the addresses only change daily and the import is time-consuming, so we should not run this hourly
# addresses require the municipalities to be present in the db
call_command('geo_import', 'finland', municipalities=True)
call_command('geo_import', 'helsinki', addresses=True)
queryset = Place.objects.filter(data_source=self.data_source)
if self.options.get('single', None):
obj_id = self.options['single']
obj_list = [self.pk_get('Address', obj_id)]
queryset = queryset.filter(id=obj_id)
else:
logger.info("Loading addresses...")
obj_list = self.pk_get('Address')
logger.info("%s addresses loaded" % len(obj_list))
syncher = ModelSyncher(queryset, lambda obj: obj.origin_id, delete_func=self.mark_deleted,
check_deleted_func=self.check_deleted)
for idx, obj in enumerate(obj_list):
if idx and (idx % 1000) == 0:
logger.info("%s addresses processed" % idx)
self._import_address(syncher, obj)
syncher.finish(self.options.get('remap', False))
示例4
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例5
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例6
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例7
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例8
def fetchExdbText(self, guid):
filepath = DataDir + "exdb/data/" + guid
if not os.path.exists(filepath):
url = "https://www.exploit-db.com/raw/{id}".format(id=guid)
logger.info(url)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko'}
try:
res = requests.get(url, headers=headers)
except Exception as e:
logger.error(e)
if not res.text == '':
open(filepath, 'w').write(res.text)
if guid.endswith("pdf"):
text = ''
else:
text = open(filepath).read()
sleep(5)
return text
示例9
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例10
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例11
def get_fields_from_path(model, path):
""" Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
示例12
def execute_wrapper(wrapped, instance, args, kwargs):
"""
CursorWrapper.execute() wrapper for Django < 2.0
"""
try:
sql = _extract_sql(*args, **kwargs)
except TypeError:
sql = None
if sql is not None:
tracked_request = TrackedRequest.instance()
span = tracked_request.start_span(operation="SQL/Query")
span.tag("db.statement", sql)
try:
return wrapped(*args, **kwargs)
finally:
if sql is not None:
tracked_request.stop_span()
if tracked_request.n_plus_one_tracker.should_capture_backtrace(
sql, span.duration()
):
span.capture_backtrace()
示例13
def executemany_wrapper(wrapped, instance, args, kwargs):
"""
CursorWrapper.executemany() wrapper for Django < 2.0
"""
try:
sql, param_list = _extract_sql_param_list(*args, **kwargs)
except TypeError:
sql = None
param_list = None
if sql is not None:
tracked_request = TrackedRequest.instance()
span = tracked_request.start_span(operation="SQL/Many")
span.tag("db.statement", sql)
try:
return wrapped(*args, **kwargs)
finally:
if sql is not None:
tracked_request.stop_span()
if tracked_request.n_plus_one_tracker.should_capture_backtrace(
sql=sql, duration=span.duration(), count=len(param_list),
):
span.capture_backtrace()
示例14
def test_dates_with_aggregation(self):
"""
.dates() returns a distinct set of dates when applied to a
QuerySet with aggregation.
Refs #18056. Previously, .dates() would return distinct (date_kind,
aggregation) sets, in this case (year, num_authors), so 2008 would be
returned twice because there are books from 2008 with a different
number of authors.
"""
srv_ver = connection.get_server_version()
if (12, 0, 0, 0) <= srv_ver < (13, 0, 0, 0):
# this test fails on SQL server 2014
self.skipTest("TODO fix django.db.utils.OperationalError: ORDER BY items must appear in the select list if SELECT DISTINCT is specified.")
dates = Book.objects.annotate(num_authors=Count("authors")).dates('pubdate', 'year')
self.assertQuerysetEqual(
dates, [
"datetime.date(1991, 1, 1)",
"datetime.date(1995, 1, 1)",
"datetime.date(2007, 1, 1)",
"datetime.date(2008, 1, 1)"
]
)
示例15
def test_in_lookup_allows_F_expressions_and_expressions_for_integers(self):
# __in lookups can use F() expressions for integers.
self.skipTest("TODO fix django.db.transaction.TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.")
queryset = Company.objects.filter(num_employees__in=([F('num_chairs') - 10]))
self.assertQuerysetEqual(queryset, ['<Company: 5060 Ltd>'], ordered=False)
self.assertQuerysetEqual(
Company.objects.filter(num_employees__in=([F('num_chairs') - 10, F('num_chairs') + 10])),
['<Company: 5040 Ltd>', '<Company: 5060 Ltd>'],
ordered=False
)
self.assertQuerysetEqual(
Company.objects.filter(
num_employees__in=([F('num_chairs') - 10, F('num_chairs'), F('num_chairs') + 10])
),
['<Company: 5040 Ltd>', '<Company: 5050 Ltd>', '<Company: 5060 Ltd>'],
ordered=False
)
示例16
def copyCourseSetup(course_copy_from, course_copy_to, redirect_pages):
"""
copy all the activities setup from one course to another
copy numeric activities with their marking components, common problems and submission components
"""
from marking.tasks import copy_setup_pages_task
with django.db.transaction.atomic():
course_copy_from.config['redirect_pages'] = redirect_pages
course_copy_from.save()
copy_setup_base(course_copy_from, course_copy_to)
copy_setup_activities(course_copy_from, course_copy_to)
# copy pages asynchronously, since it can be slow with many pages.
copy_setup_pages_task.delay(course_copy_from.slug, course_copy_to.slug)
示例17
def close_django_connection() -> None:
from django.db import connection
connection.close()
示例18
def setup_projections_table(self, process):
from django.db import connections
with connections["default"].schema_editor() as schema_editor:
assert isinstance(schema_editor, BaseDatabaseSchemaEditor)
try:
schema_editor.delete_model(self.projection_record_class)
except django.db.utils.ProgrammingError:
pass
with connections["default"].schema_editor() as schema_editor:
assert isinstance(schema_editor, BaseDatabaseSchemaEditor)
schema_editor.create_model(self.projection_record_class)
示例19
def idempotent_transaction(func):
if django.VERSION < (1, 7,) or django.VERSION >= (2, 0) and settings.DATABASES['default']['ENGINE'] == 'django.db.backends.sqlite3':
return func
else:
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
with transaction.atomic():
sp = transaction.savepoint()
try:
func(*args, **kwargs)
transaction.savepoint_rollback(sp)
except BaseException:
raise
return func_wrapper
示例20
def bench_once(self, context, num_queries, invalidate_before=False):
for _ in range(self.n):
if invalidate_before:
invalidate(db_alias=self.db_alias)
with AssertNumQueries(num_queries, using=self.db_alias):
start = time()
self.query_function(self.db_alias)
end = time()
self.data.append(
{'query': self.query_name,
'time': end - start,
'context': context,
'db': self.db_vendor,
'cache': self.cache_name})
示例21
def commit(using=None):
"""
Possibility of calling transaction.commit() in new Django versions (in atomic block).
"""
try:
django.db.transaction.commit(using)
except django.db.transaction.TransactionManagementError:
pass
示例22
def rollback(using=None, sid=None):
"""
Possibility of calling transaction.rollback() in new Django versions (in atomic block).
Important: transaction savepoint (sid) is required for Django < 1.8
"""
if sid:
django.db.transaction.savepoint_rollback(sid)
else:
try:
django.db.transaction.rollback(using)
except django.db.transaction.TransactionManagementError:
django.db.transaction.set_rollback(True, using)
# HttpResponseBase only exists from 1.5 onwards
示例23
def setUpClass(self):
super().setUpClass()
self.runner = django.test.runner.DiscoverRunner(interactive=False)
django.test.utils.setup_test_environment()
self.old_config = self.runner.setup_databases()
self.db = Db()
示例24
def tearDownClass(self):
if 'KEEP_DATA' in os.environ:
print("\nkeeping test database: %r." % self.db.db_name, file=sys.stderr)
else:
self.db.delete_all()
self.runner.teardown_databases(self.old_config)
django.test.utils.teardown_test_environment()
super().tearDownClass()
示例25
def load(self):
self.cpp = sc2.RankingData(self.db.db_name, Enums.INFO)
self.cpp.load(self.db.ranking.id)
示例26
def process_ladder(self, load=False, save=False, region=Region.EU, fetch_time=None,
mode=Mode.TEAM_1V1, version=Version.HOTS, league=League.GOLD, season=None, tier=0,
members=None, **kwargs):
""" Update a ranking building single member with kwargs or use members if set. """
season = season or self.db.season
fetch_time = fetch_time or utcnow()
members = members or [gen_member(**kwargs)]
if not getattr(self, 'cpp', None):
self.cpp = sc2.RankingData(self.db.db_name, Enums.INFO)
if load:
self.load()
self.cpp.update_with_ladder(0, # bid
0, # source_id
region,
mode,
league,
tier,
version,
season.id,
to_unix(fetch_time),
fetch_time.date().isoformat(),
Mode.team_size(mode),
members)
if save:
self.save_to_ranking()
示例27
def save_to_ranking(self):
self.cpp.save_data(self.db.ranking.id, self.db.ranking.season_id, to_unix(utcnow()))
示例28
def mock_current_season(self, status=200, season_id=None, start_time=None, fetch_time=None):
self.bnet.fetch_current_season = \
Mock(return_value=SeasonResponse(status,
ApiSeason({'seasonId': season_id or self.db.season.id,
'startDate': to_unix(start_time or utcnow())},
'http://fake-url'),
fetch_time or utcnow(), 0))
示例29
def mock_fetch_league(self, status=200, fetch_time=None, season_id=None, t0_bids=None, t1_bids=None, t2_bids=None):
season_id = season_id or self.db.season.id
self.bnet.fetch_league = \
Mock(return_value=LeagueResponse(status,
ApiLeague({'tier': [
{'id': 0, 'division': [{'ladder_id': lid} for lid in t0_bids or []]},
{'id': 1, 'division': [{'ladder_id': lid} for lid in t1_bids or []]},
{'id': 2, 'division': [{'ladder_id': lid} for lid in t2_bids or []]},
]}, url="http://fake-url", bid=season_id * 100000),
fetch_time or utcnow(), 0))
示例30
def __init__(self, cursor, db):
self.cursor = cursor
self.db = db