Source code for flask_restplus.inputs

# -*- coding: utf-8 -*-
'''
This module provide some helpers for advanced types parsing.

You can define you own parser using the same pattern:

.. code-block:: python

    def my_type(value):
        if not condition:
            raise ValueError('This is not my type')
        return parse(value)

    # Swagger documntation
    my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'}

The last line allows you to document properly the type in the Swagger documentation.
'''
from __future__ import unicode_literals

import re
import socket

from datetime import datetime, time, timedelta
from email.utils import parsedate_tz, mktime_tz
from six.moves.urllib.parse import urlparse

import aniso8601
import pytz

# Constants for upgrading date-based intervals to full datetimes.
START_OF_DAY = time(0, 0, 0, tzinfo=pytz.UTC)
END_OF_DAY = time(23, 59, 59, 999999, tzinfo=pytz.UTC)


netloc_regex = re.compile(
    r'(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?'  # basic auth
    r'(?:'
    r'(?P<localhost>localhost)|'  # localhost...
    r'(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|'  # ...or ipv4
    r'(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|'  # ...or ipv6
    r'(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))'  # domain...
    r')'
    r'(?::(?P<port>\d+))?'  # optional port
    r'$', re.IGNORECASE)


email_regex = re.compile(
    r'^'
    '(?P<local>[^@]*[^@.])'
    r'@'
    r'(?P<server>[^@]+(?:\.[^@]+)*)'
    r'$', re.IGNORECASE)

time_regex = re.compile(r'\d{2}:\d{2}')


[docs]def ipv4(value): '''Validate an IPv4 address''' try: socket.inet_aton(value) if value.count('.') == 3: return value except socket.error: pass raise ValueError('{0} is not a valid ipv4 address'.format(value))
ipv4.__schema__ = {'type': 'string', 'format': 'ipv4'}
[docs]def ipv6(value): '''Validate an IPv6 address''' try: socket.inet_pton(socket.AF_INET6, value) return value except socket.error: raise ValueError('{0} is not a valid ipv4 address'.format(value))
ipv6.__schema__ = {'type': 'string', 'format': 'ipv6'}
[docs]def ip(value): '''Validate an IP address (both IPv4 and IPv6)''' try: return ipv4(value) except ValueError: pass try: return ipv6(value) except ValueError: raise ValueError('{0} is not a valid ip'.format(value))
ip.__schema__ = {'type': 'string', 'format': 'ip'}
[docs]class URL(object): ''' Validate an URL. Example:: parser = reqparse.RequestParser() parser.add_argument('url', type=inputs.URL(schemes=['http', 'https'])) Input to the ``URL`` argument will be rejected if it does not match an URL with specified constraints. If ``check`` is True it will also be rejected if the domain does not exists. :param bool check: Check the domain exists (perform a DNS resolution) :param bool ip: Allow IP (both ipv4/ipv6) as domain :param bool local: Allow localhost (both string or ip) as domain :param bool port: Allow a port to be present :param bool auth: Allow authentication to be present :param list|tuple schemes: Restrict valid schemes to this list :param list|tuple domains: Restrict valid domains to this list :param list|tuple exclude: Exclude some domains ''' def __init__(self, check=False, ip=False, local=False, port=False, auth=False, schemes=None, domains=None, exclude=None): self.check = check self.ip = ip self.local = local self.port = port self.auth = auth self.schemes = schemes self.domains = domains self.exclude = exclude def error(self, value, details=None): msg = '{0} is not a valid URL' if details: msg = '. '.join((msg, details)) raise ValueError(msg.format(value)) def __call__(self, value): parsed = urlparse(value) netloc_match = netloc_regex.match(parsed.netloc) if not all((parsed.scheme, parsed.netloc)): if netloc_regex.match(parsed.netloc or parsed.path.split('/', 1)[0].split('?', 1)[0]): self.error(value, 'Did you mean: http://{0}') self.error(value) if parsed.scheme and self.schemes and parsed.scheme not in self.schemes: self.error(value, 'Protocol is not allowed') if not netloc_match: self.error(value) data = netloc_match.groupdict() if data['ipv4'] or data['ipv6']: if not self.ip: self.error(value, 'IP is not allowed') else: try: ip(data['ipv4'] or data['ipv6']) except ValueError as e: self.error(value, str(e)) if not self.local: if data['ipv4'] and data['ipv4'].startswith('127.'): self.error(value, 'Localhost is not allowed') elif data['ipv6'] == '::1': self.error(value, 'Localhost is not allowed') if self.check: pass if data['auth'] and not self.auth: self.error(value, 'Authentication is not allowed') if data['localhost'] and not self.local: self.error(value, 'Localhost is not allowed') if data['port']: if not self.port: self.error(value, 'Custom port is not allowed') else: port = int(data['port']) if not 0 < port < 65535: self.error(value, 'Port is out of range') if data['domain']: if self.domains and data['domain'] not in self.domains: self.error(value, 'Domain is not allowed') elif self.exclude and data['domain'] in self.exclude: self.error(value, 'Domain is not allowed') if self.check: try: socket.getaddrinfo(data['domain'], None) except socket.error: self.error(value, 'Domain does not exists') return value @property def __schema__(self): return { 'type': 'string', 'format': 'url', }
#: Validate an URL #: #: Legacy validator, allows, auth, port, ip and local #: Only allows schemes 'http', 'https', 'ftp' and 'ftps' url = URL(ip=True, auth=True, port=True, local=True, schemes=('http', 'https', 'ftp', 'ftps'))
[docs]class email(object): ''' Validate an email. Example:: parser = reqparse.RequestParser() parser.add_argument('email', type=inputs.email(dns=True)) Input to the ``email`` argument will be rejected if it does not match an email and if domain does not exists. :param bool check: Check the domain exists (perform a DNS resolution) :param bool ip: Allow IP (both ipv4/ipv6) as domain :param bool local: Allow localhost (both string or ip) as domain :param list|tuple domains: Restrict valid domains to this list :param list|tuple exclude: Exclude some domains ''' def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None): self.check = check self.ip = ip self.local = local self.domains = domains self.exclude = exclude def error(self, value, msg=None): msg = msg or '{0} is not a valid email' raise ValueError(msg.format(value)) def is_ip(self, value): try: ip(value) return True except ValueError: return False def __call__(self, value): match = email_regex.match(value) if not match or '..' in value: self.error(value) server = match.group('server') if self.check: try: socket.getaddrinfo(server, None) except socket.error: self.error(value) if self.domains and server not in self.domains: self.error(value, '{0} does not belong to the authorized domains') if self.exclude and server in self.exclude: self.error(value, '{0} belongs to a forbidden domain') if not self.local and (server in ('localhost', '::1') or server.startswith('127.')): self.error(value) if self.is_ip(server) and not self.ip: self.error(value) return value @property def __schema__(self): return { 'type': 'string', 'format': 'email', }
[docs]class regex(object): ''' Validate a string based on a regular expression. Example:: parser = reqparse.RequestParser() parser.add_argument('example', type=inputs.regex('^[0-9]+$')) Input to the ``example`` argument will be rejected if it contains anything but numbers. :param str pattern: The regular expression the input must match ''' def __init__(self, pattern): self.pattern = pattern self.re = re.compile(pattern) def __call__(self, value): if not self.re.search(value): message = 'Value does not match pattern: "{0}"'.format(self.pattern) raise ValueError(message) return value def __deepcopy__(self, memo): return regex(self.pattern) @property def __schema__(self): return { 'type': 'string', 'pattern': self.pattern, }
def _normalize_interval(start, end, value): ''' Normalize datetime intervals. Given a pair of datetime.date or datetime.datetime objects, returns a 2-tuple of tz-aware UTC datetimes spanning the same interval. For datetime.date objects, the returned interval starts at 00:00:00.0 on the first date and ends at 00:00:00.0 on the second. Naive datetimes are upgraded to UTC. Timezone-aware datetimes are normalized to the UTC tzdata. Params: - start: A date or datetime - end: A date or datetime ''' if not isinstance(start, datetime): start = datetime.combine(start, START_OF_DAY) end = datetime.combine(end, START_OF_DAY) if start.tzinfo is None: start = pytz.UTC.localize(start) end = pytz.UTC.localize(end) else: start = start.astimezone(pytz.UTC) end = end.astimezone(pytz.UTC) return start, end def _expand_datetime(start, value): if not isinstance(start, datetime): # Expand a single date object to be the interval spanning # that entire day. end = start + timedelta(days=1) else: # Expand a datetime based on the finest resolution provided # in the original input string. time = value.split('T')[1] time_without_offset = re.sub('[+-].+', '', time) num_separators = time_without_offset.count(':') if num_separators == 0: # Hour resolution end = start + timedelta(hours=1) elif num_separators == 1: # Minute resolution: end = start + timedelta(minutes=1) else: # Second resolution end = start + timedelta(seconds=1) return end def _parse_interval(value): ''' Do some nasty try/except voodoo to get some sort of datetime object(s) out of the string. ''' try: return sorted(aniso8601.parse_interval(value)) except ValueError: try: return aniso8601.parse_datetime(value), None except ValueError: return aniso8601.parse_date(value), None
[docs]def iso8601interval(value, argument='argument'): ''' Parses ISO 8601-formatted datetime intervals into tuples of datetimes. Accepts both a single date(time) or a full interval using either start/end or start/duration notation, with the following behavior: - Intervals are defined as inclusive start, exclusive end - Single datetimes are translated into the interval spanning the largest resolution not specified in the input value, up to the day. - The smallest accepted resolution is 1 second. - All timezones are accepted as values; returned datetimes are localized to UTC. Naive inputs and date inputs will are assumed UTC. Examples:: "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2) "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13) "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28) "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4) "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30) "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12) :param str value: The ISO8601 date time as a string :return: Two UTC datetimes, the start and the end of the specified interval :rtype: A tuple (datetime, datetime) :raises ValueError: if the interval is invalid. ''' if not value: raise ValueError('Expected a valid ISO8601 date/time interval.') try: start, end = _parse_interval(value) if end is None: end = _expand_datetime(start, value) start, end = _normalize_interval(start, end, value) except ValueError: msg = 'Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval.' raise ValueError(msg.format(arg=argument, value=value)) return start, end
iso8601interval.__schema__ = {'type': 'string', 'format': 'iso8601-interval'}
[docs]def date(value): '''Parse a valid looking date in the format YYYY-mm-dd''' date = datetime.strptime(value, "%Y-%m-%d") return date
date.__schema__ = {'type': 'string', 'format': 'date'} def _get_integer(value): try: return int(value) except (TypeError, ValueError): raise ValueError('{0} is not a valid integer'.format(value))
[docs]def natural(value, argument='argument'): '''Restrict input type to the natural numbers (0, 1, 2, 3...)''' value = _get_integer(value) if value < 0: msg = 'Invalid {arg}: {value}. {arg} must be a non-negative integer' raise ValueError(msg.format(arg=argument, value=value)) return value
natural.__schema__ = {'type': 'integer', 'minimum': 0}
[docs]def positive(value, argument='argument'): '''Restrict input type to the positive integers (1, 2, 3...)''' value = _get_integer(value) if value < 1: msg = 'Invalid {arg}: {value}. {arg} must be a positive integer' raise ValueError(msg.format(arg=argument, value=value)) return value
positive.__schema__ = {'type': 'integer', 'minimum': 0, 'exclusiveMinimum': True}
[docs]class int_range(object): '''Restrict input to an integer in a range (inclusive)''' def __init__(self, low, high, argument='argument'): self.low = low self.high = high self.argument = argument def __call__(self, value): value = _get_integer(value) if value < self.low or value > self.high: msg = 'Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}' raise ValueError(msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high)) return value @property def __schema__(self): return { 'type': 'integer', 'minimum': self.low, 'maximum': self.high, }
[docs]def boolean(value): ''' Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive). Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively). If the input is from the request JSON body, the type is already a native python boolean, and will be passed through without further parsing. :raises ValueError: if the boolean value is invalid ''' if isinstance(value, bool): return value if value is None: raise ValueError('boolean type must be non-null') elif not value: return False value = str(value).lower() if value in ('true', '1', 'on',): return True if value in ('false', '0',): return False raise ValueError('Invalid literal for boolean(): {0}'.format(value))
boolean.__schema__ = {'type': 'boolean'}
[docs]def datetime_from_rfc822(value): ''' Turns an RFC822 formatted date into a datetime object. Example:: inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST') :param str value: The RFC822-complying string to transform :return: The parsed datetime :rtype: datetime :raises ValueError: if value is an invalid date literal ''' raw = value if not time_regex.search(value): value = ' '.join((value, '00:00:00')) try: timetuple = parsedate_tz(value) timestamp = mktime_tz(timetuple) if timetuple[-1] is None: return datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.utc) else: return datetime.fromtimestamp(timestamp, pytz.utc) except Exception: raise ValueError('Invalid date literal "{0}"'.format(raw))
[docs]def datetime_from_iso8601(value): ''' Turns an ISO8601 formatted date into a datetime object. Example:: inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00") :param str value: The ISO8601-complying string to transform :return: A datetime :rtype: datetime :raises ValueError: if value is an invalid date literal ''' try: try: return aniso8601.parse_datetime(value) except ValueError: date = aniso8601.parse_date(value) return datetime(date.year, date.month, date.day) except Exception: raise ValueError('Invalid date literal "{0}"'.format(value))
datetime_from_iso8601.__schema__ = {'type': 'string', 'format': 'date-time'}
[docs]def date_from_iso8601(value): ''' Turns an ISO8601 formatted date into a date object. Example:: inputs.date_from_iso8601("2012-01-01") :param str value: The ISO8601-complying string to transform :return: A date :rtype: date :raises ValueError: if value is an invalid date literal ''' return datetime_from_iso8601(value).date()
date_from_iso8601.__schema__ = {'type': 'string', 'format': 'date'}