Python源码示例:sqlalchemy.dialects.postgresql.ARRAY
示例1
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('domains',
sa.Column('allowed_docker_registries',
postgresql.ARRAY(sa.String()), nullable=True))
# ### end Alembic commands ###
print('\nSet default allowed_docker_registries.')
allowed_registries = os.environ.get('ALLOWED_DOCKER_REGISTRIES', None)
if allowed_registries:
allowed_registries = allowed_registries.replace(' ', '')
allowed_registries = '{index.docker.io,' + allowed_registries + '}'
else:
allowed_registries = '{index.docker.io}'
connection = op.get_bind()
query = ("UPDATE domains SET allowed_docker_registries = '{}';".format(allowed_registries))
connection.execute(query)
op.alter_column('domains', column_name='allowed_docker_registries',
nullable=False)
示例2
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('domains', sa.Column('allowed_vfolder_hosts',
postgresql.ARRAY(sa.String()), nullable=True))
op.add_column('groups', sa.Column('allowed_vfolder_hosts',
postgresql.ARRAY(sa.String()), nullable=True))
# ### end Alembic commands ###
print('\nSet domain and group\'s allowed_vfolder_hosts with empty array.')
connection = op.get_bind()
query = ("UPDATE domains SET allowed_vfolder_hosts = '{}';")
connection.execute(query)
query = ("UPDATE groups SET allowed_vfolder_hosts = '{}';")
connection.execute(query)
op.alter_column('domains', column_name='allowed_vfolder_hosts', nullable=False)
op.alter_column('groups', column_name='allowed_vfolder_hosts', nullable=False)
示例3
def preprocess_column_and_value(self):
""" Preprocess the column and the value
Certain operations will only work if the types are cast correctly.
This is where it happens.
"""
col, val = self.column, self.value
# Case 1. Both column and value are arrays
if self.is_column_array() and self.is_value_array():
# Cast the value to ARRAY[] with the same type that the column has
# Only in this case Postgres will be able to handles them both
val = cast(pg.array(val), pg.ARRAY(col.type.item_type))
# Case 2. JSON column
if self.is_column_json():
# This is the type to which JSON column is coerced: same as `value`
# Doc: "Suggest a type for a `coerced` Python value in an expression."
coerce_type = col.type.coerce_compared_value('=', val) # HACKY: use sqlalchemy type coercion
# Now, replace the `col` used in operations with this new coerced expression
col = cast(col, coerce_type)
# Done
self.column_expression = col
self.value_expression = val
示例4
def languages_column(column_name) -> sa.Column:
"""A TEXT[] column of length > 0.
Return an ARRAY(TEXT, as_tuple=True) column.
:param column_name: the name of the column
:returns: a SQLAlchemy Column for a non-null ARRAY(TEXT, as_tuple=True)
type.
"""
return sa.Column(
pg.ARRAY(pg.TEXT, as_tuple=True),
sa.CheckConstraint(
'COALESCE(ARRAY_LENGTH({}, 1), 0) > 0'.format(column_name)
),
nullable=False,
default=['English'],
)
示例5
def test_arrays_pg(self, connection):
metadata = self.metadata
t1 = Table(
"t",
metadata,
Column("x", postgresql.ARRAY(Float)),
Column("y", postgresql.ARRAY(REAL)),
Column("z", postgresql.ARRAY(postgresql.DOUBLE_PRECISION)),
Column("q", postgresql.ARRAY(Numeric)),
)
metadata.create_all()
connection.execute(
t1.insert(), x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]
)
row = connection.execute(t1.select()).first()
eq_(row, ([5], [5], [6], [decimal.Decimal("6.4")]))
示例6
def test_arrays_base(self, connection):
metadata = self.metadata
t1 = Table(
"t",
metadata,
Column("x", sqltypes.ARRAY(Float)),
Column("y", sqltypes.ARRAY(REAL)),
Column("z", sqltypes.ARRAY(postgresql.DOUBLE_PRECISION)),
Column("q", sqltypes.ARRAY(Numeric)),
)
metadata.create_all()
connection.execute(
t1.insert(), x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]
)
row = connection.execute(t1.select()).first()
eq_(row, ([5], [5], [6], [decimal.Decimal("6.4")]))
示例7
def test_array_literal_getitem_multidim(self):
obj = postgresql.array(
[postgresql.array([1, 2]), postgresql.array([3, 4])]
)
self.assert_compile(
obj,
"ARRAY[ARRAY[%(param_1)s, %(param_2)s], "
"ARRAY[%(param_3)s, %(param_4)s]]",
)
self.assert_compile(
obj[1],
"(ARRAY[ARRAY[%(param_1)s, %(param_2)s], "
"ARRAY[%(param_3)s, %(param_4)s]])[%(param_5)s]",
)
self.assert_compile(
obj[1][0],
"(ARRAY[ARRAY[%(param_1)s, %(param_2)s], "
"ARRAY[%(param_3)s, %(param_4)s]])[%(param_5)s][%(param_6)s]",
)
示例8
def test_array_getitem_slice_type(self):
m = MetaData()
arrtable = Table(
"arrtable",
m,
Column("intarr", postgresql.ARRAY(Integer)),
Column("strarr", postgresql.ARRAY(String)),
)
# type affinity is Array...
is_(arrtable.c.intarr[1:3].type._type_affinity, ARRAY)
is_(arrtable.c.strarr[1:3].type._type_affinity, ARRAY)
# but the slice returns the actual type
assert isinstance(arrtable.c.intarr[1:3].type, postgresql.ARRAY)
assert isinstance(arrtable.c.strarr[1:3].type, postgresql.ARRAY)
示例9
def test_array_plus_native_enum_create(self):
m = MetaData()
t = Table(
"t",
m,
Column(
"data_1",
self.ARRAY(postgresql.ENUM("a", "b", "c", name="my_enum_1")),
),
Column(
"data_2",
self.ARRAY(types.Enum("a", "b", "c", name="my_enum_2")),
),
)
t.create(testing.db)
eq_(
set(e["name"] for e in inspect(testing.db).get_enums()),
set(["my_enum_1", "my_enum_2"]),
)
t.drop(testing.db)
eq_(inspect(testing.db).get_enums(), [])
示例10
def SQLARRAY(item_type):
return postgresql.ARRAY(item_type)\
.with_variant(DefaultARRAY(item_type), "sqlite")
示例11
def _lookup_operator(self, column_is_array, operator):
""" Lookup an operator in `self`, or extra operators
:param column_is_array: Is the column an ARRAY column?
Lookup will be limited to array operators
:param operator: Operator string
:return: lambda
:raises: KeyError
"""
if not column_is_array:
return self._operators_scalar.get(operator) or self._extra_scalar_ops[operator]
else:
return self._operators_array.get(operator) or self._extra_array_ops[operator]
示例12
def is_column_array(self, name: str) -> bool:
""" Is the column an ARRAY column """
raise NotImplementedError
示例13
def _is_column_array(col: MapperProperty) -> bool:
""" Is the column a PostgreSql ARRAY column? """
return isinstance(_get_column_type(col), pg.ARRAY)
示例14
def build_trafaret(sa_type, **kwargs):
if isinstance(sa_type, sa.sql.sqltypes.Enum):
trafaret = t.Enum(*sa_type.enums, **kwargs)
# check for Text should be before String
elif isinstance(sa_type, sa.sql.sqltypes.Text):
trafaret = t.String(**kwargs)
elif isinstance(sa_type, sa.sql.sqltypes.String):
trafaret = t.String(max_length=sa_type.length, **kwargs)
elif isinstance(sa_type, sa.sql.sqltypes.Integer):
trafaret = t.ToInt(**kwargs)
elif isinstance(sa_type, sa.sql.sqltypes.Float):
trafaret = t.ToFloat(**kwargs)
elif isinstance(sa_type, sa.sql.sqltypes.DateTime):
trafaret = DateTime(**kwargs) # RFC3339
elif isinstance(sa_type, sa.sql.sqltypes.Date):
trafaret = DateTime(**kwargs) # RFC3339
elif isinstance(sa_type, sa.sql.sqltypes.Boolean):
trafaret = t.ToBool(**kwargs)
# Add PG related JSON and ARRAY
elif isinstance(sa_type, postgresql.JSON):
trafaret = AnyDict | t.List(AnyDict)
# Add PG related JSON and ARRAY
elif isinstance(sa_type, postgresql.ARRAY):
item_trafaret = build_trafaret(sa_type.item_type)
trafaret = t.List(item_trafaret)
else:
type_ = str(sa_type)
msg = 'Validator for type {} not implemented'.format(type_)
raise NotImplementedError(msg)
return trafaret
示例15
def table():
meta = sa.MetaData()
post = sa.Table(
'post', meta,
sa.Column('id', sa.Integer, nullable=False),
sa.Column('title', sa.String(200), nullable=False),
sa.Column('body', sa.Text, nullable=False),
sa.Column('views', sa.Integer, nullable=False),
sa.Column('average_note', sa.Float, nullable=False),
sa.Column('pictures', postgresql.JSON, server_default='{}'),
sa.Column('published_at', sa.Date, nullable=False),
sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'),
# Indexes #
sa.PrimaryKeyConstraint('id', name='post_id_pkey'))
return post
示例16
def _get_adapted_type(self, coltype, bind):
compiled_type = coltype.compile(bind.dialect)
for supercls in coltype.__class__.__mro__:
if not supercls.__name__.startswith("_") and hasattr(supercls, "__visit_name__"):
# Hack to fix adaptation of the Enum class which is broken since SQLAlchemy 1.2
kw = {}
if supercls is Enum:
kw["name"] = coltype.name
new_coltype = coltype.adapt(supercls)
for key, value in kw.items():
setattr(new_coltype, key, value)
# If the adapted column type does not render the same as the original, don't
# substitute it
if new_coltype.compile(bind.dialect) != compiled_type:
# Make an exception to the rule for Float, since at least on PostgreSQL,
# Float can accurately represent both REAL and DOUBLE_PRECISION
if not isinstance(new_coltype, Float):
break
if isinstance(coltype, ARRAY):
new_coltype.item_type = self._get_adapted_type(new_coltype.item_type, bind)
# Stop on the first valid non-uppercase column type class
coltype = new_coltype
if supercls.__name__ != supercls.__name__.upper():
break
return coltype
示例17
def add_imports(self, collector):
if self.table.columns:
collector.add_import(Column)
for column in self.table.columns:
collector.add_import(column.type)
if column.server_default:
collector.add_literal_import("sqlalchemy", "text")
if isinstance(column.type, ARRAY):
collector.add_import(column.type.item_type.__class__)
for constraint in sorted(self.table.constraints, key=_get_constraint_sort_key):
if isinstance(constraint, ForeignKeyConstraint):
if len(constraint.columns) > 1:
collector.add_literal_import("sqlalchemy", "ForeignKeyConstraint")
else:
collector.add_literal_import("sqlalchemy", "ForeignKey")
elif isinstance(constraint, UniqueConstraint):
if len(constraint.columns) > 1:
collector.add_literal_import("sqlalchemy", "UniqueConstraint")
elif not isinstance(constraint, PrimaryKeyConstraint):
collector.add_import(constraint)
for index in self.table.indexes:
if len(index.columns) > 1:
collector.add_import(index)
示例18
def test_compare_array_of_integer_text(self):
self._compare_default_roundtrip(
ARRAY(Integer), text("(ARRAY[]::integer[])")
)
示例19
def test_postgresql_array_type(self):
eq_ignore_whitespace(
autogenerate.render._repr_type(
ARRAY(Integer), self.autogen_context
),
"postgresql.ARRAY(sa.Integer())",
)
eq_ignore_whitespace(
autogenerate.render._repr_type(
ARRAY(DateTime(timezone=True)), self.autogen_context
),
"postgresql.ARRAY(sa.DateTime(timezone=True))",
)
eq_ignore_whitespace(
autogenerate.render._repr_type(
ARRAY(BYTEA, as_tuple=True, dimensions=2), self.autogen_context
),
"postgresql.ARRAY(postgresql.BYTEA(), "
"as_tuple=True, dimensions=2)",
)
assert (
"from sqlalchemy.dialects import postgresql"
in self.autogen_context.imports
)
示例20
def test_generic_array_type(self):
eq_ignore_whitespace(
autogenerate.render._repr_type(
types.ARRAY(Integer), self.autogen_context
),
"sa.ARRAY(sa.Integer())",
)
eq_ignore_whitespace(
autogenerate.render._repr_type(
types.ARRAY(DateTime(timezone=True)), self.autogen_context
),
"sa.ARRAY(sa.DateTime(timezone=True))",
)
assert (
"from sqlalchemy.dialects import postgresql"
not in self.autogen_context.imports
)
eq_ignore_whitespace(
autogenerate.render._repr_type(
types.ARRAY(BYTEA, as_tuple=True, dimensions=2),
self.autogen_context,
),
"sa.ARRAY(postgresql.BYTEA(), as_tuple=True, dimensions=2)",
)
assert (
"from sqlalchemy.dialects import postgresql"
in self.autogen_context.imports
)
示例21
def test_array_type_user_defined_inner(self):
def repr_type(typestring, object_, autogen_context):
if typestring == "type" and isinstance(object_, String):
return "foobar.MYVARCHAR"
else:
return False
self.autogen_context.opts.update(render_item=repr_type)
eq_ignore_whitespace(
autogenerate.render._repr_type(
ARRAY(String), self.autogen_context
),
"postgresql.ARRAY(foobar.MYVARCHAR)",
)
示例22
def candidate_classes(cls) -> Column:
"""List of strings of each Key name."""
return Column(postgresql.ARRAY(String), nullable=False)
示例23
def keys(cls) -> Column:
"""List of strings of each Key name."""
return Column(postgresql.ARRAY(String), nullable=False)
# Every annotation is with respect to a candidate
示例24
def upgrade():
op.create_table('ictrp',
# Meta
sa.Column('meta_uuid', sa.Text),
sa.Column('meta_source', sa.Text),
sa.Column('meta_created', sa.DateTime(timezone=True)),
sa.Column('meta_updated', sa.DateTime(timezone=True)),
# Main
sa.Column('register', sa.Text, primary_key=True),
sa.Column('last_refreshed_on', sa.Date),
sa.Column('main_id', sa.Text, primary_key=True),
sa.Column('date_of_registration', sa.Text),
sa.Column('primary_sponsor', sa.Text),
sa.Column('public_title', sa.Text),
sa.Column('scientific_title', sa.Text),
sa.Column('date_of_first_enrollment', sa.Text),
sa.Column('target_sample_size', sa.Integer),
sa.Column('recruitment_status', sa.Text),
sa.Column('url', sa.Text),
sa.Column('study_type', sa.Text),
sa.Column('study_design', sa.Text),
sa.Column('study_phase', sa.Text),
# Additional
sa.Column('countries_of_recruitment', ARRAY(sa.Text)),
sa.Column('contacts', JSONB),
sa.Column('key_inclusion_exclusion_criteria', sa.Text),
sa.Column('health_conditions_or_problems_studied', ARRAY(sa.Text)),
sa.Column('interventions', ARRAY(sa.Text)),
sa.Column('primary_outcomes', ARRAY(sa.Text)),
sa.Column('secondary_outcomes', ARRAY(sa.Text)),
sa.Column('secondary_ids', ARRAY(sa.Text)),
sa.Column('sources_of_monetary_support', ARRAY(sa.Text)),
sa.Column('secondary_sponsors', ARRAY(sa.Text)),
)
示例25
def downgrade():
op.create_table('trials',
sa.Column('uuid', UUID, primary_key=True),
sa.Column('updated', sa.DateTime(timezone=True), nullable=False),
sa.Column('records', ARRAY(sa.Text), nullable=False, unique=True),
sa.Column('nct_id', sa.Text, unique=True),
sa.Column('euctr_id', sa.Text, unique=True),
sa.Column('isrctn_id', sa.Text, unique=True),
sa.Column('scientific_title', sa.Text, unique=True),
)
示例26
def upgrade():
op.create_table('trials',
sa.Column('uuid', UUID, primary_key=True),
sa.Column('updated', sa.DateTime(timezone=True), nullable=False),
sa.Column('records', ARRAY(sa.Text), nullable=False, unique=True),
sa.Column('nct_id', sa.Text, unique=True),
sa.Column('euctr_id', sa.Text, unique=True),
sa.Column('isrctn_id', sa.Text, unique=True),
sa.Column('scientific_title', sa.Text, unique=True),
)
示例27
def __init__(self, field=None, **params):
super(Array, self).__init__(**params)
if field is None:
field = Text()
self.__field = field
self.__column_type = ARRAY(field.column_type)
示例28
def upgrade():
op.create_table(
'licenses',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('customer_id', sa.Text(), nullable=True),
sa.Column('expiration_date', UTCDateTime(), nullable=True),
sa.Column('license_edition', sa.String(length=255), nullable=True),
sa.Column('trial', sa.Boolean(), nullable=False),
sa.Column('cloudify_version', sa.Text(), nullable=True),
sa.Column('capabilities', postgresql.ARRAY(sa.Text()), nullable=True),
sa.Column('signature', sa.LargeBinary(), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('licenses_pkey')),
sa.UniqueConstraint(
'customer_id', name=op.f('licenses_customer_id_key'))
)
示例29
def _eq_filter(field: 'Column', value: 'Any') -> 'Any':
column_type = field.type
if isinstance(column_type, postgresql.ARRAY):
value = cast(value, column_type)
return field == value
示例30
def test_should_postgresql_array_convert():
field = get_field(postgresql.ARRAY(types.Integer))
assert isinstance(field.type, graphene.List)
assert field.type.of_type == graphene.Int