Python源码示例:django.utils()
示例1
def _execute_wrapper(self, method, query, args):
"""Wrapper around execute() and executemany()"""
try:
return method(query, args)
except (PyMysqlPool.mysql.connector.ProgrammingError) as err:
six.reraise(utils.ProgrammingError,
utils.ProgrammingError(err.msg), sys.exc_info()[2])
except (PyMysqlPool.mysql.connector.IntegrityError) as err:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
except PyMysqlPool.mysql.connector.OperationalError as err:
# Map some error codes to IntegrityError, since they seem to be
# misclassified and Django would prefer the more logical place.
if err.args[0] in self.codes_for_integrityerror:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
else:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
except PyMysqlPool.mysql.connector.DatabaseError as err:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
示例2
def test_format_html(self):
"""
Test: format_html
url: https://github.com/django/django/blob/stable/1.8.x/tests/utils_tests/test_html.py#L44-L53
"""
from django.utils import html
from compat import format_html
self.assertEqual(
format_html("{0} {1} {third} {fourth}",
"< Dangerous >",
html.mark_safe("<b>safe</b>"),
third="< dangerous again",
fourth=html.mark_safe("<i>safe again</i>")
),
"< Dangerous > <b>safe</b> < dangerous again <i>safe again</i>"
)
示例3
def _execute_wrapper(self, method, query, args):
"""Wrapper around execute() and executemany()"""
try:
return method(query, args)
except (mysql.connector.ProgrammingError) as err:
six.reraise(utils.ProgrammingError,
utils.ProgrammingError(err.msg), sys.exc_info()[2])
except (mysql.connector.IntegrityError) as err:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
except mysql.connector.OperationalError as err:
# Map some error codes to IntegrityError, since they seem to be
# misclassified and Django would prefer the more logical place.
if err.args[0] in self.codes_for_integrityerror:
six.reraise(utils.IntegrityError,
utils.IntegrityError(err.msg), sys.exc_info()[2])
else:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
except mysql.connector.DatabaseError as err:
six.reraise(utils.DatabaseError,
utils.DatabaseError(err.msg), sys.exc_info()[2])
示例4
def test_dates_with_aggregation(self):
"""
.dates() returns a distinct set of dates when applied to a
QuerySet with aggregation.
Refs #18056. Previously, .dates() would return distinct (date_kind,
aggregation) sets, in this case (year, num_authors), so 2008 would be
returned twice because there are books from 2008 with a different
number of authors.
"""
srv_ver = connection.get_server_version()
if (12, 0, 0, 0) <= srv_ver < (13, 0, 0, 0):
# this test fails on SQL server 2014
self.skipTest("TODO fix django.db.utils.OperationalError: ORDER BY items must appear in the select list if SELECT DISTINCT is specified.")
dates = Book.objects.annotate(num_authors=Count("authors")).dates('pubdate', 'year')
self.assertQuerysetEqual(
dates, [
"datetime.date(1991, 1, 1)",
"datetime.date(1995, 1, 1)",
"datetime.date(2007, 1, 1)",
"datetime.date(2008, 1, 1)"
]
)
示例5
def test_saml_ok(self):
"""
test with a valid (ticket, service), with a ST and a PT,
the username and all attributes are transmited"""
tickets = [
# return a ServiceTicket (standard ticket) waiting for validation
get_user_ticket_request(self.service)[1],
# return a PT waiting for validation
get_proxy_ticket(self.service)
]
for ticket in tickets:
client = Client()
# we send the POST validation requests
response = client.post(
'/samlValidate?TARGET=%s' % self.service,
self.xml_template % {
'ticket': ticket.value,
'request_id': utils.gen_saml_id(),
'issue_instant': timezone.now().isoformat()
},
content_type="text/xml; encoding='utf-8'"
)
# and it should succeed
self.assert_success(response, settings.CAS_TEST_USER, settings.CAS_TEST_ATTRIBUTES)
示例6
def test_saml_ok_user_field(self):
"""test with a valid(ticket, service), use a attributes as transmitted username"""
for (service, username) in [
(self.service_field_needed_success, settings.CAS_TEST_ATTRIBUTES['alias'][0]),
(self.service_field_needed_success_alt, settings.CAS_TEST_ATTRIBUTES['nom'])
]:
ticket = get_user_ticket_request(service)[1]
client = Client()
response = client.post(
'/samlValidate?TARGET=%s' % service,
self.xml_template % {
'ticket': ticket.value,
'request_id': utils.gen_saml_id(),
'issue_instant': timezone.now().isoformat()
},
content_type="text/xml; encoding='utf-8'"
)
self.assert_success(response, username, {})
示例7
def test_saml_bad_ticket(self):
"""test validation with a bad ST and a bad PT, validation should fail"""
tickets = [utils.gen_st(), utils.gen_pt()]
for ticket in tickets:
client = Client()
response = client.post(
'/samlValidate?TARGET=%s' % self.service,
self.xml_template % {
'ticket': ticket,
'request_id': utils.gen_saml_id(),
'issue_instant': timezone.now().isoformat()
},
content_type="text/xml; encoding='utf-8'"
)
self.assert_error(
response,
"AuthnFailed",
'ticket %s not found' % ticket
)
示例8
def test_saml_bad_ticket_prefix(self):
"""test validation with a bad ticket prefix. Validation should fail with 'AuthnFailed'"""
bad_ticket = "RANDOM-NOT-BEGINING-WITH-ST-OR-ST"
client = Client()
response = client.post(
'/samlValidate?TARGET=%s' % self.service,
self.xml_template % {
'ticket': bad_ticket,
'request_id': utils.gen_saml_id(),
'issue_instant': timezone.now().isoformat()
},
content_type="text/xml; encoding='utf-8'"
)
self.assert_error(
response,
"AuthnFailed",
'ticket %s should begin with PT- or ST-' % bad_ticket
)
示例9
def test_saml_bad_target(self):
"""test with a valid ticket, but using a bad target, validation should fail"""
bad_target = "https://www.example.org"
ticket = get_user_ticket_request(self.service)[1]
client = Client()
response = client.post(
'/samlValidate?TARGET=%s' % bad_target,
self.xml_template % {
'ticket': ticket.value,
'request_id': utils.gen_saml_id(),
'issue_instant': timezone.now().isoformat()
},
content_type="text/xml; encoding='utf-8'"
)
self.assert_error(
response,
"AuthnFailed",
'TARGET %s does not match ticket service' % bad_target
)
示例10
def test_json_attributes(self):
"""test the json storage of ``atrributs`` in ``_attributs``"""
provider = models.FederatedIendityProvider.objects.get(suffix="example.com")
user = models.FederatedUser.objects.create(
username=settings.CAS_TEST_USER,
provider=provider,
attributs=settings.CAS_TEST_ATTRIBUTES,
ticket=""
)
self.assertEqual(utils.json_encode(settings.CAS_TEST_ATTRIBUTES), user._attributs)
user.delete()
user = models.FederatedUser.objects.create(
username=settings.CAS_TEST_USER,
provider=provider,
ticket=""
)
self.assertIsNone(user._attributs)
self.assertIsNone(user.attributs)
示例11
def test_json_attributes(self):
"""test the json storage of ``atrributs`` in ``_attributs``"""
# ge an authenticated client
client = get_auth_client()
# get the user associated to the client
user = self.get_user(client)
ticket = models.ServiceTicket.objects.create(
user=user,
service=self.service,
attributs=settings.CAS_TEST_ATTRIBUTES,
service_pattern=self.service_pattern
)
self.assertEqual(utils.json_encode(settings.CAS_TEST_ATTRIBUTES), ticket._attributs)
ticket.delete()
ticket = models.ServiceTicket.objects.create(
user=user,
service=self.service,
service_pattern=self.service_pattern
)
self.assertIsNone(ticket._attributs)
self.assertIsNone(ticket.attributs)
示例12
def debug(key, value):
from django.utils.termcolors import colorize
if value:
sys.stdout.write(colorize(key, fg='green'))
sys.stdout.write(": ")
sys.stdout.write(colorize(repr(value), fg='white'))
sys.stdout.write("\n")
示例13
def statistics_work():
connect.delete('WORK')
work_dist = dict()
import django
now = django.utils.timezone.now().date()
for i in range(6, -1, -1):
start_day = now - datetime.timedelta(days=i)
end_day = now - datetime.timedelta(days=i-1)
weekday = start_day.weekday()
work_dist[week_list[int(weekday)]] = Push_Mission.objects.filter(
create_time__gt=start_day, create_time__lt=end_day
).count()
for key, value in work_dist.items():
connect.hset('WORK', key, value)
示例14
def pytest_configure(config):
deprecation = config.getoption('deprecation')
only_wagtail = r'^wagtail(\.|$)'
if deprecation == 'all':
# Show all deprecation warnings from all packages
warnings.simplefilter('default', DeprecationWarning)
warnings.simplefilter('default', PendingDeprecationWarning)
elif deprecation == 'pending':
# Show all deprecation warnings from wagtail
warnings.filterwarnings('default', category=DeprecationWarning, module=only_wagtail)
warnings.filterwarnings('default', category=PendingDeprecationWarning, module=only_wagtail)
elif deprecation == 'imminent':
# Show only imminent deprecation warnings from wagtail
warnings.filterwarnings('default', category=DeprecationWarning, module=only_wagtail)
elif deprecation == 'none':
# Deprecation warnings are ignored by default
pass
if config.getoption('postgres'):
os.environ['DATABASE_ENGINE'] = 'django.db.backends.postgresql'
# Setup django after processing the pytest arguments so that the env
# variables are available in the settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'wagtail.tests.settings')
django.setup()
# Activate a language: This affects HTTP header HTTP_ACCEPT_LANGUAGE sent by
# the Django test client.
from django.utils import translation
translation.activate("en")
from wagtail.tests.settings import MEDIA_ROOT, STATIC_ROOT
shutil.rmtree(STATIC_ROOT, ignore_errors=True)
shutil.rmtree(MEDIA_ROOT, ignore_errors=True)
示例15
def execute(self, *args, **options):
"""
Try to execute this command, performing model validation if
needed (as controlled by the attribute
``self.requires_model_validation``, except if force-skipped).
"""
# Switch to English, because django-admin.py creates database content
# like permissions, and those shouldn't contain any translations.
# But only do this if we can assume we have a working settings file,
# because django.utils.translation requires settings.
saved_lang = None
self.stdout = OutputWrapper(options.get('stdout', sys.stdout))
self.stderr = OutputWrapper(options.get('stderr', sys.stderr), self.style.ERROR)
if self.can_import_settings:
from django.utils import translation
saved_lang = translation.get_language()
translation.activate('en-us')
try:
if self.requires_model_validation and not options.get('skip_validation'):
self.validate()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD("COMMIT;"))
finally:
if saved_lang is not None:
translation.activate(saved_lang)
示例16
def get_latest_script(script_version):
"""Downloads the latest script version to the local storage.
:param script_version: :py:class:`~wooey.models.core.ScriptVersion`
:return: boolean
Returns true if a new version was downloaded.
"""
script_path = script_version.script_path
local_storage = utils.get_storage(local=True)
script_exists = local_storage.exists(script_path.name)
if not script_exists:
local_storage.save(script_path.name, script_path.file)
return True
else:
# If script exists, make sure the version is valid, otherwise fetch a new one
script_contents = local_storage.open(script_path.name).read()
script_checksum = utils.get_checksum(buff=script_contents)
if script_checksum != script_version.checksum:
tf = tempfile.TemporaryFile()
with tf:
tf.write(script_contents)
tf.seek(0)
local_storage.delete(script_path.name)
local_storage.save(script_path.name, tf)
return True
return False
示例17
def cleanup_wooey_jobs(**kwargs):
from django.utils import timezone
from .models import WooeyJob
cleanup_settings = wooey_settings.WOOEY_JOB_EXPIRATION
anon_settings = cleanup_settings.get('anonymous')
now = timezone.now()
if anon_settings:
WooeyJob.objects.filter(user=None, created_date__lte=now-anon_settings).delete()
user_settings = cleanup_settings.get('user')
if user_settings:
WooeyJob.objects.filter(user__isnull=False, created_date__lte=now-user_settings).delete()
示例18
def utils(django_utils_function):
def decorator(func):
util_function = helper.deep_getattribute(django_utils, django_utils_function)
if isinstance(util_function, helper.Nothing):
raise Exception('Function {} not exist on django.utils module.'.format(django_utils_function))
@wraps(func)
def wrapper(*args, **kwargs):
return util_function(func(*args, **kwargs))
return wrapper
return decorator
示例19
def test_in_lookup_allows_F_expressions_and_expressions_for_datetimes(self):
self.skipTest("TODO fix django.db.utils.OperationalError: Conversion failed when converting date and/or time from character string.")
# (0.012) QUERY = 'SELECT [expressions_result].[id], [expressions_result].[experiment_id], [expressions_result].[result_time] FROM [expressions_result] INNER JOIN [expressions_experiment] ON ([expressions_result].[experiment_id] = [expressions_experiment].[id]) WHERE [expressions_result].[result_time] BETWEEN %s AND %s' - PARAMS = ('F(experiment__start)', 'F(experiment__end)'); args=('F(experiment__start)', 'F(experiment__end)')
start = datetime.datetime(2016, 2, 3, 15, 0, 0)
end = datetime.datetime(2016, 2, 5, 15, 0, 0)
experiment_1 = Experiment.objects.create(
name='Integrity testing',
assigned=start.date(),
start=start,
end=end,
completed=end.date(),
estimated_time=end - start,
)
experiment_2 = Experiment.objects.create(
name='Taste testing',
assigned=start.date(),
start=start,
end=end,
completed=end.date(),
estimated_time=end - start,
)
Result.objects.create(
experiment=experiment_1,
result_time=datetime.datetime(2016, 2, 4, 15, 0, 0),
)
Result.objects.create(
experiment=experiment_1,
result_time=datetime.datetime(2016, 3, 10, 2, 0, 0),
)
Result.objects.create(
experiment=experiment_2,
result_time=datetime.datetime(2016, 1, 8, 5, 0, 0),
)
within_experiment_time = [F('experiment__start'), F('experiment__end')]
queryset = Result.objects.filter(result_time__range=within_experiment_time)
self.assertQuerysetEqual(queryset, ["<Result: Result at 2016-02-04 15:00:00>"])
within_experiment_time = [F('experiment__start'), F('experiment__end')]
queryset = Result.objects.filter(result_time__range=within_experiment_time)
self.assertQuerysetEqual(queryset, ["<Result: Result at 2016-02-04 15:00:00>"])
示例20
def test_invalid_operator(self):
self.skipTest("TODO fix django.db.utils.ProgrammingError: Incorrect syntax near '1000000'.")
# throws django.db.utils.ProgrammingError instead of DatabaseError
with self.assertRaises(DatabaseError):
list(Experiment.objects.filter(start=F('start') * datetime.timedelta(0)))
示例21
def test_durationfield_add(self):
self.skipTest("TODO fix django.db.utils.ProgrammingError: Incorrect syntax near '1000000'.")
# (0.001) QUERY = 'SELECT [expressions_experiment].[id], [expressions_experiment].[name], [expressions_experiment].[assigned], [expressions_experiment].[completed], [expressions_experiment].[estimated_time], [expressions_experiment].[start], [expressions_experiment].[end] FROM [expressions_experiment] WHERE [expressions_experiment].[start] = ((DATEADD(MICROSECOND, ([expressions_experiment].[estimated_time] %% 1000000), CAST(DATEADD(SECOND, ([expressions_experiment].[estimated_time] / 1000000), CAST([expressions_experiment].[start] as datetime2)) as datetime2)))) ORDER BY [expressions_experiment].[name] ASC' - PARAMS = (); args=()
zeros = [e.name for e in Experiment.objects.filter(start=F('start') + F('estimated_time'))]
self.assertEqual(zeros, ['e0'])
end_less = [e.name for e in Experiment.objects.filter(end__lt=F('start') + F('estimated_time'))]
self.assertEqual(end_less, ['e2'])
delta_math = [
e.name for e in
Experiment.objects.filter(end__gte=F('start') + F('estimated_time') + datetime.timedelta(hours=1))
]
self.assertEqual(delta_math, ['e4'])
示例22
def test_clean_deleted_sessions(self):
"""
tests for clean_deleted_sessions that should delete object for which matching session
do not exists anymore
"""
if django.VERSION >= (1, 8):
client1 = Client()
client2 = Client()
client1.get("/login")
client2.get("/login")
session = client2.session
session['authenticated'] = True
session.save()
models.FederateSLO.objects.create(
username="test1@example.com",
session_key=client1.session.session_key,
ticket=utils.gen_st()
)
models.FederateSLO.objects.create(
username="test2@example.com",
session_key=client2.session.session_key,
ticket=utils.gen_st()
)
self.assertEqual(len(models.FederateSLO.objects.all()), 2)
models.FederateSLO.clean_deleted_sessions()
self.assertEqual(len(models.FederateSLO.objects.all()), 1)
with self.assertRaises(models.FederateSLO.DoesNotExist):
models.FederateSLO.objects.get(username="test1@example.com")
示例23
def process(self, stack, stream):
"""Process the token stream"""
for token_type, value in stream:
is_keyword = token_type in sqlparse.tokens.Keyword
if is_keyword:
yield sqlparse.tokens.Text, '<strong>'
yield token_type, django.utils.html.escape(value)
if is_keyword:
yield sqlparse.tokens.Text, '</strong>'
示例24
def check_constraints(self, table_names=None):
"""Check rows in tables for invalid foreign key references
Checks each table name in `table_names` for rows with invalid foreign
key references. This method is intended to be used in conjunction with
`disable_constraint_checking()` and `enable_constraint_checking()`, to
determine if rows with invalid references were entered while
constraint checks were off.
Raises an IntegrityError on the first invalid foreign key reference
encountered (if any) and provides detailed information about the
invalid reference in the error message.
Backends can override this method if they can more directly apply
constraint checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE")
"""
ref_query = """
SELECT REFERRING.`{0}`, REFERRING.`{1}` FROM `{2}` as REFERRING
LEFT JOIN `{3}` as REFERRED
ON (REFERRING.`{4}` = REFERRED.`{5}`)
WHERE REFERRING.`{6}` IS NOT NULL AND REFERRED.`{7}` IS NULL"""
cursor = self.cursor()
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = \
self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor,
table_name)
for column_name, referenced_table_name, referenced_column_name \
in key_columns:
cursor.execute(ref_query.format(primary_key_column_name,
column_name, table_name,
referenced_table_name,
column_name,
referenced_column_name,
column_name,
referenced_column_name))
for bad_row in cursor.fetchall():
msg = ("The row in table '{0}' with primary key '{1}' has "
"an invalid foreign key: {2}.{3} contains a value "
"'{4}' that does not have a corresponding value in "
"{5}.{6}.".format(table_name, bad_row[0],
table_name, column_name,
bad_row[1], referenced_table_name,
referenced_column_name))
raise utils.IntegrityError(msg)
示例25
def execute(self, *args, **options):
"""
Try to execute this command, performing system checks if needed (as
controlled by attributes ``self.requires_system_checks`` and
``self.requires_model_validation``, except if force-skipped).
"""
if options.get('no_color'):
self.style = no_style()
self.stderr.style_func = None
if options.get('stdout'):
self.stdout = OutputWrapper(options['stdout'])
if options.get('stderr'):
self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func)
saved_locale = None
if not self.leave_locale_alone:
# Only mess with locales if we can assume we have a working
# settings file, because django.utils.translation requires settings
# (The final saying about whether the i18n machinery is active will be
# found in the value of the USE_I18N setting)
if not self.can_import_settings:
raise CommandError("Incompatible values of 'leave_locale_alone' "
"(%s) and 'can_import_settings' (%s) command "
"options." % (self.leave_locale_alone,
self.can_import_settings))
# Deactivate translations, because django-admin creates database
# content like permissions, and those shouldn't contain any
# translations.
from django.utils import translation
saved_locale = translation.get_language()
translation.deactivate_all()
try:
if (self.requires_system_checks and
not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`.
not options.get('skip_checks')):
self.check()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on
# settings.
from django.db import connections, DEFAULT_DB_ALIAS
connection = connections[options.get('database', DEFAULT_DB_ALIAS)]
if connection.ops.start_transaction_sql():
self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()))
self.stdout.write(output)
if self.output_transaction:
self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()))
finally:
if saved_locale is not None:
translation.activate(saved_locale)
示例26
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path
示例27
def check_constraints(self, table_names=None):
"""Check rows in tables for invalid foreign key references
Checks each table name in `table_names` for rows with invalid foreign
key references. This method is intended to be used in conjunction with
`disable_constraint_checking()` and `enable_constraint_checking()`, to
determine if rows with invalid references were entered while
constraint checks were off.
Raises an IntegrityError on the first invalid foreign key reference
encountered (if any) and provides detailed information about the
invalid reference in the error message.
Backends can override this method if they can more directly apply
constraint checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE")
"""
ref_query = """
SELECT REFERRING.`{0}`, REFERRING.`{1}` FROM `{2}` as REFERRING
LEFT JOIN `{3}` as REFERRED
ON (REFERRING.`{4}` = REFERRED.`{5}`)
WHERE REFERRING.`{6}` IS NOT NULL AND REFERRED.`{7}` IS NULL"""
cursor = self.cursor()
if table_names is None:
table_names = self.introspection.table_names(cursor)
for table_name in table_names:
primary_key_column_name = \
self.introspection.get_primary_key_column(cursor, table_name)
if not primary_key_column_name:
continue
key_columns = self.introspection.get_key_columns(cursor,
table_name)
for column_name, referenced_table_name, referenced_column_name \
in key_columns:
cursor.execute(ref_query.format(primary_key_column_name,
column_name, table_name,
referenced_table_name,
column_name,
referenced_column_name,
column_name,
referenced_column_name))
for bad_row in cursor.fetchall():
msg = ("The row in table '{0}' with primary key '{1}' has "
"an invalid foreign key: {2}.{3} contains a value "
"'{4}' that does not have a corresponding value in "
"{5}.{6}.".format(table_name, bad_row[0],
table_name, column_name,
bad_row[1], referenced_table_name,
referenced_column_name))
raise utils.IntegrityError(msg)
示例28
def download(self, url):
"""
Downloads the given URL and returns the file name.
"""
def cleanup_url(url):
tmp = url.rstrip('/')
filename = tmp.split('/')[-1]
if url.endswith('/'):
display_url = tmp + '/'
else:
display_url = url
return filename, display_url
prefix = 'django_%s_template_' % self.app_or_project
tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download')
self.paths_to_remove.append(tempdir)
filename, display_url = cleanup_url(url)
if self.verbosity >= 2:
self.stdout.write("Downloading %s\n" % display_url)
try:
the_path, info = urlretrieve(url, path.join(tempdir, filename))
except IOError as e:
raise CommandError("couldn't download URL %s to %s: %s" %
(url, filename, e))
used_name = the_path.split('/')[-1]
# Trying to get better name from response headers
content_disposition = info.get('content-disposition')
if content_disposition:
_, params = cgi.parse_header(content_disposition)
guessed_filename = params.get('filename') or used_name
else:
guessed_filename = used_name
# Falling back to content type guessing
ext = self.splitext(guessed_filename)[1]
content_type = info.get('content-type')
if not ext and content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
guessed_filename += ext
# Move the temporary file to a filename that has better
# chances of being recognnized by the archive utils
if used_name != guessed_filename:
guessed_path = path.join(tempdir, guessed_filename)
shutil.move(the_path, guessed_path)
return guessed_path
# Giving up
return the_path