blob: 4b35c111f834df31aaec9f218e75fa8970a460f5 [file] [log] [blame]
import logging
from urllib.parse import urlencode
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db.transaction import atomic
from django.template import Context
from django.urls import reverse
from rest_framework import serializers
from django_airavata.apps.auth import iam_admin_client
from . import models, utils
logger = logging.getLogger(__name__)
class PendingEmailChangeSerializer(serializers.ModelSerializer):
class Meta:
model = models.PendingEmailChange
fields = ['email_address', 'created_date']
class UserSerializer(serializers.ModelSerializer):
pending_email_change = serializers.SerializerMethodField()
complete = serializers.SerializerMethodField()
username_valid = serializers.SerializerMethodField()
ext_user_profile_valid = serializers.SerializerMethodField()
class Meta:
model = get_user_model()
fields = ['id', 'username', 'first_name', 'last_name', 'email',
'pending_email_change', 'complete', 'username_valid',
'ext_user_profile_valid']
read_only_fields = ('username',)
def get_pending_email_change(self, instance):
request = self.context['request']
pending_email_change = models.PendingEmailChange.objects.filter(user=request.user, verified=False).first()
if pending_email_change is not None:
serializer = PendingEmailChangeSerializer(instance=pending_email_change, context=self.context)
return serializer.data
else:
return None
def get_complete(self, instance):
return instance.user_profile.is_complete
def get_username_valid(self, instance):
return instance.user_profile.is_username_valid
def get_ext_user_profile_valid(self, instance):
return instance.user_profile.is_ext_user_profile_valid
@atomic
def update(self, instance, validated_data):
request = self.context['request']
instance.first_name = validated_data['first_name']
instance.last_name = validated_data['last_name']
if instance.email != validated_data['email']:
# Delete any unverified pending email changes
models.PendingEmailChange.objects.filter(user=request.user, verified=False).delete()
# Email doesn't get updated until it is verified. Create a pending
# email change record in the meantime
pending_email_change = models.PendingEmailChange.objects.create(user=request.user, email_address=validated_data['email'])
self._send_email_verification_link(request, pending_email_change)
instance.save()
# save in the user profile service too
user_profile_client = request.profile_service['user_profile']
# update the Airavata profile if it exists
if user_profile_client.doesUserExist(request.authz_token,
request.user.username,
settings.GATEWAY_ID):
airavata_user_profile = user_profile_client.getUserProfileById(
request.authz_token, request.user.username, settings.GATEWAY_ID)
airavata_user_profile.firstName = instance.first_name
airavata_user_profile.lastName = instance.last_name
user_profile_client.updateUserProfile(request.authz_token, airavata_user_profile)
# otherwise, update in Keycloak user store
else:
iam_admin_client.update_user(request.user.username,
first_name=instance.first_name,
last_name=instance.last_name)
return instance
def _send_email_verification_link(self, request, pending_email_change):
verification_uri = (
request.build_absolute_uri(reverse('django_airavata_auth:user_profile')) +
'?' + urlencode({"code": pending_email_change.verification_code}))
logger.debug(
"verification_uri={}".format(verification_uri))
context = Context({
"username": pending_email_change.user.username,
"email": pending_email_change.email_address,
"first_name": pending_email_change.user.first_name,
"last_name": pending_email_change.user.last_name,
"portal_title": settings.PORTAL_TITLE,
"url": verification_uri,
})
utils.send_email_to_user(models.VERIFY_EMAIL_CHANGE_TEMPLATE, context)
class ExtendedUserProfileFieldChoiceSerializer(serializers.Serializer):
id = serializers.IntegerField(required=False)
display_text = serializers.CharField()
order = serializers.IntegerField()
class ExtendedUserProfileFieldLinkSerializer(serializers.Serializer):
id = serializers.IntegerField(required=False)
label = serializers.CharField()
url = serializers.URLField()
order = serializers.IntegerField()
display_link = serializers.BooleanField(default=True)
display_inline = serializers.BooleanField(default=False)
class ExtendedUserProfileFieldSerializer(serializers.ModelSerializer):
field_type = serializers.ChoiceField(choices=["text", "single_choice", "multi_choice", "user_agreement"])
other = serializers.BooleanField(required=False)
choices = ExtendedUserProfileFieldChoiceSerializer(required=False, many=True)
checkbox_label = serializers.CharField(allow_blank=True, required=False)
links = ExtendedUserProfileFieldLinkSerializer(required=False, many=True)
userHasWriteAccess = serializers.SerializerMethodField()
class Meta:
model = models.ExtendedUserProfileField
fields = ['id', 'name', 'help_text', 'order', 'created_date',
'updated_date', 'field_type', 'other', 'choices', 'checkbox_label', 'links', 'required', 'userHasWriteAccess']
read_only_fields = ('created_date', 'updated_date')
def to_representation(self, instance):
result = super().to_representation(instance)
if instance.field_type == 'single_choice':
result['other'] = instance.single_choice.other
result['choices'] = ExtendedUserProfileFieldChoiceSerializer(instance.single_choice.choices.filter(deleted=False).order_by('order'), many=True).data
if instance.field_type == 'multi_choice':
result['other'] = instance.multi_choice.other
result['choices'] = ExtendedUserProfileFieldChoiceSerializer(instance.multi_choice.choices.filter(deleted=False).order_by('order'), many=True).data
if instance.field_type == 'user_agreement':
result['checkbox_label'] = instance.user_agreement.checkbox_label
result['links'] = ExtendedUserProfileFieldLinkSerializer(instance.links.order_by('order'), many=True).data
return result
def create(self, validated_data):
field_type = validated_data.pop('field_type')
other = validated_data.pop('other', False)
choices = validated_data.pop('choices', [])
checkbox_label = validated_data.pop('checkbox_label', '')
links = validated_data.pop('links', [])
if field_type == 'text':
instance = models.ExtendedUserProfileTextField.objects.create(**validated_data)
elif field_type == 'single_choice':
instance = models.ExtendedUserProfileSingleChoiceField.objects.create(**validated_data, other=other)
# add choices
for choice in choices:
choice.pop('id', None)
instance.choices.create(**choice)
elif field_type == 'multi_choice':
instance = models.ExtendedUserProfileMultiChoiceField.objects.create(**validated_data, other=other)
# add choices
for choice in choices:
choice.pop('id', None)
instance.choices.create(**choice)
elif field_type == 'user_agreement':
instance = models.ExtendedUserProfileAgreementField.objects.create(**validated_data, checkbox_label=checkbox_label)
else:
raise Exception(f"Unrecognized field type: {field_type}")
# create links
for link in links:
link.pop('id', None)
instance.links.create(**link)
return instance
def update(self, instance, validated_data):
instance.name = validated_data['name']
instance.help_text = validated_data['help_text']
instance.order = validated_data['order']
instance.required = validated_data.get('required', instance.required)
# logger.debug(f"instance.field_type={instance.field_type}, validated_data={validated_data}")
if instance.field_type == 'single_choice':
instance.single_choice.other = validated_data.get('other', instance.single_choice.other)
choices = validated_data.pop('choices', None)
if choices:
choice_ids = [choice['id'] for choice in choices if 'id' in choice]
# Soft delete any choices that are not in the list
instance.single_choice.choices.exclude(id__in=choice_ids).update(deleted=True)
for choice in choices:
choice_id = choice.pop('id', None)
models.ExtendedUserProfileSingleChoiceFieldChoice.objects.update_or_create(
id=choice_id,
defaults={**choice, "single_choice_field": instance.single_choice},
)
instance.single_choice.save()
elif instance.field_type == 'multi_choice':
instance.multi_choice.other = validated_data.get('other', instance.multi_choice.other)
choices = validated_data.pop('choices', None)
if choices:
choice_ids = [choice['id'] for choice in choices if 'id' in choice]
# Soft delete any choices that are not in the list
instance.multi_choice.choices.exclude(id__in=choice_ids).update(deleted=True)
for choice in choices:
choice_id = choice.pop('id', None)
models.ExtendedUserProfileMultiChoiceFieldChoice.objects.update_or_create(
id=choice_id,
defaults={**choice, "multi_choice_field": instance.multi_choice},
)
instance.multi_choice.save()
elif instance.field_type == 'user_agreement':
instance.user_agreement.checkbox_label = validated_data.pop('checkbox_label', instance.user_agreement.checkbox_label)
instance.user_agreement.save()
# update links
links = validated_data.pop('links', [])
link_ids = [link['id'] for link in links if 'id' in link]
instance.links.exclude(id__in=link_ids).delete()
for link in links:
link_id = link.pop('id', None)
link['field'] = instance
models.ExtendedUserProfileFieldLink.objects.update_or_create(
id=link_id,
defaults=link,
)
instance.save()
return instance
def get_userHasWriteAccess(self, extendedUserProfileField):
request = self.context['request']
return request.is_gateway_admin
class ExtendedUserProfileValueSerializer(serializers.ModelSerializer):
id = serializers.IntegerField(label='ID', required=False)
text_value = serializers.CharField(required=False, allow_blank=True)
# choices must be write_only so that DRF ignores trying to deserialized this related field
# deserialization is handled explicitly in to_representation, see below
choices = serializers.ListField(child=serializers.IntegerField(), required=False, write_only=True)
other_value = serializers.CharField(required=False, allow_blank=True)
agreement_value = serializers.BooleanField(required=False)
class Meta:
model = models.ExtendedUserProfileValue
fields = ['id', 'value_type', 'ext_user_profile_field', 'text_value',
'choices', 'other_value', 'agreement_value', 'valid', 'value_display']
read_only_fields = ['value_type', 'value_display']
def to_representation(self, instance):
result = super().to_representation(instance)
if instance.value_type == 'text':
result['text_value'] = instance.text.text_value
elif instance.value_type == 'single_choice':
choices = []
if instance.single_choice.choice is not None:
choices.append(instance.single_choice.choice)
result['choices'] = choices
result['other_value'] = instance.single_choice.other_value
elif instance.value_type == 'multi_choice':
result['choices'] = list(map(lambda c: c.value, instance.multi_choice.choices.all()))
result['other_value'] = instance.multi_choice.other_value
elif instance.value_type == 'user_agreement':
result['agreement_value'] = instance.user_agreement.agreement_value
return result
def create(self, validated_data):
request = self.context['request']
user = request.user
user_profile = user.user_profile
# Support create/update in the many=True situation. When many=True and
# .save() is called, .create() will be called on each value. Here we
# need to see if there is an id and if so call .update() instead.
if "id" in validated_data:
instance = models.ExtendedUserProfileValue.objects.get(id=validated_data["id"])
return self.update(instance, validated_data)
ext_user_profile_field = validated_data.pop('ext_user_profile_field')
if ext_user_profile_field.field_type == 'text':
text_value = validated_data.pop('text_value')
return models.ExtendedUserProfileTextValue.objects.create(
ext_user_profile_field=ext_user_profile_field,
user_profile=user_profile,
text_value=text_value)
elif ext_user_profile_field.field_type == 'single_choice':
choices = validated_data.pop('choices', [])
choice = choices[0] if len(choices) > 0 else None
other_value = validated_data.pop('other_value', '')
return models.ExtendedUserProfileSingleChoiceValue.objects.create(
ext_user_profile_field=ext_user_profile_field,
user_profile=user_profile,
choice=choice,
other_value=other_value,
)
elif ext_user_profile_field.field_type == 'multi_choice':
choices = validated_data.pop('choices', [])
other_value = validated_data.pop('other_value', '')
value = models.ExtendedUserProfileMultiChoiceValue.objects.create(
ext_user_profile_field=ext_user_profile_field,
user_profile=user_profile,
other_value=other_value,
)
for choice in choices:
models.ExtendedUserProfileMultiChoiceValueChoice.objects.create(
value=choice,
multi_choice_value=value
)
return value
elif ext_user_profile_field.field_type == 'user_agreement':
agreement_value = validated_data.get('agreement_value')
return models.ExtendedUserProfileAgreementValue.objects.create(
ext_user_profile_field=ext_user_profile_field,
user_profile=user_profile,
agreement_value=agreement_value
)
def update(self, instance, validated_data):
if instance.value_type == 'text':
text_value = validated_data.pop('text_value')
instance.text.text_value = text_value
instance.text.save()
elif instance.value_type == 'single_choice':
choices = validated_data.pop('choices', [])
choice = choices[0] if len(choices) > 0 else None
other_value = validated_data.pop('other_value', '')
instance.single_choice.choice = choice
instance.single_choice.other_value = other_value
instance.single_choice.save()
elif instance.value_type == 'multi_choice':
choices = validated_data.pop('choices', [])
other_value = validated_data.pop('other_value', '')
# Delete any that are no longer in the set
instance.multi_choice.choices.exclude(value__in=choices).delete()
# Create records as needed for new entries
for choice in choices:
models.ExtendedUserProfileMultiChoiceValueChoice.objects.update_or_create(
value=choice, multi_choice_value=instance.multi_choice)
instance.multi_choice.other_value = other_value
instance.multi_choice.save()
elif instance.value_type == 'user_agreement':
agreement_value = validated_data.pop('agreement_value')
instance.user_agreement.agreement_value = agreement_value
instance.user_agreement.save()
instance.save()
return instance
def validate(self, attrs):
ext_user_profile_field = attrs['ext_user_profile_field']
# validate that id_value is only provided for choice fields, and 'text_value' only for the others
if ext_user_profile_field.field_type == 'single_choice':
choices = attrs.get('choices', [])
other_value = attrs.get('other_value', '')
# Check that choices are valid
for choice in choices:
if not ext_user_profile_field.single_choice.choices.filter(id=choice, deleted=False).exists():
raise serializers.ValidationError({'choices': 'Invalid choice.'})
if len(choices) > 1:
raise serializers.ValidationError({'choices': "Must specify only a single choice."})
if len(choices) == 1 and other_value != '':
raise serializers.ValidationError("Must specify only a single choice or the other choice, but not both.")
if len(choices) == 0 and other_value == '':
raise serializers.ValidationError("Must specify one of a single choice or the other choice (but not both).")
elif ext_user_profile_field.field_type == 'multi_choice':
choices = attrs.get('choices', [])
other_value = attrs.get('other_value', '')
# Check that choices are valid
for choice in choices:
if not ext_user_profile_field.multi_choice.choices.filter(id=choice, deleted=False).exists():
raise serializers.ValidationError({'choices': 'Invalid choice.'})
return attrs