DownloadDataFile

class openmsistream.data_file_io.DownloadDataFile(filepath, *args, **kwargs)

Bases: DataFile, ABC

Class to represent a data file that will be read as messages from a topic

Parameters:

filepath (pathlib.Path) – Path to the file

static get_full_filepath(dfc)

Return the full filepath of a downloaded file given one of its DataFileChunks

Parameters:

dfc (DataFileChunk) – One of the DataFileChunk objects contributing to the file

Returns:

the full path to the file

Return type:

pathlib.Path

abstract property bytestring

A bytestring of the data contained in the file. Not implemented in the base class.

abstract property check_file_hash

The hash of the data in the file after it was read. Not implemented in the base class.

property relative_filepath

The path to the file, relative to its root directory

add_chunk(dfc, thread_lock=<contextlib.nullcontext object>)

Process a chunk that’s been read from a topic. Returns a number of codes based on what effect adding the chunk had.

This function calls _on_add_chunk(), with the DataFileChunk as the argument.

Parameters:
  • dfc (DataFileChunk) – the DataFileChunk object whose data should be added

  • thread_lock (threading.Lock, optional) – the lock object to acquire/release so that race conditions don’t affect reconstruction of the files (only needed if running this function asynchronously)

Returns:

an internal code indicating whether the chunk: was successfully added to a file in progress, was already received, was the last chunk needed and the file is successfully reconstructed according to its hash or the post-reconstruction hash is mismatched to the hash of the file contents originally read from disk.

Return type:

int

_apply_generation_policy(dfc, thread_lock)

Check and apply the directional generation policy for a chunk.

Returns a status code if the chunk was handled by the policy (reset, skipped, or error), or None if the chunk should be accepted normally.

Policy:

hash matches → None (accept, same generation) hash differs, n > → reset and adopt (newer generation) hash differs, n <= → skip (stale or indeterminate) hash same, n differs → genuine corruption (ValueError)

_handle_chunk_count_mismatch(dfc, thread_lock)

Handle a chunk whose n_total_chunks differs from what we expected. Returns a status code.

abstract _on_add_chunk(dfc)

A function to actually process a new chunk being added to the file. This function is executed while a thread lock is acquired so it will never run asynchronously. Also any DataFileChunks passed to this function are guaranteed to have unique offsets.

Not implemented in the base class.

Parameters:

dfc (DataFileChunk) – the DataFileChunk object whose data should be added

_reset_for_new_generation()

Reset internal state to accept chunks from a new upload generation.

Called when a chunk arrives with a different file_hash and a strictly higher n_total_chunks, indicating a newer (larger) version of the same file. The directional policy ensures we only reset forward to a generation with more chunks, never backward or sideways.

Subclasses should override to clean up their own state (e.g., delete partial files on disk, clear in-memory data dicts).