Scott Tiger Tech Blog

Blog technologiczny firmy Scott Tiger S.A.

Logstash

Autor: Piotr Karpiuk o piątek 20. Grudzień 2019

Wprowadzenie

Logstash to narzędzie będące swoistą stacją przekaźnikową i rozgałęziaczem do przetwarzania strumieni danych. Typowy potok przetwarzania (ang. pipeline) Logstasha łączy wiele strumieni wejściowych w jeden strumień wyjściowy, przy okazji uzupełniając przepływające rekordy danych o nowe informacje. Strumień danych wejściowych jest tu rozumiany bardzo szeroko i obejmuje takie źródła jak pliki dzienników (np. Apache, syslog), brokery komunikatów, usługi HTTP, skrzynka emailowa, wyniki cyklicznie uruchamianych poleceń shella lub zapytań do baz danych, kanały RSS, a nawet strumień postów z Twittera. Typowym wyjściem jest NoSQLowa baza danych Elasticsearch, ale możliwych dodatkowych miejsc docelowych jest wiele, np. możemy sobie zażyczyć aby wszystkie dane dodatkowo zrzucać do pliku lub przekazywać na wejście do jakiegoś skryptu Pythona, wysyłać do aplikacji Graphite, StatsD, itd.

Typowym przykładem transformacji danych dokonywanej przez Logstash jest konwersja rekordu dziennika Apache:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /images/kibana-search.png HTTP/1.1" 200 203023 "/logstash-monitorama-2013/" \
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

na format JSON, z jednoczesnym ustaleniem nazwy domenowej dla adresu IP i przybliżonej lokalizacji geograficznej adresu IP:

{
        "request" => "/images/kibana-search.png",
          "agent" => "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36",
         "offset" => 325,
           "auth" => "-",
          "ident" => "-",
           "verb" => "GET",
         "source" => "/path/to/file/logstash-tutorial.log",
        "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] GET /images/kibana-search.png HTTP/1.1 ...",
       "referrer" => "/logstash-monitorama-2013/",
     "@timestamp" => 2017-11-09T02:51:12.416Z,
       "response" => "200",
          "bytes" => "203023",
       "clientip" => "83.149.9.216",
     "client_dns" => "gprs-client-83.149.9.216.misp.ru",
          "geoip" => {
              "timezone" => "Europe/Moscow",
                    "ip" => "83.149.9.216",
              "latitude" => 55.7485,
        "continent_code" => "EU",
             "city_name" => "Moscow",
          "country_name" => "Russia",
         "country_code2" => "RU",
         "country_code3" => "RU",
           "region_name" => "Moscow",
              "location" => {
                   "lon" => 37.6184,
                   "lat" => 55.7485
              },
           "postal_code" => "101194",
           "region_code" => "MOW",
             "longitude" => 37.6184
       },
       "@version" => "1",
           "host" => "My-MacBook-Pro.local",
    "httpversion" => "1.1",
      "timestamp" => "04/Jan/2015:05:13:42 +0000"
}
    

Gwoli ścisłości, pod wejście i wyjście Logstasha możemy podpiąć tzw. kodeki które dekodują/kodują dane do określonego formatu (np. protobuf, JSON, itp.).

Uruchomienie Logstasha i konfiguracja potoku przetwarzania

Utwórzmy plik logstash.conf w bieżącym katalogu, z najprostszą możliwą konfiguracją potoku przetwarzania (strumień wejściowy to STDIN a wyjściowy to STDOUT):

input {
  stdin {
  }
}
output {
  stdout {
  }
}

Uruchomienie Logstasha w kontenerze wygląda tak:

docker run -ti --net=host --name=logstash --rm -v ${PWD}/logstash.conf:/usr/share/logstash/pipeline/logstash.conf docker.elastic.co/logstash/logstash:7.5.0

Uwaga: gdy podmapowujesz pliki z hosta przy uruchamianiu kontenera, zadbaj o to żeby plik hosta miał uprawnienia które pozwolą na odczyt z konta UID=1000.

Najczęściej spotykane opcje, jakie można dać na końcu polecenia:

-f /path/to/mypipeline.conf
ścieżka do pliku konfiguracji potoku przetwarzania; może tu być katalog, wtedy odczytane zostaną z niego wszystkie pliki *.conf w porządku alfabetycznym
-t
sprawdzenie poprawności pliku konfiguracji potoku przetwarzania i zakończenie pracy
-r
monitorowanie zmian w pliku konfiguracji (co 3 sekundy) i przeładowywanie w razie wykrycia zmian; opcjonalnie można do Logstasha wysłać sygnał SIGHUP

Istnieją jeszcze dwa pliki konfiguracyjne:

logstash.yml
(w dystrybucji kontenerowej w katalogu /usr/share/logstash/config/), gdzie przechowywane są domyślne ustawienia Logstasha. Każde z tych ustawień można nadpisać odpowiednią opcją wiersza poleceń
jvm.options
konfiguracja JVM na której chodzi Logstash; tu np. można zdefiniować minimalny i maksymalny rozmiar sterty

Przykład

Konfiguracja potoku przetwarzania dla dziennika access.log serwera Apache może wyglądać tak:

input {
  beats {
    port => "5044"
  }
}
filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}"}
  }
  geoip {
    source => "clientip"
  }
}
output {
  stdout { codec => rubydebug }
}

Jak to działa? Po pierwsze rzadko kiedy Logstash będzie obecny w tym samym środowisku (kontenerze/kapsule Kubernetesa/maszynie) co pliki logów do przetworzenia. Zatem potrzebny jest jakiś proces-agent (ang. shipper), który prześle logi ze zdalnego środowiska do Logstasha po TCP. W stosie narzędzi firmy Elastic zajmują się tym wyspecjalizowane narzędzia z rodziny Beats, w naszym konkretnym przypadku (plik dziennika) będzie to Filebeat.

Beaty to temat na oddzielny artykuł, więc tutaj tylko krótka instrukcja. Załóżmy że logi Apache mamy na maszynie apache w katalogu /logs, a Logstash chodzi na maszynie logstash. Logstasha uruchamiamy z podanym wyżej plikiem konfiguracyjnym, a na maszynie apache:

  • Tworzymy plik konfiguracyjny filebeat.yml Filebeat’a:
    filebeat.inputs:
    - type: log
      paths:
        - /logs/access.log 
    output.logstash:
      hosts: ["logstash:5044"]
  • Uruchamiamy Filebeata w kontenerze:
    docker run -ti -v /logs/access.log:/logs/access.log -v $(pwd)/filebeat.yml:/usr/share/filebeat/filebeat.yml docker.elastic.co/beats/filebeat:7.5.0

Teraz Filebeat prześle logi do Logstasha i będzie oczekiwał na nowe rekordy na końcu pliku.

Wróćmy do pliku konfiguracyjnego potoku przetwarzania Logstasha. Filtry (aplikowane po kolei) mamy dwa. Pierwszy to grok, który potrafi przekształcić ponad 120 różnych typów logów na JSONa i łatwo można dodawać do niego własne wzorce przy użyciu m.in. wyrażeń regularnych.

Przykład własnej konfiguracji pluginu grok dla rekordu dziennika:

55.3.244.1 GET /index.html 15824 0.043

może wyglądać tak:

grok {
  match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}

Drugi filtr to geoip, który uzupełnia JSONa danymi o lokalizacji geograficznej ustalonego adresu IP.

Sekcja wyjściowa naszego potoku to zrzucenie generowanych zdarzeń na STDOUT z ładnym formatowaniem JSONa.

BTW: Domyślna konfiguracja potoku przetwarzania w kontenerowej wersji Logstasha to na wejściu beat, a na wyjściu STDOUT.

Więcej o konfiguracji potoku przetwarzania

Nazwy które do tej pory napotkaliśmy w pliku konfiguracyjnym, takie jak beats, grok, geoip, stdout, elasticsearch to tzw. pluginy które mogą należeć do jednej z trzech sekcji: input, filter i output. Można też pisać własne pluginy (jako moduły Ruby’ego, wykonywane w środowisku JRuby) zarządzane programem wiersza poleceń logstash-plugin:

docker run --rm docker.elastic.co/logstash/logstash:7.5.0 logstash-plugin <command> [options]

Możliwe polecenia i opcje to np.:

  • list --group output: wyświetla zainstalowane pluginy fazy output
  • install logstash-output-kafka: instaluje wskazany plugin
  • update: aktualizuje wszystkie pluginy
  • remove logstash-output-kafka: usuwa wskazany plugin

W pliku konfiguracyjnym potoku możemy używać rozgałęzień (if else) i wyrażeń. Na przykład:

output {
  if [type] == "apache" {
    if [status] =~ /^5\d\d/ {
      nagios { ...  }
    } else if [status] =~ /^4\d\d/ {
      elasticsearch { ... }
    }
    statsd { increment => "apache.%{status}" }
  }
}

Wyrażenie [nazwa_pola] odnosi się do wartości pola zdarzenia. Gdy sięgamy do pola zagnieżdżonego, trzeba użyć konstrukcji [pole_nadrzędne][pole_podrzędne]. W przypadku odwołania do pola pierwszego poziomu, nawiasy kwadratowe można pominąć.

Zawartość pól można rozwijać w łańcuchach, jak np. "/var/log/%{type}.%{+yyyy.MM.dd.HH}", gdzie drugie rozwinięcie formatuje pole @timestamp zdarzenia.

Inne przykłady zastosowania filtrów (tutaj mutate):

filter {
  if [action] == "login" { 
    mutate { remove_field => "secret" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
}

Można też stosować operatory logiczne or i and.

Wyrażenie if [foo] zwraca fałsz gdy zajdzie jeden z warunków:

  • zdarzenie nie ma pola [foo],
  • pole [foo] ma wartość false,
  • pole [foo] ma wartość null

Inne przydatne rozwinięcia jakich można używać w pliku konfiguracyjnym potoku przetwarzania:

  • ${var} – wartość zmiennej środowiskowej
  • ${var:default_val} – wartość zmiennej środowiskowej lub wskazana wartość gdy zmienna jest niezdefiniowana
  • ścieżki plików mogą zawierać znane z shella konstrukcje, np. *, ** (rekurencyjne zejście po katalogu), {a..z}

@metadata to specjalne pole (typu słownik), które nie pojawi się w generowanym przez Logstasha wyjściu. Można mu nadawać wartości aby ich używać w instrukcjach warunkowych.

Pod maską

Każdy strumień wejściowy ma swój własny wątek systemu operacyjnego, który odbiera dane, generuje na ich podstawie zdarzenia i zapisuje je w kolejce (domyślnie kolejka jest w pamięci RAM, ale na życzenie może być to kolejka trwała na dysku – ma to znaczenie jeśli nie chcemy utracić danych w razie nagłego przerwania pracy Logstasha). Wątki robocze potoku przetwarzania (ang. pipeline worker threads) – których zwykle jest tyle ile rdzeni procesora ma maszyna na której chodzi Logstash – wyciągają zdarzenia w paczkach (domyślnie po 125, maksymalny czas oczekiwania na zebranie paczki to domyślnie 50ms), transformują zawarte w nich dane za pomocą filtrów, a następnie przesyłają na wyjście.

Łatwo zauważyć, że filtrowanie może łatwo stać się wąskim gardłem jeśli będzie wymagało np. dostępu do jakichś baz danych lub zewnętrznych usług. W takiej sytuacji może się okazać potrzebne użycie wielu instancji Logstasha i stojącego przed nimi równoważnika obciążenia (NginX, HAProxy, Kafka). Zaleca się stawiać min. 2 instancje dla lepszej odporności na awarię.

Typowy odbiorca logów, Elasticsearch, może być również skalowany horyzontalnie – wtedy dobrze jest rozrzucać wyjściowe pakiety równomiernie między dostępne instancje. Konfiguracja wyjścia Logstasha może na taką okoliczność wyglądać tak:

output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
}

Logstash zapamiętuje (w systemie plików) bieżącą pozycję w przetwarzanym strumieniu wejściowym, aby nie przetwarzać ponownie już raz odczytanych rekordów po ponownym uruchomieniu narzędzia, a jedynie doczytać nowe.

Eleganckie zakończenie pracy przez Logstasha czeka aż zostanie opróżniona kolejka zdarzeń. Jeśli nie zależy nam na tym, możemy uruchomić narzędzie z opcją --pipeline.unsafe_shutdown.

Dla ciekawskich

Domyślnie wyłączona Dead Letter Queue (do włączenia w pliku konfiguracyjnym logstash.yml) pozwala przechować w sposób trwały na dysku zdarzenia których Logstash z jakichś powodów nie mógł wysłać do Elasticsearcha (kod odpowiedzi 400 lub 404). Możesz łatwo ponownie przetworzyć takie zdarzenia używając plugina wejściowego dead_letter_queue.

Logstash działa na maszynie wirtualnej Javy (JVM). Twórcy zalecają, aby minimalny i maksymalny rozmiar sterty maszyny ustawić na tę samą wartość, ponieważ skalowanie sterty jest operacją kosztowną czasowo.

Potoki przetwarzania można tworzyć nie tylko ręcznie jak to robiliśmy w tym artykule, ale również z poziomu interfejsu użytkownika (Kibana). Po zdecydowaniu się na drugą opcję pierwsza staje się niedostępna.

Działanie Logstasha można monitorować i jest do tego API REST, domyślnie na porcie 9600 ale można wskazać inny opcją wiersza poleceń --http.port.

Przykład wywołania Monitoring API:

curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'

Uważny czytelnik może zapytać: czy Filebeat nie mógłby przesyłać logów bezpośrednio do Elasticsearcha z pominięciem Logstasha? Otóż mógłby, ale pośrednictwo Logstasha daje dodatkowe możliwości:

  • radzenie sobie z chwilowymi dużymi przyrostami danych wejściowych – poprzez zapisanie ich w kolejce (jeśli zażyczymy to na dysku) redukowane jest tzw. ciśnienie wsteczne (ang. backpressure),
  • pozwala łączyć dane z różnych strumieni, nie tylko pliki z logami (bazy danych, AWS S3, kolejki komunikatów),
  • pozwala wysyłać dane do wielu różnych miejsc docelowych, nie tylko do Elasticsearcha (S3, HDFS, do pliku itp.),
  • filtry i wyrażenia warunkowe pozwalają zaimplementować dość złożone scenariusze przetwarzania.

Sympatyczny poradnik na temat opcji wdrażania Logstasha jest tu: Deploying and scaling.

Share and Enjoy:
  • del.icio.us
  • Facebook
  • Google Bookmarks
  • Śledzik
  • Blip
  • Blogger.com
  • Gadu-Gadu Live
  • LinkedIn
  • MySpace
  • Wykop

Zostaw komentarz

XHTML: Możesz użyć następujących tagów: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>