""" test feather-format compat """ import numpy as np import pytest import pandas as pd import pandas._testing as tm from pandas.core.arrays import ( ArrowStringArray, StringArray, ) from pandas.io.feather_format import read_feather, to_feather # isort:skip pytestmark = pytest.mark.filterwarnings( "ignore:Passing a BlockManager to DataFrame:DeprecationWarning" ) pa = pytest.importorskip("pyarrow") @pytest.mark.single_cpu class TestFeather: def check_error_on_write(self, df, exc, err_msg): # check that we are raising the exception # on writing with pytest.raises(exc, match=err_msg): with tm.ensure_clean() as path: to_feather(df, path) def check_external_error_on_write(self, df): # check that we are raising the exception # on writing with tm.external_error_raised(Exception): with tm.ensure_clean() as path: to_feather(df, path) def check_round_trip(self, df, expected=None, write_kwargs={}, **read_kwargs): if expected is None: expected = df.copy() with tm.ensure_clean() as path: to_feather(df, path, **write_kwargs) result = read_feather(path, **read_kwargs) tm.assert_frame_equal(result, expected) def test_error(self): msg = "feather only support IO with DataFrames" for obj in [ pd.Series([1, 2, 3]), 1, "foo", pd.Timestamp("20130101"), np.array([1, 2, 3]), ]: self.check_error_on_write(obj, ValueError, msg) def test_basic(self): df = pd.DataFrame( { "string": list("abc"), "int": list(range(1, 4)), "uint": np.arange(3, 6).astype("u1"), "float": np.arange(4.0, 7.0, dtype="float64"), "float_with_null": [1.0, np.nan, 3], "bool": [True, False, True], "bool_with_null": [True, np.nan, False], "cat": pd.Categorical(list("abc")), "dt": pd.DatetimeIndex( list(pd.date_range("20130101", periods=3)), freq=None ), "dttz": pd.DatetimeIndex( list(pd.date_range("20130101", periods=3, tz="US/Eastern")), freq=None, ), "dt_with_null": [ pd.Timestamp("20130101"), pd.NaT, pd.Timestamp("20130103"), ], "dtns": pd.DatetimeIndex( list(pd.date_range("20130101", periods=3, freq="ns")), freq=None ), } ) df["periods"] = pd.period_range("2013", freq="M", periods=3) df["timedeltas"] = pd.timedelta_range("1 day", periods=3) df["intervals"] = pd.interval_range(0, 3, 3) assert df.dttz.dtype.tz.zone == "US/Eastern" expected = df.copy() expected.loc[1, "bool_with_null"] = None self.check_round_trip(df, expected=expected) def test_duplicate_columns(self): # https://github.com/wesm/feather/issues/53 # not currently able to handle duplicate columns df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list("aaa")).copy() self.check_external_error_on_write(df) def test_read_columns(self): # GH 24025 df = pd.DataFrame( { "col1": list("abc"), "col2": list(range(1, 4)), "col3": list("xyz"), "col4": list(range(4, 7)), } ) columns = ["col1", "col3"] self.check_round_trip(df, expected=df[columns], columns=columns) def test_read_columns_different_order(self): # GH 33878 df = pd.DataFrame({"A": [1, 2], "B": ["x", "y"], "C": [True, False]}) expected = df[["B", "A"]] self.check_round_trip(df, expected, columns=["B", "A"]) def test_unsupported_other(self): # mixed python objects df = pd.DataFrame({"a": ["a", 1, 2.0]}) self.check_external_error_on_write(df) def test_rw_use_threads(self): df = pd.DataFrame({"A": np.arange(100000)}) self.check_round_trip(df, use_threads=True) self.check_round_trip(df, use_threads=False) def test_path_pathlib(self): df = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD"), dtype=object), index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ).reset_index() result = tm.round_trip_pathlib(df.to_feather, read_feather) tm.assert_frame_equal(df, result) def test_path_localpath(self): df = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD"), dtype=object), index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ).reset_index() result = tm.round_trip_localpath(df.to_feather, read_feather) tm.assert_frame_equal(df, result) def test_passthrough_keywords(self): df = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD"), dtype=object), index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ).reset_index() self.check_round_trip(df, write_kwargs={"version": 1}) @pytest.mark.network @pytest.mark.single_cpu def test_http_path(self, feather_file, httpserver): # GH 29055 expected = read_feather(feather_file) with open(feather_file, "rb") as f: httpserver.serve_content(content=f.read()) res = read_feather(httpserver.url) tm.assert_frame_equal(expected, res) def test_read_feather_dtype_backend(self, string_storage, dtype_backend): # GH#50765 df = pd.DataFrame( { "a": pd.Series([1, np.nan, 3], dtype="Int64"), "b": pd.Series([1, 2, 3], dtype="Int64"), "c": pd.Series([1.5, np.nan, 2.5], dtype="Float64"), "d": pd.Series([1.5, 2.0, 2.5], dtype="Float64"), "e": [True, False, None], "f": [True, False, True], "g": ["a", "b", "c"], "h": ["a", "b", None], } ) if string_storage == "python": string_array = StringArray(np.array(["a", "b", "c"], dtype=np.object_)) string_array_na = StringArray(np.array(["a", "b", pd.NA], dtype=np.object_)) elif dtype_backend == "pyarrow": from pandas.arrays import ArrowExtensionArray string_array = ArrowExtensionArray(pa.array(["a", "b", "c"])) string_array_na = ArrowExtensionArray(pa.array(["a", "b", None])) else: string_array = ArrowStringArray(pa.array(["a", "b", "c"])) string_array_na = ArrowStringArray(pa.array(["a", "b", None])) with tm.ensure_clean() as path: to_feather(df, path) with pd.option_context("mode.string_storage", string_storage): result = read_feather(path, dtype_backend=dtype_backend) expected = pd.DataFrame( { "a": pd.Series([1, np.nan, 3], dtype="Int64"), "b": pd.Series([1, 2, 3], dtype="Int64"), "c": pd.Series([1.5, np.nan, 2.5], dtype="Float64"), "d": pd.Series([1.5, 2.0, 2.5], dtype="Float64"), "e": pd.Series([True, False, pd.NA], dtype="boolean"), "f": pd.Series([True, False, True], dtype="boolean"), "g": string_array, "h": string_array_na, } ) if dtype_backend == "pyarrow": from pandas.arrays import ArrowExtensionArray expected = pd.DataFrame( { col: ArrowExtensionArray(pa.array(expected[col], from_pandas=True)) for col in expected.columns } ) tm.assert_frame_equal(result, expected) def test_int_columns_and_index(self): df = pd.DataFrame({"a": [1, 2, 3]}, index=pd.Index([3, 4, 5], name="test")) self.check_round_trip(df) def test_invalid_dtype_backend(self): msg = ( "dtype_backend numpy is invalid, only 'numpy_nullable' and " "'pyarrow' are allowed." ) df = pd.DataFrame({"int": list(range(1, 4))}) with tm.ensure_clean("tmp.feather") as path: df.to_feather(path) with pytest.raises(ValueError, match=msg): read_feather(path, dtype_backend="numpy") def test_string_inference(self, tmp_path): # GH#54431 path = tmp_path / "test_string_inference.p" df = pd.DataFrame(data={"a": ["x", "y"]}) df.to_feather(path) with pd.option_context("future.infer_string", True): result = read_feather(path) expected = pd.DataFrame(data={"a": ["x", "y"]}, dtype="string[pyarrow_numpy]") tm.assert_frame_equal(result, expected)