Python之ICMP解码与IP扫描器

ICMP类型

已经定义的ICMP消息类型大约由10多种,每种ICMP数据类型都被封装在一个IP数据包中。主要的ICMP消息类型包括一下几种。

响应请求

我们日常使用最多的ping,就是响应请求(Type=8)和应答(Code=0),一台主机向一个节点发送一个Type=8的ICMP报文,如果途中没有异常(例如被路由器丢弃、目标不回应ICMP或传输失败),则目标返回Type=0的ICMP报文,说明这台主机存在,更详细的tracert通过计算ICMP报文通过的节点来确定主机与目标之间的网络距离。

目标不可到达、源抑制和超时报文

这三种报文的格式是一样的,目标不可到达报文(Type=3)在路由器或主机不能传递数据报时使用,例如我们要连接对方一个不存在的系统端口(端口号小于1024)时,将返回Type=3、Code=3的ICMP报文,它要告诉我们:“嘿,别连接了,我不在家的!”,常见的不可到达类型还有网络不可到达(Code=0)、主机不可到达(Code=1)、协议不可到达(Code=2)等。源抑制则充当一个控制流量的角色,它通知主机减少数据报流量,由于ICMP没有恢复传输的报文,所以只要停止该报文,主机就会逐渐恢复传输速率。最后,无连接方式网络的问题就是数据报会丢失,或者长时间在网络游荡而找不到目标,或者拥塞导致主机在规定时间内无法重组数据报分段,这时就要触发ICMP超时报文的产生。超时报文的代码域有两种取值:Code=0表示传输超时,Code=1表示重组分段超时。

时间戳

时间戳请求报文(Type=13)和时间戳应答报文(Type=14)用于测试两台主机之间数据报来回一次的传输时间。传输时,主机填充原始时间戳,接收方收到请求后填充接收时间戳后以Type=14的报文格式返回,发送方计算这个时间差。一些系统不响应这种报文。
全部消息类型
下表显示了完整的ICMP类型:

表1 ICMP类型
TYPECODEDescriptionQueryError
00Echo Reply——回显应答(Ping应答)x
30Network Unreachable——网络不可达x
31Host Unreachable——主机不可达x
32Protocol Unreachable——协议不可达x
33Port Unreachable——端口不可达x
34Fragmentation needed but no frag. bit set——需要进行分片但设置不分片比特x
35Source routing failed——源站选路失败x
36Destination network unknown——目的网络未知x
37Destination host unknown——目的主机未知x
38Source host isolated (obsolete)——源主机被隔离(作废不用)x
39Destination network administratively prohibited——目的网络被强制禁止x
310Destination host administratively prohibited——目的主机被强制禁止x
311Network unreachable for TOS——由于服务类型TOS,网络不可达x
312Host unreachable for TOS——由于服务类型TOS,主机不可达x
313Communication administratively prohibited by filtering——由于过滤,通信被强制禁止x
314Host precedence violation——主机越权x
315Precedence cutoff in effect——优先中止生效x
40Source quench——源端被关闭(基本流控制)
50Redirect for network——对网络重定向
51Redirect for host——对主机重定向
52Redirect for TOS and network——对服务类型和网络重定向
53Redirect for TOS and host——对服务类型和主机重定向
80Echo request——回显请求(Ping请求)x
90Router advertisement——路由器通告
100Route solicitation——路由器请求
110TTL equals 0 during transit——传输期间生存时间为0x
111TTL equals 0 during reassembly——在数据报组装期间生存时间为0x
120IP header bad (catchall error)——坏的IP首部(包括各种差错)x
121Required options missing——缺少必需的选项x
130Timestamp request (obsolete)——时间戳请求(作废不用)x
14Timestamp reply (obsolete)——时间戳应答(作废不用)x
150Information request (obsolete)——信息请求(作废不用)x
160Information reply (obsolete)——信息应答(作废不用)x
170Address mask request——地址掩码请求x
180Address mask reply——地址掩码应答

PYTHON ICMP解码

#-*-coding:utf-8-*-
import socket
import os
import struct
from ctypes import *
 
 
#监听主机
host = "192.168.2.249"
 
#ip头定义
########################################################################
class IP(Structure):
 
    _fields_ = [
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ushort),
        ("ttl",     c_ubyte),
        ("protocol_num",c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
    ]
 
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)
 
    def __init__(self,socket_buffer=None):
 
        #协议字段跟协议名对应,常见的有如:
        #1 ICMP 
        #2 IGMP 
        #6 TCP 
        #17 UDP 
        #88 IGRP 
        #89 OSPF
        self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
        #转换下可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        #协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
 
########################################################################
class ICMP(Structure):
 
    _fields_ = [
        ("type",c_ubyte),
        ("code",c_ubyte),
        ("checksum",c_ushort),
        ("unused",c_ushort),
        ("next_hop_mtu",c_ushort)
    ]
 
    #----------------------------------------------------------------------
    def __new__(self,socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self,socket_buffer):
       pass
 
 
#创建原始套接字,并绑定,跟之前的代码差不多 
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
 
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) 
 
sniffer.bind((host, 0))
#捕获的时候包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
 
if os.name  == "nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
 
try:
    while True:
        #读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]
        #将缓冲数据的前20字节按IP头格式解析
        ip_header = IP(raw_buffer[0:20])
        #输出双方IP
        print "Protocol:%s %s -> %s" % (ip_header.protocol,
                                        ip_header.src_address,
                                        ip_header.dst_address)
        #如果包是ICMP就进行处理
        if ip_header.protocol == "ICMP":
            #
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
 
            ICMP_header = ICMP(buf)
 
            print "ICMP -> Type:%d Code:%d" % (ICMP_header.type,ICMP_header.code)
 
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

另扫一个终端PING一个不存在或者没上线的内网IP

root@myspuerkali:~# ping 192.168.2.55
PING 192.168.2.55 (192.168.2.55) 56(84) bytes of data.
From 192.168.2.249 icmp_seq=1 Destination Host Unreachable

得到

Protocol:ICMP 192.168.2.249 -> 192.168.2.249
ICMP -> Type:3 Code:1

根据上表我的得之数据不可达

根据上面的代码,
我们继续扩展一个内网IP扫描器,检测存活主机

IP存活扫描器

#-*-coding:utf-8-*-
import socket
import os
import struct
import threading
 
from netaddr import IPNetwork,IPAddress
from ctypes import *
 
 
#监听主机
host   = "192.168.2.249"
 
#要扫描的目标子网
subnet = "192.168.2.0/24"
 
#ICMP想要的核对魔法数据,自定义
magic_message = "MAGIC"
 
#批量发送UDP数据包
def udp_sender(subnet,magic_message):
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
    for ip in IPNetwork(subnet):
        try:
            sender.sendto(magic_message,("%s" % ip,65212))
        except:
            pass
#ip头定义
########################################################################
class IP(Structure):
 
    _fields_ = [
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ushort),
        ("ttl",     c_ubyte),
        ("protocol_num",c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
    ]
 
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)
 
    def __init__(self,socket_buffer=None):
 
        #协议字段跟协议名对应,常见的有如:
        #1 ICMP 
        #2 IGMP 
        #6 TCP 
        #17 UDP 
        #88 IGRP 
        #89 OSPF
        self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
        #转换下可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        #协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)
 
########################################################################
class ICMP(Structure):
 
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]
 
    #----------------------------------------------------------------------
    def __new__(self,socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self,socket_buffer):
       pass
 
 
 
#创建原始套接字,并绑定,跟之前的代码差不多 
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
 
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) 
 
sniffer.bind((host, 0))
#捕获的时候包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
 
if os.name  == "nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
 
#创建发送UDP信息线程
t = threading.Thread(target=udp_sender,args=(subnet,magic_message))
t.start()  
 
try:
    while True:
        #读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]
        #将缓冲数据的前20字节按IP头格式解析
        ip_header = IP(raw_buffer[0:20])
        #输出双方IP
        #print "Protocol:%s %s -> %s" % (ip_header.protocol,
        #                                ip_header.src_address,
        #                                ip_header.dst_address)
        #如果包是ICMP就进行处理
        if ip_header.protocol == "ICMP":
            #
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
 
            icmp_header = ICMP(buf)
 
            #print "ICMP -> Type:%d Code:%d" % (icmp_header.type,icmp_header.code)
            
            #如果检测到类型跟代码都是3的
            if icmp_header.code == 3 and icmp_header.type == 3:
                #确定响应主机在我们设置的目标之内
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print "Host on line:%s" % ip_header.src_address
 
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

效果:

root@myspuerkali:~/python# python scanner.py
Host on line:192.168.2.88
Host on line:192.168.2.130
Host on line:192.168.2.249
Host on line:192.168.2.254

一个简单有效的IP扫描存活器就好了

发表评论

邮箱地址不会被公开。 必填项已用*标注