Kafka Connect has an extension mechanism based on plugins. By implementing certain interfaces, or extending certain classes, you can create one of eight types of extensions:
Connector
for piping data into or out of Kafka via external systemsConverter
for translating between Kafka Connect's runtime data format andbyte[]
HeaderConverter
same, but for headersTransformation
for modifying messages as they move through a connectorPredicate
used to conditionally apply aTransformation
ConfigProvider
for integrating a source of key/value properties use in configurationConnectRestExtension
for adding your own JAX-RS resources (filters, REST endpoints, etc.) to the Kafka Connect APIConnectorClientConfigOverridePolicy
for enforcing a policy on overriding of client configs via the connector configs
Some of these, like the client config override, are pretty obscure, but connectors and transformations are the meat and potatoes of Kafka Connect, and a custom config provider is like magic for supplying secrets (e.g. passwords) to your configuration.
Adding Clojure to the Mix
The Kafka Connect extension mechanism is pretty great, but if your language of choice is Clojure, can you still create plugins for Kafka Connect? You can! There are a couple of hoops to jump through, which I will describe.
Example Code: Shouting Transform
This example is not something you would really use (unless you LOVE ALL CAPS), but it shows all the parts and configuration for a Kafka Connect transformer (also referred to as a single message transform, or SMT).
This example will examine the value of string messages, and convert certain words to all caps, whichever words you specify in the configuration to the plugin.
e.g., if you set the shouting-words
property to "love,nice,shouting" then this
message:
Hello, nice world! Do you love shouting?
Will be transformed into:
Hello, NICE world! Do you LOVE SHOUTING?
Use gen-class
Clojure has other ways to implement a Java interface, like reify
and proxy
, but we
need gen-class
because Kafka Connect configuration expects us to supply it with a
class name for it to instantiate. gen-class
allows us to create a class with the name
of our choice. Here's the full code of the shouting transformer:
(ns fizzy.plugins.shouting
(:require [clojure.string :refer [replace upper-case]])
(:import [org.apache.kafka.common.config ConfigDef ConfigDef$Type ConfigDef$Importance]
[org.apache.kafka.connect.data Schema$Type])
(:gen-class
:name fizzy.plugins.ShoutingTransform
:extends fizzy.plugins.ClassLoaderImposition
:implements [org.apache.kafka.connect.transforms.Transformation]))
(def config (atom {}))
(defn -configure [this cfg]
(swap! config assoc this cfg))
(defn -config [_this]
(let [field-name "shouting-words"
value-type ConfigDef$Type/LIST
importance ConfigDef$Importance/HIGH
default-value '()
docstring "Provides a list of words that SHOULD BE SHOUTED"]
(.define (ConfigDef.) field-name value-type default-value importance docstring)))
(defn- shout
[v match]
"Takes a string value and a string to match and makes the match string upper case"
(replace v (re-pattern (str "(?i)" match)) (upper-case match)))
(defn -apply [this record]
(let [topic (.topic record)
partition (.kafkaPartition record)
key-schema (.keySchema record)
key (.key record)
value-schema (.valueSchema record)
value (.value record)
shout-value (if (string? value))
(reduce shout value (get-in @config this "shouting-words"))
value)
timestamp (.timestamp record)
headers (.headers record)]
(.newRecord record topic partition key-schema key value-schema shout-value timestamp headers)))
(defn -close [_this]
nil)
A Tour of the Code
A transformation plugin is any class that implements
org.apache.kafka.connect.transforms.Transformation
. The methods we need are:
void configure(Map<String, ?> configs)
accept a map of configuration keys/valuesConfigDef config()
returns an object which describes the expected configsConnectRecord apply(ConnectRecord record)
the actual transformation to performvoid close()
called on shutdown, for any resources that need to be cleaned up
Our Clojure code stores the configuration map in an atom, with a separate config stored for each instance of the class, since you could have multiple simultaneous instances of the plugin running.The only configuration option we use in this example is "shouting-words", a list of strings that we will convert to all caps in any message where we find them.
The apply function passes all the fields of the ConnectRecord
along unmodified
except the value, which is transformed by the shout
function, but only if the
value is a string (a message coming through a connector could be virtually anything).
Our close
function is a no-op, because we don't have anything to shut down.
A Class Loading Issue
Kafka Connect provides a dedicated class loader for each plugin, so that all the plugins can have isolated classes and not interfere with each other. This turns out to be a bit of a problem for Clojure, which loads itself using the context class loader rather than the plugin class loader. Furthermore, it uses static initializer blocks to load, so there's only a narrow window of opportunity to intervene. See the Clojure implementation details here.
If your eyes are very sharp, you may have noticed that our gen-class
extends another
class, fizzy.plugins.ClassLoaderImposition
. The sole purpose of this class is to
set the context class loader when the class loads. Here is the implementation of that
class. Putting the code in a static initializer block allows us to
set the context class loader before Clojure needs to access it.
package fizzy.plugins;
public class ClassLoaderImposition {
static {
Thread.currentThread().setContextClassLoader(ClassLoaderImposition.class.getClassLoader());
}
}
To put it bluntly, this is a hack to get Clojure to load in the plugin context, but it works!
Packaging It Up
Kafka Connect has a (configurable) directory where plugins are loaded from. If you're
using a local installation of Confluent Platform, the default location is
$CONFLUENT_HOME/share/java
. In the official Confluent Docker container, it's
/usr/share/confluent-hub-components
. You can either make an uberjar with your code in
it (plus all of its dependencies), or create a subdirectory of the plugins directory
with all the jars in it. Note that you should not include jars that come with Kafka
Connect, like kafka-clients
, slf4j-api
, snappy
. Creating jars from your project
is beyond the scope of this article, but you can use tools.build or leiningen to do
it. Remember that once you have copied your plugin files to Connect, you must stop and
restart the server. It's a good idea to tail the logs, so you can see any errors that
pop up if something goes wrong with loading your plugin.
Adding Your Transformer to a Connector
Once you have a plugin installed in a running Kafka Connect cluster, you use it by specifying its configuration in the connector properties. Here is how our example would be enabled within connector properties:
{
"name":"my-connector",
"config": {
// more config redacted
"transforms":"shouting",
"transforms.shouting.type":"fizzy.plugins.ShoutingTransform",
"transforms.shouting.shouting-words":"hello,world,love,shouting"
}
}
Making a ConfigProvider
We were very excited to build a custom config provider, because it meant we could get our secrets from Vault instead of passing them around in files or the environment. It is very similar to writing a transformer, with a couple of wrinkles. The interface is:
ConfigData get(String path)
ConfigData get(String path, Set<String> keys)
void configure(Map<String, ?> configs)
void close()
And there are optional methods for subscribing to changes to a given key.
One of the differences with the ConfigProvider
(also with ConnectRestExtension
if
you decide to create one) is that plugins of this type are found using the
java.util.ServiceLoader
mechanism. This means that your jar file must have a
META-INF/service/org.apache.kafka.common.config.provider.ConfigProvider
file. The
content of this file is a single line with the fully-qualified name of your class which
implements ConfigProvider
.
When your config provider is installed, you will be able to add properties to Kafka Connect that reference your provider to supply a value. For example, your ENVIRONMENT could include the following:
CONNECT_CONFIG_PROVIDERS=fizzysecrets
CONNECT_CONFIG_PROVIDERS_FIZZYSECRETS_CLASS=fizzy.plugins.SecretsConfigProvider
CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='\${fizzysecrets:SASL_JAAS_CONFIG_USER}' password='\${fizzysecrets:SASL_JAAS_CONFIG_PASS}';"
One More Hurdle
Getting secrets from a config provider is great, but there is one final hoop to jump
through if you're running Kafka Connect with the Confluent Docker image. Part of the
startup sequence is running
a
tool called cub to ensure Kafka is ready before starting Kafka Connect. Because the
tool also needs access to your config providers, you must copy your config provider
code (along with dependencies) to /usr/share/java/cp-base-new
. That way, any secrets
you need to reach Kafka will be also be available to cub.
Clojure + Kafka Connect
Clojure is a fun and productive way to extend Kafka Connect. One thing we haven't tried yet is writing our own connector, but that should be a breeze. Good luck!