die Auswahl einer PostgreSQL array unnesting funktioniert nicht wie erwartet mit SqlAlchemy
Ich habe eine SQL-Abfrage, die Duplikate einer Zeile (tatsächlich zu löschen und neue einzufügen) in einer Tabelle, während die Spaltung seiner ursprünglichen TSRANGE Feld gegeben durch ZEITSTEMPEL. Es funktioniert einwandfrei soweit ich es getestet. Unten ist eine demo, um zu verdeutlichen, was ich meine, und hier ist ein Turnschuh für die Bequemlichkeit:
-- demo initialisation
CREATE TABLE random_table (
uid VARCHAR(36) PRIMARY KEY,
id VARCHAR(20),
tsrange_field TSRANGE
);
CREATE EXTENSION pgcrypto; -- needed for `gen_random_uuid` function
INSERT INTO public.random_table (uid, id, tsrange_field)
VALUES (gen_random_uuid(), 'random_id', tsrange('2000-01-01', '2020-01-01', '[)'));
-- actual query
WITH splitters AS (
SELECT uid, datetime
FROM random_table
JOIN unnest(ARRAY['2015-04-15'::timestamp, '2016-04-15'::timestamp, '2017-01-01'::timestamp, '2017-04-15'::timestamp]) datetime
ON tsrange_field @> datetime
WHERE id = 'random_id'
AND (lower(random_table.tsrange_field) IS NULL OR lower(random_table.tsrange_field) != datetime)
AND (upper(random_table.tsrange_field) IS NULL OR upper(random_table.tsrange_field) != datetime)
), to_be_splitted AS (
DELETE FROM random_table
USING splitters
WHERE splitters.uid = random_table.uid
RETURNING random_table.uid, id, tsrange_field
)
INSERT INTO random_table (uid, id, tsrange_field)
SELECT DISTINCT ON (id, tsrange_field)
gen_random_uuid() AS uid, id,
unnest(ARRAY[
tsrange(
CASE
WHEN LAG(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime) IS NOT NULL
THEN LAG(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime)
ELSE lower(tsrange_field)
END,
splitters.datetime,
'[)'
),
tsrange(
splitters.datetime,
CASE
WHEN LEAD(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime) IS NOT NULL
THEN LEAD(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime)
ELSE upper(tsrange_field)
END,
'[)'
)
]) AS tsrange_field
FROM to_be_splitted JOIN splitters ON to_be_splitted.uid = splitters.uid
ORDER BY tsrange_field
RETURNING *;
Jetzt will ich es übersetzen in sqlalchemy
, und das ist, wo kommt mein problem. Ich produzierte Sie den folgenden code:
# pip install psycopg2, sqlalchemy
from datetime import datetime
from sqlalchemy import (and_, case, cast, column, Column, create_engine, delete, func,
insert, MetaData, or_, select, Table, VARCHAR)
from sqlalchemy.dialects.postgresql import array, TSRANGE, ARRAY
METADATA = MetaData()
RANDOM_TABLE = Table(
'random_table', METADATA,
Column('uid', VARCHAR(36), primary_key=True),
Column('id', VARCHAR(20)),
Column('tsrange_field', TSRANGE)
)
engine = create_engine('postgresql://test:test@localhost:5432/test')
def split_row(id, *datetimes):
# this function contains the translation attempt
splits = func.unnest([dt for dt in datetimes]).alias('datetime')
datetime_col = column('datetime')
splitters = (
select([RANDOM_TABLE.c.uid, datetime_col])
.select_from(RANDOM_TABLE.join(
splits,
onclause=RANDOM_TABLE.c.tsrange_field.op('@>')(datetime_col)
))
.where(and_(
RANDOM_TABLE.c.id == id,
or_(func.lower(RANDOM_TABLE.c.tsrange_field) == None,
func.lower(RANDOM_TABLE.c.tsrange_field).op('!=')(datetime_col)),
or_(func.upper(RANDOM_TABLE.c.tsrange_field) == None,
func.upper(RANDOM_TABLE.c.tsrange_field).op('!=')(datetime_col)),
))
).cte('splitters')
to_be_split = (
delete(RANDOM_TABLE)
.where(splitters.c.uid == RANDOM_TABLE.c.uid)
.returning(RANDOM_TABLE.c.uid, RANDOM_TABLE.c.tsrange_field)
).cte('to_be_split')
window_params = {'partition_by': column('uid'),
'order_by': datetime_col}
previous_splitter = func.lag(datetime_col).over(**window_params)
next_splitter = func.lead(datetime_col).over(**window_params)
lower_bound_case = case(
[(previous_splitter != None, previous_splitter)],
else_=func.lower(column('tsrange_field'))
)
upper_bound_case = case(
[(next_splitter != None, next_splitter)],
else_=func.upper(column('tsrange_field'))
)
split_tsranges = [
func.tsrange(lower_bound_case, datetime_col, '[)'),
func.tsrange(datetime_col, upper_bound_case, '[)')
]
split_query = select([
func.gen_random_uuid().label('uid'),
func.unnest(split_tsranges).alias('tsrange_field') # does not work
]).distinct(
column('tsrange_field')
).select_from(
to_be_split
.join(splitters, onclause=to_be_split.c.uid == splitters.c.uid)
).order_by(
column('tsrange_field')
)
whole_query = (
insert(RANDOM_TABLE)
.from_select([column('uid'), column('tsrange_field')], split_query)
.returning(column('uid'), column('tsrange_field'))
)
return whole_query
with engine.connect() as conn:
query = split_row('random_id', datetime.now())
import pdb; pdb.set_trace()
print(conn.execute(query).fetchall())
Nur leider schlägt es fehl, mit folgender Meldung:
Traceback (most recent call last):
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
cursor, statement, parameters, context
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
cursor.execute(statement, parameters)
psycopg2.ProgrammingError: can't adapt type 'Function'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "stuck.py", line 85, in <module>
print(conn.execute(query).fetchall())
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
return meth(self, multiparams, params)
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
distilled_params,
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
e, statement, parameters, cursor, context
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 383, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 128, in reraise
raise value.with_traceback(tb)
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
cursor, statement, parameters, context
File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'Function'
[SQL: WITH splitters AS
(SELECT random_table.uid AS uid, datetime
FROM random_table JOIN unnest(%(unnest_3)s) AS datetime ON random_table.tsrange_field @> datetime
WHERE random_table.id = %(id_1)s AND (lower(random_table.tsrange_field) IS NULL OR (lower(random_table.tsrange_field) != datetime)) AND (upper(random_table.tsrange_field) IS NULL OR (upper(random_table.tsrange_field) != datetime))),
to_be_split AS
(DELETE FROM random_table USING splitters WHERE splitters.uid = random_table.uid RETURNING random_table.uid, random_table.tsrange_field)
INSERT INTO random_table (uid, tsrange_field) SELECT DISTINCT ON (tsrange_field) gen_random_uuid() AS uid, tsrange_field.unnest_1
FROM unnest(%(unnest_2)s) AS tsrange_field, to_be_split JOIN splitters ON to_be_split.uid = splitters.uid ORDER BY tsrange_field RETURNING uid, tsrange_field]
[parameters: {'unnest_2': [<sqlalchemy.sql.functions.Function at 0x7fba18954a20; tsrange>, <sqlalchemy.sql.functions.Function at 0x7fba18954b00; tsrange>], 'unnest_3': [datetime.datetime(2019, 5, 3, 23, 37, 1, 773118)], 'id_1': 'random_id'}]
(Background on this error at: http://sqlalche.me/e/f405)
Blick auf das erzeugte SQL, bemerkte ich, dass die unnest(ARRAY[...]) AS tsrange_field
im SELECT
von meinem ursprünglichen SQL-Abfrage wird gemacht in die FROM
- Klausel und ich kann nicht herausfinden, warum. Zusätzlich wird die Nachricht psycopg2.ProgrammingError: can't adapt type 'Function'
nicht bietet eine große Hilfe und scheint nicht im Zusammenhang mit der unnest
misrendering.
Jede Andeutung auf das, was passiert und wie es zu beheben würde sehr geschätzt werden.