DataFileUploadDirectory
- class openmsistream.DataFileUploadDirectory(dirpath, config_path, upload_regex=re.compile('^((?!(\\.|.*.log))).*$'), datafile_type=<class 'openmsistream.data_file_io.entity.upload_data_file.UploadDataFile'>, watchdog_lag_time=3, use_polling_observer=False, **kwargs)
Bases:
DataFileDirectory
,ControlledProcessSingleThread
,ProducerGroup
,Runnable
Class representing a directory being watched for new files to be added. Files added to the directory are broken into chunks and uploaded as messages to a Kafka topic.
- Parameters:
dirpath (
pathlib.Path
) – Path to the directory that should be watched for new filesconfig_path (
pathlib.Path
) – Path to the config file to use in defining the Broker connection and Producersupload_regex (
re.compile()
, optional) – only files matching this regular expression will be uploadeddatafile_type (
UploadDataFile
, optional) – the type of data file that recognized files should be uploaded as (must be a subclass ofUploadDataFile
)watchdog_lag_time (int) – Number of seconds that files must remain static (unmodified) for their uploads to begin
- Raises:
ValueError – if datafile_type is not a subclass of
UploadDataFile
- ARGUMENT_PARSER_TYPE
alias of
OpenMSIStreamArgumentParser
- upload_files_as_added(topic_name, n_threads=2, chunk_size=524288, max_queue_size=500, upload_existing=False)
Watch for new files to be added to the directory. Chunk and produce newly added files as messages to a Kafka topic.
- Parameters:
topic_name (str) – Name of the topic to produce messages to
n_threads (int, optional) – The number of threads to use to produce from the shared queue
chunk_size (int, optional) – The size of each file chunk in bytes
max_queue_size (int, optional) – The maximum size in MB of the internal upload queue (this is distinct from the librdkafka buffering queue)
upload_existing (bool, optional) – True if any files that already exist in the directory should be uploaded. If False (the default) then only files added to the directory after startup will be enqueued to the producer
- producer_callback(err, msg, prodid, filename, filepath, n_total_chunks, chunk_i)
A reference to this method is given as the callback for each call to
confluent_kafka.Producer.produce()
. It is called for every message upon acknowledgement by the broker, and it uses the file registries in the LOGS subdirectory to keep the information about what has and hasn’t been uploaded current with what has been received by the broker.If the message is the final one to be acknowledged from its corresponding
UploadDataFile
, theUploadDataFile
is registered as “fully produced”.- Parameters:
err (
confluent_kafka.KafkaError
) – The error object for the messagemsg (
confluent_kafka.Message
) – The message objectprodid (str) – The ID of the producer that produced the message (hex(id(producer)) in memory)
filename (str) – The name of the file the message is coming from
filepath (
pathlib.Path
) – The full path to the file the message is coming fromn_total_chunks (int) – The total number of chunks in the file this message came from
chunk_i (int) – The index of the message’s file chunk in the full list for the file
- filepath_should_be_uploaded(filepath)
Returns True if a given filepath should be uploaded (if it matches the upload regex)
- Parameters:
filepath (
pathlib.Path
) – A candidate upload filepath- Returns:
True if the filepath is an existing file outside of the LOGS directory whose path matches the upload regex, False otherwise
- Return type:
- Raises:
TypeError – if filepath isn’t a
pathlib.Path
object
- classmethod get_command_line_arguments()
Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag
- classmethod get_init_args_kwargs(parsed_args)
Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.
- Parameters:
parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class
- Returns:
A list of init args
- Returns:
A dictionary of init kwargs
- classmethod run_from_command_line(args=None)
Run a
DataFileUploadDirectory
directly from the command lineCalls
upload_files_as_added()
on aDataFileUploadDirectory
defined by command line (or given) arguments- Parameters:
args (List) – the list of arguments to send to the parser
- property other_datafile_kwargs
Overload this property to send extra keyword arguments to the
UploadDataFile
constructor for each recognized file (useful if using a custom datafile_type)
- property status_msg
A message stating the number of files currently at each stage of progress, up to the maximum number of files
- property progress_msg
A message describing the files that are currently recognized as part of the directory
- property have_file_to_upload
True if there are any files waiting to be uploaded or in progress
- property partially_done_file_paths
A list of filepaths whose uploads are currently in progress
- property n_partially_done_files
The number of files currently in the process of being uploaded