import datetime import re import numpy as np import pytest from pandas._libs.tslibs import Timestamp from pandas.compat import is_platform_windows import pandas as pd from pandas import ( DataFrame, DatetimeIndex, Index, Series, _testing as tm, bdate_range, date_range, read_hdf, ) from pandas.tests.io.pytables.common import ( _maybe_remove, ensure_clean_store, ) from pandas.util import _test_decorators as td pytestmark = pytest.mark.single_cpu def test_conv_read_write(): with tm.ensure_clean() as path: def roundtrip(key, obj, **kwargs): obj.to_hdf(path, key=key, **kwargs) return read_hdf(path, key) o = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) tm.assert_series_equal(o, roundtrip("series", o)) o = Series(range(10), dtype="float64", index=[f"i_{i}" for i in range(10)]) tm.assert_series_equal(o, roundtrip("string_series", o)) o = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) tm.assert_frame_equal(o, roundtrip("frame", o)) # table df = DataFrame({"A": range(5), "B": range(5)}) df.to_hdf(path, key="table", append=True) result = read_hdf(path, "table", where=["index>2"]) tm.assert_frame_equal(df[df.index > 2], result) def test_long_strings(setup_path): # GH6166 data = ["a" * 50] * 10 df = DataFrame({"a": data}, index=data) with ensure_clean_store(setup_path) as store: store.append("df", df, data_columns=["a"]) result = store.select("df") tm.assert_frame_equal(df, result) def test_api(tmp_path, setup_path): # GH4584 # API issue when to_hdf doesn't accept append AND format args path = tmp_path / setup_path df = DataFrame(range(20)) df.iloc[:10].to_hdf(path, key="df", append=True, format="table") df.iloc[10:].to_hdf(path, key="df", append=True, format="table") tm.assert_frame_equal(read_hdf(path, "df"), df) # append to False df.iloc[:10].to_hdf(path, key="df", append=False, format="table") df.iloc[10:].to_hdf(path, key="df", append=True, format="table") tm.assert_frame_equal(read_hdf(path, "df"), df) def test_api_append(tmp_path, setup_path): path = tmp_path / setup_path df = DataFrame(range(20)) df.iloc[:10].to_hdf(path, key="df", append=True) df.iloc[10:].to_hdf(path, key="df", append=True, format="table") tm.assert_frame_equal(read_hdf(path, "df"), df) # append to False df.iloc[:10].to_hdf(path, key="df", append=False, format="table") df.iloc[10:].to_hdf(path, key="df", append=True) tm.assert_frame_equal(read_hdf(path, "df"), df) def test_api_2(tmp_path, setup_path): path = tmp_path / setup_path df = DataFrame(range(20)) df.to_hdf(path, key="df", append=False, format="fixed") tm.assert_frame_equal(read_hdf(path, "df"), df) df.to_hdf(path, key="df", append=False, format="f") tm.assert_frame_equal(read_hdf(path, "df"), df) df.to_hdf(path, key="df", append=False) tm.assert_frame_equal(read_hdf(path, "df"), df) df.to_hdf(path, key="df") tm.assert_frame_equal(read_hdf(path, "df"), df) with ensure_clean_store(setup_path) as store: df = DataFrame(range(20)) _maybe_remove(store, "df") store.append("df", df.iloc[:10], append=True, format="table") store.append("df", df.iloc[10:], append=True, format="table") tm.assert_frame_equal(store.select("df"), df) # append to False _maybe_remove(store, "df") store.append("df", df.iloc[:10], append=False, format="table") store.append("df", df.iloc[10:], append=True, format="table") tm.assert_frame_equal(store.select("df"), df) # formats _maybe_remove(store, "df") store.append("df", df.iloc[:10], append=False, format="table") store.append("df", df.iloc[10:], append=True, format="table") tm.assert_frame_equal(store.select("df"), df) _maybe_remove(store, "df") store.append("df", df.iloc[:10], append=False, format="table") store.append("df", df.iloc[10:], append=True, format=None) tm.assert_frame_equal(store.select("df"), df) def test_api_invalid(tmp_path, setup_path): path = tmp_path / setup_path # Invalid. df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) msg = "Can only append to Tables" with pytest.raises(ValueError, match=msg): df.to_hdf(path, key="df", append=True, format="f") with pytest.raises(ValueError, match=msg): df.to_hdf(path, key="df", append=True, format="fixed") msg = r"invalid HDFStore format specified \[foo\]" with pytest.raises(TypeError, match=msg): df.to_hdf(path, key="df", append=True, format="foo") with pytest.raises(TypeError, match=msg): df.to_hdf(path, key="df", append=False, format="foo") # File path doesn't exist path = "" msg = f"File {path} does not exist" with pytest.raises(FileNotFoundError, match=msg): read_hdf(path, "df") def test_get(setup_path): with ensure_clean_store(setup_path) as store: store["a"] = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) left = store.get("a") right = store["a"] tm.assert_series_equal(left, right) left = store.get("/a") right = store["/a"] tm.assert_series_equal(left, right) with pytest.raises(KeyError, match="'No object named b in the file'"): store.get("b") def test_put_integer(setup_path): # non-date, non-string index df = DataFrame(np.random.default_rng(2).standard_normal((50, 100))) _check_roundtrip(df, tm.assert_frame_equal, setup_path) def test_table_values_dtypes_roundtrip(setup_path): with ensure_clean_store(setup_path) as store: df1 = DataFrame({"a": [1, 2, 3]}, dtype="f8") store.append("df_f8", df1) tm.assert_series_equal(df1.dtypes, store["df_f8"].dtypes) df2 = DataFrame({"a": [1, 2, 3]}, dtype="i8") store.append("df_i8", df2) tm.assert_series_equal(df2.dtypes, store["df_i8"].dtypes) # incompatible dtype msg = re.escape( "invalid combination of [values_axes] on appending data " "[name->values_block_0,cname->values_block_0," "dtype->float64,kind->float,shape->(1, 3)] vs " "current table [name->values_block_0," "cname->values_block_0,dtype->int64,kind->integer," "shape->None]" ) with pytest.raises(ValueError, match=msg): store.append("df_i8", df1) # check creation/storage/retrieval of float32 (a bit hacky to # actually create them thought) df1 = DataFrame(np.array([[1], [2], [3]], dtype="f4"), columns=["A"]) store.append("df_f4", df1) tm.assert_series_equal(df1.dtypes, store["df_f4"].dtypes) assert df1.dtypes.iloc[0] == "float32" # check with mixed dtypes df1 = DataFrame( { c: Series(np.random.default_rng(2).integers(5), dtype=c) for c in ["float32", "float64", "int32", "int64", "int16", "int8"] } ) df1["string"] = "foo" df1["float322"] = 1.0 df1["float322"] = df1["float322"].astype("float32") df1["bool"] = df1["float32"] > 0 df1["time1"] = Timestamp("20130101") df1["time2"] = Timestamp("20130102") store.append("df_mixed_dtypes1", df1) result = store.select("df_mixed_dtypes1").dtypes.value_counts() result.index = [str(i) for i in result.index] expected = Series( { "float32": 2, "float64": 1, "int32": 1, "bool": 1, "int16": 1, "int8": 1, "int64": 1, "object": 1, "datetime64[ns]": 2, }, name="count", ) result = result.sort_index() expected = expected.sort_index() tm.assert_series_equal(result, expected) @pytest.mark.filterwarnings("ignore::pandas.errors.PerformanceWarning") def test_series(setup_path): s = Series(range(10), dtype="float64", index=[f"i_{i}" for i in range(10)]) _check_roundtrip(s, tm.assert_series_equal, path=setup_path) ts = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) _check_roundtrip(ts, tm.assert_series_equal, path=setup_path) ts2 = Series(ts.index, Index(ts.index, dtype=object)) _check_roundtrip(ts2, tm.assert_series_equal, path=setup_path) ts3 = Series(ts.values, Index(np.asarray(ts.index, dtype=object), dtype=object)) _check_roundtrip( ts3, tm.assert_series_equal, path=setup_path, check_index_type=False ) def test_float_index(setup_path): # GH #454 index = np.random.default_rng(2).standard_normal(10) s = Series(np.random.default_rng(2).standard_normal(10), index=index) _check_roundtrip(s, tm.assert_series_equal, path=setup_path) def test_tuple_index(setup_path): # GH #492 col = np.arange(10) idx = [(0.0, 1.0), (2.0, 3.0), (4.0, 5.0)] data = np.random.default_rng(2).standard_normal(30).reshape((3, 10)) DF = DataFrame(data, index=idx, columns=col) with tm.assert_produces_warning(pd.errors.PerformanceWarning): _check_roundtrip(DF, tm.assert_frame_equal, path=setup_path) @pytest.mark.filterwarnings("ignore::pandas.errors.PerformanceWarning") def test_index_types(setup_path): values = np.random.default_rng(2).standard_normal(2) func = lambda lhs, rhs: tm.assert_series_equal(lhs, rhs, check_index_type=True) ser = Series(values, [0, "y"]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [datetime.datetime.today(), 0]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, ["y", 0]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [datetime.date.today(), "a"]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [0, "y"]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [datetime.datetime.today(), 0]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, ["y", 0]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [datetime.date.today(), "a"]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [1.23, "b"]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [1, 1.53]) _check_roundtrip(ser, func, path=setup_path) ser = Series(values, [1, 5]) _check_roundtrip(ser, func, path=setup_path) dti = DatetimeIndex(["2012-01-01", "2012-01-02"], dtype="M8[ns]") ser = Series(values, index=dti) _check_roundtrip(ser, func, path=setup_path) ser.index = ser.index.as_unit("s") _check_roundtrip(ser, func, path=setup_path) def test_timeseries_preepoch(setup_path, request): dr = bdate_range("1/1/1940", "1/1/1960") ts = Series(np.random.default_rng(2).standard_normal(len(dr)), index=dr) try: _check_roundtrip(ts, tm.assert_series_equal, path=setup_path) except OverflowError: if is_platform_windows(): request.applymarker( pytest.mark.xfail("known failure on some windows platforms") ) raise @pytest.mark.parametrize( "compression", [False, pytest.param(True, marks=td.skip_if_windows)] ) def test_frame(compression, setup_path): df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) # put in some random NAs df.iloc[0, 0] = np.nan df.iloc[5, 3] = np.nan _check_roundtrip_table( df, tm.assert_frame_equal, path=setup_path, compression=compression ) _check_roundtrip( df, tm.assert_frame_equal, path=setup_path, compression=compression ) tdf = DataFrame( np.random.default_rng(2).standard_normal((10, 4)), columns=Index(list("ABCD"), dtype=object), index=date_range("2000-01-01", periods=10, freq="B"), ) _check_roundtrip( tdf, tm.assert_frame_equal, path=setup_path, compression=compression ) with ensure_clean_store(setup_path) as store: # not consolidated df["foo"] = np.random.default_rng(2).standard_normal(len(df)) store["df"] = df recons = store["df"] assert recons._mgr.is_consolidated() # empty _check_roundtrip(df[:0], tm.assert_frame_equal, path=setup_path) def test_empty_series_frame(setup_path): s0 = Series(dtype=object) s1 = Series(name="myseries", dtype=object) df0 = DataFrame() df1 = DataFrame(index=["a", "b", "c"]) df2 = DataFrame(columns=["d", "e", "f"]) _check_roundtrip(s0, tm.assert_series_equal, path=setup_path) _check_roundtrip(s1, tm.assert_series_equal, path=setup_path) _check_roundtrip(df0, tm.assert_frame_equal, path=setup_path) _check_roundtrip(df1, tm.assert_frame_equal, path=setup_path) _check_roundtrip(df2, tm.assert_frame_equal, path=setup_path) @pytest.mark.parametrize("dtype", [np.int64, np.float64, object, "m8[ns]", "M8[ns]"]) def test_empty_series(dtype, setup_path): s = Series(dtype=dtype) _check_roundtrip(s, tm.assert_series_equal, path=setup_path) def test_can_serialize_dates(setup_path): rng = [x.date() for x in bdate_range("1/1/2000", "1/30/2000")] frame = DataFrame( np.random.default_rng(2).standard_normal((len(rng), 4)), index=rng ) _check_roundtrip(frame, tm.assert_frame_equal, path=setup_path) def test_store_hierarchical(setup_path, multiindex_dataframe_random_data): frame = multiindex_dataframe_random_data _check_roundtrip(frame, tm.assert_frame_equal, path=setup_path) _check_roundtrip(frame.T, tm.assert_frame_equal, path=setup_path) _check_roundtrip(frame["A"], tm.assert_series_equal, path=setup_path) # check that the names are stored with ensure_clean_store(setup_path) as store: store["frame"] = frame recons = store["frame"] tm.assert_frame_equal(recons, frame) @pytest.mark.parametrize( "compression", [False, pytest.param(True, marks=td.skip_if_windows)] ) def test_store_mixed(compression, setup_path): def _make_one(): df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) df["obj1"] = "foo" df["obj2"] = "bar" df["bool1"] = df["A"] > 0 df["bool2"] = df["B"] > 0 df["int1"] = 1 df["int2"] = 2 return df._consolidate() df1 = _make_one() df2 = _make_one() _check_roundtrip(df1, tm.assert_frame_equal, path=setup_path) _check_roundtrip(df2, tm.assert_frame_equal, path=setup_path) with ensure_clean_store(setup_path) as store: store["obj"] = df1 tm.assert_frame_equal(store["obj"], df1) store["obj"] = df2 tm.assert_frame_equal(store["obj"], df2) # check that can store Series of all of these types _check_roundtrip( df1["obj1"], tm.assert_series_equal, path=setup_path, compression=compression, ) _check_roundtrip( df1["bool1"], tm.assert_series_equal, path=setup_path, compression=compression, ) _check_roundtrip( df1["int1"], tm.assert_series_equal, path=setup_path, compression=compression, ) def _check_roundtrip(obj, comparator, path, compression=False, **kwargs): options = {} if compression: options["complib"] = "blosc" with ensure_clean_store(path, "w", **options) as store: store["obj"] = obj retrieved = store["obj"] comparator(retrieved, obj, **kwargs) def _check_roundtrip_table(obj, comparator, path, compression=False): options = {} if compression: options["complib"] = "blosc" with ensure_clean_store(path, "w", **options) as store: store.put("obj", obj, format="table") retrieved = store["obj"] comparator(retrieved, obj) def test_unicode_index(setup_path): unicode_values = ["\u03c3", "\u03c3\u03c3"] s = Series( np.random.default_rng(2).standard_normal(len(unicode_values)), unicode_values, ) _check_roundtrip(s, tm.assert_series_equal, path=setup_path) def test_unicode_longer_encoded(setup_path): # GH 11234 char = "\u0394" df = DataFrame({"A": [char]}) with ensure_clean_store(setup_path) as store: store.put("df", df, format="table", encoding="utf-8") result = store.get("df") tm.assert_frame_equal(result, df) df = DataFrame({"A": ["a", char], "B": ["b", "b"]}) with ensure_clean_store(setup_path) as store: store.put("df", df, format="table", encoding="utf-8") result = store.get("df") tm.assert_frame_equal(result, df) def test_store_datetime_mixed(setup_path): df = DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0], "c": ["a", "b", "c"]}) ts = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) df["d"] = ts.index[:3] _check_roundtrip(df, tm.assert_frame_equal, path=setup_path) def test_round_trip_equals(tmp_path, setup_path): # GH 9330 df = DataFrame({"B": [1, 2], "A": ["x", "y"]}) path = tmp_path / setup_path df.to_hdf(path, key="df", format="table") other = read_hdf(path, "df") tm.assert_frame_equal(df, other) assert df.equals(other) assert other.equals(df) def test_infer_string_columns(tmp_path, setup_path): # GH# pytest.importorskip("pyarrow") path = tmp_path / setup_path with pd.option_context("future.infer_string", True): df = DataFrame(1, columns=list("ABCD"), index=list(range(10))).set_index( ["A", "B"] ) expected = df.copy() df.to_hdf(path, key="df", format="table") result = read_hdf(path, "df") tm.assert_frame_equal(result, expected)