class documentation

class IncrementalDataSet(PartitionedDataSet): (source)

View In Hierarchy

IncrementalDataSet inherits from PartitionedDataSet, which loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec. IncrementalDataSet also stores the information about the last processed partition in so-called checkpoint that is persisted to the location of the data partitions by default, so that subsequent pipeline run loads only new partitions past the checkpoint.

Example:

>>> from kedro.io import IncrementalDataSet
>>>
>>> # these credentials will be passed to:
>>> # a) 'fsspec.filesystem()' call,
>>> # b) the dataset initializer,
>>> # c) the checkpoint initializer
>>> credentials = {"key1": "secret1", "key2": "secret2"}
>>>
>>> data_set = IncrementalDataSet(
>>>     path="s3://bucket-name/path/to/folder",
>>>     dataset="pandas.CSVDataSet",
>>>     credentials=credentials
>>> )
>>> loaded = data_set.load()  # loads all available partitions
>>> # assert isinstance(loaded, dict)
>>>
>>> data_set.confirm()  # update checkpoint value to the last processed partition ID
>>> reloaded = data_set.load()  # still loads all available partitions
>>>
>>> data_set.release()  # clears load cache
>>> # returns an empty dictionary as no new partitions were added
>>> data_set.load()
Method __init__ Creates a new instance of IncrementalDataSet.
Method confirm Confirm the dataset by updating the checkpoint value to the latest processed partition ID
Constant DEFAULT_CHECKPOINT_FILENAME Undocumented
Constant DEFAULT_CHECKPOINT_TYPE Undocumented
Method _list_partitions Undocumented
Method _load Undocumented
Method _parse_checkpoint_config Undocumented
Method _read_checkpoint Undocumented
Instance Variable _checkpoint_config Undocumented
Instance Variable _comparison_func Undocumented
Instance Variable _force_checkpoint Undocumented
Property _checkpoint Undocumented

Inherited from PartitionedDataSet:

Method _describe Undocumented
Method _exists Undocumented
Method _invalidate_caches Undocumented
Method _join_protocol Undocumented
Method _partition_to_path Undocumented
Method _path_to_partition Undocumented
Method _release Undocumented
Method _save Undocumented
Instance Variable _credentials Undocumented
Instance Variable _dataset_config Undocumented
Instance Variable _dataset_type Undocumented
Instance Variable _filename_suffix Undocumented
Instance Variable _filepath_arg Undocumented
Instance Variable _fs_args Undocumented
Instance Variable _load_args Undocumented
Instance Variable _overwrite Undocumented
Instance Variable _partition_cache Undocumented
Instance Variable _path Undocumented
Instance Variable _protocol Undocumented
Instance Variable _sep Undocumented
Property _filesystem Undocumented
Property _normalized_path Undocumented

Inherited from AbstractDataSet (via PartitionedDataSet):

Class Method from_config Create a data set instance using the configuration provided.
Method __str__ Undocumented
Method exists Checks whether a data set's output already exists by calling the provided _exists() method.
Method load Loads data by delegation to the provided load method.
Method release Release any cached data.
Method save Saves data by delegation to the provided save method.
Method _copy Undocumented
Property _logger Undocumented
def __init__(self, path: str, dataset: Union[str, Type[AbstractDataSet], Dict[str, Any]], checkpoint: Union[str, Dict[str, Any]] = None, filepath_arg: str = 'filepath', filename_suffix: str = '', credentials: Dict[str, Any] = None, load_args: Dict[str, Any] = None, fs_args: Dict[str, Any] = None): (source)

Creates a new instance of IncrementalDataSet.

Parameters
path:strPath to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataSet.
dataset:Union[str, Type[AbstractDataSet], Dict[str, Any]]Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataSet b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.
checkpoint:Union[str, Dict[str, Any]]Optional checkpoint configuration. Accepts a dictionary with the corresponding dataset definition including filepath (unlike dataset argument). Checkpoint configuration is described here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#checkpoint-configuration Credentials for the checkpoint can be explicitly specified in this configuration.
filepath_arg:strUnderlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath".
filename_suffix:strIf specified, only partitions that end with this string will be processed.
credentials:Dict[str, Any]Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem, the dataset dataset initializer and the checkpoint. If the dataset or the checkpoint configuration contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-credentials
load_args:Dict[str, Any]Keyword arguments to be passed into find() method of the filesystem implementation.
fs_args:Dict[str, Any]Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem).
Raises
DataSetErrorIf versioning is enabled for the underlying dataset.
def confirm(self): (source)

Confirm the dataset by updating the checkpoint value to the latest processed partition ID

DEFAULT_CHECKPOINT_FILENAME: str = (source)

Undocumented

Value
'CHECKPOINT'
DEFAULT_CHECKPOINT_TYPE: str = (source)

Undocumented

Value
'kedro.extras.datasets.text.TextDataSet'
@cachedmethod(cache=operator.attrgetter('_partition_cache'))
def _list_partitions(self) -> List[str]: (source)
def _load(self) -> Dict[str, Callable[[], Any]]: (source)

Undocumented

def _parse_checkpoint_config(self, checkpoint_config: Union[str, Dict[str, Any], None]) -> Dict[str, Any]: (source)

Undocumented

def _read_checkpoint(self) -> Union[str, None]: (source)

Undocumented

_checkpoint_config = (source)

Undocumented

_comparison_func = (source)

Undocumented

_force_checkpoint = (source)

Undocumented

Undocumented