OpenMSIStreamConsumer

class openmsistream.kafka_wrapper.OpenMSIStreamConsumer(consumer_type, configs, kafkacrypto=None, message_key_regex=None, filter_new_message_keys=False, starting_offsets=None, filepath_regex=None, **kwargs)

Bases: LogOwner

Wrapper for working with a Consumer of some type. Expects message values that are DataFileChunk objects by default; other message value types can be accommodated by setting “value.deserializer” in the config file.

Parameters:
  • consumer_type (confluent_kafka.DeserializingConsumer or kafkacrypto.KafkaConsumer) – The type of underlying Consumer that should be used

  • configs (dict) – A dictionary of configuration names and parameters to use in instantiating the underlying Consumer

  • kafkacrypto (OpenMSIStreamKafkaCrypto, optional) – The OpenMSIStreamKafkaCrypto object that should be used to instantiate the Consumer. Only needed if consumer_type is kafkacrypto.KafkaConsumer.

  • message_key_regex (re.compile() or None, optional) – A regular expression to filter messages based on their keys. Only messages matching this regex will be returned by get_next_message(). This parameter only has an effect if restart_at_beginning is true and a consumer group with the given ID has previously consumed messages from the topic, or if filter_new_message_keys is True. Messages with keys that are not strings will always be consumed, logging warnings if they should be filtered.

  • filter_new_message_keys (bool, optional) – If False (the default) the message_key_regex will only be used to filter messages from before each partition’s currently committed location. Useful if you want to process only some messages from earlier in the topic, but all new messages that have never been read. To filter every message read instead of just previously-consumed messages, set this to True.

  • starting_offsets (list(confluent_kafka.TopicPartition) or None, optional) – A list of confluent_kafka.TopicPartition objects listing the initial starting offsets for each partition in the topic for Consumers with this group ID. If filter_new_message_keys is False, messages with offsets greater than or equal to these will NOT be filtered using message_key_regex.

  • filepath_regex (re.compile() or None, optional) – A regular expression to filter messages based on the path to the file with which they’re associated (relative to the “root” upload directory). Only messages matching this regex will be returned by get_next_message(). Messages that cannot be deserialized to DataFileChunk objects will always be consumed, logging warnings if they should be filtered.

  • kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Consumer, with underscores in their names replaced by dots

Raises:
  • ValueError – if consumer_type is not confluent_kafka.DeserializingConsumer or kafkacrypto.KafkaConsumer

  • ValueError – if consumer_type is kafkacrypto.KafkaConsumer and kafkacrypto is None

static get_consumer_args_kwargs(config_file_path, logger=None, kafkacrypto=None, treat_undecryptable_as_plaintext=False, **kwargs)

Return the list of arguments and dictionary or keyword arguments that should be used to instantiate OpenMSIStreamConsumer objects based on the given config file. Used to share a single OpenMSIStreamKafkaCrypto instance across several Consumers.

Parameters:
  • config_file_path (pathlib.Path) – Path to the config file to use in defining Consumers

  • logger (Logger) – The Logger object to use for each of the OpenMSIStreamConsumer objects

  • kafkacrypto (OpenMSIStreamKafkaCrypto, optional) – The OpenMSIStreamKafkaCrypto object that should be used to instantiate Consumers. Only needed if a single specific OpenMSIStreamKafkaCrypto instance should be shared.

  • treat_undecryptable_as_plaintext (boolean, optional) – If True, the KafkaCrypto Deserializers will immediately return any keys/values that are not possibly decryptable as binary data. This allows faster handling of messages that will never be decryptable, such as when enabling or disabling encryption across a platform, or when unencrypted messages are mixed in a topic with encrypted messages.

  • kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Consumers, with underscores in their names replaced by dots

Returns:

ret_args, the list of arguments to create new OpenMSIStreamConsumer objects based on the config file and arguments to this method

Return type:

list

Returns:

ret_kwargs, the dictionary of keyword arguments to create new OpenMSIStreamConsumer objects based on the config file and arguments to this method

Return type:

dict

classmethod from_file(*args, **kwargs)

Wrapper around get_consumer_args_kwargs() and the OpenMSIStreamConsumer constructor to return a single OpenMSIStreamConsumer from a given config file/arguments. Arguments are the same as get_consumer_args_kwargs()

Returns:

An OpenMSIStreamConsumer object based on the given config file/arguments

Return type:

OpenMSIStreamConsumer

get_next_message(*poll_args, **poll_kwargs)

Call poll() for the underlying consumer and return any consumed message’s value.

If this Consumer’s message_key_regex is not None this method may return None even though a message was consumed.

For the case of encrypted messages, this may return a kafkacrypto.Message object with kafkacrypto.KafkaCryptoMessages for its key and/or value if the message fails to be decrypted within a certain amount of time

All arguments/keyword arguments are passed through to the underlying Consumer’s poll() function.

Returns:

a single consumed confluent_kafka.Message object, an undecrypted kafkacrypto.Message object, or None

Return type:

confluent_kafka.Message, kafkacrypto.Message, or None

Raises:

Exception – for any error encountered in calling poll(). A special error-level message will be logged, including the stack trace of the Exception.

subscribe(*args, **kwargs)

A wrapper around the underlying Consumer’s subscribe() method

commit(message=None, offsets=None, asynchronous=True)

A wrapper around the underlying Consumer’s commit() method. Exactly one of message and offsets must be given.

Parameters:
  • message (confluent_kafka.Message or kafkacrypto.Message, optional) – The message whose offset should be committed

  • offsets (list(confluent_kafka.TopicPartition), optional) – The list of topic+partitions+offsets to commit

  • asynchronous (bool) – If True, the Consumer.commit call will return immediately and run asynchronously instead of blocking

Raises:

ValueError – if message and offsets are both given

close(*args, **kwargs)

Combined wrapper around the underlying Consumer’s close() method and kafkacrypto.KafkaCrypto.close().

message_consumed_before = <methodtools._LruCacheWire object>