Skip to content

DataSource

DataSource

Utility functions at data source level

Source code in ddataflow/data_source.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class DataSource:
    """
    Utility functions at data source level
    """

    def __init__(
        self,
        *,
        name: str,
        config: dict,
        local_data_folder: str,
        snapshot_path: str,
        size_limit,
    ):
        self._name = name
        self._local_data_folder = local_data_folder
        self._snapshot_path = snapshot_path
        self._size_limit = size_limit
        self._config = config
        self._filter = None
        self._source = None

        if "source" in self._config:
            self._source = config["source"]
        else:
            if self._config.get("file-type") == "parquet":
                self._source = lambda spark: spark.read.parquet(self._name)
            else:
                self._source = lambda spark: spark.table(self._name)

        if "filter" in self._config:
            self._filter = self._config["filter"]
        else:
            if self._config.get("default_sampling"):
                self._filter = lambda df: filter_function(df)

    def query(self):
        """
        query with filter unless none is present
        """
        df = self.query_without_filter()

        if self._filter is not None:
            print(f"Filter set for {self._name}, applying it")
            df = self._filter(df)
        else:
            print(f"No filter set for {self._name}")

        return df

    def has_filter(self) -> bool:
        return self._filter is not None

    def query_without_filter(self):
        """
        Go to the raw data source without any filtering
        """
        spark = get_or_create_spark()
        logger.debug(f"Querying without filter source: '{self._name}'")
        return self._source(spark)

    def query_locally(self):
        logger.info(f"Querying locally {self._name}")

        path = self.get_local_path()
        if not os.path.exists(path):
            raise Exception(
                f"""Data source '{self.get_name()}' does not have data in {path}.
            Consider downloading using  the following command:
            ddataflow current_project download_data_sources"""
            )
        spark = get_or_create_spark()
        df = spark.read.parquet(path)

        return df

    def get_dbfs_sample_path(self) -> str:
        return os.path.join(self._snapshot_path, self._get_name_as_path())

    def get_local_path(self) -> str:
        return os.path.join(self._local_data_folder, self._get_name_as_path())

    def _get_name_as_path(self):
        """
        converts the name when it has "/mnt/envents" in the name to a single file in a (flat structure) _mnt_events
        """
        return self.get_name().replace("/", "_")

    def get_name(self) -> str:
        return self._name

    def get_parquet_filename(self) -> str:
        return self._name + ".parquet"

    def estimate_size_and_fail_if_too_big(self):
        """
        Estimate the size of the data source use the _name used in the _config
        It will throw an exception if the estimated size is bigger than the maximum allowed in the configuration
        """

        print("Estimating size of data source: ", self.get_name())
        df = self.query()
        size_estimation = self._estimate_size(df)

        print("Estimated size of the Dataset in GB: ", size_estimation)

        if size_estimation > self._size_limit:
            raise BiggerThanMaxSize(self._name, size_estimation, self._size_limit)

        return df

    def _estimate_size(self, df: DataFrame) -> float:
        """
        Estimates the size of a dataframe in Gigabytes

        Formula:
            number of gigabytes = (N*V*W) / 1024^3
        """

        print(f"Amount of rows in dataframe to estimate size: {df.count()}")
        average_variable_size_bytes = 50
        return (df.count() * len(df.columns) * average_variable_size_bytes) / (1024**3)

estimate_size_and_fail_if_too_big()

Estimate the size of the data source use the _name used in the _config It will throw an exception if the estimated size is bigger than the maximum allowed in the configuration

Source code in ddataflow/data_source.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def estimate_size_and_fail_if_too_big(self):
    """
    Estimate the size of the data source use the _name used in the _config
    It will throw an exception if the estimated size is bigger than the maximum allowed in the configuration
    """

    print("Estimating size of data source: ", self.get_name())
    df = self.query()
    size_estimation = self._estimate_size(df)

    print("Estimated size of the Dataset in GB: ", size_estimation)

    if size_estimation > self._size_limit:
        raise BiggerThanMaxSize(self._name, size_estimation, self._size_limit)

    return df

query()

query with filter unless none is present

Source code in ddataflow/data_source.py
47
48
49
50
51
52
53
54
55
56
57
58
59
def query(self):
    """
    query with filter unless none is present
    """
    df = self.query_without_filter()

    if self._filter is not None:
        print(f"Filter set for {self._name}, applying it")
        df = self._filter(df)
    else:
        print(f"No filter set for {self._name}")

    return df

query_without_filter()

Go to the raw data source without any filtering

Source code in ddataflow/data_source.py
64
65
66
67
68
69
70
def query_without_filter(self):
    """
    Go to the raw data source without any filtering
    """
    spark = get_or_create_spark()
    logger.debug(f"Querying without filter source: '{self._name}'")
    return self._source(spark)