Pythonでハニーポットを構築する方法:セキュリティの欺ceptionの実用的なガイド
サイバーセキュリティにおいて、ハニーポットは、システムに侵入しようとする潜在的な攻撃者を引きつけて検出するように設計されたおとりシステムです。ちょうど、蜂蜜の入った壷が屋外に置かれているとハエが集まってくるのと同じです。
これらのハニーポットをシステムのセキュリティ カメラと考えてください。セキュリティ カメラが誰が建物に侵入しようとしているのか、どのように侵入しようとしているのかを理解するのに役立つのと同じように、これらのハニーポットは、誰がシステムを攻撃しようとしているのか、そして彼らがどのような手法を使用しているのかを理解するのに役立ちます。
このチュートリアルを終えると、Python でデモ ハニーポットを作成し、ハニーポットがどのように機能するかを理解できるようになります。
目次
ハニーポットの種類を理解する
-
開発環境を設定する方法
コアハニーポットの構築方法
ネットワークリスナーを実装します
ハニーポットを実行します
Honeypot Attack Simulatorを作成します
ハニーポットデータを分析する方法
セキュリティ上の考慮事項
結論
ハニーポットの種類を理解する
独自のハニーポットの設計を始める前に、そのさまざまなタイプを簡単に理解しましょう。
運用ハニーポット: これらのタイプのハニーポットは実際の運用環境に配置され、実際のセキュリティ攻撃を検出するために使用されます。通常、設計がシンプルで、保守と導入が容易で、リスクを軽減するために対話が制限されています。
-
研究ハニーポット:これらは、セキュリティ研究者によって設定されたより複雑なシステムであり、攻撃パターンを研究し、これらのパターンに関する経験的分析を実行し、マルウェアサンプルを収集し、以前に発見されていない新しい攻撃技術を理解します。彼らは、生産環境でのアプリケーションのように動作するのではなく、多くの場合、オペレーティングシステムまたはネットワーク全体をエミュレートします。
このチュートリアルでは、接続試行と攻撃者の基本的な動作を記録する中程度のインタラクション ハニーポットを構築します。
開発環境をセットアップする方法
Pythonで開発環境をセットアップすることから始めましょう。次のコマンドを実行します。
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configure logging directory
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
組み込みライブラリを使用するので、外部の依存関係をインストールする必要はありません。ログは honeypot_logs
ディレクトリに保存されます。
コアハニーポットの構築方法
基本的なハニーポットは、3つのコンポーネントで構成されます。
接続を受け入れるネットワークリスナー
アクティビティを記録するためのロギングシステム
攻撃者と対話するための基本的なエミュレーションサービス
次に、コアハニーポットクラスを初期化することから始めましょう。
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Default ports to monitor
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Send appropriate banner for the service
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Receive data from attacker
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Send fake response
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
このクラスには重要な情報がたくさん含まれているので、各関数を 1 つずつ見てみましょう。
__ init __
関数は、ログファイルのパス /ファイル名だけでなく、HoneypotをホストするIPおよびポート番号を記録します。また、ハニーポットに持っているアクティブな接続の総数の記録を維持します。
log_activity
関数は、IP、データ、および IP が接続を試みたポートに関する情報を受信します。次に、この情報を JSON 形式のログ ファイルに追加します。
handle_connection
関数は、さまざまなポートで実行されるこれらのサービスを模倣します。ハニーポットをポート 21、22、80、および 443 で実行します。これらのサービスは、それぞれ FTP、SSH、HTTP、HTTPS プロトコル用です。したがって、ハニーポットと対話しようとする攻撃者は、これらのポート上でこれらのサービスを期待する必要があります。
これらのサービスの動作を模倣するために、実際に使用するサービスバナーを使用します。この関数は、攻撃者が接続するときに最初に適切なバナーを送信し、次にデータを受信してログに記録します。 Honeypotは、偽の応答「コマンドが認識されていない」を攻撃者に送り返します。
ネットワークリスナーを実装する
次に、着信接続を処理するネットワークリスナーを実装しましょう。このために、簡単なソケットプログラミングを使用します。ソケットプログラミングの仕組みを知らない場合は、それに関連するいくつかの概念を説明するこの記事をご覧ください。
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Handle connection in separate thread
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
start_listener
関数はサーバーを起動し、指定されたポートでリッスンします。 bind_ip
は 0.0.0.0
になり、サーバーがすべてのネットワーク インターフェイスでリッスンすることを示します。
ここでは、複数の攻撃者がハニーポットとの対話を試みたり、攻撃スクリプトやツールがハニーポットをスキャンしたりする可能性があるため、新しい接続をそれぞれ別のスレッドで処理します。スレッドの仕組みがわからない場合は、Python のスレッドと同時実行について説明したこの記事を参照してください。
また、この関数をCore honeypot
クラスに必ず配置してください。
ハニーポットを実行する
次に、ハニーポットを開始する main
関数を作成しましょう。
def main():
honeypot = Honeypot()
# Start listeners for each port in separate threads
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
この関数は、 honeypot
クラスをインスタンス化し、定義された各ポート(21,22,80,443)のリスナーを別のスレッドとして起動します。これで、実際のプログラムを無限のループに入れて、実際のプログラムを生かしているメインスレッドを保持します。これをすべて一緒にスクリプトに入れて実行します。
ハニーポット攻撃シミュレーターを作成する
次に、いくつかの攻撃シナリオをシミュレートし、Honeypotをターゲットにして、JSONログファイルにデータを収集できるようにしましょう。
このシミュレーターは、ハニーポットに関するいくつかの重要な側面をデモンストレーションするのに役立ちます。
現実的な攻撃パターン:シミュレーターは、ポートスキャン、ブルートフォースの試み、サービス固有のエクスプロイトなどの一般的な攻撃パターンをシミュレートします。
可変強度:シミュレータは、シミュレーションの強度を調整して、ハニーポットがさまざまな負荷を処理する方法をテストします。
いくつかの攻撃タイプ:実際の攻撃者が試みるかもしれないさまざまなタイプの攻撃を実証し、あなたのハニーポットがそれぞれにどのように反応するかを理解するのに役立ちます。
同時接続:シミュレーターはスレッドを使用して、ハニーポットが複数の同時接続を処理する方法をテストします。
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuration for the simulator
self.target_ip = target_ip
self.intensity = intensity
# Common ports that attackers often probe
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Dictionary of common commands used by attackers for different services
self.attack_patterns = {
21: [ # FTP commands
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # SSH attempts
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # HTTP requests
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Intensity settings affect the frequency and volume of simulated attacks
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Create a new socket connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Get banner if any
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Send attack patterns based on the port
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Wait for response
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Add realistic delay between commands
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mix of different attack patterns
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Randomly choose and execute an attack pattern
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
このシミュレーション スクリプトでは多くの処理が行われているので、1 つずつ詳しく見ていきましょう。また、コード内でもう少し読みやすくするために、すべての関数と操作にコメントを追加しました。
まず、HoneypotSimulator
というユーティリティ クラスを用意します。このクラスには、シミュレータの基本構成をセットアップする __init__
関数があります。これには、ターゲット IP アドレス (デフォルトは localhost) と強度レベル (デフォルトは「中」) の 2 つのパラメータが必要です。
また、3 つの重要なコンポーネントも定義します。プローブするターゲット ポート (FTP、SSH、HTTP などの一般的なサービス)、各サービスに固有の攻撃パターン (ログイン試行やコマンドなど)、スレッドを介したシミュレーションの攻撃性を制御する強度設定です。カウントとタイミングの遅延。
simulate_connection
関数は、個々の接続試行を特定のポートに処理します。ソケット接続を作成し、任意のサービスバナー(SSHバージョン情報など)を取得しようとし、サービスタイプに基づいて適切な攻撃コマンドを送信します。一般的なネットワークの問題にエラー処理を追加し、コマンド間に現実的な遅延を追加して、人間の相互作用を模倣しました。
simulate_port_scan
関数は偵察ツールのように機能し、ターゲット リスト内の各ポートを系統的にチェックします。これは、nmap
などのツールの動作に似ており、ポートを 1 つずつ調べて利用可能なサービスを確認します。ポートごとに simulate_connection
関数を呼び出し、小さなランダムな遅延を追加して、スキャン パターンをより自然に見せます。
simulate_brute_force
関数は、一般的なユーザー名とパスワードのリストを管理し、FTP や SSH などのサービスに対してさまざまな組み合わせを試みます。試行のたびに、新しい接続が作成され、そのサービスの正しい形式でログイン資格情報が送信され、接続が閉じられます。これは、ハニーポットがクレデンシャル スタッフィング攻撃をどの程度検出して記録するかをテストするのに役立ちます。
run_continuous_simulation
関数は、指定された期間実行され、ポート スキャン、ブルート フォース、特定のサービス攻撃などのさまざまな攻撃タイプからランダムに選択されます。 Python の ThreadPoolExecutor
を使用して、指定された強度レベルに基づいて複数の攻撃を同時に実行します。
最後に、シミュレータのコマンドラインインターフェイスを提供する main
関数があります。 honeypotsimulator
クラスのインスタンスを作成し、ユーザーの中断やエラーの適切な処理など、全体的な実行を管理します。
シミュレーターコードを別のスクリプトに配置した後、次のコマンドで実行します。
# Run with default settings (medium intensity, localhost, 5 minutes)
python honeypot_simulator.py
# Run with custom settings
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Honeypotと同じマシンのシミュレーターをローカルで実行しているため、ターゲットは localhost
になります。ただし、実際のシナリオでは、またはVMまたは別のマシンでハニーポットを実行している場合は別のものになる可能性があります。シミュレーターを実行する前にIPを確認してください。
ハニーポットデータを分析する方法
ハニーポットによって収集されたすべてのデータを分析できるヘルパー関数を簡単に作成してみましょう。これは JSON ログ ファイルに保存されているため、組み込みの JSON パッケージを使用して簡単に解析できます。
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Track session patterns
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Initialize IP tracking if new
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Update IP statistics
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Track hourly patterns
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analyze port targeting patterns
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Track payload patterns
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Track attack timeline
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Analysis Report Generation
print("\n=== Honeypot Analysis Report ===")
# 1. IP-based Analysis
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Port Analysis
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Temporal Analysis
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Attack Sophistication Analysis
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Port diversity
len(stats['unique_payloads']) * 0.6 # Payload diversity
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Common Payload Patterns
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncate long payloads
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
これを別のスクリプト ファイルに配置し、JSON ログで関数を呼び出すことができます。この機能は、収集されたデータに基づいて JSON ファイルから包括的な洞察を提供します。
私たちの分析は、データをIPベースの統計、ポートターゲティングパターン、1時間ごとの攻撃分布、ペイロード特性などのいくつかのカテゴリにグループ化することから始まります。すべてのIPについて、合計試行、最初と最後の時間、ターゲットポート、一意のペイロードを追跡しています。これは、攻撃者向けのユニークなプロファイルを構築するのに役立ちます。
また、ここでは、最も頻繁にターゲットを絞ったポートを監視するポートベースの攻撃パターン、およびユニークな攻撃者の数を調べます。また、ターゲットを絞った攻撃者を識別するのに役立つ攻撃の洗練分析も実行します。この分析は、簡単なスキャンアクティビティと洗練された攻撃を分離するために使用されます。
時間分析は、時間ごとの攻撃試行のパターンを特定するのに役立ち、攻撃のタイミングと潜在的な自動ターゲットキャンペーンのパターンを明らかにします。最後に、一般的に見られる攻撃文字列またはコマンドを特定するために、一般的に見られるペイロードを公開します。
セキュリティ上の考慮事項
このハニーポットを展開する際には、次のセキュリティ対策を必ず考慮してください。
ハニーポットを隔離された環境で実行します。通常は、VM 内、または NAT とファイアウォールの背後にあるローカル マシン上にあります。
妥協した場合にリスクを減らすために、最小限のシステム特権(通常はルートではない)でハニーポットを実行します。
マルウェアや機密情報が含まれている可能性があるため、生産グレードまたは研究ハニーポットとして展開する予定がある場合は、収集されたデータに注意してください。
堅牢な監視メカニズムを実装して、ハニーポット環境から抜け出そうとする試みを検出します。
結論
これにより、ハニーポットを構築し、ハニーポットへの攻撃をシミュレートするシミュレーターを作成し、ハニーポット ログのデータを分析していくつかの簡単な推論を行いました。これは、攻撃的セキュリティ概念と防御的セキュリティ概念の両方を理解するための優れた方法です。これを基にして、より複雑な検出システムを作成し、次のような機能を追加することを検討できます。
攻撃動作に基づく動的サービスエミュレーション
これらの収集されたハニーポットログのより良い推論分析を実行する脅威インテリジェンスシステムとの統合
高度なロギングメカニズムを介して、IP、ポート、ネットワークのデータを超えて包括的なログを収集します
機械学習機能を追加して、攻撃パターンを検出します
ハニーポットは強力なセキュリティツールであるにもかかわらず、唯一の防衛線ではなく、包括的な防御セキュリティ戦略の一部であるべきであることを忘れないでください。
ハニーポットがどのように機能するのか、その目的は何なのか、そして Python プログラミングについても少しは学んでいただけたでしょうか。