# BASE
>>> spark.sql("""
... CREATE OR REPLACE TEMP VIEW alltypes AS
... SELECT
...   CAST(v AS INT) AS int_col,
...   CAST(v % 3 AS BIGINT) AS bigint_col
... FROM (SELECT explode(sequence(0, 9)) AS v)
... """)
DataFrame[value: boolean]

>>> spark.sql("""
... CREATE OR REPLACE TEMP VIEW t_base AS
... SELECT
...   int_col,
...   bigint_col,
...   CASE WHEN int_col IN (0, 9) THEN NULL ELSE int_col END AS by_col,
...   int_col * 10 AS val_col
... FROM alltypes
... """)
DataFrame[value: boolean]

>>> spark.sql("SELECT * FROM t_base").show()
+-------+----------+------+-------+
|int_col|bigint_col|by_col|val_col|
+-------+----------+------+-------+
|      0|         0|  NULL|      0|
|      1|         1|     1|     10|
|      2|         2|     2|     20|
|      3|         0|     3|     30|
|      4|         1|     4|     40|
|      5|         2|     5|     50|
|      6|         0|     6|     60|
|      7|         1|     7|     70|
|      8|         2|     8|     80|
|      9|         0|  NULL|     90|
+-------+----------+------+-------+

# 1) argmin, unfiltered, non-null  -> expected: 10
>>> spark.sql("SELECT min_by(val_col, by_col) AS result FROM t_base").show()
+------+
|result|
+------+
|    10|
+------+

# 2) argmin, unfiltered, null_result=True  -> expected: NULL
>>> spark.sql("""
... SELECT min_by(val_col,
...               CASE WHEN by_col IS NULL THEN by_col ELSE by_col END) AS result
... FROM (SELECT CASE WHEN val_col = 10 THEN NULL ELSE val_col END AS val_col, by_col FROM t_base)
... """).show()
+------+
|result|
+------+
|  NULL|
+------+

# 3) argmin, filtered (excluye int_col=1), non-null  -> expected: 20
>>> spark.sql("""
... SELECT min_by(val_col, by_col) AS result
... FROM t_base
... WHERE int_col <> 1
... """).show()
+------+
|result|
+------+
|    20|
+------+

>>> # 4) argmin, filtered + null_result=True -> expected: NULL
>>> spark.sql("""
... SELECT min_by(val_col, by_col) AS result
... FROM (SELECT CASE WHEN val_col = 20 THEN NULL ELSE val_col END AS val_col, by_col, int_col FROM t_base)
... WHERE int_col <> 1
... """).show()
+------+
|result|
+------+
|  NULL|
+------+

# 5) argmax, unfiltered, non-null  -> expected: 80
>>> spark.sql("SELECT max_by(val_col, by_col) AS result FROM t_base").show()
+------+
|result|
+------+
|    80|
+------+

>>> # 6) argmax, unfiltered, null_result=True  -> expected: NULL
>>> spark.sql("""
... SELECT max_by(val_col, by_col) AS result
... FROM (SELECT CASE WHEN val_col = 80 THEN NULL ELSE val_col END AS val_col, by_col FROM t_base)
... """).show()
+------+
|result|
+------+
|  NULL|
+------+

# 7) argmax, filtered (excluye int_col=8), non-null  -> expected: 70
>>> spark.sql("""
... SELECT max_by(val_col, by_col) AS result
... FROM t_base
... WHERE int_col <> 8
... """).show()
+------+
|result|
+------+
|    70|
+------+

>>> # 8) argmax, filtered + null_result=True -> expected: NULL
>>> spark.sql("""
... SELECT max_by(val_col, by_col) AS result
... FROM (SELECT CASE WHEN val_col = 70 THEN NULL ELSE val_col END AS val_col, by_col, int_col FROM t_base)
... WHERE int_col <> 8
... """).show()
+------+
|result|
+------+
|  NULL|
+------+
