Apache Kafka as Target - DSS 6 | Data Source Solutions Documentation

Documentation: Apache Kafka as Target - DSS 6 | Data Source Solutions Documentation

Apache Kafka as Target

Data Source Solutions DSS supports integrating changes into Kafka location. This section describes the configuration requirements for integrating changes using Integrate and Refresh into Kafka location. For the list of supported Kafka versions, into which DSS can integrate changes, see Integrate changes into location in Capabilities.

DSS uses the Apache Kafka client library - librdkafka to send data packages into Kafka message bus during Continuous Integrate and Bulk Refresh.

Kafka Message Format

By default, the DSS sends messages in JSON format to Kafka location. This behavior is equivalent to defining the action FileFormat with parameter JsonMode=SCHEMA_PAYLOAD and the location property MESSAGE BUNDLING (Kafka_Message_Bundling)=ROW.

If Schema Registry is used, the Kafka message format must be set to a compact AVRO-based or JSON-based format, which is achieved by defining the location property SCHEMA REGISTRY FORMAT (Kafka_Schema_Registry_Format). Note that the AVRO-based format is not a true AVRO because each message is not a valid AVRO file (e.g. no file header). Instead, each message is considered a 'micro AVRO' that comprises fragments of data encoded using the AVRO data type serialization format. Both the 'micro AVRO' and JSON formats, irrespective of whether Schema Registry is utilized, adhere to Confluent's 'Kafka Connect' message format standard. These formats are compatible with any implementation of Kafka sink connectors.

To use the Cloudera Schema Registry, you must use it in the Confluent compatible mode. This can be achieved by specifying the URL in the following format: http://FQDN:PORT/api/v1/confluent, where FQDN:PORT is the address of the Cloudera Schema Registry specified in the location property SCHEMA REGISTRY URL (Kafka_Schema_Registry) while creating a location or by editing the existing location's source and target properties.

When a Kafka location is configured with location property SCHEMA REGISTRY URL (Kafka_Schema_Registry), parameters Json, Xml, Csv, AvroCompression, or Parquet in action FileFormat cannot be used. However, these parameters can be used to send messages in other formats. If parameter Avro is chosen without defining the location property SCHEMA REGISTRY URL then each message will be a valid AVRO file (including a header with the schema and column information), rather than Kafka Connect's more compact AVRO-based format.

Metadata for Messages

In order to process messages from DSS, a Kafka consumer often requires metadata such as table and column names, data types, and more, about the replicated table. If the location is defined with the SCHEMA REGISTRY URL the consumer can read this metadata directly from the registry. When using the JSON format with the default mode (JsonMode=SCHEMA_PAYLOAD), each message already contains this information. Alternatively, you can include metadata in each message by defining the action ColumnProperties with parameters Extra and IntegrateExpression allowing you to add values like {dss_tbl_name} and {dss_op}.

Kafka Message Bundling and Size

By default, each Kafka message contains just one row, regardless of the format chosen. Multiple rows can be bundled into a single message using the location property MESSAGE BUNDLING (Kafka_Message_Bundling). This property can be defined while creating a Kafka location or by editing the existing location's Source and Target Properties)

If the location property Schema Registry (Kafka_Schema_Registry) is defined, you must use only the ROW mode in MESSAGE BUNDLING (Kafka_Message_Bundling). Using any other mode is unsupported and may lead to data integrity issues.

Although bundling of multiple rows can be combined with the Kafka Connect compatible formats (JSON with default mode SCHEMA_PAYLOAD), the resulting (longer) messages no longer conform to Confluent's 'Kafka Connect' standard.

For bundling modes TRANSACTION and THRESHOLD, the number of rows in each message is affected by the location property MESSAGE THRESHOLD (Kafka_Message_Bundling_Threshold). For those bundling modes, rows continue to be bundled into the same message until after this threshold is exceeded. After that happens, the message is sent and new rows are bundled into the next message. The property MESSAGE THRESHOLD (Kafka_Message_Bundling_Threshold) has no effect on the bundling modes ROW or CHANGE.

By default, the minimum size of a Kafka message sent by DSS is 4096 bytes; the maximum size of a Kafka message is 1,000,000 bytes; DSS will not send a message exceeding this size and will instead give a fatal error; if the location property MESSAGE COMPRESS (Kafka_Message_Compress) is used, this error will be raised by a Kafka broker. You can change the maximum Kafka message size that DSS will send by defining the environment variable $DSS_KAFKA_MSG_MAX_BYTES, but ensure not to exceed the maximum message size configured in Kafka broker (settings message.max.bytes). If the message size exceeds this limit then the message will be lost.

DSS_KAFKA_MSG_MAX_BYTES works in two ways:

  • checks the size of a particular message and raises an DSS error if the size is exceeded even before transmitting it to a Kafka broker.
  • checks the maximum size of compressed messages inside the Kafka transport protocol.

If the message is too big to be sent because it contains multiple rows, then less bundling (e.g. MESSAGE BUNDLING=ROW) or using a lower MESSAGE THRESHOLD can help in reducing the number of rows in each message. Otherwise, the number of bytes used for each row must be lowered; either with a more compact message format or even by actually truncating a column value (by adding action ColumnProperties with parameter TrimDatatype to the capture location).

Syncing Kafka, Interruption of Message Sending, and Consuming Messages with Idempotence

An DSS integrate job performs a sync of messages sent into Kafka at the end of each integrate cycle, instead of after each individual message. This means if the job is interrupted while it is sending messages, and when it is restarted, the sending of multiple rows from the interrupted cycle may be repeated. Programs consuming Kafka messages must be able to cope with this repetition; this is called being 'idempotent'. One technique to be idempotent is to track an increasing sequence in each message and use detect which messages have already been processed. A column with such an increasing sequence can be defined using the following action:

Group Table Action Parameter(s)
KAFKA * ColumnProperties Name=integ_key, Extra, Datatype=varchar, Length=45, IntegrateExpression="{dss_integ_seq}"

If DSS resends a message, its contents will be identical each time, including this sequence number.

Kafka Message Keys and Partitioning

Kafka messages can contain a 'key' that Kafka uses to put messages into partitions, so consumption can be parallelized. DSS typically puts a key into each message which contains a hash computed from values in the 'distribution key' column of each row. This key is present only if the messages are in JSON or AVRO format. It is not present when the location property MESSAGE BUNDLING (Kafka_Message_Bundling) is set to TRANSACTION or THRESHOLD.

Customize Integrate

By default, for Kafka, DSS does not replicate the delete operation performed at the source location. So to integrate the delete operation, an extra column for time key needs to be added in the target location.

For this, action ColumnProperties may be defined with the following parameters:

Group Table Action Parameter(s)
KAFKA * ColumnProperties Name=op_val, Datatype=integer, IntegrateExpression="{dss_op}", Extra
KAFKA * ColumnProperties Name=integ_seq, Datatype=varchar, Length=45, IntegrateExpression="{dss_integ_seq}", TimeKey, Extra

Case Sensitive Table Names

By default, DSS stores all table names in lowercase. To store the table names in uppercase on Kafka target location, define the following:

  • When creating a source location, select the option Case Sensitive Names at step 4 of the process.
  • When creating the target Kafka location, select the option Default Topic and change the default value {dss_tbl_name} to {dss_tbl_base_name}.

Azure Event Hubs

When using Azure Event Hubs (a Kafka broker implementation) as a target location, you may get a Kafka policy violation error caused by a request/reply timeout. In this case, we recommend setting the DSS_KAFKA_PROPERTY environment variable for the Azure Event Hubs location using the following configuration.

Here is an example configuration for the DSS_KAFKA_PROPERTY environment variable.

Group Tables Action Parameter
Apache Kafka * Environment Name=DSS_KAFKA_PROPERTY, Value="queue.buffering.max.messages=2000;request.timeout.ms=100000"

Additionally, the following environment variables can be configured in certain cases.

  • DSS_KAFKA_MSG_DELIVERY_TIMEOUT This parameter allows setting up the message delivery timeout. The default value is 5 minutes, but it could be increased if the throughput value is low but the queue.buffering.max.messages value is big. For example, if queue.buffering.max.messages=100000, throughput=1 unit, the value for DSS_KAFKA_MSG_DELIVERY_TIMEOUT can be 16 minutes;
  • DSS_KAFKA_RATE_LIMIT
    This parameter allows setting up the amount of outgoing messages. It can resolve the case with some strict limitations (like the consumer could not process more than 100 messages per second). The values for DSS_KAFKA_RATE_LIMIT can be 1/10/100/1000 (messages per second).
  • It is recommended to do a refresh for each table separately in a channel that has tables of different sizes.
  • It is recommended to run a refresh without parallelism for tables with minimum throughput units on the Azure Event Hubs as this can result in an error.

The following table provides the recommended settings for the queue.buffering.max.messages and request.timeout.ms configuration properties.

Property Recommended Values Throughput 
queue.buffering.max.messages 1000 (2000 is a maximum) 1 unit
queue.buffering.max.messages 2000 (4000 is a maximum) 2 units
queue.buffering.max.messages 40000 (80000 is a maximum) 40 units
request.timeout.ms up to 30000 ms

For details on connecting to Azure Event Hubs, see section Location Connection for Apache Kafka.