Source code for slam_domain.models

"""
This module provide model for domains. There are 2 models
  - Domain: which represent a DNS domain like example.com
  - DomainEntry: which represent a named entry like www.example.com
"""
# As we use meta class from django that not require any public method, we disable pylint
# for R0903 (too-few-public-methods)
# As we use django models.Model, pylint fail to find objects method. We must disable pylint
# test E1101 (no-member)
# pylint: disable=E1101,R0903
from django.db import models
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from django.db.utils import IntegrityError

from slam_core.utils import error_message, name_validator

DOMAIN_FIELD = [
    'description',
    'dns_master',
    'contact'
]


[docs]class Domain(models.Model): """ Domain class represent a fqdn domain like example.com - name: immutable name of the domain (example.com) - description: a short description of the domain - dns_master: IP of DNS master (used to push data in production) - contact: a contact email for the domain - creation_date: when domain has been created """ name = models.CharField(max_length=50, unique=True, validators=[name_validator]) description = models.CharField(max_length=120, blank=True, null=True) dns_master = models.GenericIPAddressField() contact = models.EmailField(blank=True, null=True) creation_date = models.DateTimeField(auto_now_add=True, null=True)
[docs] def show(self, key=False, short=False): """ This method return a dict construction of the object. We have 3 types of output, - standard: all information about object it-self, short information about associated objects (like ForeignKey and ManyToManyField) - short: some basic information about object it-self, primary key of associated objects - key: primary key of the object :param short: if set to True, method return a short output :param key: if set to True, method return a key output. It will overwrite short param :return: """ if key: result = { 'name': self.name } elif short: result = { 'name': self.name, 'description': self.description, 'entries_count': DomainEntry.objects.filter(domain=self).exclude(type='PTR').count() } else: result_entries = [] entries = DomainEntry.objects.filter(domain=self).exclude(type='PTR') for entry in entries: result_entries.append(entry.show(key=True)) result = { 'name': self.name, 'description': self.description, 'dns_master': self.dns_master, 'contact': self.contact, 'creation_date': self.creation_date, 'entries': result_entries } return result
[docs] @staticmethod def create(name, args=None): """ A custom way to create a domain. :param name: the DNS name :param args: some optional information about a domain :return: """ try: domain = Domain(name=name) if args is not None: for arg in args: if arg in DOMAIN_FIELD: setattr(domain, arg, args[arg]) domain.full_clean() except (IntegrityError, ValidationError) as err: return error_message('domain', name, err) domain.save() result = domain.show() result['status'] = 'done' return result
[docs] @staticmethod def update(name, args=None): """ A custom method to update a domain. :param name: the name of the domain :param args: field that must be updated :return: """ try: # First, we get the domain domain = Domain.objects.get(name=name) except ObjectDoesNotExist as err: return error_message('domain', name, err) if args is not None: for arg in args: if arg in DOMAIN_FIELD: setattr(domain, arg, args[arg]) try: domain.full_clean() except (ValidationError, IntegrityError) as err: error_message('domain', name, err) domain.save() result = domain.show() result['status'] = 'done' return result
[docs] @staticmethod def remove(name): """ This method is a custom way to delete a domain. As models.Model already have a method called delete(), we must use another name for our method. :param name: domain name :return: """ try: domain = Domain.objects.get(name=name) except ObjectDoesNotExist as err: return error_message('domain', name, err) domain.delete() return { 'domain': name, 'status': 'done' }
[docs] @staticmethod def get(name, short=False): """ A custom way to get a domain. :param name: the name of the domain :param short: Return a short version of the object :return: """ try: domain = Domain.objects.get(name=name) except ObjectDoesNotExist as err: return error_message('domain', name, err) result = domain.show(short=short) return result
[docs] @staticmethod def search(filters=None): """ This is a custom way to get all domains that match the filters :param filters: a dict of field / regex :return: """ if filters is None: domains = Domain.objects.all() else: # We suppose filters as been construct outside models class. domains = Domain.objects.filter(**filters) result = [] for domain in domains: result.append(domain.show(short=True)) return result
[docs]class DomainEntry(models.Model): """ Domain entry is a name in domain like www.example.com - name: the name of the entry - domain: the domain associated (fqdn is name.domain) - type: the DNS entry type (A, CNAME, NS, ...). AAAA entries are marked as A type - entries: In some cases (CNAME, NS, ...) entry refered to another entry - description: a short description of the entry - creation_date: when entry as been created """ name = models.CharField(max_length=50, validators=[name_validator]) domain = models.ForeignKey(Domain, on_delete=models.PROTECT) type = models.CharField(max_length=5, default='A') entries = models.ManyToManyField('self') description = models.CharField(max_length=150, blank=True, default='', null=True) creation_date = models.DateField(auto_now_add=True, null=True) class Meta: """ DomainEntry is unique for a specific domain (ie we only have one www.example.com but we can have www.example.org) """ unique_together = ('name', 'domain', 'type')
[docs] def show(self, key=False, short=False): """ :param key: :param short: :return: """ result_entries = [] result_addresses = [] if key: for sub_entry in self.entries.all(): result_entries.append({ 'name': '{}.{} ({})'.format(sub_entry.name, sub_entry.domain.name, sub_entry.type) }) for address in self.address_set.all(): result_addresses.append(address.show(key=True)) result = { 'name': self.name, 'domain': self.domain.show(key=True), 'type': self.type, 'entries': result_entries, 'addresses': result_addresses } elif short: for sub_entry in self.entries.all(): result_entries.append(sub_entry.show(key=True)) for address in self.address_set.all(): result_addresses.append(address.show(key=True)) result = { 'name': self.name, 'domain': self.domain.show(key=True), 'type': self.type, 'description': self.description, 'creation_date': self.creation_date, 'entries': result_entries, 'addresses': result_addresses } else: for sub_entry in self.entries.all(): result_entries.append(sub_entry.show(short=True)) for address in self.address_set.all(): result_addresses.append(address.show(key=True)) result = { 'name': self.name, 'domain': self.domain.show(short=True), 'type': self.type, 'description': self.description, 'creation_date': self.creation_date, 'entries': result_entries, 'addresses': result_addresses } return result
[docs] @staticmethod def create(name, domain, ns_type='A', sub_entry=None, description=None): """ This method is a custom way to create NS entry. :param name: the name of the entry :param domain: the domain :param ns_type: the NS type of the entry :param sub_entry: :param description: A short description of the entry :return: """ sub_entry_obj = None if ' ' in name: return error_message('domain', name, 'Space not allowed in name') try: # First, we get the domain entry_domain = Domain.objects.get(name=domain) except ObjectDoesNotExist as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) if sub_entry is not None: try: sub_entry_domain = Domain.objects.get(name=sub_entry['domain']) sub_entry_obj = DomainEntry.objects.get(name=sub_entry['name'], domain=sub_entry_domain, type=sub_entry['type']) except ObjectDoesNotExist as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) # We must check # - if it s a A record, no similar CNAME # - if it s a CNAME, no similar A record if ns_type == 'A': try: DomainEntry.objects.get(name=name, domain=entry_domain, type='CNAME') return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), 'A similar CNAME record exist !') except ObjectDoesNotExist: pass elif ns_type == 'CNAME': try: DomainEntry.objects.get(name=name, domain=entry_domain, type='A') return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), 'A similar A record exist !') except ObjectDoesNotExist: pass try: entry = DomainEntry(name=name, domain=entry_domain, type=ns_type, description=description) entry.full_clean() except (IntegrityError, ValidationError) as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) entry.save() if sub_entry_obj is not None: entry.entries.add(sub_entry_obj) return { 'entry': '{}.{} {}'.format(name, domain, ns_type), 'status': 'done' }
[docs] @staticmethod def update(name, domain, ns_type='A', sub_entry=None, description=None): """ :param name: :param domain: :param ns_type: :param sub_entry: :param description: :return: """ try: entry = DomainEntry(name=name, domain=domain, type=ns_type) except ObjectDoesNotExist as err: error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) if description is not None: entry.description = description if sub_entry is not None: if sub_entry['name']: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), 'Space not allowed in name') sub_entry_obj = DomainEntry(name=sub_entry['name'], domain=sub_entry['domain'], type=sub_entry['type']) sub_entry_obj.full_clean() sub_entry_obj.save() entry.entries.add(sub_entry_obj) return { 'entry': '{}.{} {}'.format(name, domain, ns_type), 'status': 'done' }
[docs] @staticmethod def exclude(name, domain, ns_type='A', sub_entry=None): """ :param name: :param domain: :param ns_type: :param sub_entry: :return: """ try: entry = DomainEntry(name=name, domain=domain, type=ns_type) except ObjectDoesNotExist as err: error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) if sub_entry is not None: sub_entry_obj = DomainEntry.get(name=sub_entry['name'], domain=sub_entry['domain'], type=sub_entry['type']) entry.entries.remove(sub_entry_obj) return { 'entry': '{}.{} {}'.format(name, domain, ns_type), 'status': 'done' }
[docs] @staticmethod def remove(name, domain, ns_type='A'): """ A custom way to delete a entry. As a entry is unique from name/domain/ns_type, we need to have all this information to delete the right entry. :param name: name of entry :param domain: domain of entry :param ns_type: NS type of entry :return: """ try: entry_domain = Domain.objects.get(name=domain) entry = DomainEntry.objects.get(name=name, domain=entry_domain, type=ns_type) except ObjectDoesNotExist as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) entry.delete() return { 'entry': '{}.{} {}'.format(name, domain, ns_type), 'status': 'done' }
[docs] @staticmethod def get(name, domain, ns_type='A'): """ A custom way to get a entry :param name: name of the entry :param domain: domain of the entry :param ns_type: NS type of the entry :return: """ try: domain_entry = Domain.objects.get(name=domain) except ObjectDoesNotExist as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) try: entry = DomainEntry.objects.get(name=name, domain=domain_entry, type=ns_type) except ObjectDoesNotExist as err: return error_message('entry', '{}.{} {}'.format(name, domain, ns_type), err) return entry.show()
[docs] @staticmethod def search(filters=None): """ This is a custom method to get all entries :param filters: the filter we will use :return: """ result = [] if filters is None: entries = DomainEntry.objects.all() else: entries = DomainEntry.objects.filter(**filters) for entry in entries: result.append(entry.show(short=True)) return result