技术探索:基于Python的QQ语音通话数据包分析与地理位置关联

技术探索:基于Python的QQ语音通话数据包分析与地理位置关联

引言

在现代网络通信中,即时通讯软件(IM)已成为不可或缺的一部分,其中语音通话功能尤为普遍。然而,对于普通用户而言,这些功能背后的网络数据交互过程往往是一个“黑盒”。本文将探讨一种技术方案,旨在通过分析本地捕获的网络数据包(PCAP文件),尝试识别特定的应用流量(以QQ语音为例),并关联其源IP地址的地理位置信息。本研究纯属技术探讨,旨在加深对网络协议栈和数据分析的理解。

核心技术与方法论

1. 数据包捕获与解析 (pyshark)

网络数据包的捕获通常由Wiresharktcpdump等工具完成,生成标准的.pcap.pcapng格式文件。我们的分析起点便是这些文件。Python库pyshark作为TShark(Wireshark的命令行版本)的封装,提供了在Python环境中直接解析这些文件的能力。它可以逐层解码数据包(链路层、网络层、传输层及应用层),提取出关键字段,如时间戳、源/目的IP地址、源/目的端口号、协议类型(TCP/UDP)以及应用层载荷。

2. 特定应用流量识别

识别出所有QQ语音相关的数据包是后续分析的基础。我们采用了多维度的识别策略:

  • 端口与协议过滤: QQ语音通信通常会使用一组特定的端口范围(例如,已知的8000-9009, 10000, 17000-17009等)或遵循RTP(Real-time Transport Protocol)协议,后者常用于音视频传输,端口范围通常在5000-65535。因此,我们首先筛选出使用UDP协议且端口号符合这些范围的数据包。
  • 应用层特征匹配: 更精确的识别依赖于应用层的特征。通过查阅资料和分析,我们发现"02:00:48"这一十六进制序列在QQ语音通话的数据包中频繁出现,可视为其协议的一个标识符。为此,我们调用TShark的命令行接口,使用-Y参数配合data过滤器和-x选项,直接在数据包的原始字节流中搜索这一特征序列,从而精准定位QQ语音流量。

3. 地理位置信息获取 (requests & API)

获取IP地址对应的地理位置信息是本次分析的关键环节。我们整合了两种API服务以提高覆盖率和准确性:

  • 内部API (ip-api.com): 这是一个提供免费查询的服务,通过简单的HTTP GET请求即可获取IP的国家、城市、经纬度、ISP等信息。它作为基础查询手段。
  • 外部API (ipgeolocation.io): 对于更高的精度需求或备用查询,我们集成了付费API ipgeolocation.io。这需要在配置文件中预先设置有效的API Key。通过requests库发起HTTPS请求,可以获取详细的位置数据。

脚本会尝试调用这两个API,并将结果进行融合处理,确保最终报告的完整性。

4. 通话对端识别逻辑

在一次双向语音通话中,我们不仅要看到所有经过的数据包,更要识别出通话的另一方。我们设计了两种算法来推断对端IP:

  • 流量模式分析 (find_opposite_ips_by_traffic): 统计所有源和目标IP对之间的数据包数量和总字节数。理论上,通话双方的流量应该是主要的,因此流量最大且为公网IP的地址对,很可能就是通话双方。
  • 特征模式分析 (find_opposite_ips_by_hex_pattern): 针对包含"02:00:48"特征的数据包,统计其源IP和目标IP的出现频率。同样,出现频率最高且为公网IP的地址,极有可能是通话的对端。

5. 数据处理与报告生成 (jsonpandas)

  • 数据存储: 分析结果以结构化的JSON格式存储,包含原始数据包信息、识别出的语音流量、地理位置信息、对端IP推断等。
  • 报告生成: 最终,将JSON结果转换为易于阅读的Markdown格式摘要报告,清晰地展示分析流程和结论。
  • 数据框接口 (pandas): 虽然核心逻辑未直接使用,但提供了将结果转换为pandas.DataFrame的功能,便于后续更复杂的数据分析和可视化。

实现架构与代码概览

代码整体采用模块化设计,主要入口函数analyze_pcap负责协调各子任务。get_location函数处理IP地理位置查询。find_opposite_ips_*系列函数执行对端IP识别。generate_report函数负责输出最终报告。main函数则处理命令行参数,支持单文件分析和批处理模式。

潜在应用与局限性

应用场景

  • 数字取证: 在授权的网络取证活动中,分析特定应用的通信模式和地理位置。
  • 网络安全研究: 理解特定IM应用的网络行为,有助于制定针对性的安全策略。
  • 网络性能分析: 分析应用流量特征,优化网络资源分配。

局限性与挑战

  • 协议变更: 应用程序的通信协议可能随时更新,导致依赖的端口或特征码失效。
  • 隐私与法律: 未经许可分析他人网络流量是违法的,本方法仅适用于自有设备和数据。
  • 定位精度: IP地理位置定位本身存在误差,尤其在动态IP和移动网络环境下。
  • 网络环境: NAT、代理、VPN等技术会改变或隐藏真实的源IP地址。

结论

本文阐述了一种利用Python及相关库对特定应用(QQ语音)的网络流量进行深度分析的技术路径。通过结合协议分析、特征匹配和地理定位API,实现了从数据包到地理位置信息的关联。此实践展示了网络数据分析的魅力,同时也提醒我们在进行此类探索时必须严格遵守法律法规和伦理准则。


本文代码仅供学习和研究目的。请务必在合法合规的范围内使用相关技术。

# -*- coding: utf-8 -*-
"""
QQ语音通话位置与数据包捕获分析工具 (单文件完整版)
集成了数据包解析、位置分析、语音流量分析等所有功能。
"""

import os
import sys
import json
import logging
import random
import datetime
import argparse
import threading
import queue
import subprocess
import tempfile
import configparser
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from typing import List, Dict, Any, Optional

# 第三方库依赖
try:
    import requests
    import pandas as pd
    import pyshark
    from geopy.geocoders import Nominatim
except ImportError as e:
    print(f"缺少必要的依赖库: {e}")
    print("请运行: pip install requests pandas pyshark geopy")
    sys.exit(1)

# ==================== utils/logger.py ====================

class ColoredLogger:
    """
    彩色日志输出类
    """
    
    # ANSI颜色码
    COLORS = {
        'BLUE': '\033[34m',      # 蓝色
        'YELLOW': '\033[33m',    # 黄色
        'RED': '\033[31m',       # 红色
        'RESET': '\033[0m'       # 重置颜色
    }
    
    @staticmethod
    def _get_current_time():
        """获取当前时间,格式为 HH:MM:SS"""
        return datetime.datetime.now().strftime("%H:%M:%S")
    
    @classmethod
    def info(cls, message):
        """
        输出蓝色信息日志
        
        Args:
            message: 日志消息
        """
        timestamp = cls._get_current_time()
        colored_message = f"{cls.COLORS['BLUE']}{timestamp} {message}{cls.COLORS['RESET']}"
        print(colored_message)
    
    @classmethod
    def warning(cls, message):
        """
        输出黄色警告日志
        
        Args:
            message: 警告消息
        """
        timestamp = cls._get_current_time()
        colored_message = f"{cls.COLORS['YELLOW']}{timestamp} {message}{cls.COLORS['RESET']}"
        print(colored_message)
    
    @classmethod
    def error(cls, message):
        """
        输出红色错误日志
        
        Args:
            message: 错误消息
        """
        timestamp = cls._get_current_time()
        colored_message = f"{cls.COLORS['RED']}{timestamp} {message}{cls.COLORS['RESET']}"
        print(colored_message)


# 兼容Windows系统的颜色输出
def init_color_support():
    """
    初始化颜色支持,兼容Windows系统
    """
    if sys.platform == "win32":
        try:
            import ctypes
            kernel32 = ctypes.windll.kernel32
            kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
        except:
            # 如果无法设置控制台模式,则忽略
            pass

# 初始化颜色支持
init_color_support()


# ==================== utils/tools.py ====================

# User-Agent集合
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:97.0) Gecko/20100101 Firefox/97.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:97.0) Gecko/20100101 Firefox/97.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:91.0) Gecko/20100101 Firefox/91.0"
]

# 设置日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_ip_geolocation(ip_address: str, key: str, language: str = "en") -> dict:
    """
    使用 ipgeolocation.io 查询 IP 的地理位置信息。
    """
    url = "https://api.ipgeolocation.io/v2/ipgeo"
    params = {"apiKey": key, "ip": ip_address, "lang": language}
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        # logging.error(f"ipgeolocation.io 请求失败:{e}")
        return {}


def load_api_key(config_path: str) -> str:
    """从INI配置文件中加载API密钥"""
    try:
        config = configparser.ConfigParser()
        abs_config_path = os.path.abspath(config_path)
        config.read(abs_config_path, encoding='utf-8')

        # 从[ip_location]节获取api_key
        return config.get('ip_location', 'api_key', fallback='')
    except Exception as e:
        logging.error(f"读取配置文件时出错:{e}")
        return ''


def merge_api_results(result1: dict, result2: dict) -> dict:
    """合并两个 API 的结果"""
    merged = {}
    merged.update(result1)
    for key, value in result2.items():
        if key not in merged or not merged[key]:
            merged[key] = value
    return merged


# ==================== ip_locator/ip_locator.py ====================

class IPLocator:
    """
    IP地址定位器类
    """
    
    def __init__(self):
        """
        初始化IP定位器
        """
        self.geolocator = Nominatim(user_agent="qq_voice_location_analyzer")
        
    def get_ip_location(self, ip_address: str) -> Optional[Dict[str, any]]:
        """
        通过IP地址获取地理位置信息
        
        Args:
            ip_address: IP地址
            
        Returns:
            地理位置信息字典
        """
        try:
            # 使用免费的IP地理位置API (ip-api.com)
            response = requests.get(f"http://ip-api.com/json/{ip_address}", timeout=10)
            if response.status_code == 200:
                data = response.json()
                if data.get('status') == 'success':
                    return {
                        'ip': ip_address,
                        'country': data.get('country'),
                        'region': data.get('regionName'),
                        'city': data.get('city'),
                        'lat': data.get('lat'),
                        'lon': data.get('lon'),
                        'isp': data.get('isp'),
                        'org': data.get('org'),
                        'timezone': data.get('timezone')
                    }
            return None
        except Exception:
            # 静默失败,不输出任何错误日志
            return None
    
    def get_multiple_ips_location(self, ip_addresses: List[str]) -> List[Dict[str, any]]:
        """
        批量获取IP地址的地理位置信息
        
        Args:
            ip_addresses: IP地址列表
            
        Returns:
            地理位置信息列表
        """
        locations = []
        for ip in ip_addresses:
            location = self.get_ip_location(ip)
            if location:
                locations.append(location)
        return locations
    
    def is_public_ip(self, ip: str) -> bool:
        """
        检查IP是否为公网IP
        
        Args:
            ip: IP地址
            
        Returns:
            如果是公网IP则返回True
        """
        if not ip:
            return False

        # 检查是否为私有IP地址范围
        parts = ip.split('.')
        if len(parts) != 4:
            return True  # 不是IPv4地址,假设是公网

        # 检查是否为私有IP
        if ip.startswith('10.'):
            return False
        elif ip.startswith('192.168.'):
            return False
        elif ip.startswith('172.') and 16 <= int(parts[1]) <= 31:
            return False
        elif ip.startswith('127.'):
            return False

        # 默认认为是公网IP
        return True


# ==================== pcap_analyzer/packet_parser.py ====================

class PacketParser:
    """
    数据包解析器类
    """
    
    def __init__(self, pcap_file_path: str, tshark_path: str = None):
        """
        初始化数据包解析器
        
        Args:
            pcap_file_path: PCAP文件路径
            tshark_path: TShark可执行文件路径(可选)
        """
        self.pcap_file_path = pcap_file_path
        self.tshark_path = tshark_path
        self.captured_packets = []
        
    def parse_pcap(self) -> List[Dict[str, Any]]:
        """
        解析PCAP文件
        
        Returns:
            解析后的数据包列表
        """
        packets_data = []
        
        # 使用pyshark读取PCAP文件,指定TShark路径(如果有)
        if self.tshark_path:
            cap = pyshark.FileCapture(self.pcap_file_path, tshark_path=self.tshark_path)
        else:
            cap = pyshark.FileCapture(self.pcap_file_path)
        
        for packet in cap:
            packet_info = {
                'timestamp': float(packet.sniff_timestamp) if hasattr(packet, 'sniff_timestamp') else None,
                'source_ip': packet.ip.src if hasattr(packet, 'ip') else None,
                'destination_ip': packet.ip.dst if hasattr(packet, 'ip') else None,
                'source_port': getattr(getattr(packet, 'tcp', None), 'srcport', 
                                      getattr(getattr(packet, 'udp', None), 'srcport', None)),
                'destination_port': getattr(getattr(packet, 'tcp', None), 'dstport',
                                          getattr(getattr(packet, 'udp', None), 'dstport', None)),
                'protocol': packet.transport_layer if hasattr(packet, 'transport_layer') else None,
                'length': int(packet.length) if hasattr(packet, 'length') else None,
                'layers': [layer.layer_name for layer in packet.layers] if hasattr(packet, 'layers') else []
            }
            
            # 添加详细信息
            if hasattr(packet, 'tcp'):
                tcp_layer = packet.tcp
                packet_info.update({
                    'tcp_flags': getattr(tcp_layer, 'flags', None),
                    'tcp_seq': getattr(tcp_layer, 'seq', None),
                    'tcp_ack': getattr(tcp_layer, 'ack', None)
                })
                
            if hasattr(packet, 'udp'):
                udp_layer = packet.udp
                packet_info.update({
                    'udp_length': getattr(udp_layer, 'length', None)
                })
                
            packets_data.append(packet_info)
            
        self.captured_packets = packets_data
        return packets_data
    
    def filter_voice_traffic(self) -> List[Dict[str, Any]]:
        """
        过滤出可能的QQ语音流量(基于端口、协议和QQ特有的特征)
        
        Returns:
            语音流量数据包列表
        """
        voice_packets = []
        
        for packet in self.captured_packets:
            # 检查是否有基本的网络信息
            src_ip = packet.get('source_ip')
            dst_ip = packet.get('destination_ip')
            src_port = packet.get('source_port')
            dst_port = packet.get('destination_port')
            protocol = packet.get('protocol')
            
            if not (src_ip and dst_ip):
                continue
                
            # 检查是否有端口号
            if not (src_port and dst_port):
                continue
                
            # QQ语音常用端口
            qq_voice_ports = [8000, 8001, 8002, 8003, 8004, 8005, 8006, 8007, 8008, 8009, 
                             9000, 9001, 9002, 9003, 9004, 9005, 9006, 9007, 9008, 9009,
                             10000, 10001, 17000, 17001, 17002, 17003, 17004, 17005, 17006, 17007, 17008, 17009]
            
            # 检查端口是否为QQ语音常用端口
            has_qq_port = (src_port and int(src_port) in qq_voice_ports) or (dst_port and int(dst_port) in qq_voice_ports)
            
            # 检查是否为UDP协议(QQ语音通常使用UDP)
            is_udp = protocol and protocol.upper() == 'UDP'
            
            # 检查是否为RTP协议端口范围(实时传输协议,用于语音)
            rtp_port_range = 5000 <= int(src_port) <= 65535 if src_port else False
            rtp_dst_port_range = 5000 <= int(dst_port) <= 65535 if dst_port else False
            
            # 检查是否为常见的语音通信端口
            common_voice_port = (src_port and int(src_port) in [80, 443, 53, 1935, 3478, 5060, 5061]) or \
                               (dst_port and int(dst_port) in [80, 443, 53, 1935, 3478, 5060, 5061])
            
            # 综合判断条件:UDP协议 + QQ语音端口 或者 UDP + RTP端口范围
            if (is_udp and (has_qq_port or rtp_port_range or rtp_dst_port_range)) or \
               (is_udp and common_voice_port):
                voice_packets.append(packet)
                    
        return voice_packets
    
    def get_connection_pairs(self) -> List[Dict[str, Any]]:
        """
        获取连接对信息
        
        Returns:
            连接对列表
        """
        connections = {}
        
        for packet in self.captured_packets:
            src_ip = packet.get('source_ip')
            dst_ip = packet.get('destination_ip')
            src_port = packet.get('source_port')
            dst_port = packet.get('destination_port')
            
            if src_ip and dst_ip:
                # 创建双向连接标识
                forward_key = f"{src_ip}:{src_port}-{dst_ip}:{dst_port}" if src_port and dst_port else f"{src_ip}-*-{dst_ip}-*"
                reverse_key = f"{dst_ip}:{dst_port}-{src_ip}:{src_port}" if dst_port and src_port else f"{dst_ip}-*-{src_ip}-*"
                
                # 如果反向连接已存在,则使用该连接
                if reverse_key in connections:
                    key = reverse_key
                else:
                    key = forward_key
                    
                if key not in connections:
                    connections[key] = {
                        'source_ip': src_ip,
                        'source_port': src_port,
                        'destination_ip': dst_ip,
                        'destination_port': dst_port,
                        'packets_count': 0,
                        'total_bytes': 0,
                        'protocols': set(),
                        'timestamps': []
                    }
                
                conn = connections[key]
                conn['packets_count'] += 1
                if packet.get('length'):
                    conn['total_bytes'] += packet['length']
                if packet.get('protocol'):
                    conn['protocols'].add(packet['protocol'])
                if packet.get('timestamp'):
                    conn['timestamps'].append(packet['timestamp'])
        
        # 转换protocols为列表以便JSON序列化
        for conn in connections.values():
            conn['protocols'] = list(conn['protocols'])
            
        return list(connections.values())
    
    def filter_qq_voice_by_hex_pattern(self) -> List[Dict[str, Any]]:
        """
        通过十六进制模式过滤QQ语音通话数据包
        根据Wireshark中搜索"020048"字符串的方法来识别QQ语音通话数据包
        
        Returns:
            符合QQ语音通话特征的数据包列表
        """
        try:
            # 使用tshark命令行工具直接搜索十六进制模式
            cmd = [
                self.tshark_path if self.tshark_path else "tshark",
                "-r", self.pcap_file_path,  # 输入PCAP文件
                "-Y", "frame contains 02:00:48",  # 搜索包含十六进制模式的数据包
                "-T", "json"  # 输出JSON格式
            ]
            
            # 执行tshark命令
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
            
            if result.returncode == 0 and result.stdout.strip():
                try:
                    # 解析tshark的JSON输出
                    packets_json = json.loads(result.stdout)
                    
                    qq_voice_packets = []
                    
                    for pkt in packets_json:
                        if '_source' in pkt and 'layers' in pkt['_source']:
                            layers = pkt['_source']['layers']
                            
                            # 提取关键信息
                            frame_info = layers.get('frame', {})
                            ip_info = layers.get('ip', {})
                            tcp_info = layers.get('tcp', {})
                            udp_info = layers.get('udp', {})
                            
                            # 获取时间戳
                            time_relative = frame_info.get('frame.time_relative', frame_info.get('frame.time', None))
                            if time_relative:
                                # 尝试转换时间戳格式
                                try:
                                    timestamp = float(time_relative.replace('s', ''))
                                except:
                                    timestamp = None
                            else:
                                timestamp = None
                            
                            # 获取IP信息
                            src_ip = ip_info.get('ip.src', ip_info.get('ipv4.src'))
                            dst_ip = ip_info.get('ip.dst', ip_info.get('ipv4.dst'))
                            
                            # 获取端口信息
                            src_port = tcp_info.get('tcp.srcport', tcp_info.get('tcp.port')) if tcp_info else None
                            if not src_port:
                                src_port = udp_info.get('udp.srcport', udp_info.get('udp.port'))
                            
                            dst_port = tcp_info.get('tcp.dstport', tcp_info.get('tcp.port')) if tcp_info else None
                            if not dst_port:
                                dst_port = udp_info.get('udp.dstport', udp_info.get('udp.port'))
                            
                            # 获取协议
                            protocol = layers.get('frame.protocols', '')
                            if ':' in protocol:
                                # 获取最后一个协议作为主要协议
                                protocol = protocol.split(':')[-1].upper()
                            else:
                                protocol = protocol.upper()
                            
                            # 获取长度
                            length = frame_info.get('frame.len')
                            if length:
                                try:
                                    length = int(length)
                                except:
                                    length = None
                            
                            # 构建包信息
                            packet_info = {
                                'timestamp': timestamp,
                                'source_ip': src_ip,
                                'destination_ip': dst_ip,
                                'source_port': src_port,
                                'destination_port': dst_port,
                                'protocol': protocol if protocol else 'UNKNOWN',
                                'length': length,
                                'layers': list(layers.keys()),
                                'raw_data_contains_020048': True
                            }
                            
                            # 添加TCP/UDP特定信息
                            if tcp_info:
                                packet_info.update({
                                    'tcp_flags': tcp_info.get('tcp.flags', None),
                                    'tcp_seq': tcp_info.get('tcp.seq', None),
                                    'tcp_ack': tcp_info.get('tcp.ack', None)
                                })
                            
                            if udp_info:
                                packet_info.update({
                                    'udp_length': udp_info.get('udp.length', None)
                                })
                            
                            qq_voice_packets.append(packet_info)
                    
                    # 静默记录找到的数据包数量
                    return qq_voice_packets
                except json.JSONDecodeError:
                    # 静默失败,不输出错误日志
                    pass
            else:
                # 静默失败,不输出错误日志
                pass
                
        except subprocess.TimeoutExpired:
            # 静默失败,不输出错误日志
            pass
        except Exception as e:
            # 静默失败,不输出错误日志
            pass
        
        # 如果命令行方法失败,回退到原来的UDP+RTP端口过滤方法
        return self.filter_voice_traffic()

    def analyze_packet_statistics(self) -> Dict[str, Any]:
        """
        分析数据包统计信息
        
        Returns:
            统计信息字典
        """
        if not self.captured_packets:
            return {}
            
        stats = {
            'total_packets': len(self.captured_packets),
            'protocols_distribution': {},
            'ports_distribution': {'source': {}, 'destination': {}},
            'ip_addresses': {'source': set(), 'destination': set()},
            'traffic_volume': {'upload': 0, 'download': 0},
            'time_span': {'start': None, 'end': None}
        }
        
        timestamps = []
        
        for packet in self.captured_packets:
            # 协议分布
            protocol = packet.get('protocol', 'UNKNOWN')
            stats['protocols_distribution'][protocol] = stats['protocols_distribution'].get(protocol, 0) + 1
            
            # 端口分布
            src_port = packet.get('source_port')
            dst_port = packet.get('destination_port')
            
            if src_port:
                stats['ports_distribution']['source'][src_port] = stats['ports_distribution']['source'].get(src_port, 0) + 1
            if dst_port:
                stats['ports_distribution']['destination'][dst_port] = stats['ports_distribution']['destination'].get(dst_port, 0) + 1
            
            # IP地址统计
            src_ip = packet.get('source_ip')
            dst_ip = packet.get('destination_ip')
            
            if src_ip:
                stats['ip_addresses']['source'].add(src_ip)
            if dst_ip:
                stats['ip_addresses']['destination'].add(dst_ip)
                
            # 流量统计
            length = packet.get('length', 0)
            if length:
                stats['traffic_volume']['upload'] += length
                
            # 时间跨度
            timestamp = packet.get('timestamp')
            if timestamp is not None:
                timestamps.append(timestamp)
        
        # 计算时间跨度
        if timestamps:
            stats['time_span']['start'] = min(timestamps)
            stats['time_span']['end'] = max(timestamps)
        
        # 转换sets为lists以便JSON序列化
        stats['ip_addresses']['source'] = list(stats['ip_addresses']['source'])
        stats['ip_addresses']['destination'] = list(stats['ip_addresses']['destination'])
        
        return stats
    
    def export_to_dataframe(self) -> pd.DataFrame:
        """
        将捕获的数据包导出为pandas DataFrame
        
        Returns:
            包含数据包信息的DataFrame
        """
        if not self.captured_packets:
            return pd.DataFrame()
        
        df = pd.DataFrame(self.captured_packets)
        return df


# ==================== pcap_analyzer/location_analyzer.py ====================

class LocationAnalyzer:
    """
    位置分析器类
    """
    
    def __init__(self):
        """
        初始化位置分析器
        """
        # 在单文件版本中,假定配置文件在同一目录
        base_dir = os.path.dirname(os.path.abspath(__file__))
        config_path = os.path.join(base_dir, "config.ini")
        # print(f"当前 config_path 的路径为 {config_path}")
        self.api_key = load_api_key(config_path)
        self.locator = IPLocator()
        
    def get_ip_location(self, ip_address: str) -> Optional[Dict[str, any]]:
        """
        通过IP地址获取地理位置信息
        
        Args:
            ip_address: IP地址
            
        Returns:
            地理位置信息字典
        """
        return self.locator.get_ip_location(ip_address)

    def merge_results(self, result1: dict, result2: dict) -> dict:
        """合并两个API的结果"""
        merged = {}
        merged.update(result1)
        for key, value in result2.items():
            if key not in merged or not merged[key]:
                merged[key] = value
        return merged

    def get_ip_16location_separated(self, ip_address: str) -> Optional[Dict[str, any]]:
        """
        通过IP地址获取地理位置信息,返回结果

        Args:
            ip_address: IP地址

        Returns:
            地理位置信息字典
        """
        result = self.locator.get_ip_location(ip_address)
        OutAPIResult = get_ip_geolocation(ip_address, self.api_key)

        # 添加来源标识
        if result:
            result['source'] = 'internal'
        if OutAPIResult:
            OutAPIResult['source'] = 'external'

        if result is None and OutAPIResult:
            # 如果内部定位失败,使用外部API结果
            # 添加地图链接信息
            external_lat = OutAPIResult.get('location', {}).get('latitude')
            external_lon = OutAPIResult.get('location', {}).get('longitude')
            if external_lat and external_lon:
                try:
                    external_lat = float(external_lat)
                    external_lon = float(external_lon)
                    OutAPIResult['map_link'] = f"https://www.google.com/maps/place/{external_lat}+{external_lon}"
                except ValueError:
                    pass
            return OutAPIResult
        elif result and OutAPIResult:
            # 如果两个结果都存在,合并它们
            merged = self.merge_results(result, OutAPIResult)
            merged['source'] = 'merged'
            merged['internal_result'] = result
            merged['external_result'] = OutAPIResult
            
            # 生成内部API地图链接
            internal_map_link = None
            if result.get('lat') is not None and result.get('lon') is not None:
                internal_map_link = f"https://www.google.com/maps/place/{result['lat']}+{result['lon']}"
            
            # 生成外部API地图链接
            external_map_link = None
            external_lat = OutAPIResult.get('location', {}).get('latitude')
            external_lon = OutAPIResult.get('location', {}).get('longitude')
            if external_lat and external_lon:
                try:
                    external_lat = float(external_lat)
                    external_lon = float(external_lon)
                    external_map_link = f"https://www.google.com/maps/place/{external_lat}+{external_lon}"
                except ValueError:
                    external_lat = OutAPIResult.get('lat')
                    external_lon = OutAPIResult.get('lon')
                    if external_lat is not None and external_lon is not None:
                        try:
                            external_map_link = f"https://www.google.com/maps/place/{external_lat}+{external_lon}"
                        except:
                            pass
            
            # 生成双API对比链接
            dual_map_link = None
            if internal_map_link and external_map_link:
                dual_map_link = f"{external_map_link} / {internal_map_link}"
            elif internal_map_link:
                dual_map_link = f"{internal_map_link} / 无外部API数据"
            elif external_map_link:
                dual_map_link = f"无内部API数据 / {external_map_link}"
            
            # 将地图链接添加到合并结果中
            merged['internal_map_link'] = internal_map_link
            merged['external_map_link'] = external_map_link
            merged['dual_map_link'] = dual_map_link
            
            return merged
        elif result:
            # 只有内部结果可用
            # 添加内部地图链接
            if result.get('lat') is not None and result.get('lon') is not None:
                result['map_link'] = f"https://www.google.com/maps/place/{result['lat']}+{result['lon']}"
            return result
        else:
            # 都不可用
            return None

    def get_16multiple_ips_location(self, ip_addresses: List[str]) -> List[Dict[str, any]]:
        """
        批量获取IP地址的地理位置信息

        Args:
            ip_addresses: IP地址列表

        Returns:
            地理位置信息列表
        """
        locations = []
        for ip in ip_addresses:
            location = self.get_ip_16location_separated(ip)  # 已经同时使用内外部API
            if location:
                locations.append(location)
        return locations

    
    def analyze_connections_location(self, connections: List[Dict[str, any]]) -> Dict[str, any]:
        """
        分析连接的地理位置信息
        
        Args:
            connections: 连接信息列表
            
        Returns:
            包含地理位置分析结果的字典
        """
        all_ips = set()
        
        # 收集所有涉及的IP地址
        for conn in connections:
            if conn.get('source_ip'):
                all_ips.add(conn['source_ip'])
            if conn.get('destination_ip'):
                all_ips.add(conn['destination_ip'])
        
        # 获取所有IP的位置信息
        ip_locations = self.get_16multiple_ips_location(list(all_ips))
        
        # 统计地理位置分布
        country_stats = {}
        region_stats = {}
        
        for loc in ip_locations:
            country = loc.get('country')
            region = loc.get('region')
            
            if country:
                country_stats[country] = country_stats.get(country, 0) + 1
            if region:
                region_stats[region] = region_stats.get(region, 0) + 1
        
        return {
            'ip_locations': ip_locations,
            'country_statistics': country_stats,
            'region_statistics': region_stats,
            'total_unique_ips': len(ip_locations)
        }

    def find_qq_voice_peer_ip(self, connections: List[Dict[str, any]], local_ip_hint: str = None) -> List[Dict[str, any]]:
        """
        专门用于查找QQ语音通话的对端IP地址
        
        Args:
            connections: 连接信息列表
            local_ip_hint: 本地IP提示(可选),用于区分本地和远程IP
            
        Returns:
            对端IP地址列表及位置信息
        """
        # 首先识别可能的QQ服务器IP(通常是本地IP之外的IP)
        if local_ip_hint:
            peer_connections = [conn for conn in connections 
                                if local_ip_hint not in [conn.get('source_ip'), conn.get('destination_ip')]]
        else:
            # 如果没有本地IP提示,尝试找出流量最大的非本地IP对
            # 统计IP出现次数和流量
            ip_stats = {}
            for conn in connections:
                src_ip = conn.get('source_ip')
                dst_ip = conn.get('destination_ip')
                packets_count = conn.get('packets_count', 0)
                
                if src_ip:
                    if src_ip not in ip_stats:
                        ip_stats[src_ip] = {'count': 0, 'bytes': 0}
                    ip_stats[src_ip]['count'] += packets_count
                    ip_stats[src_ip]['bytes'] += conn.get('total_bytes', 0)
                    
                if dst_ip:
                    if dst_ip not in ip_stats:
                        ip_stats[dst_ip] = {'count': 0, 'bytes': 0}
                    ip_stats[dst_ip]['count'] += packets_count
                    ip_stats[dst_ip]['bytes'] += conn.get('total_bytes', 0)
            
            # 找出流量最大的IP(排除localhost和内网IP)
            public_ips = {}
            for ip, stats in ip_stats.items():
                if self.locator.is_public_ip(ip):
                    public_ips[ip] = stats
            
            # 选择流量最大的几个IP作为可能的对端
            sorted_ips = sorted(public_ips.items(), key=lambda x: x[1]['bytes'], reverse=True)
            top_ips = [item[0] for item in sorted_ips[:5]]  # 取前5个流量最大的公网IP
            
            peer_connections = [conn for conn in connections 
                                if (conn.get('source_ip') in top_ips or 
                                    conn.get('destination_ip') in top_ips)]
        
        # 获取对端IP的位置信息
        peer_ips = set()
        for conn in peer_connections:
            if conn.get('source_ip') != local_ip_hint:
                peer_ips.add(conn['source_ip'])
            if conn.get('destination_ip') != local_ip_hint:
                peer_ips.add(conn['destination_ip'])
        
        # 过滤掉内网IP
        peer_ips = {ip for ip in peer_ips if self.locator.is_public_ip(ip)}
        
        # 获取位置信息
        peer_locations = self.get_16multiple_ips_location(list(peer_ips))
        
        return {
            'peer_ips': list(peer_ips),
            'peer_locations': peer_locations,
            'connection_details': peer_connections
        }

    
    def find_qq_voice_peer_ip_by_hex_pattern(self, parser, local_ip_hint: str = None) -> List[Dict[str, any]]:
        """
        使用十六进制模式"020048"查找QQ语音通话的对端IP地址
        
        Args:
            parser: PacketParser实例
            local_ip_hint: 本地IP提示(可选)
            
        Returns:
            对端IP地址列表及位置信息
        """
        # 使用十六进制模式过滤QQ语音数据包
        qq_voice_packets = parser.filter_qq_voice_by_hex_pattern()
        
        # 统计包含特定模式的数据包中的IP
        ip_stats = {}
        for packet in qq_voice_packets:
            src_ip = packet.get('source_ip')
            dst_ip = packet.get('destination_ip')
            
            if src_ip:
                if src_ip not in ip_stats:
                    ip_stats[src_ip] = {'count': 0, 'bytes': 0}
                ip_stats[src_ip]['count'] += 1
                if packet.get('length'):
                    ip_stats[src_ip]['bytes'] += packet['length']
                    
            if dst_ip:
                if dst_ip not in ip_stats:
                    ip_stats[dst_ip] = {'count': 0, 'bytes': 0}
                ip_stats[dst_ip]['count'] += 1
                if packet.get('length'):
                    ip_stats[dst_ip]['bytes'] += packet['length']
        
        # 过滤出公网IP
        public_ips = {}
        for ip, stats in ip_stats.items():
            if self.locator.is_public_ip(ip) and ip != local_ip_hint:
                public_ips[ip] = stats
        
        # 选择包含最多十六进制模式数据包的IP作为对端IP
        sorted_ips = sorted(public_ips.items(), key=lambda x: x[1]['count'], reverse=True)
        top_ips = [item[0] for item in sorted_ips[:15]]  # 取前15个最频繁的IP
        
        # 获取这些IP的位置信息
        peer_locations = self.get_16multiple_ips_location(top_ips)
        
        # 构建包含双API结果和地图链接的详细报告
        detailed_report = []
        for location in peer_locations:
            ip = location.get('ip', 'Unknown')
            source = location.get('source', 'unknown')
            
            # 提取内部API的经纬度
            internal_lat = '未知'
            internal_lon = '未知'
            if 'internal_result' in location and location['internal_result']:
                internal_data = location['internal_result']
                internal_lat = internal_data.get('lat', '未知')
                internal_lon = internal_data.get('lon', '未知')
            elif location.get('source') == 'internal' or location.get('source') == 'merged':
                internal_lat = location.get('lat', '未知')
                internal_lon = location.get('lon', '未知')
            
            # 提取外部API的经纬度
            external_lat = '未知'
            external_lon = '未知'
            if 'external_result' in location and location['external_result']:
                external_data = location['external_result']
                lat = external_data.get('location', {}).get('latitude')
                lon = external_data.get('location', {}).get('longitude')
                if lat is not None and lon is not None:
                    try:
                        external_lat = float(lat)
                        external_lon = float(lon)
                    except ValueError:
                        external_lat = lat
                        external_lon = lon
                else:
                    external_lat = external_data.get('lat', '未知')
                    external_lon = external_data.get('lon', '未知')
            elif location.get('source') == 'external':
                external_data = location
                lat = external_data.get('location', {}).get('latitude')
                lon = external_data.get('location', {}).get('longitude')
                if lat is not None and lon is not None:
                    try:
                        external_lat = float(lat)
                        external_lon = float(lon)
                    except ValueError:
                        external_lat = lat
                        external_lon = lon
                else:
                    external_lat = external_data.get('lat', '未知')
                    external_lon = external_data.get('lon', '未知')
            
            report_entry = {
                'ip': ip,
                'source': source,
                'location_data': location,
                'internal_coordinates': {
                    'lat': internal_lat,
                    'lon': internal_lon
                },
                'external_coordinates': {
                    'lat': external_lat,
                    'lon': external_lon
                }
            }
            
            # 添加地图链接信息
            if 'dual_map_link' in location:
                report_entry['map_report'] = f"谷歌地图定位点: {location['dual_map_link']}"
            elif 'internal_map_link' in location and 'external_map_link' in location:
                report_entry['map_report'] = f"谷歌地图定位点: {location['internal_map_link']} / {location['external_map_link']}"
            elif 'internal_map_link' in location:
                report_entry['map_report'] = f"谷歌地图定位点: {location['internal_map_link']} / https://www.google.com/maps/place/{external_lat}+{external_lon}"
            elif 'external_map_link' in location:
                report_entry['map_report'] = f"谷歌地图定位点: https://www.google.com/maps/place/{internal_lat}+{internal_lon} / {location['external_map_link']}"
            else:
                # 如果没有预先生成的地图链接,手动创建
                report_entry['map_report'] = f"谷歌地图定位点: https://www.google.com/maps/place/{internal_lat}+{internal_lon} / https://www.google.com/maps/place/{external_lat}+{external_lon}"
            
            detailed_report.append(report_entry)
        
        return {
            'peer_ips': top_ips,
            'peer_locations': peer_locations,
            'detailed_location_reports': detailed_report,  # 包含双API结果和地图链接的详细报告
            'packet_count_by_ip': {ip: stats['count'] for ip, stats in public_ips.items()},
            'total_qq_voice_packets': len(qq_voice_packets)
        }


class VoiceTrafficAnalyzer:
    """
    语音流量分析器类
    """
    
    def __init__(self):
        """
        初始化语音流量分析器
        """
        self.location_analyzer = LocationAnalyzer()
        
    def analyze_voice_patterns(self, packets: List[Dict[str, any]]) -> Dict[str, any]:
        """
        分析语音流量模式
        
        Args:
            packets: 数据包列表
            
        Returns:
            语音流量分析结果
        """
        analysis_result = {
            'total_packets': len(packets),
            'unique_source_ips': set(),
            'unique_destination_ips': set(),
            'port_distribution': {},
            'protocol_distribution': {},
            'time_analysis': {},
            'size_analysis': {}
        }
        
        packet_sizes = []
        
        for packet in packets:
            src_ip = packet.get('source_ip')
            dst_ip = packet.get('destination_ip')
            src_port = packet.get('source_port')
            protocol = packet.get('protocol')
            size = packet.get('length')
            timestamp = packet.get('timestamp')
            
            if src_ip:
                analysis_result['unique_source_ips'].add(src_ip)
            if dst_ip:
                analysis_result['unique_destination_ips'].add(dst_ip)
                
            if src_port:
                port_str = str(src_port)
                analysis_result['port_distribution'][port_str] = \
                    analysis_result['port_distribution'].get(port_str, 0) + 1
                    
            if protocol:
                analysis_result['protocol_distribution'][protocol] = \
                    analysis_result['protocol_distribution'].get(protocol, 0) + 1
                    
            if size and isinstance(size, int):
                packet_sizes.append(size)
                
            if timestamp:
                # 时间段分析(按小时)
                hour = int(timestamp % 86400 // 3600)  # 转换为一天中的小时数
                analysis_result['time_analysis'][hour] = \
                    analysis_result['time_analysis'].get(hour, 0) + 1
        
        # 计算包大小统计
        if packet_sizes:
            analysis_result['size_analysis'] = {
                'min_size': min(packet_sizes),
                'max_size': max(packet_sizes),
                'avg_size': sum(packet_sizes) / len(packet_sizes),
                'total_bytes': sum(packet_sizes)
            }
        
        # 转换集合为列表以支持JSON序列化
        analysis_result['unique_source_ips'] = list(analysis_result['unique_source_ips'])
        analysis_result['unique_destination_ips'] = list(analysis_result['unique_destination_ips'])
        
        return analysis_result


# ==================== main.py ====================

def load_config():
    """
    加载配置文件(支持INI格式)
    """
    config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
    if os.path.exists(config_path):
        config = configparser.ConfigParser()
        config.read(config_path, encoding='utf-8')
        
        # 从配置中获取TShark路径
        if 'tshark' in config and config.has_option('tshark', 'path'):
            return {'tshark_path': config.get('tshark', 'tshark_path')}
    
    return {}


def format_dict_as_markdown_list(data_dict: dict, indent_level: int = 0) -> str:
    """
    将字典格式化为Markdown列表字符串
    
    Args:
        data_dict: 要格式化的字典
        indent_level: 缩进级别
        
    Returns:
        格式化的字符串
    """
    indent = "  " * indent_level
    if not data_dict:
        return f"{indent}* 无数据\n"
    
    result = ""
    for key, value in data_dict.items():
        result += f"{indent}- {key}: {value}\n"
    return result

def format_dict_as_markdown_list_global(data_dict, indent_level):
    """
    将字典格式化为Markdown列表字符串(全局版本,用于多进程)
    """
    return format_dict_as_markdown_list(data_dict, indent_level)

def generate_summary_report(results: dict) -> str:
    """
    生成摘要报告
    
    Args:
        results: 分析结果字典
        
    Returns:
        摘要报告字符串
    """
    metadata = results['metadata']
    location_analysis = results['location_analysis']
    peer_analysis = results.get('peer_analysis', {})
    hex_peer_analysis = results.get('hex_peer_analysis', {})
    voice_analysis = results['voice_analysis']
    
    report = f"""# QQ语音通话数据分析摘要报告

## 基本信息

- 输入文件: {metadata['input_file']}
- 总数据包数: {metadata['total_packets']:,}
- 语音数据包数: {metadata['voice_packets']:,}
- 连接对数量: {metadata['connections']:,}

## QQ语音通话对端IP分析(基于流量模式)

以下为基于流量模式识别出的QQ语音通话对端IP地址:

"""
    
    # 添加基于流量模式的对端IP详情
    peer_locations = peer_analysis.get('peer_locations', [])
    if peer_locations:
        for ip_location in peer_locations:
            report += f"- **对端IP地址**: {ip_location.get('ip', '未知')}\n"
            report += f"  - 国家: {ip_location.get('country', '未知')}\n"
            report += f"  - 区域: {ip_location.get('region', '未知')}\n"
            report += f"  - 城市: {ip_location.get('city', '未知')}\n"
            report += f"  - ISP: {ip_location.get('isp', '未知')}\n"
            report += f"  - 组织: {ip_location.get('org', '未知')}\n"
            report += f"  - 时区: {ip_location.get('timezone', '未知')}\n"
            report += "\n"
    else:
        report += "- 未找到明确的对端IP地址\n\n"
    
    report += f"""## QQ语音通话对端IP分析(基于十六进制模式"020048")

以下为基于十六进制模式"020048"识别出的QQ语音通话对端IP地址:

"""
    
    # 添加十六进制模式识别的对端IP详情
    hex_peer_locations = hex_peer_analysis.get('peer_locations', [])
    if hex_peer_locations:
        for ip_location in hex_peer_locations:
            report += f"- **对端IP地址**: {ip_location.get('ip', '未知')}\n"
            
            # 显示双API的国家信息
            internal_country = '未知'
            external_country = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_country = ip_location['internal_result'].get('country', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_data = ip_location['external_result']
                # 外部API的国家信息在location子字典中
                if 'location' in external_data and external_data['location']:
                    external_country = external_data['location'].get('country_name', '未知')
                else:
                    external_country = external_data.get('country', '未知')
            report += f"  - 国家: {internal_country} / {external_country}\n"

            # 显示双API的区域信息
            internal_region = '未知'
            external_region = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_region = ip_location['internal_result'].get('region', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_data = ip_location['external_result']
                # 外部API的区域信息在location子字典中
                if 'location' in external_data and external_data['location']:
                    external_region = external_data['location'].get('state_prov', '未知')
                else:
                    external_region = external_data.get('region', '未知')
            report += f"  - 区域: {internal_region} / {external_region}\n"

            # 显示双API的城市信息
            internal_city = '未知'
            external_city = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_city = ip_location['internal_result'].get('city', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_data = ip_location['external_result']
                # 外部API的城市信息在location子字典中
                if 'location' in external_data and external_data['location']:
                    external_city = external_data['location'].get('city', '未知')
                else:
                    external_city = external_data.get('city', '未知')
            report += f"  - 城市: {internal_city} / {external_city}\n"

            # 显示双API的ISP信息
            internal_isp = '未知'
            external_isp = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_isp = ip_location['internal_result'].get('isp', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_isp = ip_location['external_result'].get('isp', '未知')
            report += f"  - ISP: {internal_isp} / {external_isp}\n"

            # 显示双API的组织信息
            internal_org = '未知'
            external_org = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_org = ip_location['internal_result'].get('org', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_org = ip_location['external_result'].get('org', '未知')
            report += f"  - 组织: {internal_org} / {external_org}\n"

            # 显示双API的时区信息
            internal_timezone = '未知'
            external_timezone = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_timezone = ip_location['internal_result'].get('timezone', '未知')
            if 'external_result' in ip_location and ip_location['external_result']:
                external_timezone = ip_location['external_result'].get('timezone', '未知')
            report += f"  - 时区: {internal_timezone} / {external_timezone}\n"

            # 显示双API的经纬度信息
            internal_coords = '未知'
            external_coords = '未知'
            if 'internal_result' in ip_location and ip_location['internal_result']:
                internal_lat = ip_location['internal_result'].get('lat', '未知')
                internal_lon = ip_location['internal_result'].get('lon', '未知')
                if internal_lat != '未知' and internal_lon != '未知':
                    internal_coords = f"{internal_lon},{internal_lat}"
                else:
                    internal_coords = '未知'
            if 'external_result' in ip_location and ip_location['external_result']:
                external_data = ip_location['external_result']
                external_lat = '未知'
                external_lon = '未知'
                # 外部API的经纬度信息在location子字典中
                if 'location' in external_data and external_data['location']:
                    lat_val = external_data['location'].get('latitude', '未知')
                    lon_val = external_data['location'].get('longitude', '未知')
                    if lat_val != '未知' and lon_val != '未知':
                        external_coords = f"{lon_val},{lat_val}"
                    else:
                        external_coords = '未知'
                else:
                    external_coords = '未知'
            report += f"  - 坐标: {internal_coords} / {external_coords}\n"

            # 添加包数量信息
            ip_addr = ip_location.get('ip', '')
            if ip_addr in hex_peer_analysis.get('packet_count_by_ip', {}):
                pkt_count = hex_peer_analysis['packet_count_by_ip'][ip_addr]
                report += f"  - 包数量: {pkt_count}\n"

            # 根据记忆,对于十六进制模式识别的IP,输出双API地图链接
            if 'dual_map_link' in ip_location:
                report += f"  - 谷歌地图定位点: {ip_location['dual_map_link']}\n"
            elif 'internal_map_link' in ip_location and 'external_map_link' in ip_location:
                report += f"  - 谷歌地图定位点: {ip_location['internal_map_link']} / {ip_location['external_map_link']}\n"
            elif 'internal_map_link' in ip_location:
                # 尝试从外部API获取坐标
                external_lat = '未知'
                external_lon = '未知'
                if 'external_result' in ip_location and ip_location['external_result']:
                    external_data = ip_location['external_result']
                    lat = external_data.get('location', {}).get('latitude')
                    lon = external_data.get('location', {}).get('longitude')
                    if lat is not None and lon is not None:
                        try:
                            external_lat = float(lat)
                            external_lon = float(lon)
                        except ValueError:
                            external_lat = lat
                            external_lon = lon
                    else:
                        external_lat = external_data.get('lat', '未知')
                        external_lon = external_data.get('lon', '未知')
                report += f"  - 谷胶地图定位点: {ip_location['internal_map_link']} / https://www.google.com/maps/place/{external_lat}+{external_lon}\n"
            elif 'external_map_link' in ip_location:
                # 尝试从内部API获取坐标
                internal_lat = '未知'
                internal_lon = '未知'
                if 'internal_result' in ip_location and ip_location['internal_result']:
                    internal_data = ip_location['internal_result']
                    internal_lat = internal_data.get('lat', '未知')
                    internal_lon = internal_data.get('lon', '未知')
                elif ip_location.get('source') == 'internal' or ip_location.get('source') == 'merged':
                    internal_lat = ip_location.get('lat', '未知')
                    internal_lon = ip_location.get('lon', '未知')
                report += f"  - 谷歌地图定位点: https://www.google.com/maps/place/{internal_lat}+{internal_lon} / {ip_location['external_map_link']}\n"
            report += "\n"
    else:
        report += "- 未找到包含十六进制模式'020048'的对端IP地址\n\n"
    
    if 'total_qq_voice_packets' in hex_peer_analysis:
        report += f"通过十六进制模式识别到的QQ语音数据包总数: {hex_peer_analysis['total_qq_voice_packets']}\n\n"
    
    report += f"""## 全部IP地理位置分析

- 唯一IP地址数: {location_analysis['total_unique_ips']}
- 定位成功的IP数: {len(location_analysis['ip_locations'])}

### IP地址定位详情

以下为所有识别到的IP地址及其位置信息:

"""
    
    # 添加所有IP地址定位详情
    for ip_location in location_analysis['ip_locations']:
        report += f"- **IP地址**: {ip_location.get('ip', '未知')}\n"
        report += f"  - 国家: {ip_location.get('country', '未知')}\n"
        report += f"  - 区域: {ip_location.get('region', '未知')}\n"
        report += f"  - 城市: {ip_location.get('city', '未知')}\n"
        report += f"  - ISP: {ip_location.get('isp', '未知')}\n"
        report += f"  - 组织: {ip_location.get('org', '未知')}\n"
        report += f"  - 时区: {ip_location.get('timezone', '未知')}\n"
        report += "\n"
    
    report += f"""### 国家分布统计
{format_dict_as_markdown_list(location_analysis['country_statistics'], 1)}

### 区域分布统计
{format_dict_as_markdown_list(location_analysis['region_statistics'], 1)}

## 语音流量分析

- 总语音包数: {voice_analysis['total_packets']:,}
- 唯一源IP数: {len(voice_analysis['unique_source_ips'])}
- 唯一目标IP数: {len(voice_analysis['unique_destination_ips'])}

### 协议分布
{format_dict_as_markdown_list(voice_analysis['protocol_distribution'], 1)}

### 端口分布 (Top 10)
{format_dict_as_markdown_list(dict(list(voice_analysis['port_distribution'].items())[:10]), 1)}

## 包大小分析

- 最小包大小: {voice_analysis['size_analysis'].get('min_size', '未知')} bytes
- 最大包大小: {voice_analysis['size_analysis'].get('max_size', '未知')} bytes
- 平均包大小: {voice_analysis['size_analysis'].get('avg_size', '未知'):,.2f} bytes
- 总传输字节数: {voice_analysis['size_analysis'].get('total_bytes', '未知'):,}
"""
    
    return report

def generate_summary_report_global(results):
    """
    生成摘要报告(全局版本,用于多进程)
    """
    return generate_summary_report(results)

def analyze_pcap_file(pcap_file_path: str, output_dir: str = "./data/output", tshark_path: str = None):
    """
    分析PCAP文件的主要函数
    
    Args:
        pcap_file_path: PCAP文件路径
        output_dir: 输出目录
        tshark_path: TShark可执行文件路径(可选)
    """
    ColoredLogger.info(f"开始分析PCAP文件: {pcap_file_path}")
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 初始化数据包解析器
    parser = PacketParser(pcap_file_path, tshark_path=tshark_path)
    
    # 解析PCAP文件
    ColoredLogger.info("正在解析数据包...")
    packets = parser.parse_pcap()
    ColoredLogger.info(f"共解析到 {len(packets)} 个数据包")
    
    # 获取连接对信息
    ColoredLogger.info("正在分析连接对...")
    connections = parser.get_connection_pairs()
    ColoredLogger.info(f"共发现 {len(connections)} 个连接对")
    
    # 过滤语音流量
    ColoredLogger.info("正在过滤语音流量...")
    voice_packets = parser.filter_voice_traffic()
    ColoredLogger.info(f"共发现 {len(voice_packets)} 个可能的语音数据包")
    
    # 初始化位置分析器
    location_analyzer = LocationAnalyzer()
    
    # 分析连接的地理位置
    ColoredLogger.info("正在分析地理位置信息...")
    location_analysis = location_analyzer.analyze_connections_location(connections)
    ColoredLogger.info(f"共定位到 {location_analysis['total_unique_ips']} 个IP的位置")
    
    # 专门分析QQ语音通话的对端IP
    ColoredLogger.info("正在查找QQ语音通话的对端IP...")
    peer_analysis = location_analyzer.find_qq_voice_peer_ip(connections)
    ColoredLogger.info(f"找到 {len(peer_analysis['peer_ips'])} 个可能的对端IP")
    
    # 使用十六进制模式分析QQ语音通话对端IP
    ColoredLogger.info("正在使用十六进制模式查找QQ语音通话的对端IP...")
    hex_peer_analysis = location_analyzer.find_qq_voice_peer_ip_by_hex_pattern(parser)
    ColoredLogger.info(f"通过十六进制模式找到 {len(hex_peer_analysis['peer_ips'])} 个可能的对端IP")
    
    # 初始化语音流量分析器
    voice_analyzer = VoiceTrafficAnalyzer()
    
    # 分析语音流量模式
    ColoredLogger.info("正在分析语音流量模式...")
    voice_analysis = voice_analyzer.analyze_voice_patterns(voice_packets)
    
    # 生成输出文件名
    base_filename = os.path.splitext(os.path.basename(pcap_file_path))[0]
    
    # 保存分析结果
    results = {
        'metadata': {
            'input_file': pcap_file_path,
            'total_packets': len(packets),
            'voice_packets': len(voice_packets),
            'connections': len(connections),
            'analysis_time': datetime.datetime.now().isoformat()
        },
        'packets': packets[:100],  # 只保存前100个包的信息以避免文件过大
        'connections': connections,
        'voice_packets': voice_packets[:100],  # 只保存前100个语音包的信息
        'location_analysis': location_analysis,
        'peer_analysis': peer_analysis,  # 添加对端IP分析结果
        'hex_peer_analysis': hex_peer_analysis,  # 添加十六进制模式分析结果
        'voice_analysis': voice_analysis
    }
    
    # 保存完整结果
    output_file = os.path.join(output_dir, f"{base_filename}_analysis.json")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    
    # 生成摘要报告
    summary = generate_summary_report(results)
    summary_file = os.path.join(output_dir, f"{base_filename}_summary.md")
    with open(summary_file, 'w', encoding='utf-8') as f:
        f.write(summary)
    
    ColoredLogger.info(f"分析完成!结果已保存到:")
    ColoredLogger.info(f"  详细结果: {output_file}")
    ColoredLogger.info(f"  摘要报告: {summary_file}")


def analyze_pcap_file_global(pcap_file_path, output_dir="./data/output", tshark_path=None):
    """
    分析PCAP文件的主要函数(全局版本,用于多进程)
    """
    ColoredLogger.info(f"开始分析PCAP文件: {pcap_file_path}")
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 初始化数据包解析器
    parser = PacketParser(pcap_file_path, tshark_path=tshark_path)
    
    # 解析PCAP文件
    ColoredLogger.info("正在解析数据包...")
    packets = parser.parse_pcap()
    ColoredLogger.info(f"共解析到 {len(packets)} 个数据包")
    
    # 获取连接对信息
    ColoredLogger.info("正在分析连接对...")
    connections = parser.get_connection_pairs()
    ColoredLogger.info(f"共发现 {len(connections)} 个连接对")
    
    # 过滤语音流量
    ColoredLogger.info("正在过滤语音流量...")
    voice_packets = parser.filter_voice_traffic()
    ColoredLogger.info(f"共发现 {len(voice_packets)} 个可能的语音数据包")
    
    # 初始化位置分析器
    location_analyzer = LocationAnalyzer()
    
    # 分析连接的地理位置
    ColoredLogger.info("正在分析地理位置信息...")
    location_analysis = location_analyzer.analyze_connections_location(connections)
    ColoredLogger.info(f"共定位到 {location_analysis['total_unique_ips']} 个IP的位置")
    
    # 专门分析QQ语音通话的对端IP
    ColoredLogger.info("正在查找QQ语音通话的对端IP...")
    peer_analysis = location_analyzer.find_qq_voice_peer_ip(connections)
    ColoredLogger.info(f"找到 {len(peer_analysis['peer_ips'])} 个可能的对端IP")
    
    # 使用十六进制模式分析QQ语音通话对端IP
    ColoredLogger.info("正在使用十六进制模式查找QQ语音通话的对端IP...")
    hex_peer_analysis = location_analyzer.find_qq_voice_peer_ip_by_hex_pattern(parser)
    ColoredLogger.info(f"通过十六进制模式找到 {len(hex_peer_analysis['peer_ips'])} 个可能的对端IP")
    
    # 初始化语音流量分析器
    voice_analyzer = VoiceTrafficAnalyzer()
    
    # 分析语音流量模式
    ColoredLogger.info("正在分析语音流量模式...")
    voice_analysis = voice_analyzer.analyze_voice_patterns(voice_packets)
    
    # 生成输出文件名
    base_filename = os.path.splitext(os.path.basename(pcap_file_path))[0]
    
    # 保存分析结果
    results = {
        'metadata': {
            'input_file': pcap_file_path,
            'total_packets': len(packets),
            'voice_packets': len(voice_packets),
            'connections': len(connections),
            'analysis_time': datetime.datetime.now().isoformat()
        },
        'packets': packets[:100],  # 只保存前100个包的信息以避免文件过大
        'connections': connections,
        'voice_packets': voice_packets[:100],  # 只保存前100个语音包的信息
        'location_analysis': location_analysis,
        'peer_analysis': peer_analysis,  # 添加对端IP分析结果
        'hex_peer_analysis': hex_peer_analysis,  # 添加十六进制模式分析结果
        'voice_analysis': voice_analysis
    }
    
    # 保存完整结果
    output_file = os.path.join(output_dir, f"{base_filename}_analysis.json")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    
    # 生成摘要报告
    summary = generate_summary_report_global(results)
    summary_file = os.path.join(output_dir, f"{base_filename}_summary.md")
    with open(summary_file, 'w', encoding='utf-8') as f:
        f.write(summary)
    
    ColoredLogger.info(f"分析完成!结果已保存到:")
    ColoredLogger.info(f"  详细结果: {output_file}")
    ColoredLogger.info(f"  摘要报告: {summary_file}")
    
    return True

def analyze_single_file_with_progress_global(args):
    """全局函数:分析单个文件的函数,接收参数元组"""
    
    i, pcap_file, total_files, input_dir, output_dir, tshark_path = args

    # 打印开始信息
    ColoredLogger.info(f"\n[{i}/{total_files}] 正在分析文件: {pcap_file}")

    try:
        # 在新进程中执行分析
        pcap_path = os.path.join(input_dir, pcap_file)
        analyze_pcap_file_global(pcap_path, output_dir, tshark_path)

        # 打印完成信息
        ColoredLogger.info(f"[{i}/{total_files}] 完成分析: {pcap_file}")

        return (i, pcap_file, True, "")
    except Exception as e:
        # 打印错误信息
        ColoredLogger.error(f"[{i}/{total_files}] 分析文件 {pcap_file} 时出现错误: {str(e)}")

        return (i, pcap_file, False, str(e))

def analyze_all_pcaps_in_directory(input_dir: str = "./data/input", output_dir: str = "./data/output", tshark_path: str = None, num_threads: int = 1):
    """
    分析指定目录下的所有PCAP文件
    
    Args:
        input_dir: 输入目录路径,默认为"./data/input"
        output_dir: 输出目录路径
        tshark_path: TShark可执行文件路径(可选)
        num_threads: 线程数量,默认为1(单线程)
    """
    multiprocessing.set_start_method('spawn', force=True)  # 设置启动方法以避免某些平台上的问题
    
    print_lock = threading.Lock()  # 用于同步打印输出
    
    def log_print(message):
        """线程安全的打印函数"""
        with print_lock:
            ColoredLogger.info(message)
            
    ColoredLogger.info(f"开始分析目录 {input_dir} 下的所有PCAP文件...")
    ColoredLogger.info(f"使用 {num_threads} 个线程进行分析")
    
    # 确保输入目录存在
    if not os.path.exists(input_dir):
        ColoredLogger.error(f"错误: 输入目录 {input_dir} 不存在")
        return
        
    # 获取目录下所有PCAP文件
    pcap_extensions = ('.pcap', '.pcapng', '.cap', '.pcapng.gz', '.pcap.gz')
    pcap_files = [f for f in os.listdir(input_dir) 
                  if f.lower().endswith(pcap_extensions)]
    
    if not pcap_files:
        ColoredLogger.warning(f"在目录 {input_dir} 中未找到PCAP文件")
        return
        
    ColoredLogger.info(f"找到 {len(pcap_files)} 个PCAP文件")
    
    # 如果只使用单线程,保持原有行为
    if num_threads == 1:
        # 逐个分析PCAP文件
        for i, pcap_file in enumerate(pcap_files, 1):
            pcap_path = os.path.join(input_dir, pcap_file)
            ColoredLogger.info(f"\n[{i}/{len(pcap_files)}] 正在分析文件: {pcap_file}")
            
            try:
                analyze_pcap_file(pcap_path, output_dir, tshark_path=tshark_path)
                ColoredLogger.info(f"[{i}/{len(pcap_files)}] 完成分析: {pcap_file}")
            except Exception as e:
                ColoredLogger.error(f"[{i}/{len(pcap_files)}] 分析文件 {pcap_file} 时出现错误: {str(e)}")
                continue
    else:
        # 使用进程池而不是线程池来避免事件循环冲突
        
        # 创建文件索引元组列表
        file_tuples = [(i, pcap_file, len(pcap_files), input_dir, output_dir, tshark_path) 
                      for i, pcap_file in enumerate(pcap_files, 1)]
        
        # 使用进程池执行分析,避免事件循环冲突
        with ProcessPoolExecutor(max_workers=num_threads) as executor:
            # 提交所有任务
            futures = [executor.submit(analyze_single_file_with_progress_global, file_tuple) 
                      for file_tuple in file_tuples]
            
            # 等待所有任务完成
            for future in as_completed(futures):
                future.result()  # 获取结果,这也会抛出异常(如果有的话)

    ColoredLogger.info(f"\n已完成分析目录 {input_dir} 下的所有PCAP文件")


def main():
    """
    主函数
    """
    parser = argparse.ArgumentParser(description='QQ语音通话位置与数据包捕获分析工具 (单文件版)')
    parser.add_argument('pcap_file', nargs='?', help='要分析的PCAP文件路径')
    parser.add_argument('-o', '--output', default='./data/output', help='输出目录路径')
    parser.add_argument('--list-pcaps', action='store_true', help='列出当前目录下的所有PCAP文件')
    parser.add_argument('--tshark-path', help='TShark可执行文件路径(可选)')
    parser.add_argument('--batch-analyze', action='store_true', help='批量分析data/input目录下的所有PCAP文件')
    parser.add_argument('--input-dir', default='./data/input', help='批量分析的输入目录路径,默认为./data/input')
    parser.add_argument('-t', '--threads', type=int, default=1, help='指定线程数量,默认为1(仅在使用--batch-analyze时生效)')
    
    args = parser.parse_args()
    
    # 验证命令行参数
    if args.threads != 1 and not args.batch_analyze:
        ColoredLogger.error("错误: -t/--threads 选项只能与 --batch-analyze 选项一起使用")
        sys.exit(1)
    
    # 如果用户选择了批量分析选项
    if args.batch_analyze:
        # 如果没有通过命令行参数指定TShark路径,则从配置文件加载
        tshark_path = args.tshark_path
        if not tshark_path:
            config = load_config()
            tshark_path = config.get('tshark_path')
        
        analyze_all_pcaps_in_directory(args.input_dir, args.output, tshark_path=tshark_path, num_threads=args.threads)
        return
    
    # 如果用户要求列出PCAP文件
    if args.list_pcaps:
        pcap_files = [f for f in os.listdir('.') if f.lower().endswith(('.pcap', '.pcapng'))]
        if pcap_files:
            ColoredLogger.info("当前目录下的PCAP文件:")
            for f in pcap_files:
                ColoredLogger.info(f"  - {f}")
        else:
            ColoredLogger.warning("未找到PCAP文件")
        return
    
    # 如果没有提供PCAP文件,尝试自动检测
    if not args.pcap_file:
        pcap_files = [f for f in os.listdir('.') if f.lower().endswith(('.pcap', '.pcapng'))]
        if not pcap_files:
            ColoredLogger.error("错误: 当前目录下未找到任何PCAP文件")
            ColoredLogger.error("请提供一个PCAP文件作为参数或确保当前目录包含PCAP文件")
            sys.exit(1)
        
        # 使用最近修改的PCAP文件
        latest_pcap = max(pcap_files, key=lambda x: os.path.getmtime(x))
        ColoredLogger.info(f"自动选择最新的PCAP文件: {latest_pcap}")
        args.pcap_file = latest_pcap
    
    # 验证PCAP文件是否存在
    if not os.path.exists(args.pcap_file):
        ColoredLogger.error(f"错误: 文件 {args.pcap_file} 不存在")
        sys.exit(1)
    
    # 如果没有通过命令行参数指定TShark路径,则从配置文件加载
    tshark_path = args.tshark_path
    if not tshark_path:
        config = load_config()
        # print(config)
        tshark_path = config.get('tshark_path')
        # print(tshark_path)
    
    # 开始分析
    try:
        analyze_pcap_file(args.pcap_file, args.output, tshark_path=tshark_path)
    except Exception as e:
        ColoredLogger.error(f"分析过程中出现错误: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()
© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容