PythonとRiremlitを使用してリアルタイムネットワークトラフィックダッシュボードを構築する方法
ネットワークトラフィックをリアルタイムで視覚化したいと思ったことはありませんか?このチュートリアルでは、Pythonおよび retrylit
を使用してインタラクティブネットワークトラフィック分析ダッシュボードを構築する方法を学習します。 riremlit
は、データ分析とデータ処理のためのWebアプリケーションの開発に使用できるオープンソースPythonフレームワークです。
このチュートリアルを終えると、コンピュータの NIC (ネットワーク インターフェイス カード) から生のネットワーク パケットをキャプチャし、データを処理し、リアルタイムで更新される美しい視覚エフェクトを作成する方法がわかるようになります。
目次
ネットワークトラフィック分析が重要なのはなぜですか?
前提条件
-
プロジェクトのセットアップ方法
コア機能を構築する方法
Streamlit ビジュアライゼーションの作成方法
ネットワークパケットをキャプチャする方法
すべてをまとめる
将来の機能強化
結論
ネットワークトラフィック分析が重要なのはなぜですか?
ネットワークがほぼすべてのアプリケーションとサービスのバックボーンを形成している企業では、ネットワーク トラフィック分析は重要な要件です。その中心となるのは、ネットワークの監視、すべてのトラフィック (受信および送信) のキャプチャ、およびネットワークを流れるこれらのパケットの解釈を含むネットワーク パケットの分析です。この手法を使用すると、セキュリティ パターンを特定し、異常を検出し、ネットワークのセキュリティと効率を確保できます。
このチュートリアルで取り組むこの概念実証プロジェクトは、ネットワーク アクティビティをリアルタイムで視覚化して分析するのに役立つため、特に役立ちます。これにより、エンタープライズ システムで問題のトラブルシューティング、パフォーマンスの最適化、セキュリティ分析がどのように行われるかを理解できるようになります。
前提条件
Python 3.8 以降のバージョンがシステムにインストールされていること。
-
コンピューター ネットワークの概念についての基本的な理解。
Python プログラミング言語とその広く使用されているライブラリに関する知識。
データの視覚化技術とライブラリに関する基本的な知識。
プロジェクトのセットアップ方法
開始するには、プロジェクト構造を作成し、次のコマンドを使用してPIPで必要なツールをインストールします。
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
ダッシュボードの視覚化には riremlit
、データ処理には pandas
、ネットワークパケットのキャプチャとパケット処理には scapy
、最後に<コードを使用します。 >収集されたデータでチャートをプロットするためのプロット。
コア機能の構築方法
すべてのコードを dashboard.py
という名前の単一のファイルに置きます。まず、使用するすべての要素をインポートすることから始めましょう。
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
次に、基本的なロギング構成を設定して、ロギングを構成しましょう。これは、イベントを追跡し、アプリケーションをデバッグモードで実行するために使用されます。現在、ロギングレベルを info
に設定しています。つまり、レベル情報
以上のイベントが表示されます。 Pythonでのロギングに慣れていない場合は、詳細になるこのドキュメントピースをチェックすることをお勧めします。
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
次に、パケットプロセッサを構築します。このクラスでキャプチャされたパケットを処理する機能を実装します。
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Add TCP-specific information
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Add UDP-specific information
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Keep only last 10000 packets to prevent memory issues
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
このクラスは、コア機能を構築し、パケットの処理に使用されるいくつかのユーティリティ機能を備えています。
ネットワーク パケットは、トランスポート レベルで 2 つに分類され (TCP と UDP)、ネットワーク レベルでは ICMP プロトコルに分類されます。 TCP/IP の概念に詳しくない場合は、freeCodeCamp News のこの記事をチェックすることをお勧めします。
私たちのコンストラクターは、定義したこれらの TCP/IP プロトコル タイプ バケットに分類されるすべてのパケットを追跡します。パケット キャプチャ時間、キャプチャされたデータ、キャプチャされたパケット数にも注意します。
また、スレッドロックを活用して、1つのパケットのみが1つの時間で処理されるようにします。これをさらに拡張して、プロジェクトが並列パケット処理を可能にすることができます。
get_protocol_name
ヘルパー関数は、プロトコル番号に基づいてプロトコルの正しいタイプを取得するのに役立ちます。この背景について説明すると、Internet Assigned Numbers Authority (IANA) は、ネットワーク パケット内のさまざまなプロトコルを識別するために標準化された番号を割り当てています。解析されたネットワーク パケットでこれらの数値が表示されると、現在傍受されているパケットでどのような種類のプロトコルが使用されているかがわかります。このプロジェクトの範囲では、TCP、UDP、および ICMP (Ping) のみにマッピングします。他のタイプのパケットが発生した場合は、それを OTHER(
として分類します。
process_packet
関数は、これらの個々のパケットを処理するコア機能を処理します。パケットに IP 層が含まれている場合、送信元および宛先の IP アドレス、プロトコル タイプ、パケット サイズ、およびパケット キャプチャの開始からの経過時間が記録されます。
特定のトランスポート層プロトコル (TCP や UDP など) を持つパケットの場合、TCP パケットの TCP フラグとともに送信元ポートと宛先ポートをキャプチャします。これらの抽出された詳細は、メモリ内の packet_data
リストに保存されます。また、これらのパケットが処理されたときの packet_count
も追跡します。
get_dataframe
関数は、packet_data
リストを、視覚化に使用される Pandas
データフレームに変換するのに役立ちます。
照明の視覚化を作成する方法
今度は、インタラクティブな流れのダッシュボードを構築する時が来ました。 dashboard.py
スクリプト(パケット処理クラスの外)で create_visualization
という関数を定義します。
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Protocol distribution
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Packets timeline
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Top source IPs
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
この関数は、データフレームを入力として取得し、3つのチャート /グラフをプロットするのに役立ちます。
プロトコル分布グラフ: このグラフには、キャプチャされたパケット トラフィックにおけるさまざまなプロトコル (TCP、UDP、ICMP など) の割合が表示されます。
パケットタイムラインチャート:このチャートには、期間にわたって1秒あたりの処理されたパケットの数が表示されます。
上位送信元 IP アドレス グラフ: このグラフでは、キャプチャされたトラフィックで最も多くのパケットを送信した上位 10 個の IP アドレスが強調表示されます。
プロトコル分布図は、3 つの異なるタイプ (およびその他) のプロトコル数を円グラフで表したものです。これらのグラフをプロットするには、Streamlit
および Plotly
Python ツールを使用します。パケット キャプチャが開始されてからのタイムスタンプも記録したので、このデータを使用して、キャプチャされたパケットの傾向を時間の経過とともにプロットします。
2 番目のグラフでは、データに対して groupby
操作を実行し、1 秒ごとにキャプチャされたパケット数を取得します (S
は秒を表します)。グラフをプロットします。
最後に、3 番目のグラフでは、観察された個別の送信元 IP をカウントし、上位 10 個の IP を示す IP 数のグラフをプロットします。
ネットワークパケットをキャプチャする方法
次に、機能を構築して、ネットワークパケットデータをキャプチャできるようにしましょう。
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
これは、 packetprocessor
クラスをインスタンス化し、 sniff
関数を scapy
モジュールで使用してパケットのキャプチャを開始する単純な関数です。
ここでは、メインのプログラムフローから独立してパケットをキャプチャできるようにするために、スレッドを使用します。これにより、パケットキャプチャ操作がリアルタイムでダッシュボードを更新するなど、他の操作をブロックしないようにします。また、作成された packetprocessor
インスタンスを返して、メインプログラムで使用できるようにします。
すべてをまとめる
次に、これらすべてのピースを、プログラムのドライバー関数として機能する main
関数と一緒にステッチしましょう。
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Initialize packet processor in session state
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Create dashboard layout
col1, col2 = st.columns(2)
# Get current data
df = st.session_state.processor.get_dataframe()
# Display metrics
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Display visualizations
create_visualizations(df)
# Display recent packets
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Add refresh button
if st.button('Refresh Data'):
st.rerun()
# Auto refresh
time.sleep(2)
st.rerun()
この関数は、Streamlit
ダッシュボードもインスタンス化し、すべてのコンポーネントを統合します。まず Streamlit
ダッシュボードのページ タイトルを設定し、次に PacketProcessor
を初期化します。 Streamlit
のセッション状態を使用して、パケット キャプチャのインスタンスが 1 つだけ作成され、その状態が保持されるようにします。
ここで、データが処理されるたびにセッション状態からデータフレームを動的に取得し、メトリクスと視覚エフェクトの表示を開始します。また、最近キャプチャされたパケットと、タイムスタンプ、送信元および宛先 IP、プロトコル、パケットのサイズなどの情報も表示されます。また、ユーザーがダッシュボードからデータを手動で更新できる機能も追加され、同時に 2 秒ごとにデータが自動的に更新されます。
最後に、次のコマンドを使用してプログラムを実行しましょう。
sudo streamlit run dashboard.py
パケットキャプチャ機能には管理特権が必要であるため、 sudo
でプログラムを実行する必要があることに注意してください。 Windowsにいる場合は、ターミナルを管理者として開き、 sudo
プレフィックスなしでプログラムを実行します。
プログラムがパケットのキャプチャを開始するまでしばらくお待ちください。すべてがうまくいけば、次のようなものが表示されるはずです。
これらはすべて、Streamlit
ダッシュボード プログラムに実装したばかりの視覚エフェクトです。
将来の強化
それに伴い、ダッシュボードの機能を拡張するために使用できる将来の拡張アイデアを次に示します。
異常検出のための機械学習機能を追加する
地理的IPマッピングを実装します
トラフィック分析パターンに基づいてカスタム アラートを作成する
パケットペイロード分析オプションを追加する
結論
おめでとう!これで、Python と Streamlit
を使用してリアルタイム ネットワーク トラフィック分析ダッシュボードが正常に構築されました。このプログラムは、ネットワークの動作に関する貴重な洞察を提供し、セキュリティ監視からネットワークの最適化まで、さまざまなユースケースに拡張できます。
これで、ネットワーク トラフィック分析の基本と、Python プログラミングについて少しは学習できたと思います。読んでいただきありがとうございます!