Introduction
I’m running a dedicated server to host some internet services. The server runs Debian. I’ve installed logstash on it to do a bit of monitoring of my system logs and suricata.
I’ve build a set of dashboards. The screenshot below shows a part of the one being dedicated to suricata:
Setup
My data sources were the following:
- System logs
- Apache logs
- Suricata full JSON logs (should be available in suricata 2.0)
System logs
The setup was mostly really easy. I’ve just added a grok pattern to detect successful and unsuccessful connections on the ssh server.
input { file { type => "linux-syslog" path => [ "/var/log/daemon.log", "/var/log/auth.log", "/var/log/mail.info" ] } filter { if [type] == "linux-syslog" { grok { match => { "message" => "Accepted %{WORD:auth_method} for %{USER:username} from %{IP:src_ip} port %{INT:src_port} ssh2" } } grok { match => { "message" => "Invalid user %{USER:username} from %{IP:src_ip}" } } } }
Apache logs
For apache, it was even easier for access.log:
file { path => [ "/var/log/apache2/*access.log" ] type => "apache-access" } file { type => "apache-error" path => "/var/log/apache2/error.log" } } filter { if [type] == "apache-access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } if [type] == "apache-error" { grok { match => { "message" => "%{APACHEERRORLOG}" } patterns_dir => ["/var/lib/logstash/etc/grok"] } } }
For error log, I’ve created a grok pattern to get client IP. So I’ve created a file in grok dir with:
HTTPERRORDATE %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR} APACHEERRORLOG \[%{HTTPERRORDATE:timestamp}\] \[%{WORD:severity}\] \[client %{IPORHOST:clientip}\] %{GREEDYDATA:message_remainder}
Netfilter logs
For Netfilter logs, I’ve decided to play it the old way and to parse kernel log instead of using ulogd:
input { file { type => "kern-log" path => "/var/log/kern.log" } } filter { if [type] == "kern-log" { grok { match => { "message" => "%{IPTABLES}"} patterns_dir => ["/var/lib/logstash/etc/grok"] } } }
with IPTABLES being defined in a file placed in the grok directory and containing:
NETFILTERMAC %{COMMONMAC:dst_mac}:%{COMMONMAC:src_mac}:%{ETHTYPE:ethtype} ETHTYPE (?:(?:[A-Fa-f0-9]{2}):(?:[A-Fa-f0-9]{2})) IPTABLES1 (?:IN=%{WORD:in_device} OUT=(%{WORD:out_device})? MAC=%{NETFILTERMAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip}.*(TTL=%{INT:ttl})?.*PROTO=%{WORD:proto}?.*SPT=%{INT:src_port}?.*DPT=%{INT:dst_port}?.*) IPTABLES2 (?:IN=%{WORD:in_device} OUT=(%{WORD:out_device})? MAC=%{NETFILTERMAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip}.*(TTL=%{INT:ttl})?.*PROTO=%{INT:proto}?.*) IPTABLES (?:%{IPTABLES1}|%{IPTABLES2})
Exim logs
This part was complicated because exim logs are multiline. So I found a page explaining how to match at least, the logs for delivered mail.
It is using multiline in filter.
Then I added a series of matches to get more information. Each match do only get a part of a message so I’ve used break_on_match not to exit
when one of the match succeed.
input { file { type => "exim-log" path => "/var/log/exim4/mainlog" } } filter { if [type] == "exim-log" { multiline { pattern => "%{DATE} %{TIME} %{HOSTNAME:msgid} (=>|Completed)" what => "previous" } grok { break_on_match => false match => [ "message", "<= %{NOTSPACE:from} H=%{NOTSPACE:server} \[%{IP:src_ip}\]" ] } grok { break_on_match => false match => [ "message", "=> %{USERNAME:username} <%{NOTSPACE:dest}> R=%{WORD:transport}" ] } grok { break_on_match => false match => [ "message", "=> %{NOTSPACE:dest} R=%{WORD:transport}" ] } grok { break_on_match => false match => [ "message", "%{DATE} %{TIME} H=%{NOTSPACE:server}%{GREEDYDATA} \[%{IP:src_ip}\] F=<%{NOTSPACE:mail_to}> temporarily rejected RCPT <%{NOTSPACE:dest}>: greylisted" ] } } }
Suricata
Suricata full JSON output is JSON so the configuration in logstash is trivial:
input { file { path => ["/var/log/suricata/eve.json" ] codec => json } }
You can download a sample Suricata Dashboard to use in in your logstash installation.
The full configuration
Below is the full configuration. There is only one thing which I did not mention. For most source IP, I use geoip to have an idea of the localisation of the IP.
input { file { type => "linux-syslog" path => [ "/var/log/daemon.log", "/var/log/auth.log", "/var/log/mail.info" ] } file { path => [ "/var/log/apache2/*access.log" ] type => "apache-access" } file { type => "apache-error" path => "/var/log/apache2/error.log" } file { type => "exim-log" path => "/var/log/exim4/mainlog" } file { type => "kern-log" path => "/var/log/kern.log" } file { path => ["/var/log/suricata/eve.json" ] codec => json } } filter { if [type] == "apache-access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } if [type] == "linux-syslog" { grok { match => { "message" => "Accepted %{WORD:auth_method} for %{USER:username} from %{IP:src_ip} port %{INT:src_port} ssh2" } } } if [type] == "apache-error" { grok { match => { "message" => "%{APACHEERRORLOG}" } patterns_dir => ["/var/lib/logstash/etc/grok"] } } if [type] == "exim-log" { multiline { pattern => "%{DATE} %{TIME} %{HOSTNAME:msgid} (=>|Completed)" what => "previous" } grok { break_on_match => false match => [ "message", "<= %{NOTSPACE:from} H=%{NOTSPACE:server} \[%{IP:src_ip}\]" ] } grok { break_on_match => false match => [ "message", "=> %{USERNAME:username} <%{NOTSPACE:dest}> R=%{WORD:transport}" ] } grok { break_on_match => false match => [ "message", "=> %{NOTSPACE:dest} R=%{WORD:transport}" ] } grok { break_on_match => false match => [ "message", "%{DATE} %{TIME} H=%{NOTSPACE:server}%{GREEDYDATA} \[%{IP:src_ip}\] F=<%{NOTSPACE:mail_to}> temporarily rejected RCPT <%{NOTSPACE:dest}>: greylisted" ] } } if [type] == "kern-log" { grok { match => { "message" => "%{IPTABLES}"} patterns_dir => ["/var/lib/logstash/etc/grok"] } } if [src_ip] { geoip { source => "src_ip" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float" ] } } if [clientip] { geoip { source => "clientip" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float" ] } } if [srcip] { geoip { source => "srcip" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float" ] } } } output { stdout { codec => rubydebug } elasticsearch { embedded => true } }