Kafka Connect has an extension mechanism based on plugins. By implementing certain interfaces, or extending certain classes, you can create one of eight types of extensions:

  • Connector for piping data into or out of Kafka via external systems
  • Converter for translating between Kafka Connect's runtime data format and byte[]
  • HeaderConverter same, but for headers
  • Transformation for modifying messages as they move through a connector
  • Predicate used to conditionally apply a Transformation
  • ConfigProvider for integrating a source of key/value properties use in configuration
  • ConnectRestExtension for adding your own JAX-RS resources (filters, REST endpoints, etc.) to the Kafka Connect API
  • ConnectorClientConfigOverridePolicy for enforcing a policy on overriding of client configs via the connector configs

Some of these, like the client config override, are pretty obscure, but connectors and transformations are the meat and potatoes of Kafka Connect, and a custom config provider is like magic for supplying secrets (e.g. passwords) to your configuration.

Adding Clojure to the Mix

The Kafka Connect extension mechanism is pretty great, but if your language of choice is Clojure, can you still create plugins for Kafka Connect? You can! There are a couple of hoops to jump through, which I will describe.

Example Code: Shouting Transform

This example is not something you would really use (unless you LOVE ALL CAPS), but it shows all the parts and configuration for a Kafka Connect transformer (also referred to as a single message transform, or SMT).

This example will examine the value of string messages, and convert certain words to all caps, whichever words you specify in the configuration to the plugin.

e.g., if you set the shouting-words property to "love,nice,shouting" then this message:

Hello, nice world! Do you love shouting?

Will be transformed into:

Hello, NICE world! Do you LOVE SHOUTING?

Use gen-class

Clojure has other ways to implement a Java interface, like reify and proxy, but we need gen-class because Kafka Connect configuration expects us to supply it with a class name for it to instantiate. gen-class allows us to create a class with the name of our choice. Here's the full code of the shouting transformer:

  (ns fizzy.plugins.shouting
    (:require [clojure.string :refer [replace upper-case]])
    (:import [org.apache.kafka.common.config ConfigDef ConfigDef$Type ConfigDef$Importance]
             [org.apache.kafka.connect.data Schema$Type])
    (:gen-class
     :name fizzy.plugins.ShoutingTransform
     :extends fizzy.plugins.ClassLoaderImposition
     :implements [org.apache.kafka.connect.transforms.Transformation]))

  (def config (atom {}))

  (defn -configure [this cfg]
    (swap! config assoc this cfg))

  (defn -config [_this]
    (let [field-name "shouting-words"
          value-type ConfigDef$Type/LIST
          importance ConfigDef$Importance/HIGH
          default-value '()
          docstring "Provides a list of words that SHOULD BE SHOUTED"]
      (.define (ConfigDef.) field-name value-type default-value importance docstring)))

  (defn- shout
    [v match]
    "Takes a string value and a string to match and makes the match string upper case"
    (replace v (re-pattern (str "(?i)" match)) (upper-case match)))

  (defn -apply [this record]
    (let [topic (.topic record)
          partition (.kafkaPartition record)
          key-schema (.keySchema record)
          key (.key record)
          value-schema (.valueSchema record)
          value (.value record)
          shout-value (if (string? value))
                       (reduce shout value (get-in @config this "shouting-words"))
                        value)
          timestamp (.timestamp record)
          headers (.headers record)]
      (.newRecord record topic partition key-schema key value-schema shout-value timestamp headers)))

  (defn -close [_this]
    nil)

A Tour of the Code

A transformation plugin is any class that implements org.apache.kafka.connect.transforms.Transformation. The methods we need are:

  • void configure(Map<String, ?> configs) accept a map of configuration keys/values
  • ConfigDef config() returns an object which describes the expected configs
  • ConnectRecord apply(ConnectRecord record) the actual transformation to perform
  • void close() called on shutdown, for any resources that need to be cleaned up

Our Clojure code stores the configuration map in an atom, with a separate config stored for each instance of the class, since you could have multiple simultaneous instances of the plugin running.The only configuration option we use in this example is "shouting-words", a list of strings that we will convert to all caps in any message where we find them.

The apply function passes all the fields of the ConnectRecord along unmodified except the value, which is transformed by the shout function, but only if the value is a string (a message coming through a connector could be virtually anything).

Our close function is a no-op, because we don't have anything to shut down.

A Class Loading Issue

Kafka Connect provides a dedicated class loader for each plugin, so that all the plugins can have isolated classes and not interfere with each other. This turns out to be a bit of a problem for Clojure, which loads itself using the context class loader rather than the plugin class loader. Furthermore, it uses static initializer blocks to load, so there's only a narrow window of opportunity to intervene. See the Clojure implementation details here.

If your eyes are very sharp, you may have noticed that our gen-class extends another class, fizzy.plugins.ClassLoaderImposition. The sole purpose of this class is to set the context class loader when the class loads. Here is the implementation of that class. Putting the code in a static initializer block allows us to set the context class loader before Clojure needs to access it.

  package fizzy.plugins;

  public class ClassLoaderImposition {
      static {
          Thread.currentThread().setContextClassLoader(ClassLoaderImposition.class.getClassLoader());
      }
  }

To put it bluntly, this is a hack to get Clojure to load in the plugin context, but it works!

Packaging It Up

Kafka Connect has a (configurable) directory where plugins are loaded from. If you're using a local installation of Confluent Platform, the default location is $CONFLUENT_HOME/share/java. In the official Confluent Docker container, it's /usr/share/confluent-hub-components. You can either make an uberjar with your code in it (plus all of its dependencies), or create a subdirectory of the plugins directory with all the jars in it. Note that you should not include jars that come with Kafka Connect, like kafka-clients, slf4j-api, snappy. Creating jars from your project is beyond the scope of this article, but you can use tools.build or leiningen to do it. Remember that once you have copied your plugin files to Connect, you must stop and restart the server. It's a good idea to tail the logs, so you can see any errors that pop up if something goes wrong with loading your plugin.

Adding Your Transformer to a Connector

Once you have a plugin installed in a running Kafka Connect cluster, you use it by specifying its configuration in the connector properties. Here is how our example would be enabled within connector properties:

{
    "name":"my-connector",
    "config": {
       // more config redacted
       "transforms":"shouting",
       "transforms.shouting.type":"fizzy.plugins.ShoutingTransform",
       "transforms.shouting.shouting-words":"hello,world,love,shouting"
    }   
}

Making a ConfigProvider

We were very excited to build a custom config provider, because it meant we could get our secrets from Vault instead of passing them around in files or the environment. It is very similar to writing a transformer, with a couple of wrinkles. The interface is:

  • ConfigData get(String path)
  • ConfigData get(String path, Set<String> keys)
  • void configure(Map<String, ?> configs)
  • void close()

And there are optional methods for subscribing to changes to a given key.

One of the differences with the ConfigProvider (also with ConnectRestExtension if you decide to create one) is that plugins of this type are found using the java.util.ServiceLoader mechanism. This means that your jar file must have a META-INF/service/org.apache.kafka.common.config.provider.ConfigProvider file. The content of this file is a single line with the fully-qualified name of your class which implements ConfigProvider.

When your config provider is installed, you will be able to add properties to Kafka Connect that reference your provider to supply a value. For example, your ENVIRONMENT could include the following:

  CONNECT_CONFIG_PROVIDERS=fizzysecrets
  CONNECT_CONFIG_PROVIDERS_FIZZYSECRETS_CLASS=fizzy.plugins.SecretsConfigProvider
  CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='\${fizzysecrets:SASL_JAAS_CONFIG_USER}' password='\${fizzysecrets:SASL_JAAS_CONFIG_PASS}';"

One More Hurdle

Getting secrets from a config provider is great, but there is one final hoop to jump through if you're running Kafka Connect with the Confluent Docker image. Part of the startup sequence is running a tool called cub to ensure Kafka is ready before starting Kafka Connect. Because the tool also needs access to your config providers, you must copy your config provider code (along with dependencies) to /usr/share/java/cp-base-new. That way, any secrets you need to reach Kafka will be also be available to cub.

Clojure + Kafka Connect

Clojure is a fun and productive way to extend Kafka Connect. One thing we haven't tried yet is writing our own connector, but that should be a breeze. Good luck!