"""
This module provide model for networks. There are 2 models
- Network: which represent a IPv6 or IPv4 network
- Address: which represent a IPv6 or IPv5
As we use django models.Model, pylint fail to find objects method. We must disable pylint
test E1101 (no-member)
"""
# We need to remove C0103 form pylint as ip is not reconnized as a valid snake cas naming.
# pylint: disable=E1101, C0103
import ipaddress
from django.db import models
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from django.db.utils import IntegrityError
from slam_core.utils import error_message, name_validator
from slam_domain.models import DomainEntry, Domain
from slam_network.exceptions import NetworkFull
[docs]class Network(models.Model):
"""
Network class represent a IPv4 or IPv6 network
- name: The human reading name of the network
- description: A short description of the network
- address: network address (192.168.0.0)
- prefix: network prefix (/24)
- gateway: the IP of the network gateway
- dns_master: The IP of DNS master for reverse resolution (used to push data in production)
- contact: a contact email for the network
- dhcp: the IP of DHCP server (used to push data in production)
- freeradius: the IP of freeradius server (used to push data in production)
- vlan: the VLAN id of the network
"""
name = models.CharField(max_length=50, unique=True, validators=[name_validator])
ip = models.GenericIPAddressField(unique=True)
prefix = models.IntegerField(default=24)
description = models.CharField(max_length=150, default='')
gateway = models.GenericIPAddressField(blank=True, null=True)
dns_master = models.GenericIPAddressField(blank=True, null=True)
dhcp = models.GenericIPAddressField(blank=True, null=True)
radius = models.GenericIPAddressField(blank=True, null=True)
vlan = models.IntegerField(default=1)
contact = models.EmailField(blank=True, null=True)
[docs] def show(self, key=False, short=False):
"""
This method return a dict construction of the object. We have 3 types of output,
- standard: all information about object it-self, short information about associated
objects (like ForeignKey and ManyToManyField)
- short: some basic information about object it-self, primary key of associated objects
- key: primary key of the object
:param short: if set to True, method return a short output
:param key: if set to True, method return a key output. It will overwrite short param
:return:
"""
if key:
result = {
'name': self.name
}
elif short:
addresses_used = len(self.addresses())
addresses_total = ipaddress.ip_network('{}/{}'.format(self.ip,
self.prefix)).num_addresses
result = {
'name': self.name,
'address': self.ip,
'prefix': self.prefix,
'version': ipaddress.ip_address(self.ip).version,
'description': self.description,
'used_addresses': addresses_used,
'total': addresses_total
}
else:
result_addresses = []
for address in self.addresses():
result_addresses.append(address.show(short=True))
addresses_used = len(self.addresses())
addresses_total = ipaddress.ip_network('{}/{}'.format(self.ip,
self.prefix)).num_addresses
result = {
'name': self.name,
'address': self.ip,
'prefix': self.prefix,
'version': ipaddress.ip_address(self.ip).version,
'description': self.description,
'gateway': self.gateway,
'dns_master': self.dns_master,
'dhcp': self.dhcp,
'radius': self.radius,
'vlan': self.vlan,
'contact': self.contact,
'used_addresses': addresses_used,
'total': addresses_total,
'addresses': result_addresses
}
return result
[docs] def is_include(self, ip):
"""
This method check if ip is included on a network
:param ip: IP address
:return:
"""
address = ipaddress.ip_address(ip)
network = ipaddress.ip_network('{}/{}'.format(self.ip, self.prefix))
if address.version == network.version and address in network:
return True
return False
[docs] def addresses(self):
"""
This method return all addresses which are in the current network.
:return:
"""
result = self.address_set.all()
return result
[docs] def get_free_ip(self):
"""
:return:
"""
network = ipaddress.ip_network('{}/{}'.format(self.ip, self.prefix))
addresses = []
for address in self.addresses():
addresses.append(ipaddress.ip_address(address.ip))
for result_address in network.hosts():
if result_address not in addresses:
return result_address
raise NetworkFull()
[docs] def version(self):
"""
:return:
"""
return ipaddress.ip_network('{}/{}'.format(self.ip, self.prefix)).version
[docs] @staticmethod
def create(name, address, prefix, description='A short description', gateway=None,
dns_master=None, dhcp=None, radius=None, vlan=1, contact=None):
# pylint: disable=R0913
"""
This is a custom way to create a network
:param name: human reading name of the network
:param address: IPv4 or IPv6 network address
:param prefix: network prefix
:param description: A short description of the network
:param gateway: IP of the network gateway
:param dns_master: IP of DNS master
:param dhcp: IP of DHCP server
:param vlan: VLAN id of the network
:param contact: a contact email for the network
:return:
"""
try:
network = Network(name=name, ip=address, prefix=prefix, description=description,
gateway=gateway, dns_master=dns_master, dhcp=dhcp, vlan=vlan,
contact=contact, radius=radius)
network.full_clean()
except (IntegrityError, ValidationError) as err: # In case network already exist
return error_message('network', name, err)
network.full_clean()
network.save()
return {
'network': network.name,
'status': 'done'
}
[docs] @staticmethod
def update(name, description=None, gateway=None, dns_master=None, dhcp=None, vlan=None,
contact=None, radius=None):
# pylint: disable=R0913
"""
This is a custom method to update value on a existing network
:param name: human reading name of the network
:param description: A short description of the network
:param gateway: The IP of the gateway
:param dns_master: The IP of DNS master
:param dhcp: The IP of DHCP server
:param vlan: The VLAN id
:param contact: a contact email for the network
:return:
"""
try:
network = Network.objects.get(name=name)
except ObjectDoesNotExist as err:
return error_message('network', name, err)
if description is not None:
network.description = description
if gateway is not None:
network.gateway = gateway
if dns_master is not None:
network.dns_master = dns_master
if dhcp is not None:
network.dhcp = dhcp
if radius is not None:
network.radius = radius
if vlan is not None:
network.vlan = vlan
if contact is not None:
network.contact = contact
try:
network.full_clean()
except ValidationError as err:
return error_message('network', name, err)
network.save()
return {
'network': name,
'status': 'done'
}
[docs] @staticmethod
def remove(name):
"""
This is a custom method to delete a network. As delete is already used by models.Model,
we should call it with another name
:param name: name of network we want delete
:return:
"""
try:
network = Network.objects.get(name=name)
network.delete()
except (ObjectDoesNotExist, IntegrityError) as err:
return error_message('network', name, err)
return {
'network': name,
'status': 'done'
}
[docs] @staticmethod
def get(name):
"""
This is a custom method to get all information for a network
:param name: name of the network
:return:
"""
try:
network = Network.objects.get(name=name)
except ObjectDoesNotExist as err:
return error_message('network', name, err)
result = network.show()
return result
[docs] @staticmethod
def search(filters=None):
"""
This is a custom method to get all networks that match the filters
:param filters: a dict of field / regex
:return:
"""
if filters is None:
networks = Network.objects.all()
else:
networks = Network.objects.filter(**filters)
result = []
for network in networks:
result.append(network.show(short=True))
return result
[docs]class Address(models.Model):
"""
Address class represent a specific address on a network.
- ip: IPv4 or IPv6 address
- ns_entries: all other NS entries for this IP (CNAME, A, ...)
"""
ip = models.GenericIPAddressField(unique=True)
ns_entries = models.ManyToManyField(DomainEntry)
creation_date = models.DateTimeField(auto_now_add=True, null=True)
network = models.ForeignKey(Network, on_delete=models.PROTECT)
[docs] def show(self, key=False, short=True):
"""
:param key:
:param short:
:return:
"""
if key:
result = {
'ip': self.ip,
}
elif short:
result_entries = []
for entry in self.ns_entries.all():
result_entries.append(entry.show(key=True))
result = {
'ip': self.ip,
'ns_entries': result_entries,
'creation_date': self.creation_date,
'network': self.network.show(key=True)
}
else:
result_entries = []
for entry in self.ns_entries.all():
result_entries.append(entry.show(short=True))
result = {
'ip': self.ip,
'ns_entries': result_entries,
'creation_date': self.creation_date,
# 'network': self.network.show(short=True)
}
return result
[docs] def version(self):
"""
:return:
"""
return ipaddress.ip_address(self.ip).version
[docs] @staticmethod
def create(ip, network, ns_entry=None):
"""
This is a custom method to create a Address.
:param ip:
:param network:
:param ns_entry:
:return:
"""
try:
try:
network_address = Network.objects.get(name=network)
except ObjectDoesNotExist as err:
return error_message('address', ip, err)
if network_address is not None and not network_address.is_include(ip):
return error_message('address', ip, 'Address {} not in Network {}/{}'.format(
ip, network_address.address, network_address.prefix))
address = Address(ip=ip, network=network_address)
address.full_clean()
except (IntegrityError, ValueError, ValidationError) as err:
return error_message('address', ip, err)
address.save()
if ns_entry is not None:
try:
domain = Domain.objects.get(name=ns_entry['domain'])
except ObjectDoesNotExist as err:
return error_message('address', ip, err)
try:
entry = DomainEntry.objects.get(name=ns_entry['name'], domain=domain, type='A')
except ObjectDoesNotExist as err:
# If NS entry not exist, we create it.
result = DomainEntry.create(name=ns_entry['name'], domain=ns_entry['domain'])
if result['status'] != 'done':
return error_message('address', ip, result['message'])
entry = DomainEntry.objects.get(name=ns_entry['name'], domain=domain, type='A')
try:
entry_ptr = DomainEntry.objects.get(name=ns_entry['name'], domain=domain,
type='PTR')
except ObjectDoesNotExist:
result = DomainEntry.create(name=ns_entry['name'], domain=ns_entry['domain'],
ns_type='PTR')
if result['status'] != 'done':
return result
entry_ptr = DomainEntry.objects.get(name=ns_entry['name'], domain=domain,
type='PTR')
address.ns_entries.add(entry)
address.ns_entries.add(entry_ptr)
return {
'address': address.ip,
'status': 'done'
}
[docs] @staticmethod
def include(ip, network, ns_entry, ns_type='A'):
"""
This is a custom method to add a entry in a address
:param ip: IP address
:param network: network
:param ns_entry: NS entry
:param ns_type: NS entry type
:return:
"""
fqdn = ns_entry.split('.', 1)
ns = fqdn[0]
domain = fqdn[1]
try:
network_entry = Network.objects.get(name=network)
if network_entry is not None and not network_entry.is_include(ip):
return error_message('entry', ns_entry, 'Address {} not in Network {}/{}'.format(
ip, network_entry.address, network_entry.prefix))
address_entry = Address.objects.get(ip=ip)
domain_entry = Domain.objects.get(name=domain)
ns_entry_obj = DomainEntry.objects.get(name=ns, domain=domain_entry, type=ns_type)
if ns_type == 'PTR' and len(ns_entry_obj.address_set.all()) != 0:
return error_message('entry', ip, 'PTR record is used')
except ObjectDoesNotExist as err:
return error_message('entry', ns_entry, err)
address_entry.ns_entries.add(ns_entry_obj)
return {
'entry': ns_entry,
'status': 'done'
}
[docs] @staticmethod
def exclude(ip, network, ns_entry, ns_type='A'):
"""
This is a custom method to remove a NS entry from address
:param ip: IP address
:param network: network
:param ns_entry: NS entry
:param ns_type: NS type
:return:
"""
fqdn = ns_entry.split('.', 1)
ns = fqdn[0]
domain_entry = fqdn[1]
try:
address_entry = Address.objects.get(ip=ip)
domain_entry = Domain.objects.get(name=domain_entry)
ns_entry_entry = DomainEntry.objects.get(name=ns, domain=domain_entry, type=ns_type)
except ObjectDoesNotExist as err:
return error_message('entry', ns_entry, err)
address_entry.ns_entries.remove(ns_entry_entry)
return {
'entry': ns_entry,
'status': 'done'
}
[docs] @staticmethod
def remove(ip, network, ns_entry=True):
"""
This is a custom method to delete Address
:param ip: The IP address we will delete
:param network: The network name
:param ns_entry: If true, we also remove PTR and A resolution name (default True)
:return:
"""
try:
try:
network_address = Network.objects.get(name=network)
except ObjectDoesNotExist:
network_address = None
if network_address is not None and not network_address.is_include(ip):
return error_message('address', ip, 'Address {} not in Network {}/{}'.format(
ip, network_address.address, network_address.prefix))
address = Address.objects.get(ip=ip)
try:
entry_ptr = address.ns_entries.get(type='PTR')
except ObjectDoesNotExist:
entry_ptr = None
try:
entry_a = address.ns_entries.get(type='A')
except ObjectDoesNotExist:
entry_a = None
address.delete()
except (ObjectDoesNotExist, IntegrityError) as err:
return error_message('address', ip, err)
if ns_entry:
if entry_ptr is not None:
try:
if len(entry_ptr.address_set.all()) == 0:
# We only delete PTR if no other address use it. (ie it s a orphan entry)
entry_ptr.delete()
except (IntegrityError, ObjectDoesNotExist) as err:
return error_message('address', ip, err)
if entry_a is not None:
try:
if len(entry_a.address_set.all()) == 0:
# We only delete A if no other address use it. (ie it s a orphan entry)
entry_a.delete()
except (IntegrityError, ObjectDoesNotExist) as err:
return error_message('address', ip, err)
return {
'address': ip,
'status': 'done'
}
[docs] @staticmethod
def get(ip, network):
"""
This is a custom method to get information about a address
:param ip: IP address
:param network: Network
:return:
"""
try:
network = Network.objects.get(name=network)
except ObjectDoesNotExist as err:
network = None
try:
address = Address.objects.get(ip=ip)
except ObjectDoesNotExist as err:
return error_message('address', ip, err)
result = address.show()
return result
[docs] @staticmethod
def search(filters=None):
"""
This is a custom method to get all networks that match the filters
:param filters: a dict of field / regex
:return:
"""
if filters is None:
addresses = Address.objects.all()
else:
addresses = Address.objects.filter(**filters)
result = []
for address in addresses:
result.append(address.show(short=True))
return result
[docs] @staticmethod
def match_network(ip):
"""
This method return the network associated with the address
:return:
"""
networks = Network.objects.all()
for network in networks:
if network.is_include(ip):
return network
return None