ELK(elasticsearch、kibana、logstash)に基づく攻撃の視覚化

良い一日。 多数のサーバーにサービスを提供するプロセスでは、マシンの動物園全体の管理を一元化する必要があります。また、ログ、ログ分析の一元化されたコレクションは、異常、エラー、一般統計を特定する必要があります。

Rsyslogは集中ログコレクションとして使用され、elasticsearch + kibanaはログの構造化と視覚化に使用されます。 すべては問題ありませんが、接続されているマシンの数が増えると、データの処理と分析に時間がかかる(時間がかかる)データが非常に多くなります。 他の興味深いことに加えて、私は常に自分のセキュリティセンターを組織したいと考えていました。 マップ、グラフなどを備えた一種のマルチモニター統計。

この記事では、SSSに対する攻撃の統計モニターを作成した経験を説明します。 プログラマや他の人は非標準のポートに接続したり、証明書を使用したりできない可能性があるため、この点で保護については考慮しません。

kibanaで既にElasticを展開しているため、システムをそのベースに基づいて構築します。

そのため、 dockerdocker-composeをすでにインストールしているので、その上でサービスを上げます。

弾性:

elasticsearch: build: elasticsearch:2.3.4 container_name: elastic command: elasticsearch -Des.network.host=0.0.0.0 net: host ports: - "9200:9200" - "9300:9300" volumes: - "/srv/docker/elastic/etc:/usr/share/elasticsearch/config" - "/srv/docker/elastic/db:/usr/share/elasticsearch/data" - "/etc/localtime:/etc/localtime:ro" restart: always environment: - ES_HEAP_SIZE=2g 

/srv/docker/elastic/elasticsearch.yml:

 cluster.name: Prod node.name: "central-syslog" http.port: 9200 network.host: _non_loopback_ discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: [ ] transport.publish_host: 0.0.0.0 #transport.publish_port: 9300 http.cors.enabled : true http.cors.allow-origin : "*" http.cors.allow-methods : OPTIONS, HEAD, GET, POST, PUT, DELETE http.cors.allow-headers : X-Requested-With,X-Auth-Token,Content-Type, Content-Length script.engine.groovy.inline.aggs: on 

/srv/docker/elastic/logging.yml:

 logger: action: DEBUG com.amazonaws: WARN appender: console: type: console layout: type: consolePattern conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n" 

きばな:

 kibana: image: kibana restart: always container_name: kibana environment: SERVICE_NAME: 'kibana' ELASTICSEARCH_URL: "http://xxxx:9200" ports: - "4009:5601" volumes: - "/etc/localtime:/etc/localtime:ro" 

logstash:

 logstash: image: logstash:latest restart: always container_name: logstash hostname: logstash ports: - "1025:1025" - "1026:1026" volumes: - "/srv/docker/logstash/logstash.conf:/etc/logstash.conf:ro" - "/srv/docker/logstash/ssh-map.json:/etc/ssh-map.json:ro" command: "logstash -f /etc/logstash.conf" 

だから。 エラスティックとキバナを起動しましたが、外部サーバーからのログを処理するためにlogsteshを準備するために残っています。 私はこのスキームを実装しました:

rsyslog→logstash→エラスティック→kibana。

rsyslogに組み込まれているコネクタをエラスティックに直接使用することは可能ですが、統計情報のためにフィールドとgeoipのデータが必要です。

監視に接続されたサーバーで、次のエントリをrsyslog構成(通常は/etc/rsyslog.d/50-default.conf)に追加します。

 auth,authpriv.* @@xxxx:1026 

このエントリを使用して、すべての承認イベントをリモートサーバー(logstash)に送信します。

次に、logsteshが受信したログを処理および処理する必要があります。 これを行うには、出力(/srv/docker/logstash/ssh-map.json)で作業しやすいようにフィールドのマッピングを作成します。

 { "template": "logstash-*", "mappings": { "ssh": { "properties": { "@timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "@version": { "type": "string" }, "username": { "type": "string" }, "src_ip": { "type": "string" }, "port": { "type": "long" }, } } } } 

マッピングの作成中に、logstashで1つのバグが発生しました。つまり、フィールドに値geo_pointを割り当てる(インデックスを作成するときにgeoip.location-floatの値が設定されます)。 このバグは登録されており、回避策としてlogstash- *標準インデックステンプレートを使用する必要がありました。

したがって、マッピングがあります。 ここで、着信データをフィルタリングし、必要な形式(/srv/docker/logstash/logstash.conf)でエラスティックに送信するように、logstash構成を準備する必要があります。

 input { tcp { port => 1026 type => "security" } } filter { grok { match => ["message", "Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_brute_force_attack" } grok { match => ["message", "Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_sucessful_login" } geoip { source => "src_ip" } } output { if "ssh_brute_force_attack" in [tags] { elasticsearch { hosts => ["xxxx:9200"] index => "logstash-%{+YYYY.MM.dd}" manage_template => true template_name => "ssh" template => "/etc/ssh-map.json" template_overwrite => true } } } 

設定は読みやすく、理解できるので、コメントする必要はないと思います。

だから。 Sssログは、logsteshに入り、処理して、エラスティックインデックスに送信します。 視覚化を構成するためにのみ残ります。

-xxxxでWebインターフェースを開きます:4009 /
-[設定]に移動し、インデックスを使用して作業を追加します(logstash- *)

次に、検索クエリ、視覚化、ダッシュボードをkibanaで作成する必要があります。

[検出]タブで、キバナにインデックスを追加した後、レコードが表示されます。すべてを正しく設定しました。

左側の列には、フィルタリング用のフィールドのリストがあり、それらを操作します。

最初のフィルターは、攻撃されたサーバーのリストです。

-ホストフィールドの近くで、追加をクリックします
-検索をssh-brute-servers-under-attackとして保存します(名前は可変です)

2番目のフィルターは、攻撃国のリストです。

-geoip.country_nameフィールドの近くにある[追加]をクリックします
-ssh-brute-countriesとして保存(名前は可変)

3番目のフィルターは、攻撃の総数です。

-[検出]タブに移動します
-ssh-brute-allとして保存

したがって、最終画面では、4つの異なるパラメーターを表示します。

1.攻撃の総数
2.攻撃国
3.攻撃されたサーバー
4.攻撃ホストへのポインターを使用したマップ

攻撃の総数:

-[視覚化]タブに移動します
-視覚化メトリックのタイプを選択します
-保存済み検索から-ssh-brute-all
-メトリックを開き、フィールドの値をに変更します-攻撃の総数
-ビジュアライゼーションを保存する

攻撃国:

-[視覚化]タブに移動します
-視覚化データテーブルのタイプを選択します
-保存された検索から-ssh-brute-countries
-メトリックを開き、フィールドの値を-に変更します
-ここで、フィールドを相互に関連付けて、テーブル「ユニーク」でカウントする必要があります。 行の分割をクリックします
-集約-用語
-フィールド-geoip.country_name.raw
-カスタムラベル-国

すべてを正しく入力すると、緑色の再生ボタンが点灯します。クリックすると、次のようなものが表示されます。

画像

-ビジュアライゼーションを保存する

攻撃されたサーバー:
-[視覚化]タブに移動します
-視覚化データテーブルのタイプを選択します
-保存された検索から-ssh-brute-servers-under-attack
-メトリックを開き、フィールドの値を-に変更します
-ここで、フィールドを相互に関連付けて、テーブル「ユニーク」でカウントする必要があります。 行の分割をクリックします
-集約-用語
-フィールド-host.raw
-カスタムラベル-サーバー

すべてを正しく入力すると、緑色の再生ボタンが点灯します。クリックすると、次のようなものが表示されます。

画像

-ビジュアライゼーションを保存する

攻撃ホストへのポインターを使用したマップ(最も興味深い)

-[視覚化]タブに移動します
-視覚化タイルマップのタイプを選択します
-新しい検索から-インデックスパターンを選択-logstash- *
-地理座標を開きます。 すべてのステップが正しければ、Fieldフィールドは自動的にgeoip.locationを埋めます
-オプションに移動
-カードのホスティングを変更します(MapRequestが条件を変更したため、トークンを取得して他の操作を行う必要があるため)。 Dawを入れる-WMS準拠のマップサーバー
-すべてのフィールドをパラメーターに追加します。

WMS Url-basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer
WMSレイヤー*-0
WMSバージョン*-1.3.0
WMS形式*-画像/ PNG
WMS属性-USGSが提供するマップ
WMSスタイル*-空

その結果、攻撃マップが表示されるはずです。

画像

-ビジュアライゼーションを保存する

これで、ダッシュボードを作成するためのすべてが揃いました。

[ダッシュボード]タブに移動し、[視覚エフェクトの追加](右上の円内)をクリックして、保存した視覚エフェクトを画面に追加し、画面を保存します。 Drag'n'Dropでサポートされています。 その結果、次のような画面が表示されました。

画像

これで、新しいホストをシステムに接続するには、承認ログのlogstashサーバーへの転送を示すだけで十分です。イベントが発生すると、ホストと情報が画面とエラスティックに追加されます。

必要に応じて、いつでもブルートフォースコントロールパネルのログを追加し、より詳細な統計を収集して、この記事と同様に画面に追加できます。

Source: https://habr.com/ru/post/J324760/


All Articles