Using Main Programs

Details on programs

The main functional programs provided in OpenMSIStream are accessible through special “console entry point” commands. The pages linked below will describe how these programs can be run from the command line and what they do.

OpenMSIStream also provides some useful base classes that can be extended to create customized Kafka streaming workflows for specific lab use cases. Those base classes are described in detail on the pages linked below, as well as documented in the API reference.

The sections on this page below provide further information necessary to work with or extend any of these main programs.

Configuration files

Working with any of the programs in OpenMSIStream requires creating at least one configuration file that will tell the Kafka backend how to connect to the broker and configure the main Producer(s)/Consumer(s) it uses. Information in configuration files is also used to point a Consumer to an S3 bucket to the correct endpoint and connect to it, and to supply information necessary for working with data encrypted using KafkaCrypto.

Location and Usage

Programs accept the “--config” command line argument to specify the configuration file they should use. The value of that argument can either be the path to the file to use, or the name of a file in the default location. The default location is initially set to the openmsistream/kafka_wrapper/config directory directory in the installed repository, but you can change its location by setting the OPENMSISTREAM_CONFIG_FILE_DIR environment variable on your system (note that setting this variable without copying the files from the default location will break the tutorials and CI tests on your system).

You can see what the default config file location is at any time by running, for example:

UploadDataFile -h

and checking the output message for the “--config” argument, if you’d like to put files there and reference them by name instead of by their path in some different location.

The default value of the “--config” argument is “test” which points to the test.config file that comes installed with OpenMSIStream. That file includes “[broker]”, “[producer]”, and “[consumer]” sections with reasonable all-around values in them, and environment variable references to specify the broker connection. If your broker has plaintext SASL authentication enabled and you set values for the “KAFKA_TEST_CLUSTER_BOOTSTRAP_SERVERS”, “KAFKA_TEST_CLUSTER_USERNAME”, and “KAFKA_TEST_CLUSTER_PASSWORD” environment variables, then you can use the default configuration file to get started moving files right away (without encryption or interaction with an S3 bucket).

In practice, however, different setups benefit from using bespoke configuration settings based on the sizes of files involved and the type of program being run. You can use the example files as starting points to adjust your configuration as necessary, pointing the programs you run to your custom file using the “--config” command line argument. In addition to the information below, Confluent has released a white paper about optimizing Kafka deployments for throughput, latency, durability, and availability that includes configuration recommendations that you could try out in your own use cases.

Structure

In general, a configuration file is a text file with one or more distinct and named sections. Comments can be added by using lines starting with “#”, and other whitespace in general is ignored. Each section begins with a heading line like “[section_name]” (with square brackets included), and beneath that heading different parameters are supplied using lines like “key = value”. The structure of the files is that expected by the Python configparser module; check the docs there for more information if you’d like.

If any parameter’s value begins with the “$” character, the configuration file parser will attempt to expand it as an environment variable. This is a useful way to, for example, store secrets like usernames or passwords as environment variables instead of plain text. You can set these environment variables in a shell .rc or .profile file if running on Linux or Mac OS. On Windows you can set them as machine environment variables using PowerShell commands like:

[Environment]::SetEnvironmentVariable("NAME","VALUE",[EnvironmentVariableTarget]::Machine)

[broker]” section

Options listed under the [broker] section heading configure which Kafka broker should be used by a program and how to connect to it. You can add here any parameters recognized by Kafka brokers either in Confluent Platform or in the librdkafka backend, but common parameters here include:

  • bootstrap.servers to detail the server on which the broker is hosted

  • sasl.mechanism and security.protocol to describe how programs are authenticated to interact with the broker

  • sasl.username and sasl.password to provide the key and secret of an API key created for the broker

[producer]” section

Options in the [producer] section configure the Producer (or group of Producers) used by a program. You can add here any parameters recognized by Kafka Producers either in Confluent Platform or in the librdkafka backend, but some of the most useful are:

  • message.max.bytes to configure the maximum size of individual messages

  • batch.size to control the maximum number of messages in each batch sent to the broker

  • linger.ms to change how long a batch of messages should wait to become as full as possible before being sent to the broker

  • compression.type to add or change how batches of messages are compressed before being produced (and decompressed afterward)

Additionally, the key.serializer and value.serializer configs allow users to change methods used to convert message keys and values (respectively) to byte arrays. The OpenMSIStream code provides an additional option called DataFileChunkSerializer that you’ll want to use if you’re producing chunks of data files.

[consumer]” section

Options in the [consumer] section configure the Consumer (or group of Consumers) used by a program. Again here any parameters recognized by Kafka Consumers either in Confluent Platform or in the librdkafka backend are valid, but some of the most useful/important for OpenMSIStream are:

  • group.id to group Consumers amongst one another. Giving “create_new” for this parameter will create a new group ID every time the code is run. (This config may be overridden by a value from the command line in some cases.)

  • auto.offset.reset to tell the Consumer where in the log to start consuming messages if no previously-committed offset for the consumer group can be found. “earliest” will start at the beginning of the topic and “latest” will start at the end. Giving “none” for this parameter will remove it from the configs, and an error will be thrown if no previously-committed offset for the consumer group can be found.

  • enable.auto.commit to tell the Consumer whether or not to automatically commit offsets. Some portions of the code manually commit offsets, and if this config is left as its default value (True) a Warning will be logged stating that the “at least once” guarantee is not valid unless you set enable.auto.commit = False.

  • fetch.min.bytes to change how many bytes must accumulate before a batch of messages is consumed from the topic (consuming batches of messages is also subject to a timeout, so changing this parameter will only ever adjust the tradeoff between throughput and latency, but will not prevent any messages from being consumed in general)

Additionally, the key.deserializer and value.deserializer configs allow users to change methods used to convert message keys and values (respectively) from byte arrays to objects. The OpenMSIStream code provides an additional option called DataFileChunkDeserializer that you’ll want to use if you’re consuming messages that are chunks of data files produced by OpenMSIStream.

“Heartbeat” messages for long-running programs (optional)

Many of the programs provided by OpenMSIStream are designed to run for long periods of time on remote systems. OpenMSIStream therefore provides an option to periodically send “heartbeat” messages to a Kafka topic to give users insight into which of their programs are still running, even remotely.

“Heartbeat” messages have a consistent structure and are produced at a constant (configurable) interval. By default, each message has a string key like “[program_ID]_heartbeat” (where program_ID is set by the user when the program is first started) and a json-formatted string value with a “timestamp” field (timestamps are given in ISO format in the program-local timezone). Additional fields in the heartbeat message values are detailed on the individual program/base class pages linked above, but they generally include the number of messages and bytes produced/read/processed since the last heartbeat was sent.

Command line arguments

All long-running programs accept the following three command line arguments to configure how heartbeat messages are sent:

  • --heartbeat_topic_name: the name of the topic to which heartbeat messages should be produced. This parameter must be included on the command line to produce heartbeat messages.

  • --heartbeat_program_id: the “ID” of the long-running program that should uniquely identify it amongst any programs producing heartbeat messages to the given topic (this is what goes in the key of each message). If this value isn’t given on the command line, the message keys will include the hex code of the heartbeat producer’s location in memory instead, which WILL change if the program is shut down and restarted (it’s also not particularly identifiable or meaningful).

  • --heartbeat_interval_secs: how often (in seconds) the “heartbeat” messages should be sent to the topic. NOTE: this parameter (and the discussion of “heartbeats” in this section in general) is completely independent of the “heartbeat.interval.ms” configuration parameter for Kafka consumers (all Kafka consumers send periodic “heartbeats” of their own to the broker so that brokers know which consumers are still part of the group and which have stopped listening to topics).

Broker/producer configuration

The broker to which the heartbeat messages are sent (and the producer used to send them) are configured from the main program configuration file. By default, the broker is configured using the regular “[broker]” section of the main program file and the heartbeat producer uses the default Kafka Producer configurations.

Heartbeats can be sent to a different broker by including those broker configurations in the optional “[heartbeat]” section of the main configuration file. If the files includes a “[heartbeat]” section with a bootstrap.servers parameter, the “[heartbeat]” section must also include any other necessary authentication/configuration parameters for the broker to use.

The “[heartbeat]” section in the configuration file can also contain any of the parameters allowed in the “[producer]” section. These parameters will be used to configure the producer that sends the heartbeat messages.

Delivery guarantees

Heartbeat messages are not sent with any callbacks registered. No guarantees are made with respect to their delivery to the registered broker beyond a best-faith effort from the producer sending them. Users should not assume that heartbeat messages accurately reflect, for example, the amount of data stored in a particular topic, and should only use heartbeat messages as approximate metrics and for general health checks.

Consuming heartbeat messages

Heartbeat messages have string keys and values. The value strings are json-formatted and parseable to dictionaries. The following Python code is an example of a program that can run in an OpenMSIStream-configured computing environment to read heartbeat messages:

import uuid, json
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka import DeserializingConsumer
from openmsistream.kafka_wrapper import OpenMSIStreamConsumer

configs = {
    "bootstrap.servers":"localhost:9092",
    "key.deserializer": StringDeserializer(),
    "value.deserializer": StringDeserializer(),
    "group.id": str(uuid.uuid4()),
    "auto.offset.reset": "earliest",
}
consumer = OpenMSIStreamConsumer(DeserializingConsumer, configs)
consumer.subscribe(["heartbeats"])
while True:
    msg = consumer.get_next_message(5)
    if msg is not None:
        value_dict = json.loads(msg.value())
        print(f"[{msg.key()}]: {value_dict}")

(The example is reading from a broker with no user authentication running on localhost port 9092, using a random group ID for the consumer.)

Production of own logs via Kafka (optional)

Many of the programs provided by OpenMSIStream run on remote systems. OpenMSIStream provides an option to send messages containing that nodes OpenMSI logs to a Kafka topic.

“Log” messages are produced at a constant (configurable) interval. By default, each message has a string key like “[program_ID]_log” (where program_ID is set by the user when the program is first started) and a json-formatted string containing a “timestamp” (unix epoch seconds) and a “messages” array of log entries.

Command line arguments

All long-running programs accept the following three command line arguments to configure how log messages are sent:

  • --log_topic_name: the name of the topic to which log messages should be produced. This parameter must be included on the command line to produce log messages.

  • --log_program_id: the “ID” of the long-running program that should uniquely identify it amongst any programs producing log messages to the given topic (this is what goes in the key of each message). If this value isn’t given on the command line, the message keys will include the hex code of the log producer’s location in memory instead, which WILL change if the program is shut down and restarted (it’s also not particularly identifiable or meaningful).

  • --log_interval_secs: how often (in seconds) the “log” messages should be sent to the topic.

Broker/producer configuration

The broker to which the log messages are sent (and the producer used to send them) are configured from the main program configuration file. By default, the broker is configured using the regular “[broker]” section of the main program file and the log producer uses the default Kafka Producer configurations.

Logs can be sent to a different broker by including those broker configurations in the optional “[log]” section of the main configuration file. If the files includes a “[log]” section with a bootstrap.servers parameter, the “[log]” section must also include any other necessary authentication/configuration parameters for the broker to use.

The “[log]” section in the configuration file can also contain any of the parameters allowed in the “[producer]” section. These parameters will be used to configure the producer that sends the log messages.

Delivery guarantees

Log messages are not sent with any callbacks registered. No guarantees are made with respect to their delivery to the registered broker beyond a best-faith effort from the producer sending them. Users should not assume that log messages are complete logs (though they strive to be).

Consuming log messages

Log messages have string keys and values. The value strings are json-formatted and parseable to dictionaries. The following Python code is an example of a program that can run in an OpenMSIStream-configured computing environment to read log messages:

import uuid, json
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka import DeserializingConsumer
from openmsistream.kafka_wrapper import OpenMSIStreamConsumer

configs = {
    "bootstrap.servers":"localhost:9092",
    "key.deserializer": StringDeserializer(),
    "value.deserializer": StringDeserializer(),
    "group.id": str(uuid.uuid4()),
    "auto.offset.reset": "earliest",
}
consumer = OpenMSIStreamConsumer(DeserializingConsumer, configs)
consumer.subscribe(["logs-topic"])
while True:
    msg = consumer.get_next_message(5)
    if msg is not None:
        value_dict = json.loads(msg.value())
        print(f"[{msg.key()}]: {value_dict}")

(The example is reading from a broker with no user authentication running on localhost port 9092, using a random group ID for the consumer, and no message encryption.)

Installing programs as Services or daemons

Instances of each of the main programs can also be installed as Windows Services or Linux daemons to keep them running persistently. OpenMSIStream includes some wrapper/utility functions to facilitate working with programs installed as Services or daemons, and is designed to make the process as transparent as possible to the end user by giving them the same command line arguments to use for running programs on the command line and installing them persistently. Please see the Services/Daemons page for more information on how to set up a Service or daemon on your system.

Encrypting data (optional)

The messages sent and received by the main programs above (running either interactively or as Services/daemons) can optionally be encrypted while stored on the broker, so that only the Producer/Consumer endpoints need to be trusted. OpenMSIStream includes a wrapper around KafkaCrypto to facilitate this encryption. Please see the page on message encryption for more information, including how to provision nodes and set up configuration files to encrypt messages. Note that “heartbeat” messages (as described above) cannot be encrypted.