import os import numpy as np import pytest from pandas.compat import ( PY311, is_ci_environment, is_platform_linux, is_platform_little_endian, ) from pandas.errors import ( ClosedFileError, PossibleDataLossError, ) from pandas import ( DataFrame, HDFStore, Index, Series, _testing as tm, date_range, read_hdf, ) from pandas.tests.io.pytables.common import ( _maybe_remove, ensure_clean_store, tables, ) from pandas.io import pytables from pandas.io.pytables import Term pytestmark = pytest.mark.single_cpu @pytest.mark.parametrize("mode", ["r", "r+", "a", "w"]) def test_mode(setup_path, tmp_path, mode): df = DataFrame( np.random.default_rng(2).standard_normal((10, 4)), columns=Index(list("ABCD"), dtype=object), index=date_range("2000-01-01", periods=10, freq="B"), ) msg = r"[\S]* does not exist" path = tmp_path / setup_path # constructor if mode in ["r", "r+"]: with pytest.raises(OSError, match=msg): HDFStore(path, mode=mode) else: with HDFStore(path, mode=mode) as store: assert store._handle.mode == mode path = tmp_path / setup_path # context if mode in ["r", "r+"]: with pytest.raises(OSError, match=msg): with HDFStore(path, mode=mode) as store: pass else: with HDFStore(path, mode=mode) as store: assert store._handle.mode == mode path = tmp_path / setup_path # conv write if mode in ["r", "r+"]: with pytest.raises(OSError, match=msg): df.to_hdf(path, key="df", mode=mode) df.to_hdf(path, key="df", mode="w") else: df.to_hdf(path, key="df", mode=mode) # conv read if mode in ["w"]: msg = ( "mode w is not allowed while performing a read. " r"Allowed modes are r, r\+ and a." ) with pytest.raises(ValueError, match=msg): read_hdf(path, "df", mode=mode) else: result = read_hdf(path, "df", mode=mode) tm.assert_frame_equal(result, df) def test_default_mode(tmp_path, setup_path): # read_hdf uses default mode df = DataFrame( np.random.default_rng(2).standard_normal((10, 4)), columns=Index(list("ABCD"), dtype=object), index=date_range("2000-01-01", periods=10, freq="B"), ) path = tmp_path / setup_path df.to_hdf(path, key="df", mode="w") result = read_hdf(path, "df") tm.assert_frame_equal(result, df) def test_reopen_handle(tmp_path, setup_path): path = tmp_path / setup_path store = HDFStore(path, mode="a") store["a"] = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) msg = ( r"Re-opening the file \[[\S]*\] with mode \[a\] will delete the " "current file!" ) # invalid mode change with pytest.raises(PossibleDataLossError, match=msg): store.open("w") store.close() assert not store.is_open # truncation ok here store.open("w") assert store.is_open assert len(store) == 0 store.close() assert not store.is_open store = HDFStore(path, mode="a") store["a"] = Series( np.arange(10, dtype=np.float64), index=date_range("2020-01-01", periods=10) ) # reopen as read store.open("r") assert store.is_open assert len(store) == 1 assert store._mode == "r" store.close() assert not store.is_open # reopen as append store.open("a") assert store.is_open assert len(store) == 1 assert store._mode == "a" store.close() assert not store.is_open # reopen as append (again) store.open("a") assert store.is_open assert len(store) == 1 assert store._mode == "a" store.close() assert not store.is_open def test_open_args(setup_path): with tm.ensure_clean(setup_path) as path: df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) # create an in memory store store = HDFStore( path, mode="a", driver="H5FD_CORE", driver_core_backing_store=0 ) store["df"] = df store.append("df2", df) tm.assert_frame_equal(store["df"], df) tm.assert_frame_equal(store["df2"], df) store.close() # the file should not have actually been written assert not os.path.exists(path) def test_flush(setup_path): with ensure_clean_store(setup_path) as store: store["a"] = Series(range(5)) store.flush() store.flush(fsync=True) def test_complibs_default_settings(tmp_path, setup_path): # GH15943 df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) # Set complevel and check if complib is automatically set to # default value tmpfile = tmp_path / setup_path df.to_hdf(tmpfile, key="df", complevel=9) result = read_hdf(tmpfile, "df") tm.assert_frame_equal(result, df) with tables.open_file(tmpfile, mode="r") as h5file: for node in h5file.walk_nodes(where="/df", classname="Leaf"): assert node.filters.complevel == 9 assert node.filters.complib == "zlib" # Set complib and check to see if compression is disabled tmpfile = tmp_path / setup_path df.to_hdf(tmpfile, key="df", complib="zlib") result = read_hdf(tmpfile, "df") tm.assert_frame_equal(result, df) with tables.open_file(tmpfile, mode="r") as h5file: for node in h5file.walk_nodes(where="/df", classname="Leaf"): assert node.filters.complevel == 0 assert node.filters.complib is None # Check if not setting complib or complevel results in no compression tmpfile = tmp_path / setup_path df.to_hdf(tmpfile, key="df") result = read_hdf(tmpfile, "df") tm.assert_frame_equal(result, df) with tables.open_file(tmpfile, mode="r") as h5file: for node in h5file.walk_nodes(where="/df", classname="Leaf"): assert node.filters.complevel == 0 assert node.filters.complib is None def test_complibs_default_settings_override(tmp_path, setup_path): # Check if file-defaults can be overridden on a per table basis df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) tmpfile = tmp_path / setup_path store = HDFStore(tmpfile) store.append("dfc", df, complevel=9, complib="blosc") store.append("df", df) store.close() with tables.open_file(tmpfile, mode="r") as h5file: for node in h5file.walk_nodes(where="/df", classname="Leaf"): assert node.filters.complevel == 0 assert node.filters.complib is None for node in h5file.walk_nodes(where="/dfc", classname="Leaf"): assert node.filters.complevel == 9 assert node.filters.complib == "blosc" @pytest.mark.parametrize("lvl", range(10)) @pytest.mark.parametrize("lib", tables.filters.all_complibs) @pytest.mark.filterwarnings("ignore:object name is not a valid") @pytest.mark.skipif( not PY311 and is_ci_environment() and is_platform_linux(), reason="Segfaulting in a CI environment" # with xfail, would sometimes raise UnicodeDecodeError # invalid state byte ) def test_complibs(tmp_path, lvl, lib, request): # GH14478 if PY311 and is_platform_linux() and lib == "blosc2" and lvl != 0: request.applymarker( pytest.mark.xfail(reason=f"Fails for {lib} on Linux and PY > 3.11") ) df = DataFrame( np.ones((30, 4)), columns=list("ABCD"), index=np.arange(30).astype(np.str_) ) # Remove lzo if its not available on this platform if not tables.which_lib_version("lzo"): pytest.skip("lzo not available") # Remove bzip2 if its not available on this platform if not tables.which_lib_version("bzip2"): pytest.skip("bzip2 not available") tmpfile = tmp_path / f"{lvl}_{lib}.h5" gname = f"{lvl}_{lib}" # Write and read file to see if data is consistent df.to_hdf(tmpfile, key=gname, complib=lib, complevel=lvl) result = read_hdf(tmpfile, gname) tm.assert_frame_equal(result, df) # Open file and check metadata for correct amount of compression with tables.open_file(tmpfile, mode="r") as h5table: for node in h5table.walk_nodes(where="/" + gname, classname="Leaf"): assert node.filters.complevel == lvl if lvl == 0: assert node.filters.complib is None else: assert node.filters.complib == lib @pytest.mark.skipif( not is_platform_little_endian(), reason="reason platform is not little endian" ) def test_encoding(setup_path): with ensure_clean_store(setup_path) as store: df = DataFrame({"A": "foo", "B": "bar"}, index=range(5)) df.loc[2, "A"] = np.nan df.loc[3, "B"] = np.nan _maybe_remove(store, "df") store.append("df", df, encoding="ascii") tm.assert_frame_equal(store["df"], df) expected = df.reindex(columns=["A"]) result = store.select("df", Term("columns=A", encoding="ascii")) tm.assert_frame_equal(result, expected) @pytest.mark.parametrize( "val", [ [b"E\xc9, 17", b"", b"a", b"b", b"c"], [b"E\xc9, 17", b"a", b"b", b"c"], [b"EE, 17", b"", b"a", b"b", b"c"], [b"E\xc9, 17", b"\xf8\xfc", b"a", b"b", b"c"], [b"", b"a", b"b", b"c"], [b"\xf8\xfc", b"a", b"b", b"c"], [b"A\xf8\xfc", b"", b"a", b"b", b"c"], [np.nan, b"", b"b", b"c"], [b"A\xf8\xfc", np.nan, b"", b"b", b"c"], ], ) @pytest.mark.parametrize("dtype", ["category", object]) def test_latin_encoding(tmp_path, setup_path, dtype, val): enc = "latin-1" nan_rep = "" key = "data" val = [x.decode(enc) if isinstance(x, bytes) else x for x in val] ser = Series(val, dtype=dtype) store = tmp_path / setup_path ser.to_hdf(store, key=key, format="table", encoding=enc, nan_rep=nan_rep) retr = read_hdf(store, key) # TODO:(3.0): once Categorical replace deprecation is enforced, # we may be able to re-simplify the construction of s_nan if dtype == "category": if nan_rep in ser.cat.categories: s_nan = ser.cat.remove_categories([nan_rep]) else: s_nan = ser else: s_nan = ser.replace(nan_rep, np.nan) tm.assert_series_equal(s_nan, retr) def test_multiple_open_close(tmp_path, setup_path): # gh-4409: open & close multiple times path = tmp_path / setup_path df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) df.to_hdf(path, key="df", mode="w", format="table") # single store = HDFStore(path) assert "CLOSED" not in store.info() assert store.is_open store.close() assert "CLOSED" in store.info() assert not store.is_open path = tmp_path / setup_path if pytables._table_file_open_policy_is_strict: # multiples store1 = HDFStore(path) msg = ( r"The file [\S]* is already opened\. Please close it before " r"reopening in write mode\." ) with pytest.raises(ValueError, match=msg): HDFStore(path) store1.close() else: # multiples store1 = HDFStore(path) store2 = HDFStore(path) assert "CLOSED" not in store1.info() assert "CLOSED" not in store2.info() assert store1.is_open assert store2.is_open store1.close() assert "CLOSED" in store1.info() assert not store1.is_open assert "CLOSED" not in store2.info() assert store2.is_open store2.close() assert "CLOSED" in store1.info() assert "CLOSED" in store2.info() assert not store1.is_open assert not store2.is_open # nested close store = HDFStore(path, mode="w") store.append("df", df) store2 = HDFStore(path) store2.append("df2", df) store2.close() assert "CLOSED" in store2.info() assert not store2.is_open store.close() assert "CLOSED" in store.info() assert not store.is_open # double closing store = HDFStore(path, mode="w") store.append("df", df) store2 = HDFStore(path) store.close() assert "CLOSED" in store.info() assert not store.is_open store2.close() assert "CLOSED" in store2.info() assert not store2.is_open # ops on a closed store path = tmp_path / setup_path df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD"), dtype=object), index=Index([f"i-{i}" for i in range(30)], dtype=object), ) df.to_hdf(path, key="df", mode="w", format="table") store = HDFStore(path) store.close() msg = r"[\S]* file is not open!" with pytest.raises(ClosedFileError, match=msg): store.keys() with pytest.raises(ClosedFileError, match=msg): "df" in store with pytest.raises(ClosedFileError, match=msg): len(store) with pytest.raises(ClosedFileError, match=msg): store["df"] with pytest.raises(ClosedFileError, match=msg): store.select("df") with pytest.raises(ClosedFileError, match=msg): store.get("df") with pytest.raises(ClosedFileError, match=msg): store.append("df2", df) with pytest.raises(ClosedFileError, match=msg): store.put("df3", df) with pytest.raises(ClosedFileError, match=msg): store.get_storer("df2") with pytest.raises(ClosedFileError, match=msg): store.remove("df2") with pytest.raises(ClosedFileError, match=msg): store.select("df") msg = "'HDFStore' object has no attribute 'df'" with pytest.raises(AttributeError, match=msg): store.df def test_fspath(): with tm.ensure_clean("foo.h5") as path: with HDFStore(path) as store: assert os.fspath(store) == str(path)