DataFileStreamProcessor
- class openmsistream.DataFileStreamProcessor(config_file, topic_name, **kwargs)
Bases:
DataFileStreamHandler
,DataFileChunkProcessor
,ABC
Consumes
DataFileChunk
messages into memory and perform some operation(s) when entire files are available. This is a base class that cannot be instantiated on its own.- Parameters:
config_file (
pathlib.Path
) – Path to the config file to use in defining the Broker connection and Consumerstopic_name (str) – Name of the topic to which the Consumers should be subscribed
output_dir (
pathlib.Path
, optional) – Path to the directory where the log and csv registry files should be kept (if None a default will be created in the current directory)mode (str, optional) – a string flag determining whether reconstructed data files should have their contents stored only in “memory” (the default, and the fastest), only on “disk” (in the output directory, to reduce the memory footprint), or “both” (for flexibility in processing)
datafile_type (
DownloadDataFile
, optional) – the type of data file that recognized files should be reconstructed as. Default options are set automatically depending on the “mode” argument. (must be a subclass ofDownloadDataFile
)n_threads (int, optional) – the number of threads/consumers to run
consumer_group_id (str, optional) – the group ID under which each consumer should be created
filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed
- Raises:
ValueError – if datafile_type is not a subclass of
DownloadDataFileToMemory
, or more specific as determined by the “mode” argument
- process_files_as_read()
Consumes messages and stores their data in memory. Uses several parallel threads to consume message and calls
_process_downloaded_data_file()
for fully read files. Runs until the user inputs a command to shut it down.- Returns:
the total number of messages consumed
- Return type:
- Returns:
the total number of messages processed (registered in memory)
- Return type:
- Returns:
the number of files successfully processed during the run
- Return type:
- Returns:
the paths of up to 50 most recent files successfully processed during the run
- Return type:
List
- _process_message(lock, msg, rootdir_to_set=None)
Process a single message to add it to a file being held in memory until all messages are received.
If the message failed to be decrypted, this method calls
_undecryptable_message_callback()
and returns.If the message is the first one consumed for a particular file, or any message other than the last one needed, it registers the file as ‘in_progress’ in the .csv file.
If the message is the last message needed for a file and its contents match the original hash of the file on disk, this method calls
_process_downloaded_data_file()
.If the call to
_process_downloaded_data_file()
returns None (success), this method moves the file to the ‘successfully processed’ .csv file, and returns.If the call to
_process_downloaded_data_file()
returns an Exception, this method calls_failed_processing_callback()
, registers the file as ‘failed’ in the .csv file, and returns.If the message is the last one needed but the contents are somehow different than the original file on disk, this method calls
_mismatched_hash_callback()
, registers the file as ‘mismatched_hash’ in the .csv file, and returns.- Parameters:
lock (
threading.Lock
) – Acquiring thisthreading.Lock
object ensures that only one instance of_process_message()
is running at oncemsg (
confluent_kafka.KafkaMessage
orkafkacrypto.Message
) – The receivedconfluent_kafka.KafkaMessage
object, or an undecrypted KafkaCrypto messagerootdir_to_set (
pathlib.Path
) – Path to a directory that should be set as the “root” for reconstructed data files (default is the output directory)
- Returns:
True if processing the message was successful (file in progress or successfully processed), False otherwise
- Return type:
- abstract _process_downloaded_data_file(datafile, lock)
Perform some arbitrary operation(s) on a given data file that has been fully read from the stream. Can optionally lock other threads using the given lock.
Not implemented in the base class.
- Parameters:
datafile (
DownloadDataFileToMemory
) – ADownloadDataFileToMemory
object that has received all of its messages from the topiclock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_process_downloaded_data_file()
is running at once
- Returns:
None if processing was successful, an Exception otherwise
- _failed_processing_callback(datafile, lock)
Called when
_process_downloaded_data_file()
returns an Exception, providing an opportunity for fallback/backup processing in the case of failure.Does nothing in the base class.
- Parameters:
datafile (
DownloadDataFileToMemory
) – ADownloadDataFileToMemory
object that has received all of its messages from the topiclock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_failed_processing_callback()
is running at once
- classmethod get_command_line_arguments()
Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag
- ARGUMENT_PARSER_TYPE
alias of
OpenMSIStreamArgumentParser
- _mismatched_hash_callback(datafile, lock)
Called when a file reconstructed in memory doesn’t match the hash of its contents originally on disk, providing an opportunity for fallback/backup processing in the case of failure.
Does nothing in the base class.
- Parameters:
datafile (
DownloadDataFileToMemory
) – ADownloadDataFileToMemory
object that has received all of its messages from the topiclock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_mismatched_hash_callback()
is running at once
- _undecryptable_message_callback(msg)
This function is called when a message that could not be decrypted is found. If this function is called it is likely that the file the chunk is coming from won’t be able to be processed.
In the base class, this logs a warning.
- Parameters:
msg (
kafkacrypto.Message
) – thekafkacrypto.Message
object with undecrypted :class:`kafkacrypto.KafkaCryptoMessage`s for its key and/or value
- close()
Wrapper around
kafkacrypto.KafkaCrypto.close()
.
- classmethod get_init_args_kwargs(parsed_args)
Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.
- Parameters:
parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class
- Returns:
A list of init args
- Returns:
A dictionary of init kwargs
- property other_datafile_kwargs
Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor