Warning: Can't synchronize with repository "(default)" (Unsupported version control system "svn": No module named svn). Look in the Trac log for more information.

Ticket #1713: match-ipv6.patch

File match-ipv6.patch, 6.7 KB (added by chrisz, 4 years ago)

This patch makes the match_ip function IPv6-aware

  • turbogears/identity/conditions.py

     
    11import cherrypy 
    2 import socket 
    3 import struct 
    4 import threading 
    52import types 
    63import turbogears 
    74from turbogears.identity.exceptions import * 
    85from turbogears.identity import * 
    96from turbogears.decorator import weak_signature_decorator 
     7from turbogears.util import match_ip 
    108 
     9 
    1110class Predicate(object): 
    1211    ''' 
    1312    Generic base class for testing true or false for a condition. 
     
    177176        return "" 
    178177 
    179178 
    180 def _match_ip(cidr, ip): 
    181     if not '/' in cidr: 
    182         return cidr == ip 
    183  
    184     else: 
    185         try: 
    186             b,m = cidr.split('/') 
    187             shift = 32 - int(m) 
    188             a1 = struct.unpack('!L', socket.inet_aton(b))[0] >> shift 
    189             a2 = struct.unpack('!L', socket.inet_aton(ip))[0] >> shift 
    190             return a1 == a2 
    191  
    192         except: 
    193             return False 
    194  
    195  
    196179class from_host(Predicate, IdentityPredicateHelper): 
    197180    ''' 
    198181    Predicate for checking whether the visitor's host is an allowed host. 
     
    209192        Match the visitor's host against the criteria. 
    210193        ''' 
    211194        ip = _remoteHost() 
    212         if _match_ip(self.host, ip): 
     195        if match_ip(self.host, ip): 
    213196            return True 
    214197 
    215198        self.append_error_message(errors) 
  • turbogears/tests/test_util.py

     
    9090        == 'Chip & Chap' 
    9191    assert util.fixentities('<&quot;&lt;&copy;&gt;&quot;>') \ 
    9292        == '<&quot;&lt;&#169;&gt;&quot;>' 
     93 
     94 
     95def test_match_ip(): 
     96    m = util.match_ip 
     97    assert m('1.2.3.4', '1.2.3.4') 
     98    assert not m('1.2.3.4', '1.2.3.5') 
     99    assert not m('4.3.2.1', '5.3.2.1') 
     100    assert m('1.2.3.4', '::ffff:102:304') 
     101    assert not m('1.2.3.4', '::102:304') 
     102    assert m('127.0.0.1', '::ffff:127.0.0.1') 
     103    assert m('127.0.0.1', '::ffff:7f00:0001') 
     104    assert m('::ffff:127.0.0.1', '127.0.0.1') 
     105    assert m('192.168.42.76/32', '192.168.42.76') 
     106    assert m('192.168.42.77/32', '192.168.42.77') 
     107    assert not m('192.168.42.76/32', '192.168.42.77') 
     108    assert not m('192.168.42.77/32', '192.168.42.76') 
     109    assert m('192.168.42.76/31', '192.168.42.76') 
     110    assert m('192.168.42.76/31', '192.168.42.77') 
     111    assert m('224.0.0.0/3', '224.1.2.3') 
     112    assert m('224.0.0.0/3', '255.255.255.255') 
     113    assert not m('224.0.0.0/3', '192.0.0.0') 
     114    assert not m('192.168.42.76/31', '192.168.42.73') 
     115    assert m('1:2:3::', '1:02:0003:0:0::0') 
     116    assert m('1:2:3::/48', '1:2:3:4::') 
     117    assert not m('1:2:3::/48', '1:2:4:::') 
     118    assert m('1:2:3::8', '1:2:3:0:0:0:0:8') 
     119    assert m('2001:0db8:85a3:08d3:1319:8a2e:0370:7334', 
     120        '2001:db8:85a3:8d3:1319:8a2e:3.112.115.52') 
     121    assert m('2001:db8:85a3:8d3:1300::/72', 
     122        '2001:0db8:85a3:08d3:1319:8a2e:0370:7334') 
     123    assert m('2001:db8:85a3:8d3:1300::/72', 
     124        '2001:0db8:85a3:08d3:1399:8a2e:0370:7334') 
     125    assert not m('2001:db8:85a3:8d3:1300::/72', 
     126        '2001:0db8:85a3:08d3:1219:8a2e:0370:7334') 
  • turbogears/util.py

     
    66import logging 
    77import warnings 
    88import htmlentitydefs 
     9import socket 
     10import struct 
    911from inspect import getargspec, getargvalues 
    1012from itertools import izip, islice, chain, imap 
    1113from operator import isSequenceType 
     
    539541    return re.sub("&(\w+);?", repl, htmltext) 
    540542 
    541543 
     544if hasattr(socket, 'inet_pton') and hasattr(socket, 'AF_INET6'): 
     545 
     546    def inet6_aton(addr): 
     547        """Convert IP6 standard hex notation to IP6 address.""" 
     548        return socket.inet_pton(socket.AF_INET6, addr) 
     549 
     550else: # Windows etc. 
     551 
     552    import string 
     553    _inet6_chars = string.hexdigits + ':.' 
     554 
     555    def inet6_aton(addr): 
     556        """Convert IPv6 standard hex notation to IPv6 address. 
     557 
     558        Inspired by http://twistedmatrix.com/trac/. 
     559 
     560        """ 
     561        faulty = addr.lstrip(_inet6_chars) 
     562        if faulty: 
     563            raise ValueError("Illegal character '%c' in IPv6 address" % faulty[0]) 
     564        parts = addr.split(':') 
     565        elided = parts.count('') 
     566        extenso = '.' in parts[-1] and 7 or 8 
     567        if len(parts) > extenso or elided > 3: 
     568            raise ValueError("Syntactically invalid IPv6 address") 
     569        if elided == 3: 
     570            return '\x00' * 16 
     571        if elided: 
     572            zeros = ['0'] * (extenso - len(parts) + elided) 
     573            if addr.startswith('::'): 
     574                parts[:2] = zeros 
     575            elif addr.endswith('::'): 
     576                parts[-2:] = zeros 
     577            else: 
     578                idx = parts.index('') 
     579                parts[idx:idx+1] = zeros 
     580        if len(parts) != extenso: 
     581            raise ValueError("Syntactically invalid IPv6 address") 
     582        if extenso == 7: 
     583            ipv4 = parts.pop() 
     584            if ipv4.count('.') != 3: 
     585                raise ValueError("Syntactically invalid IPv6 address") 
     586            parts = [int(x, 16) for x in parts] 
     587            return struct.pack('!6H', *parts) + socket.inet_aton(ipv4) 
     588        else: 
     589            parts = [int(x, 16) for x in parts] 
     590            return struct.pack('!8H', *parts) 
     591 
     592 
     593def inet_aton(addr): 
     594    """Convert IPv4 or IPv6 notation to IPv6 address.""" 
     595    if ':' in addr: 
     596        return inet6_aton(addr) 
     597    else: 
     598        return struct.pack('!QL', 0, 0xffff) + socket.inet_aton(addr) 
     599 
     600 
     601def _inet_prefix(addr, masked): 
     602    """Remove the number of masked bits from the IPV6 address.""" 
     603    hi, lo = struct.unpack("!QQ", addr) 
     604    return (hi << 64 | lo) >> masked 
     605 
     606 
     607def match_ip(cidr, ip): 
     608    """Check whether IP address matches CIDR IP address block.""" 
     609    if '/' in cidr: 
     610        cidr, prefix = cidr.split('/', 1) 
     611        masked = (':' in cidr and 128 or 32) - int(prefix) 
     612    else: 
     613        masked = None 
     614    cidr = inet_aton(cidr) 
     615    ip = inet_aton(ip) 
     616    if masked: 
     617        cidr = _inet_prefix(cidr, masked) 
     618        ip = _inet_prefix(ip, masked) 
     619    return ip == cidr 
     620 
     621 
    542622__all__ = ["Bunch", "DictObj", "DictWrapper", "Enum", "setlike", 
    543623           "get_package_name", "get_model", "load_project_config", 
    544624           "ensure_sequence", "has_arg", "to_kw", "from_kw", 
     
    548628           "flatten_sequence", "load_class", "parse_http_accept_header", 
    549629           "to_unicode", "to_utf8", "quote_cookie", "unquote_cookie", 
    550630           "get_template_encoding_default", "find_precision", 
    551            "copy_if_mutable", "deprecated"] 
     631           "copy_if_mutable", "match_ip", "deprecated"]