from hypothesis import ( given, strategies as st, ) import numpy as np import pytest import pytz from pandas._libs import lib from pandas._libs.tslibs import ( NaT, OutOfBoundsDatetime, Timedelta, Timestamp, iNaT, to_offset, ) from pandas._libs.tslibs.dtypes import NpyDatetimeUnit from pandas._libs.tslibs.period import INVALID_FREQ_ERR_MSG import pandas._testing as tm class TestTimestampRound: def test_round_division_by_zero_raises(self): ts = Timestamp("2016-01-01") msg = "Division by zero in rounding" with pytest.raises(ValueError, match=msg): ts.round("0ns") @pytest.mark.parametrize( "timestamp, freq, expected", [ ("20130101 09:10:11", "D", "20130101"), ("20130101 19:10:11", "D", "20130102"), ("20130201 12:00:00", "D", "20130202"), ("20130104 12:00:00", "D", "20130105"), ("2000-01-05 05:09:15.13", "D", "2000-01-05 00:00:00"), ("2000-01-05 05:09:15.13", "h", "2000-01-05 05:00:00"), ("2000-01-05 05:09:15.13", "s", "2000-01-05 05:09:15"), ], ) def test_round_frequencies(self, timestamp, freq, expected): dt = Timestamp(timestamp) result = dt.round(freq) expected = Timestamp(expected) assert result == expected def test_round_tzaware(self): dt = Timestamp("20130101 09:10:11", tz="US/Eastern") result = dt.round("D") expected = Timestamp("20130101", tz="US/Eastern") assert result == expected dt = Timestamp("20130101 09:10:11", tz="US/Eastern") result = dt.round("s") assert result == dt def test_round_30min(self): # round dt = Timestamp("20130104 12:32:00") result = dt.round("30Min") expected = Timestamp("20130104 12:30:00") assert result == expected def test_round_subsecond(self): # GH#14440 & GH#15578 result = Timestamp("2016-10-17 12:00:00.0015").round("ms") expected = Timestamp("2016-10-17 12:00:00.002000") assert result == expected result = Timestamp("2016-10-17 12:00:00.00149").round("ms") expected = Timestamp("2016-10-17 12:00:00.001000") assert result == expected ts = Timestamp("2016-10-17 12:00:00.0015") for freq in ["us", "ns"]: assert ts == ts.round(freq) result = Timestamp("2016-10-17 12:00:00.001501031").round("10ns") expected = Timestamp("2016-10-17 12:00:00.001501030") assert result == expected def test_round_nonstandard_freq(self): with tm.assert_produces_warning(False): Timestamp("2016-10-17 12:00:00.001501031").round("1010ns") def test_round_invalid_arg(self): stamp = Timestamp("2000-01-05 05:09:15.13") with pytest.raises(ValueError, match=INVALID_FREQ_ERR_MSG): stamp.round("foo") @pytest.mark.parametrize( "test_input, rounder, freq, expected", [ ("2117-01-01 00:00:45", "floor", "15s", "2117-01-01 00:00:45"), ("2117-01-01 00:00:45", "ceil", "15s", "2117-01-01 00:00:45"), ( "2117-01-01 00:00:45.000000012", "floor", "10ns", "2117-01-01 00:00:45.000000010", ), ( "1823-01-01 00:00:01.000000012", "ceil", "10ns", "1823-01-01 00:00:01.000000020", ), ("1823-01-01 00:00:01", "floor", "1s", "1823-01-01 00:00:01"), ("1823-01-01 00:00:01", "ceil", "1s", "1823-01-01 00:00:01"), ("NaT", "floor", "1s", "NaT"), ("NaT", "ceil", "1s", "NaT"), ], ) def test_ceil_floor_edge(self, test_input, rounder, freq, expected): dt = Timestamp(test_input) func = getattr(dt, rounder) result = func(freq) if dt is NaT: assert result is NaT else: expected = Timestamp(expected) assert result == expected @pytest.mark.parametrize( "test_input, freq, expected", [ ("2018-01-01 00:02:06", "2s", "2018-01-01 00:02:06"), ("2018-01-01 00:02:00", "2min", "2018-01-01 00:02:00"), ("2018-01-01 00:04:00", "4min", "2018-01-01 00:04:00"), ("2018-01-01 00:15:00", "15min", "2018-01-01 00:15:00"), ("2018-01-01 00:20:00", "20min", "2018-01-01 00:20:00"), ("2018-01-01 03:00:00", "3h", "2018-01-01 03:00:00"), ], ) @pytest.mark.parametrize("rounder", ["ceil", "floor", "round"]) def test_round_minute_freq(self, test_input, freq, expected, rounder): # Ensure timestamps that shouldn't round dont! # GH#21262 dt = Timestamp(test_input) expected = Timestamp(expected) func = getattr(dt, rounder) result = func(freq) assert result == expected @pytest.mark.parametrize("unit", ["ns", "us", "ms", "s"]) def test_ceil(self, unit): dt = Timestamp("20130101 09:10:11").as_unit(unit) result = dt.ceil("D") expected = Timestamp("20130102") assert result == expected assert result._creso == dt._creso @pytest.mark.parametrize("unit", ["ns", "us", "ms", "s"]) def test_floor(self, unit): dt = Timestamp("20130101 09:10:11").as_unit(unit) result = dt.floor("D") expected = Timestamp("20130101") assert result == expected assert result._creso == dt._creso @pytest.mark.parametrize("method", ["ceil", "round", "floor"]) @pytest.mark.parametrize( "unit", ["ns", "us", "ms", "s"], ) def test_round_dst_border_ambiguous(self, method, unit): # GH 18946 round near "fall back" DST ts = Timestamp("2017-10-29 00:00:00", tz="UTC").tz_convert("Europe/Madrid") ts = ts.as_unit(unit) # result = getattr(ts, method)("h", ambiguous=True) assert result == ts assert result._creso == getattr(NpyDatetimeUnit, f"NPY_FR_{unit}").value result = getattr(ts, method)("h", ambiguous=False) expected = Timestamp("2017-10-29 01:00:00", tz="UTC").tz_convert( "Europe/Madrid" ) assert result == expected assert result._creso == getattr(NpyDatetimeUnit, f"NPY_FR_{unit}").value result = getattr(ts, method)("h", ambiguous="NaT") assert result is NaT msg = "Cannot infer dst time" with pytest.raises(pytz.AmbiguousTimeError, match=msg): getattr(ts, method)("h", ambiguous="raise") @pytest.mark.parametrize( "method, ts_str, freq", [ ["ceil", "2018-03-11 01:59:00-0600", "5min"], ["round", "2018-03-11 01:59:00-0600", "5min"], ["floor", "2018-03-11 03:01:00-0500", "2h"], ], ) @pytest.mark.parametrize( "unit", ["ns", "us", "ms", "s"], ) def test_round_dst_border_nonexistent(self, method, ts_str, freq, unit): # GH 23324 round near "spring forward" DST ts = Timestamp(ts_str, tz="America/Chicago").as_unit(unit) result = getattr(ts, method)(freq, nonexistent="shift_forward") expected = Timestamp("2018-03-11 03:00:00", tz="America/Chicago") assert result == expected assert result._creso == getattr(NpyDatetimeUnit, f"NPY_FR_{unit}").value result = getattr(ts, method)(freq, nonexistent="NaT") assert result is NaT msg = "2018-03-11 02:00:00" with pytest.raises(pytz.NonExistentTimeError, match=msg): getattr(ts, method)(freq, nonexistent="raise") @pytest.mark.parametrize( "timestamp", [ "2018-01-01 0:0:0.124999360", "2018-01-01 0:0:0.125000367", "2018-01-01 0:0:0.125500", "2018-01-01 0:0:0.126500", "2018-01-01 12:00:00", "2019-01-01 12:00:00", ], ) @pytest.mark.parametrize( "freq", [ "2ns", "3ns", "4ns", "5ns", "6ns", "7ns", "250ns", "500ns", "750ns", "1us", "19us", "250us", "500us", "750us", "1s", "2s", "3s", "1D", ], ) def test_round_int64(self, timestamp, freq): # check that all rounding modes are accurate to int64 precision # see GH#22591 dt = Timestamp(timestamp).as_unit("ns") unit = to_offset(freq).nanos # test floor result = dt.floor(freq) assert result._value % unit == 0, f"floor not a {freq} multiple" assert 0 <= dt._value - result._value < unit, "floor error" # test ceil result = dt.ceil(freq) assert result._value % unit == 0, f"ceil not a {freq} multiple" assert 0 <= result._value - dt._value < unit, "ceil error" # test round result = dt.round(freq) assert result._value % unit == 0, f"round not a {freq} multiple" assert abs(result._value - dt._value) <= unit // 2, "round error" if unit % 2 == 0 and abs(result._value - dt._value) == unit // 2: # round half to even assert result._value // unit % 2 == 0, "round half to even error" def test_round_implementation_bounds(self): # See also: analogous test for Timedelta result = Timestamp.min.ceil("s") expected = Timestamp(1677, 9, 21, 0, 12, 44) assert result == expected result = Timestamp.max.floor("s") expected = Timestamp.max - Timedelta(854775807) assert result == expected msg = "Cannot round 1677-09-21 00:12:43.145224193 to freq=" with pytest.raises(OutOfBoundsDatetime, match=msg): Timestamp.min.floor("s") with pytest.raises(OutOfBoundsDatetime, match=msg): Timestamp.min.round("s") msg = "Cannot round 2262-04-11 23:47:16.854775807 to freq=" with pytest.raises(OutOfBoundsDatetime, match=msg): Timestamp.max.ceil("s") with pytest.raises(OutOfBoundsDatetime, match=msg): Timestamp.max.round("s") @given(val=st.integers(iNaT + 1, lib.i8max)) @pytest.mark.parametrize( "method", [Timestamp.round, Timestamp.floor, Timestamp.ceil] ) def test_round_sanity(self, val, method): cls = Timestamp err_cls = OutOfBoundsDatetime val = np.int64(val) ts = cls(val) def checker(ts, nanos, unit): # First check that we do raise in cases where we should if nanos == 1: pass else: div, mod = divmod(ts._value, nanos) diff = int(nanos - mod) lb = ts._value - mod assert lb <= ts._value # i.e. no overflows with python ints ub = ts._value + diff assert ub > ts._value # i.e. no overflows with python ints msg = "without overflow" if mod == 0: # We should never be raising in this pass elif method is cls.ceil: if ub > cls.max._value: with pytest.raises(err_cls, match=msg): method(ts, unit) return elif method is cls.floor: if lb < cls.min._value: with pytest.raises(err_cls, match=msg): method(ts, unit) return elif mod >= diff: if ub > cls.max._value: with pytest.raises(err_cls, match=msg): method(ts, unit) return elif lb < cls.min._value: with pytest.raises(err_cls, match=msg): method(ts, unit) return res = method(ts, unit) td = res - ts diff = abs(td._value) assert diff < nanos assert res._value % nanos == 0 if method is cls.round: assert diff <= nanos / 2 elif method is cls.floor: assert res <= ts elif method is cls.ceil: assert res >= ts nanos = 1 checker(ts, nanos, "ns") nanos = 1000 checker(ts, nanos, "us") nanos = 1_000_000 checker(ts, nanos, "ms") nanos = 1_000_000_000 checker(ts, nanos, "s") nanos = 60 * 1_000_000_000 checker(ts, nanos, "min") nanos = 60 * 60 * 1_000_000_000 checker(ts, nanos, "h") nanos = 24 * 60 * 60 * 1_000_000_000 checker(ts, nanos, "D")