Python之ICMP解码与IP扫描器

ICMP类型

已经定义的ICMP消息类型大约由10多种,每种ICMP数据类型都被封装在一个IP数据包中。主要的ICMP消息类型包括一下几种。

响应请求

我们日常使用最多的ping,就是响应请求(Type=8)和应答(Code=0),一台主机向一个节点发送一个Type=8的ICMP报文,如果途中没有异常(例如被路由器丢弃、目标不回应ICMP或传输失败),则目标返回Type=0的ICMP报文,说明这台主机存在,更详细的tracert通过计算ICMP报文通过的节点来确定主机与目标之间的网络距离。

目标不可到达、源抑制和超时报文

这三种报文的格式是一样的,目标不可到达报文(Type=3)在路由器或主机不能传递数据报时使用,例如我们要连接对方一个不存在的系统端口(端口号小于1024)时,将返回Type=3、Code=3的ICMP报文,它要告诉我们:“嘿,别连接了,我不在家的!”,常见的不可到达类型还有网络不可到达(Code=0)、主机不可到达(Code=1)、协议不可到达(Code=2)等。源抑制则充当一个控制流量的角色,它通知主机减少数据报流量,由于ICMP没有恢复传输的报文,所以只要停止该报文,主机就会逐渐恢复传输速率。最后,无连接方式网络的问题就是数据报会丢失,或者长时间在网络游荡而找不到目标,或者拥塞导致主机在规定时间内无法重组数据报分段,这时就要触发ICMP超时报文的产生。超时报文的代码域有两种取值:Code=0表示传输超时,Code=1表示重组分段超时。

时间戳

时间戳请求报文(Type=13)和时间戳应答报文(Type=14)用于测试两台主机之间数据报来回一次的传输时间。传输时,主机填充原始时间戳,接收方收到请求后填充接收时间戳后以Type=14的报文格式返回,发送方计算这个时间差。一些系统不响应这种报文。
全部消息类型
下表显示了完整的ICMP类型:

表1 ICMP类型
TYPE CODE Description Query Error
0 0 Echo Reply——回显应答(Ping应答) x
3 0 Network Unreachable——网络不可达 x
3 1 Host Unreachable——主机不可达 x
3 2 Protocol Unreachable——协议不可达 x
3 3 Port Unreachable——端口不可达 x
3 4 Fragmentation needed but no frag. bit set——需要进行分片但设置不分片比特 x
3 5 Source routing failed——源站选路失败 x
3 6 Destination network unknown——目的网络未知 x
3 7 Destination host unknown——目的主机未知 x
3 8 Source host isolated (obsolete)——源主机被隔离(作废不用) x
3 9 Destination network administratively prohibited——目的网络被强制禁止 x
3 10 Destination host administratively prohibited——目的主机被强制禁止 x
3 11 Network unreachable for TOS——由于服务类型TOS,网络不可达 x
3 12 Host unreachable for TOS——由于服务类型TOS,主机不可达 x
3 13 Communication administratively prohibited by filtering——由于过滤,通信被强制禁止 x
3 14 Host precedence violation——主机越权 x
3 15 Precedence cutoff in effect——优先中止生效 x
4 0 Source quench——源端被关闭(基本流控制)
5 0 Redirect for network——对网络重定向
5 1 Redirect for host——对主机重定向
5 2 Redirect for TOS and network——对服务类型和网络重定向
5 3 Redirect for TOS and host——对服务类型和主机重定向
8 0 Echo request——回显请求(Ping请求) x
9 0 Router advertisement——路由器通告
10 0 Route solicitation——路由器请求
11 0 TTL equals 0 during transit——传输期间生存时间为0 x
11 1 TTL equals 0 during reassembly——在数据报组装期间生存时间为0 x
12 0 IP header bad (catchall error)——坏的IP首部(包括各种差错) x
12 1 Required options missing——缺少必需的选项 x
13 0 Timestamp request (obsolete)——时间戳请求(作废不用) x
14 Timestamp reply (obsolete)——时间戳应答(作废不用) x
15 0 Information request (obsolete)——信息请求(作废不用) x
16 0 Information reply (obsolete)——信息应答(作废不用) x
17 0 Address mask request——地址掩码请求 x
18 0 Address mask reply——地址掩码应答

PYTHON ICMP解码

#-*-coding:utf-8-*-
import socket
import os
import struct
from ctypes import *
 
 
#监听主机
host = "192.168.2.249"
 
#ip头定义
########################################################################
class IP(Structure):
 
    _fields_ = [
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ushort),
        ("ttl",     c_ubyte),
        ("protocol_num",c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
    ]
 
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)
 
    def __init__(self,socket_buffer=None):
 
        #协议字段跟协议名对应,常见的有如:
        #1 ICMP 
        #2 IGMP 
        #6 TCP 
        #17 UDP 
        #88 IGRP 
        #89 OSPF
        self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
        #转换下可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        #协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
 
########################################################################
class ICMP(Structure):
 
    _fields_ = [
        ("type",c_ubyte),
        ("code",c_ubyte),
        ("checksum",c_ushort),
        ("unused",c_ushort),
        ("next_hop_mtu",c_ushort)
    ]
 
    #----------------------------------------------------------------------
    def __new__(self,socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self,socket_buffer):
       pass
 
 
#创建原始套接字,并绑定,跟之前的代码差不多 
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
 
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) 
 
sniffer.bind((host, 0))
#捕获的时候包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
 
if os.name  == "nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
 
try:
    while True:
        #读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]
        #将缓冲数据的前20字节按IP头格式解析
        ip_header = IP(raw_buffer[0:20])
        #输出双方IP
        print "Protocol:%s %s -> %s" % (ip_header.protocol,
                                        ip_header.src_address,
                                        ip_header.dst_address)
        #如果包是ICMP就进行处理
        if ip_header.protocol == "ICMP":
            #
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
 
            ICMP_header = ICMP(buf)
 
            print "ICMP -> Type:%d Code:%d" % (ICMP_header.type,ICMP_header.code)
 
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

另扫一个终端PING一个不存在或者没上线的内网IP

root@myspuerkali:~# ping 192.168.2.55
PING 192.168.2.55 (192.168.2.55) 56(84) bytes of data.
From 192.168.2.249 icmp_seq=1 Destination Host Unreachable

得到

Protocol:ICMP 192.168.2.249 -> 192.168.2.249
ICMP -> Type:3 Code:1

根据上表我的得之数据不可达

根据上面的代码,
我们继续扩展一个内网IP扫描器,检测存活主机

IP存活扫描器

#-*-coding:utf-8-*-
import socket
import os
import struct
import threading
 
from netaddr import IPNetwork,IPAddress
from ctypes import *
 
 
#监听主机
host   = "192.168.2.249"
 
#要扫描的目标子网
subnet = "192.168.2.0/24"
 
#ICMP想要的核对魔法数据,自定义
magic_message = "MAGIC"
 
#批量发送UDP数据包
def udp_sender(subnet,magic_message):
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
    for ip in IPNetwork(subnet):
        try:
            sender.sendto(magic_message,("%s" % ip,65212))
        except:
            pass
#ip头定义
########################################################################
class IP(Structure):
 
    _fields_ = [
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ushort),
        ("ttl",     c_ubyte),
        ("protocol_num",c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
    ]
 
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)
 
    def __init__(self,socket_buffer=None):
 
        #协议字段跟协议名对应,常见的有如:
        #1 ICMP 
        #2 IGMP 
        #6 TCP 
        #17 UDP 
        #88 IGRP 
        #89 OSPF
        self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
        #转换下可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        #协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
 
########################################################################
class ICMP(Structure):
 
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]
 
    #----------------------------------------------------------------------
    def __new__(self,socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self,socket_buffer):
       pass
 
 
 
#创建原始套接字,并绑定,跟之前的代码差不多 
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
 
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) 
 
sniffer.bind((host, 0))
#捕获的时候包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
 
if os.name  == "nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
 
#创建发送UDP信息线程
t = threading.Thread(target=udp_sender,args=(subnet,magic_message))
t.start()  
 
try:
    while True:
        #读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]
        #将缓冲数据的前20字节按IP头格式解析
        ip_header = IP(raw_buffer[0:20])
        #输出双方IP
        #print "Protocol:%s %s -> %s" % (ip_header.protocol,
        #                                ip_header.src_address,
        #                                ip_header.dst_address)
        #如果包是ICMP就进行处理
        if ip_header.protocol == "ICMP":
            #
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
 
            icmp_header = ICMP(buf)
 
            #print "ICMP -> Type:%d Code:%d" % (icmp_header.type,icmp_header.code)
            
            #如果检测到类型跟代码都是3的
            if icmp_header.code == 3 and icmp_header.type == 3:
                #确定响应主机在我们设置的目标之内
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print "Host on line:%s" % ip_header.src_address
 
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

效果:

root@myspuerkali:~/python# python scanner.py
Host on line:192.168.2.88
Host on line:192.168.2.130
Host on line:192.168.2.249
Host on line:192.168.2.254

一个简单有效的IP扫描存活器就好了

发表评论

邮箱地址不会被公开。 必填项已用*标注