Connectors

W tej sekcji opisane są mechanizmy związane z obsługą connectorów - tworzenie, edytowanie, config store

Typy configów

  • Kafka config - standardowy config connectora w Kafka connect reprezentowany przez Map<String,String>

    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "topic.prefix": "test-server",
    "database.user": "postgres",
    "database.dbname": "postgres",
    "database.hostname": "localhost",
    "database.password": "postgres",
    "table.include.list": "public.connector_row",
    "database.port": "5432",
    "snapshot.mode": "always",
    "plugin.name": "pgoutput"
  • CDC config - wewnętrzna reprezentacja configu, która umożliwia zapisywanie customowych properties osobno od podstawowych properties connectora (nazywanych core properties - czyli to co znajduje się w Kafka config to core properties). Dodatkowo config ten podzielony jest na grupy properties np. wszystkie związane z połączeniem z bazą danych są w jednej kategorii. CDC config jest zapisany w strukturze Map<String(GroupName), ValuesGroup>, a ValuesGroup składa się dwóch map: Map<String, String> core i Map<String, String> custom

    "Connector": {
        "core": {
             "snapshot.mode": "always"
        },
        "custom": {}
    },
    "Events": {
        "core": {
            "table.include.list": "public.connector_row"
        },
        "custom": {}
    },
    "Database": {
        "core": {
            "topic.prefix": "test-server",
            "database.user": "postgres",
            "database.dbname": "postgres",
            "database.password": "postgres",
            "database.hostname": "localhost",
            "database.port": "5432",
            "snapshot.mode": "always",
            "plugin.name": "pgoutput"
        },
        "custom": {
            "credentialName": "savedDb"
        }
    }

  • CDC config form - reprezentacja configu na potrzeby wyświetlenia go w GUI. Dla każdego property mamy strukturę pól, które pozwalają dynamicznie generować formularz tworzenia connectora

    "Connector": {
    	"core": {
    		"snapshot.mode": {
    			"name": "snapshot.mode",
    			"displayName": "Snapshot mode",
    			"fieldType": "SELECT_FIELD",
    			"order": 24,
    			"required": false,
    			"defaultValue": "initial",
    			"placeholder": null,
    			"width": 4,
    			"options": [
    				"always",
    				"never",
    				"initial_only",
    				"configuration_based",
    				"when_needed",
    				"initial",
    				"custom",
    				"no_data"
    			],
    			"description": "The criteria for running a ...",
    			"type": "STRING",
    			"group": "Connector",
    			"dependents": [],
    			"visible": true
    		}
    	},
    	"custom": {}
    },
    "Events": {
    	"core": {
    		"table.include.list": {
    			"name": "table.include.list",
    			"displayName": "Include Tables",
    			"fieldType": "INSERT_FIELD",
    			"order": 15,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 12,
    			"options": [],
    			"description": "The tables for which changes...",
    			"type": "LIST",
    			"group": "Events",
    			"dependents": [],
    			"visible": true
    		}
    	},
    	"custom": {}
    },
    "Database": {
    	"core": {
    		"topic.prefix": {
    			"name": "topic.prefix",
    			"displayName": "Topic prefix",
    			"fieldType": "INSERT_FIELD",
    			"order": 1,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 8,
    			"options": [],
    			"description": "Topic prefix that identifies ...",
    			"type": "STRING",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"database.user": {
    			"name": "database.user",
    			"displayName": "User",
    			"fieldType": "INSERT_FIELD",
    			"order": 4,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 4,
    			"options": [],
    			"description": "Name of the database user to ...",
    			"type": "STRING",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"database.dbname": {
    			"name": "database.dbname",
    			"displayName": "Database",
    			"fieldType": "INSERT_FIELD",
    			"order": 6,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 8,
    			"options": [],
    			"description": "The name of the database from...",
    			"type": "STRING",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"database.port": {
    			"name": "database.port",
    			"displayName": "Port",
    			"fieldType": "NUMBER_FIELD",
    			"order": 3,
    			"required": false,
    			"defaultValue": "5432",
    			"placeholder": null,
    			"width": 4,
    			"options": [],
    			"description": "Port of the database server.",
    			"type": "INT",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"plugin.name": {
    			"name": "plugin.name",
    			"displayName": "Plugin",
    			"fieldType": "SELECT_FIELD",
    			"order": 7,
    			"required": false,
    			"defaultValue": "decoderbufs",
    			"placeholder": null,
    			"width": 8,
    			"options": [
    				"decoderbufs",
    				"pgoutput"
    			],
    			"description": "The name of the Postgres logical ...",
    			"type": "STRING",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"database.hostname": {
    			"name": "database.hostname",
    			"displayName": "Hostname",
    			"fieldType": "INSERT_FIELD",
    			"order": 2,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 8,
    			"options": [],
    			"description": "Resolvable hostname or IP address...",
    			"type": "STRING",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		},
    		"database.password": {
    			"name": "database.password",
    			"displayName": "Password",
    			"fieldType": "PASSWORD_FIELD",
    			"order": 5,
    			"required": false,
    			"defaultValue": null,
    			"placeholder": null,
    			"width": 4,
    			"options": [],
    			"description": "Password of the database user ...",
    			"type": "PASSWORD",
    			"group": "Database",
    			"dependents": [],
    			"visible": true
    		}
    	},
    	"custom": {}
    }

ConnectorConfigHelper

ConnectorConfigHelper jest klasą, która służy do interakcji z configiem connectora w formie Kafka config. Interakcje przebiegają z użyciem instancji PropertyHandler, która ma zaszytą w sobie nazwę property i pozwala pobierać, ustawiać albo usuwać property z configu.

Dodatkowo klasa ta obsługuje konwertowanie między dwoma formami configu - Kafka config i CDC config, walidacją customowych części configu oraz przetwarzaniem wyników walidacji części core i custom.

Dla każdego connector class które ma być wspierane (np. io.debezium.connector.oracle.OracleConnector) trzeba stworzyć implementację klasy ConnectorConfigHelper. W przypadku gdy trzeba obsłużyć connector class dla którego nie ma indywidualnego ConnectorConfigHelper używana jest bazowa implementacja, która obsługuje interakcję z properties wspólnymi dla wszystkich konektorów oraz podstawowe konwertowanie między configami.

Z założenia w tej klasie odbywa się cała logika specyficzna dla danego connector class.

Connector

Klasa Connector tworzy warstawę abstacji między REST Kafki Connect, a resztą kodu w ramach pojedynczego connectora - są w niej zaimplementowane metody do wyciągania configu, topicków, statusu etc. Wszystkie raz pobrane wartości są cachowane (czyli config jest wyciągany z Kafka connect tylko raz, każde kolejne pobranie zwróci zapisany config)

Dodatkowo dla connectorów dla których został zaimplementowany ConnectorConfigHelper istnieją bardziej specyficzne implementacje klasy Connector - ConnectorWithConfigHelper i dziedziczące z niej SourceConnector i TargetConnector. Korzystając z instancji ConnectorConfigHelper pozwalają one na wyciąganie dodatkowych informacji z konfiguracji connectora i przetwarzanie ich.

ConnectorConfigService

ConnectorConfigService zwiera logikę związaną z obsługą configu connectorów t.j.:

  • Generowanie struktury do formularzy do tworzenia konektorów na podstawie connector class, istniejącego connectora lub configu zapisanego w config store

  • Walidacja configu

  • Konwertowanie Kafka config i CDC config

  • Obsługa danych wrażliwych - czyszczenie properties np. z hasłami, gdy configi są zwracane z backendu

Najbardziej złożony jest proces generowania formularza configu. Składa się on z kilku kroków:

  1. Pobieranie bazowego configu z Kafka connect Kafka connect udostępnia endpoint (GET kafka_connect_url/connector-plugins/{connector class}/config), który zwraca properties z konfiguracji connectora wraz z dodatkowymi informacjami m.in. czy property jest wymagane, jaki jest jego typ, do jakiej grupy jest przypisane. Wartości te są ustawiane przez developerów connectorów i nie zawsze są one kompletne i poprawne. Dodatkowo jest jeszcze drugi endpoint (PUT kafka_connect_url/connector-plugins/{connector class}/config/validate), który służy do walidacji konektora. Zwraca on tę samą strukturę co pierwszy endpoint, ale oprócz tego zwraca też listę recommended_values, która zawiera wartości które może przyjmować property. Problem polega na tym, że te request wymaga podania w body configu connectora do walidacji i jeśli nie zawiera on wymaganych properties to Kafka connect zwraca błąd (4XX z message, a nie całą zwalidowaną strukturę). Dlatego proces pobierania bazowego configu wygląda w następujący sposób: 1. Pobieramy config helper dla klasy i pobieramy z niego zdefiniowany Kafka config z wymaganymi polami. Z takim configiem próbujemy pobrać config z endpointu Validate 2. Jeśli się nie uda to pobieramy config z pierwszego endpointu, szukamy w nim pól, które mają ustawione required na true, budujemy z nich Kafka config i próbujemy pobrać config z endpointu Validate 3. Jeśli się nie udało to zwracamy config bez recommended_values

  2. Wstępne przetwarzanie configu Na tym etapie wywołujemy metodę z ConnectorConfigHelper, która wstępnie przetwarza config. Domyślnie z configu usuwane są properties name i connector.class, które są obsługiwane w specjalny sposób.

  3. Nadpisywanie grup W tabeli connector_config_group_mapping zdefiniowane są mapowania grup dla danego connector class np. dla connector Debezium Oracle możemy zmienić nazwę grupy Oracle na Database. Jeśli nazwa decelowej grupy będzie pusta to wszystkie elementy z źródłowej grupy zostaną przeniesione do grupy ADVANCED.

  4. Nadpisywanie properties Na tym etapie nadpisywane są pojedyncze properties w configu - można edytować wszystkie pola które zostały pobrane z Kafka connect, usuwać properties lub dodawać nowe. Przy dodawaniu możemy zadecydować, czy property zostanie dodany do części core czy custom configu. Procesem tym steruje tabela connector_config_entry_override gdzie dla danego connector class i nazwy property ustalamy pola do zmiany. Wartość pola zostanie zmieniona tylko, jeśli kolumna w tabeli ma wartość inną, niż null. W kolumnie action określamy, czy pole ma być zmienione, dodane lub usunięte. Można tu też definiować dozwolone wartości dla property wraz oczekiwanym zachowaniem: -REPLACE - usuń wartości pobrane z Kafka connect i wstaw podane -APPEND - dodaj wartości na koniec -PREPEND - dodaj wartości na początek -SET_IF_EMPTY - ustaw wartości, jeśli nie ma żadnych z Kafka connect Dodatkowo przy tej operacji config dzielony jest na grupy - na wyjściu otrzymujemy format CDC config form

  5. Jeśli mamy jakieś wartości dla properties (generujemy formularz na podstawie connectora lub z config store) to na końcu wartości te są wstawiane do pól defaultValue. Gdy config jest generowany na podstawie connectora serwis sprawdza, czy w bazie jest zapisana informacja o CDC configu, z którego został stworzony (jest on zapisywany przez ConnectorLifecycleService przy tworzeniu connectora), a następnie weryfikuje, czy aktualny Kafka config jest zgodny z tym, z którego został stworzony (po konwersji z CDC config). Jeśli wszystko się zgdza, to używany jest CDC config. W przeciwnym wypadku aktualny Kafka config jest mapowany do CDC config.

Obsługa wrażliwych danych - configuracje connectorów często zawierają wrażliwe dane np. hasła do bazy danych. Gdy config jest zwracany z backendu serwis próbuje ukrywać wartości tych properties, aby następnie wstawić je z powrotem np. gdy zaktualizowany config jest przesyłany do backendu, żeby zaktualizować connector. Properties są klasyfikowane jako sensitive na dwa sposoby: w configu zwracanym z Kafka connect mają typ PASSWORD, albo zostały zdefiniowane w ConnectorConfigHelper, jak senstivie.

ConnectorLifecycleService

Ten serwis jest podzielony na dwie części - BaseConnectorLifecycleService i rozszerzający go ConnectorLifecycleService. Wynika to z cyklicznej zależności od ConnectorStoreService.

BaseConnectorLifecycleService zapewnia podstawowe funkcje do tworzenia, aktualizacji i walidacji connectora na podstawie CDC config. Przy tworzeniu i aktualizacji informacja o connectorze jest zapisywana w Kafka connect i w bazie danych - zapisywany jest nazwa connectora, connector class, CDC config, Kafka config i skrót MD5 Kafka configu do szybkiego sprawdzania, czy config się nie zmienił.

ConnectorLifecycleService dodatkowo przy tworzeniu i aktualizacja connectora tworzy w wpis configu w Config Store. Jeśli przy update podane storeName jest zduplikowane to wpis w Config Store jest aktualizowany.

Config Store

Config Store jest modułem który pozwala na budowanie szablonów do tworzenia connectorów oraz historyzację konfiguracji tworzonych connectorów.

Przy toworzeniu connectora automatycznie tworzony jest dla niego wpis w Config Store, albo z wybraną nazwą, albo z wygenerowaną, jeśli żadna nie była pobrana.

Oprócz tego można też ręcznie dodać wpis do Config Store bez tworzenia connectora, aktualizować i usuwać wpisy, dodawać do nich tagi po których można potem filtrować.

Config można wyciągnąć w formie Kafka connect jak i CDC config. W obu przypadkach properties uważane za sensitive są wymazywane.

Do tego można również stworzyć connector (wysłać go do Kafka connect) na podstawie zapisanego configu.

Last updated