Python源码示例:django.contrib.admin.models.CHANGE
示例1
def change(request, object, message_or_fields):
"""
Log that an object has been successfully changed.
The argument *message_or_fields* must be a sequence of modified field names
or a custom change message.
"""
if isinstance(message_or_fields, str):
message = message_or_fields
else:
message = get_change_message(message_or_fields)
models.LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=ContentType.objects.get_for_model(object).pk,
object_id=object.pk,
object_repr=force_str(object),
action_flag=models.CHANGE,
change_message=message,
)
示例2
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=force_text(object),
action_flag=CHANGE,
change_message=message
)
示例3
def test_log_actions(self):
ma = ModelAdmin(Band, self.site)
mock_request = MockRequest()
mock_request.user = User.objects.create(username='bill')
content_type = get_content_type_for_model(self.band)
tests = (
(ma.log_addition, ADDITION, {'added': {}}),
(ma.log_change, CHANGE, {'changed': {'fields': ['name', 'bio']}}),
(ma.log_deletion, DELETION, str(self.band)),
)
for method, flag, message in tests:
with self.subTest(name=method.__name__):
created = method(mock_request, self.band, message)
fetched = LogEntry.objects.filter(action_flag=flag).latest('id')
self.assertEqual(created, fetched)
self.assertEqual(fetched.action_flag, flag)
self.assertEqual(fetched.content_type, content_type)
self.assertEqual(fetched.object_id, str(self.band.pk))
self.assertEqual(fetched.user, mock_request.user)
if flag == DELETION:
self.assertEqual(fetched.change_message, '')
self.assertEqual(fetched.object_repr, message)
else:
self.assertEqual(fetched.change_message, str(message))
self.assertEqual(fetched.object_repr, str(self.band))
示例4
def lookups(self, request, model_admin):
return (
(admin_models.ADDITION, 'Added'),
(admin_models.CHANGE, 'Changed'),
(admin_models.DELETION, 'Deleted'),
)
示例5
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
return LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=str(object),
action_flag=CHANGE,
change_message=message,
)
示例6
def disable_action(modeladmin, request, queryset):
queryset.update(is_active=False)
ct = ContentType.objects.get_for_model(queryset.model)
for entry in queryset:
LogEntry.objects.log_action(user_id=request.user.id,
content_type_id=ct.pk,
object_id=entry.pk,
object_repr=entry.username,
action_flag=CHANGE,
change_message=_("Disabled"))
messages.add_message(request, messages.INFO, '%d disabled' % queryset.count())
示例7
def enable_action(modeladmin, request, queryset):
queryset.update(is_active=True)
ct = ContentType.objects.get_for_model(queryset.model)
for entry in queryset:
LogEntry.objects.log_action(user_id=request.user.id,
content_type_id=ct.pk,
object_id=entry.pk,
object_repr=entry.username,
action_flag=CHANGE,
change_message=_("Enabled"))
messages.add_message(request, messages.INFO, '%d enabled' % queryset.count())
示例8
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
return LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=str(object),
action_flag=CHANGE,
change_message=message,
)
示例9
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
return LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=force_text(object),
action_flag=CHANGE,
change_message=message,
)
示例10
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=force_text(object),
action_flag=CHANGE,
change_message=message,
)
示例11
def log_change(self, request, object, message):
"""
Log that an object has been successfully changed.
The default implementation creates an admin LogEntry object.
"""
from django.contrib.admin.models import LogEntry, CHANGE
return LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=get_content_type_for_model(object).pk,
object_id=object.pk,
object_repr=force_text(object),
action_flag=CHANGE,
change_message=message,
)
示例12
def perform_action(self, request, action, queryset):
try:
method = getattr(Subscription, action)
except AttributeError:
self.message_user(request, 'Illegal action.', level=messages.ERROR)
return
failed_count = 0
queryset_count = queryset.count()
for entry in queryset:
try:
method(entry)
entry.save()
LogEntry.objects.log_action(
user_id=request.user.id,
content_type_id=ContentType.objects.get_for_model(entry).pk,
object_id=entry.id,
object_repr=force_text(entry),
action_flag=CHANGE,
change_message='{action} action initiated by user.'.format(
action=action.replace('_', ' ').strip().capitalize()
)
)
except TransitionNotAllowed:
failed_count += 1
if failed_count:
if failed_count == queryset_count:
self.message_user(request, 'Illegal state change attempt.',
level=messages.ERROR)
else:
self.message_user(request, '%d state(s) changed (%d failed).' %
(queryset_count - failed_count, failed_count),
level=messages.WARNING)
else:
self.message_user(request, 'Successfully changed %d state(s).' %
queryset_count)
示例13
def _call_method_on_queryset(self, request, method, queryset, action):
results = {}
for document in queryset:
results[document] = {}
try:
results[document]['success'] = True
results[document]['result'] = method(document)
document.save()
LogEntry.objects.log_action(
user_id=request.user.id,
content_type_id=ContentType.objects.get_for_model(document).pk,
object_id=document.id,
object_repr=force_text(document),
action_flag=CHANGE,
change_message='{action} action initiated by user.'.format(
action=action.replace('_', ' ').strip().capitalize()
)
)
except TransitionNotAllowed as error:
results[document]['result'] = mark_safe(error)
results[document]['success'] = False
except ValueError as error:
results[document]['result'] = force_text(error)
results[document]['success'] = False
except AttributeError:
results[document]['success'] = False
results[document]['result'] = ""
return results
示例14
def setUp(self):
self.user = User.objects.create_superuser(username='super', password='secret', email='super@example.com')
self.site = Site.objects.create(domain='example.org')
self.a1 = Article.objects.create(
site=self.site,
title="Title",
created=datetime(2008, 3, 12, 11, 54),
)
content_type_pk = ContentType.objects.get_for_model(Article).pk
LogEntry.objects.log_action(
self.user.pk, content_type_pk, self.a1.pk, repr(self.a1), CHANGE,
change_message='Changed something'
)
self.client.force_login(self.user)
示例15
def test_logentry_unicode(self):
log_entry = LogEntry()
log_entry.action_flag = ADDITION
self.assertTrue(str(log_entry).startswith('Added '))
log_entry.action_flag = CHANGE
self.assertTrue(str(log_entry).startswith('Changed '))
log_entry.action_flag = DELETION
self.assertTrue(str(log_entry).startswith('Deleted '))
# Make sure custom action_flags works
log_entry.action_flag = 4
self.assertEqual(str(log_entry), 'LogEntry Object')
示例16
def test_log_action(self):
content_type_pk = ContentType.objects.get_for_model(Article).pk
log_entry = LogEntry.objects.log_action(
self.user.pk, content_type_pk, self.a1.pk, repr(self.a1), CHANGE,
change_message='Changed something else',
)
self.assertEqual(log_entry, LogEntry.objects.latest('id'))
示例17
def test_proxy_model_content_type_is_used_for_log_entries(self):
"""
Log entries for proxy models should have the proxy model's contenttype
(#21084).
"""
proxy_content_type = ContentType.objects.get_for_model(ArticleProxy, for_concrete_model=False)
post_data = {
'site': self.site.pk, 'title': "Foo", 'hist': "Bar",
'created_0': '2015-12-25', 'created_1': '00:00',
}
changelist_url = reverse('admin:admin_utils_articleproxy_changelist')
# add
proxy_add_url = reverse('admin:admin_utils_articleproxy_add')
response = self.client.post(proxy_add_url, post_data)
self.assertRedirects(response, changelist_url)
proxy_addition_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_addition_log.action_flag, ADDITION)
self.assertEqual(proxy_addition_log.content_type, proxy_content_type)
# change
article_id = proxy_addition_log.object_id
proxy_change_url = reverse('admin:admin_utils_articleproxy_change', args=(article_id,))
post_data['title'] = 'New'
response = self.client.post(proxy_change_url, post_data)
self.assertRedirects(response, changelist_url)
proxy_change_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_change_log.action_flag, CHANGE)
self.assertEqual(proxy_change_log.content_type, proxy_content_type)
# delete
proxy_delete_url = reverse('admin:admin_utils_articleproxy_delete', args=(article_id,))
response = self.client.post(proxy_delete_url, {'post': 'yes'})
self.assertRedirects(response, changelist_url)
proxy_delete_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_delete_log.action_flag, DELETION)
self.assertEqual(proxy_delete_log.content_type, proxy_content_type)
示例18
def setUp(self):
self.user = User.objects.create_superuser(username='super', password='secret', email='super@example.com')
self.site = Site.objects.create(domain='example.org')
self.a1 = Article.objects.create(
site=self.site,
title="Title",
created=datetime(2008, 3, 12, 11, 54),
)
content_type_pk = ContentType.objects.get_for_model(Article).pk
LogEntry.objects.log_action(
self.user.pk, content_type_pk, self.a1.pk, repr(self.a1), CHANGE,
change_message='Changed something'
)
self.client.force_login(self.user)
示例19
def test_logentry_unicode(self):
log_entry = LogEntry()
log_entry.action_flag = ADDITION
self.assertTrue(str(log_entry).startswith('Added '))
log_entry.action_flag = CHANGE
self.assertTrue(str(log_entry).startswith('Changed '))
log_entry.action_flag = DELETION
self.assertTrue(str(log_entry).startswith('Deleted '))
# Make sure custom action_flags works
log_entry.action_flag = 4
self.assertEqual(str(log_entry), 'LogEntry Object')
示例20
def test_log_action(self):
content_type_pk = ContentType.objects.get_for_model(Article).pk
log_entry = LogEntry.objects.log_action(
self.user.pk, content_type_pk, self.a1.pk, repr(self.a1), CHANGE,
change_message='Changed something else',
)
self.assertEqual(log_entry, LogEntry.objects.latest('id'))
示例21
def test_proxy_model_content_type_is_used_for_log_entries(self):
"""
Log entries for proxy models should have the proxy model's contenttype
(#21084).
"""
proxy_content_type = ContentType.objects.get_for_model(ArticleProxy, for_concrete_model=False)
post_data = {
'site': self.site.pk, 'title': "Foo", 'hist': "Bar",
'created_0': '2015-12-25', 'created_1': '00:00',
}
changelist_url = reverse('admin:admin_utils_articleproxy_changelist')
# add
proxy_add_url = reverse('admin:admin_utils_articleproxy_add')
response = self.client.post(proxy_add_url, post_data)
self.assertRedirects(response, changelist_url)
proxy_addition_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_addition_log.action_flag, ADDITION)
self.assertEqual(proxy_addition_log.content_type, proxy_content_type)
# change
article_id = proxy_addition_log.object_id
proxy_change_url = reverse('admin:admin_utils_articleproxy_change', args=(article_id,))
post_data['title'] = 'New'
response = self.client.post(proxy_change_url, post_data)
self.assertRedirects(response, changelist_url)
proxy_change_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_change_log.action_flag, CHANGE)
self.assertEqual(proxy_change_log.content_type, proxy_content_type)
# delete
proxy_delete_url = reverse('admin:admin_utils_articleproxy_delete', args=(article_id,))
response = self.client.post(proxy_delete_url, {'post': 'yes'})
self.assertRedirects(response, changelist_url)
proxy_delete_log = LogEntry.objects.latest('id')
self.assertEqual(proxy_delete_log.action_flag, DELETION)
self.assertEqual(proxy_delete_log.content_type, proxy_content_type)
示例22
def disable_action(modeladmin, request, queryset):
queryset.update(is_active=False)
ct = ContentType.objects.get_for_model(queryset.model)
for entry in queryset:
LogEntry.objects.log_action(
user_id=request.user.id,
content_type_id=ct.pk,
object_id=entry.pk,
object_repr=entry.username,
action_flag=CHANGE,
change_message=_('Disabled'),
)
messages.add_message(request, messages.INFO, '%d disabled' % queryset.count())
示例23
def enable_action(modeladmin, request, queryset):
queryset.update(is_active=True)
ct = ContentType.objects.get_for_model(queryset.model)
for entry in queryset:
LogEntry.objects.log_action(
user_id=request.user.id,
content_type_id=ct.pk,
object_id=entry.pk,
object_repr=entry.username,
action_flag=CHANGE,
change_message=_('Enabled'),
)
messages.add_message(request, messages.INFO, '%d enabled' % queryset.count())
示例24
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例25
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例26
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例27
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例28
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例29
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
示例30
def post(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed he
wishes to import)
"""
resource = self.get_import_resource_class()(**self.get_import_resource_kwargs(request, *args, **kwargs))
confirm_form = ConfirmImportForm(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user)
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
success_message = str(_(u'Import finished')) + ' , ' + str(_(u'Add')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_NEW] + ' , ' + str(_(u'Update')) + ' : %d' % result.totals[
RowResult.IMPORT_TYPE_UPDATE]
messages.success(request, success_message)
tmp_storage.remove()
post_import.send(sender=None, model=self.model)
model_info = (self.opts.app_label, self.opts.model_name)
url = reverse('xadmin:%s_%s_changelist' % model_info,
current_app=self.admin_site.name)
return HttpResponseRedirect(url)