OpenMSIStreamConsumer
- class openmsistream.kafka_wrapper.OpenMSIStreamConsumer(consumer_type, configs, kafkacrypto=None, message_key_regex=None, filter_new_message_keys=False, starting_offsets=None, filepath_regex=None, **kwargs)
Bases:
LogOwner
Wrapper for working with a Consumer of some type. Expects message values that are
DataFileChunk
objects by default; other message value types can be accommodated by setting “value.deserializer” in the config file.- Parameters:
consumer_type (
confluent_kafka.DeserializingConsumer
orkafkacrypto.KafkaConsumer
) – The type of underlying Consumer that should be usedconfigs (dict) – A dictionary of configuration names and parameters to use in instantiating the underlying Consumer
kafkacrypto (
OpenMSIStreamKafkaCrypto
, optional) – TheOpenMSIStreamKafkaCrypto
object that should be used to instantiate the Consumer. Only needed if consumer_type iskafkacrypto.KafkaConsumer
.message_key_regex (
re.compile()
or None, optional) – A regular expression to filter messages based on their keys. Only messages matching this regex will be returned byget_next_message()
. This parameter only has an effect if restart_at_beginning is true and a consumer group with the given ID has previously consumed messages from the topic, or if filter_new_message_keys is True. Messages with keys that are not strings will always be consumed, logging warnings if they should be filtered.filter_new_message_keys (bool, optional) – If False (the default) the message_key_regex will only be used to filter messages from before each partition’s currently committed location. Useful if you want to process only some messages from earlier in the topic, but all new messages that have never been read. To filter every message read instead of just previously-consumed messages, set this to True.
starting_offsets (list(
confluent_kafka.TopicPartition
) or None, optional) – A list ofconfluent_kafka.TopicPartition
objects listing the initial starting offsets for each partition in the topic for Consumers with this group ID. If filter_new_message_keys is False, messages with offsets greater than or equal to these will NOT be filtered using message_key_regex.filepath_regex (
re.compile()
or None, optional) – A regular expression to filter messages based on the path to the file with which they’re associated (relative to the “root” upload directory). Only messages matching this regex will be returned byget_next_message()
. Messages that cannot be deserialized toDataFileChunk
objects will always be consumed, logging warnings if they should be filtered.kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Consumer, with underscores in their names replaced by dots
- Raises:
ValueError – if consumer_type is not
confluent_kafka.DeserializingConsumer
orkafkacrypto.KafkaConsumer
ValueError – if consumer_type is
kafkacrypto.KafkaConsumer
and kafkacrypto is None
- static get_consumer_args_kwargs(config_file_path, logger=None, kafkacrypto=None, treat_undecryptable_as_plaintext=False, **kwargs)
Return the list of arguments and dictionary or keyword arguments that should be used to instantiate
OpenMSIStreamConsumer
objects based on the given config file. Used to share a singleOpenMSIStreamKafkaCrypto
instance across several Consumers.- Parameters:
config_file_path (
pathlib.Path
) – Path to the config file to use in defining Consumerslogger (
Logger
) – TheLogger
object to use for each of theOpenMSIStreamConsumer
objectskafkacrypto (
OpenMSIStreamKafkaCrypto
, optional) – TheOpenMSIStreamKafkaCrypto
object that should be used to instantiate Consumers. Only needed if a single specificOpenMSIStreamKafkaCrypto
instance should be shared.treat_undecryptable_as_plaintext (boolean, optional) – If True, the KafkaCrypto Deserializers will immediately return any keys/values that are not possibly decryptable as binary data. This allows faster handling of messages that will never be decryptable, such as when enabling or disabling encryption across a platform, or when unencrypted messages are mixed in a topic with encrypted messages.
kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Consumers, with underscores in their names replaced by dots
- Returns:
ret_args, the list of arguments to create new
OpenMSIStreamConsumer
objects based on the config file and arguments to this method- Return type:
- Returns:
ret_kwargs, the dictionary of keyword arguments to create new
OpenMSIStreamConsumer
objects based on the config file and arguments to this method- Return type:
- classmethod from_file(*args, **kwargs)
Wrapper around
get_consumer_args_kwargs()
and theOpenMSIStreamConsumer
constructor to return a singleOpenMSIStreamConsumer
from a given config file/arguments. Arguments are the same asget_consumer_args_kwargs()
- Returns:
An
OpenMSIStreamConsumer
object based on the given config file/arguments- Return type:
- get_next_message(*poll_args, **poll_kwargs)
Call poll() for the underlying consumer and return any consumed message’s value.
If this Consumer’s message_key_regex is not None this method may return None even though a message was consumed.
For the case of encrypted messages, this may return a
kafkacrypto.Message
object withkafkacrypto.KafkaCryptoMessages
for its key and/or value if the message fails to be decrypted within a certain amount of timeAll arguments/keyword arguments are passed through to the underlying Consumer’s poll() function.
- Returns:
a single consumed
confluent_kafka.Message
object, an undecryptedkafkacrypto.Message
object, or None- Return type:
confluent_kafka.Message
,kafkacrypto.Message
, or None- Raises:
Exception – for any error encountered in calling poll(). A special error-level message will be logged, including the stack trace of the Exception.
- subscribe(*args, **kwargs)
A wrapper around the underlying Consumer’s subscribe() method
- commit(message=None, offsets=None, asynchronous=True)
A wrapper around the underlying Consumer’s commit() method. Exactly one of message and offsets must be given.
- Parameters:
message (
confluent_kafka.Message
orkafkacrypto.Message
, optional) – The message whose offset should be committedoffsets (list(
confluent_kafka.TopicPartition
), optional) – The list of topic+partitions+offsets to commitasynchronous (bool) – If True, the Consumer.commit call will return immediately and run asynchronously instead of blocking
- Raises:
ValueError – if message and offsets are both given
- close(*args, **kwargs)
Combined wrapper around the underlying Consumer’s close() method and
kafkacrypto.KafkaCrypto.close()
.
- message_consumed_before = <methodtools._LruCacheWire object>