Impala

Feature requests are unlikely to be accepted for the Impala backend, due to the maintenance burden of the components involved.
Install
Install Ibis and dependencies for the Impala backend:
Install with the impala extra:
pip install 'ibis-framework[impala]'And connect:
import ibis
con = ibis.impala.connect()- 1
- Adjust connection parameters as needed.
Install for Impala:
conda install -c conda-forge ibis-impalaAnd connect:
import ibis
con = ibis.impala.connect()- 1
- Adjust connection parameters as needed.
Install for Impala:
mamba install -c conda-forge ibis-impalaAnd connect:
import ibis
con = ibis.impala.connect()- 1
- Adjust connection parameters as needed.
Database methods
create_database
create_database(self, name, /, *, catalog=None, force=False)
drop_database
drop_database(self, name, /, *, catalog=None, force=False)
list_databases
list_databases(self, *, like=None)
Table methods
The Backend object itself has many helper utility methods.
table
table(self, name, /, *, database=None)
Construct a table expression from the corresponding table in the backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | tuple[str, str] | str | None | The database, or (catalog, database) from which to get the table. For backends that support a single-level table hierarchy, you can pass in a string like "bar". For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). If not provided, the current database (and catalog, if applicable for this backend) is used. See the Table Hierarchy Concepts Guide for more info. |
None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
Examples
>>> import ibis
>>> backend = ibis.duckdb.connect()Get the “foo” table from the current database (and catalog, if applicable for this backend):
>>> backend.table("foo")Get the “foo” table from the “bar” database (in DuckDB’s language they would say the “bar” schema, in SQL this would be "bar"."foo")
>>> backend.table("foo", database="bar")Get the “foo” table from the “bar” database, within the “baz” catalog (in DuckDB’s language they would say the “bar” schema, and “baz” database, in SQL this would be "baz"."bar"."foo")
>>> backend.table("foo", database=("baz", "bar"))sql
sql(self, query, /, *, schema=None, dialect=None)
Create an Ibis table expression from a SQL query.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| query | str | A SQL query string | required |
| schema | SchemaLike | None | The schema of the query. If not provided, Ibis will try to infer the schema of the query. | None |
| dialect | str | None | The SQL dialect of the query. If not provided, the backend’s dialect is assumed. This argument can be useful when the query is written in a different dialect from the backend. | None |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The table expression representing the query |
raw_sql
raw_sql(self, query)
list_tables
list_tables(self, *, like=None, database=None)
The table names that match like in the given database.
For some backends, the tables may be files in a directory, or other equivalent entities in a SQL database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| like | str | None | A pattern in Python’s regex format. | None |
| database | tuple[str, str] | str | None | The database, or (catalog, database) from which to list tables. For backends that support a single-level table hierarchy, you can pass in a string like "bar". For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). If not provided, the current database (and catalog, if applicable for this backend) is used. See the Table Hierarchy Concepts Guide for more info. |
None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | The list of the table names that match the pattern like. |
Examples
This example uses the DuckDB backend, but the list_tables API works the same for other backends.
>>> import ibis
>>> con = ibis.duckdb.connect()
>>> foo = con.create_table("foo", schema=ibis.schema(dict(a="int")))
>>> con.list_tables()
['foo']
>>> bar = con.create_view("bar", foo)
>>> con.list_tables()
['bar', 'foo']
>>> con.create_database("my_database")
>>> con.list_tables(database="my_database")
[]
>>> con.raw_sql("CREATE TABLE my_database.baz (a INTEGER)")
<duckdb.duckdb.DuckDBPyConnection object at 0x...>
>>> con.list_tables(database="my_database")
['baz']drop_table
drop_table(self, name, /, *, database=None, force=False)
Drop an Impala table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | str | None | Database name | None |
| force | bool | Database may throw exception if table does not exist | False |
Examples
>>> table = "my_table"
>>> db = "operations"
>>> con.drop_table(table, database=db, force=True) # quartodoc: +SKIPinsert
insert(self, name, /, obj=None, *, database=None, overwrite=False, partition=None, validate=True)
Insert into an Impala table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | The table name | required | |
| obj | Table expression or DataFrame | None |
|
| database | The table database | None |
|
| overwrite | If True, will replace existing contents of table | False |
|
| partition | For partitioned tables, indicate the partition that’s being inserted into, either with an ordered list of partition keys or a dict of partition field name to value. For example for the partition (year=2007, month=7), this can be either (2007, 7) or {‘year’: 2007, ‘month’: 7}. | None |
|
| validate | If True, do more rigorous validation that schema of table being inserted is compatible with the existing table | True |
Examples
Append to an existing table
>>> con.insert(table_name, table_expr) # quartodoc: +SKIPCompletely overwrite contents
>>> con.insert(table_name, table_expr, overwrite=True) # quartodoc: +SKIPtruncate_table
truncate_table(self, name, /, *, database=None)
Delete all rows from an existing table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | str | None | Database name | None |
get_schema
get_schema(self, table_name, *, catalog=None, database=None)
Return a Schema object for the indicated table and database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | Table name | required |
| catalog | str | None | Catalog name. Unused in the impala backend. | None |
| database | str | None | Database name | None |
Returns
| Name | Type | Description |
|---|---|---|
| Schema | Ibis schema |
cache_table
cache_table(self, table_name, *, database=None, pool='default')
Caches a table in cluster memory in the given pool.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | Table name | required | |
| database | Database name | None |
|
| pool | The name of the pool in which to cache the table | 'default' |
Examples
>>> table = "my_table"
>>> db = "operations"
>>> pool = "op_4GB_pool"
>>> con.cache_table("my_table", database=db, pool=pool) # quartodoc: +SKIPCreating views
drop_table_or_view
drop_table_or_view(self, name, /, *, database=None, force=False)
Drop view or table.
create_view
create_view(self, name, /, obj, *, database=None, overwrite=False)
Create a view from an Ibis expression.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | The name of the view to create. | required |
| obj | ir.Table | The Ibis expression to create the view from. | required |
| database | str | None | The database that the view should be created in. | None |
| overwrite | bool | If True, replace an existing view with the same name. |
False |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | A table expression representing the view. |
Accessing data
delimited_file
delimited_file(self, directory, schema, name=None, database=None, delimiter=',', na_rep=None, escapechar=None, lineterminator=None, external=True)
Interpret delimited text files as an Ibis table expression.
See the parquet_file method for more details on what happens under the hood.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| directory | Server directory containing delimited text files | required | |
| schema | Ibis schema | required | |
| name | Name for the table; otherwise random names are generated | None |
|
| database | Database to create the table in | None |
|
| delimiter | Character used to delimit columns | ',' |
|
| na_rep | Character used to represent NULL values | None |
|
| escapechar | Character used to escape special characters | None |
|
| lineterminator | Character used to delimit lines | None |
|
| external | Create table as EXTERNAL (data will not be deleted on drop). | True |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
parquet_file
parquet_file(self, directory, schema=None, name=None, database=None, external=True, like_file=None, like_table=None)
Create an Ibis table from the passed directory of Parquet files.
The table can be optionally named, otherwise a unique name will be generated.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| directory | str | Path | Path | required |
| schema | sch.Schema | None | If no schema provided, and neither of the like_* argument is passed, one will be inferred from one of the parquet files in the directory. | None |
| like_file | str | Path | None | Absolute path to Parquet file on the server to use for schema definitions. An alternative to having to supply an explicit schema | None |
| like_table | str | None | Fully scoped and escaped string to an Impala table whose schema we will use for the newly created table. | None |
| name | str | None | Random unique name generated otherwise | None |
| database | str | None | Database to create the (possibly temporary) table in | None |
| external | bool | If a table is external, the referenced data will not be deleted when the table is dropped in Impala. Otherwise Impala takes ownership of the Parquet file. | True |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
avro_file
avro_file(self, directory, avro_schema, name=None, database=None, external=True)
Create a table to read a collection of Avro data.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| directory | Server path to directory containing avro files | required | |
| avro_schema | The Avro schema for the data as a Python dict | required | |
| name | Table name | None |
|
| database | Database name | None |
|
| external | Whether the table is external | True |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
The Impala client object
To use Ibis with Impala, you first must connect to a cluster using the ibis.impala.connect function:
import ibis
client = ibis.impala.connect(host=impala_host, port=impala_port)By default binary transport mode is used, however it is also possible to use HTTP. Depending on your configuration, additional connection arguments may need to be provided. For the full list of possible connection arguments please refer to the impyla documentation.
import ibis
client = ibis.impala.connect(
host=impala_host,
port=impala_port,
username=username,
password=password,
use_ssl=True,
auth_mechanism='LDAP',
use_http_transport=True,
http_path='cliservice',
)All examples here use the following block of code to connect to impala using docker:
import ibis
client = ibis.impala.connect(host=host)You can accomplish many tasks directly through the client object.
Table objects
table
table(self, name, /, *, database=None)
Construct a table expression from the corresponding table in the backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | tuple[str, str] | str | None | The database, or (catalog, database) from which to get the table. For backends that support a single-level table hierarchy, you can pass in a string like "bar". For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). If not provided, the current database (and catalog, if applicable for this backend) is used. See the Table Hierarchy Concepts Guide for more info. |
None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
Examples
>>> import ibis
>>> backend = ibis.duckdb.connect()Get the “foo” table from the current database (and catalog, if applicable for this backend):
>>> backend.table("foo")Get the “foo” table from the “bar” database (in DuckDB’s language they would say the “bar” schema, in SQL this would be "bar"."foo")
>>> backend.table("foo", database="bar")Get the “foo” table from the “bar” database, within the “baz” catalog (in DuckDB’s language they would say the “bar” schema, and “baz” database, in SQL this would be "baz"."bar"."foo")
>>> backend.table("foo", database=("baz", "bar"))The client’s table method allows you to create an Ibis table expression referencing a physical Impala table:
table = client.table('functional_alltypes', database='ibis_testing')Expression execution
Ibis expressions have execution methods like to_pandas that compile and run the expressions on Impala or whichever backend is being referenced.
For example:
>>> fa = db.functional_alltypes
>>> expr = fa.double_col.sum()
>>> expr.to_pandas()
331785.00000000006For longer-running queries, Ibis will attempt to cancel the query in progress if an interrupt is received.
Creating tables
There are several ways to create new Impala tables:
- From an Ibis table expression
- Empty, from a declared schema
- Empty and partitioned
In all cases, you should use the create_table method either on the top-level client connection or a database object.
create_table
create_table(self, name, /, obj=None, *, schema=None, database=None, temp=None, overwrite=False, external=False, format='parquet', location=None, partition=None, tbl_properties=None, like_parquet=None)
Create a new table using an Ibis table expression or in-memory data.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| obj | ir.Table | pd.DataFrame | pa.Table | pl.DataFrame | pl.LazyFrame | None | If passed, creates table from select statement results | None |
| schema | sch.SchemaLike | None | Mutually exclusive with obj, creates an empty table with a particular schema | None |
| database | str | None | Database name | None |
| temp | bool | None | Whether a table is temporary | None |
| overwrite | bool | Do not create table if table with indicated name already exists | False |
| external | bool | Create an external table; Impala will not delete the underlying data when the table is dropped | False |
| format | File format | 'parquet' |
|
| location | Specify the directory location where Impala reads and writes files for the table | None |
|
| partition | Must pass a schema to use this. Cannot partition from an expression. | None |
|
| tbl_properties | Mapping[str, Any] | None | Table properties to set on table creation. | None |
| like_parquet | Can specify instead of a schema | None |
Creating tables from a table expression
If you pass an Ibis expression to create_table, Ibis issues a CREATE TABLE ... AS SELECT (CTAS) statement:
>>> table = client.table('functional_alltypes')
>>> expr = table.group_by('string_col').size()
>>> client.create_table('string_freqs', expr, format='parquet')
>>> freqs = client.table('string_freqs')
>>> freqs.to_pandas()
string_col count
0 9 730
1 3 730
2 6 730
3 4 730
4 1 730
5 8 730
6 2 730
7 7 730
8 5 730
9 0 730
>>> files = freqs.files()
>>> files
Path Size Partition
0 hdfs://impala:8020/user/hive/warehouse/ibis_te... 584B
>>> freqs.drop()You can also choose to create an empty table and use insert (see below).
Creating an empty table
To create an empty table, you must declare an Ibis schema that will be translated to the appropriate Impala schema and data types.
As Ibis types are simplified compared with Impala types, this may expand in the future to include a more fine-grained schema declaration.
You can use the create_table method on the client object.
schema = ibis.schema(dict(foo='string', year='int32', month='int16'))
name = 'new_table'
client.create_table(name, schema=schema)By default, this stores the data files in the database default location. You can force a particular path with the location option.
from getpass import getuser
schema = ibis.schema(dict(foo='string', year='int32', month='int16'))
name = 'new_table'
location = '/home/{}/new-table-data'.format(getuser())
client.create_table(name, schema=schema, location=location)If the schema matches a known table schema, you can always use the schema method to get a schema object:
>>> t = client.table('functional_alltypes')
>>> t.schema()
ibis.Schema {
id int32
bool_col boolean
tinyint_col int8
smallint_col int16
int_col int32
bigint_col int64
float_col float32
double_col float64
date_string_col string
string_col string
timestamp_col timestamp
year int32
month int32
}Creating a partitioned table
To create an empty partitioned table, include a list of columns to be used as the partition keys.
schema = ibis.schema(dict(foo='string', year='int32', month='int16'))
name = 'new_table'
client.create_table(name, schema=schema, partition=['year', 'month'])Partitioned tables
Ibis enables you to manage partitioned tables in various ways. Since each partition behaves as its own "subtable" sharing a common schema, each partition can have its own file format, directory path, serialization properties, and so forth.
There are a handful of methods for adding and removing partitions and getting information about the partition schema and any existing partition data:
add_partition
add_partition(self, table_name, spec, *, database=None, location=None)
Add a new table partition.
Partition parameters can be set in a single DDL statement or you can use alter_partition to set them after the fact.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | The table name. | required |
| spec | dict[str, Any] | list | The partition keys for the partition being added. | required |
| database | str | None | The database name. If not provided, the current database is used. | None |
| location | str | None | Location of the partition | None |
drop_partition
drop_partition(self, table_name, spec, *, database=None)
Drop an existing table partition.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | The table name. | required |
| spec | dict[str, Any] | list | The partition keys for the partition being dropped. | required |
| database | str | None | The database name. If not provided, the current database is used. | None |
get_partition_schema
get_partition_schema(self, table_name, database=None)
Return the schema for the partition columns.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | Table name | required |
| database | str | None | Database name | None |
Returns
| Name | Type | Description |
|---|---|---|
| Schema | Ibis schema for the partition columns |
list_partitions
list_partitions(self, name, database=None)
To address a specific partition in any method that is partition specific, you can either use a dict with the partition key names and values, or pass a list of the partition values:
schema = ibis.schema(dict(foo='string', year='int32', month='int16'))
name = 'new_table'
client.create_table(name, schema=schema, partition=['year', 'month'])
client.add_partition(name, {'year': 2007, 'month', 4})
client.add_partition(name, [2007, 5])
client.add_partition(name, [2007, 6])
client.drop_partition(name, [2007, 6])We’ll cover partition metadata management and data loading below.
Inserting data into tables
If the schemas are compatible, you can insert into a table directly from an Ibis table expression:
>>> t = client.functional_alltypes
>>> client.create_table('insert_test', schema=t.schema())
>>> client.insert('insert_test', t[:3])
>>> client.insert('insert_test', t[:3])
>>> client.insert('insert_test', t[:3])
>>> target = client.table('insert_test')
>>> target.to_pandas()
id bool_col tinyint_col ... timestamp_col year month
0 5770 True 0 ... 2010-08-01 00:00:00.000 2010 8
1 5771 False 1 ... 2010-08-01 00:01:00.000 2010 8
2 5772 True 2 ... 2010-08-01 00:02:00.100 2010 8
3 5770 True 0 ... 2010-08-01 00:00:00.000 2010 8
4 5771 False 1 ... 2010-08-01 00:01:00.000 2010 8
5 5772 True 2 ... 2010-08-01 00:02:00.100 2010 8
6 5770 True 0 ... 2010-08-01 00:00:00.000 2010 8
7 5771 False 1 ... 2010-08-01 00:01:00.000 2010 8
8 5772 True 2 ... 2010-08-01 00:02:00.100 2010 8
[9 rows x 13 columns]If the table is partitioned, you must indicate the partition you are inserting into:
part = {'year': 2007, 'month': 4}
client.insert(table_name, expr, partition=part)Managing table metadata
Ibis has functions that wrap many of the DDL commands for Impala table metadata.
Detailed table metadata: DESCRIBE FORMATTED
To get a handy wrangled version of DESCRIBE FORMATTED use the describe_formatted method.
describe_formatted
describe_formatted(self, name, database=None)
Retrieve the results of a DESCRIBE FORMATTED command.
See Impala documentation for more.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name. Can be fully qualified (with database) | required |
| database | str | None | Database name | None |
>>> meta = t.describe_formatted('functional_alltypes', database='ibis_testing')
>>> meta
<class 'ibis.backends.impala.metadata.TableMetadata'>
{'info': {'CreateTime': datetime.datetime(2021, 1, 14, 21, 23, 8),
'Database': 'ibis_testing',
'LastAccessTime': 'UNKNOWN',
'Location': 'hdfs://impala:8020/__ibis/ibis-testing-data/parquet/functional_alltypes',
'Owner': 'root',
'Protect Mode': 'None',
'Retention': 0,
'Table Parameters': {'COLUMN_STATS_ACCURATE': False,
'EXTERNAL': True,
'STATS_GENERATED_VIA_STATS_TASK': True,
'numFiles': 3,
'numRows': 7300,
'rawDataSize': '-1',
'totalSize': 106278,
'transient_lastDdlTime': datetime.datetime(2021, 1, 14, 21, 23, 17)},
'Table Type': 'EXTERNAL_TABLE'},
'schema': [('id', 'int'),
('bool_col', 'boolean'),
('tinyint_col', 'tinyint'),
('smallint_col', 'smallint'),
('int_col', 'int'),
('bigint_col', 'bigint'),
('float_col', 'float'),
('double_col', 'double'),
('date_string_col', 'string'),
('string_col', 'string'),
('timestamp_col', 'timestamp'),
('year', 'int'),
('month', 'int')],
'storage info': {'Bucket Columns': '[]',
'Compressed': False,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'Num Buckets': 0,
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'SerDe Library': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
'Sort Columns': '[]'}}
>>> meta.location
'hdfs://impala:8020/__ibis/ibis-testing-data/parquet/functional_alltypes'
>>> meta.create_time
datetime.datetime(2021, 1, 14, 21, 23, 8)The show_files function is also available to see all of the physical HDFS data files backing a table:
show_files
show_files(self, name, database=None)
Retrieve results of a SHOW FILES command for a table.
See Impala documentation for more.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name. Can be fully qualified (with database) | required |
| database | str | None | Database name | None |
>>> client.show_files('store_sales', database='tpcds_parquet')[:5]
path size \
0 hdfs://localhost:20500/test-warehouse/tpcds.st... 160.61KB
1 hdfs://localhost:20500/test-warehouse/tpcds.st... 123.88KB
2 hdfs://localhost:20500/test-warehouse/tpcds.st... 139.28KB
3 hdfs://localhost:20500/test-warehouse/tpcds.st... 139.60KB
4 hdfs://localhost:20500/test-warehouse/tpcds.st... 62.84KB
partition
0 ss_sold_date_sk=2451803
1 ss_sold_date_sk=2451819
2 ss_sold_date_sk=2451772
3 ss_sold_date_sk=2451789
4 ss_sold_date_sk=2451741Modifying table metadata
alter_partition
alter_partition(self, table_name, spec, *, database=None, location=None, format=None, tbl_properties=None, serde_properties=None)
Change settings and parameters of an existing partition.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | The table name | required |
| spec | dict[str, Any] | list | The partition keys for the partition being modified | required |
| database | str | None | The database name. If not provided, the current database is used. | None |
| location | str | None | Location of the partition | None |
| format | str | None | Table format | None |
| tbl_properties | dict | None | Table properties | None |
| serde_properties | dict | None | Serialization/deserialization properties | None |
If a table is partitioned, you can modify the properties of a particular partition:
client.alter_partition(
'table_name',
{'year': 2007, 'month': 5},
location=data_dir,
format='text',
serde_properties=csv_props
)Table statistics
Computing table and partition statistics
compute_stats
compute_stats(self, name, database=None, incremental=False)
Issue a COMPUTE STATS command for a given table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Can be fully qualified (with database name) | required |
| database | str | None | Database name | None |
| incremental | bool | If True, issue COMPUTE INCREMENTAL STATS | False |
Impala-backed physical tables have a method compute_stats that computes table, column, and partition-level statistics to assist with query planning and optimization. It is standard practice to invoke this after creating a table or loading new data:
client.compute_stats('table_name')If you are using a recent version of Impala, you can also access the COMPUTE INCREMENTAL STATS DDL command:
client.compute_stats('table_name', incremental=True)Seeing table and column statistics
column_stats
column_stats(self, name, database=None)
Return results of SHOW COLUMN STATS for the table name.
table_stats
table_stats(self, name, database=None)
Return results of SHOW TABLE STATS for the table name.
The compute_stats and table_stats functions return the results of SHOW COLUMN STATS and SHOW TABLE STATS, respectively, and their output will depend, of course, on the last COMPUTE STATS call.
>>> client.compute_stats('store_sales', database='tpcds_parquet', incremental=True)
>>> stats = client.table_stats('store_sales', database='tpcds_parquet')
>>> stats[:5]
ss_sold_date_sk #Rows #Files Size Bytes Cached Cache Replication \
0 2450829 1071 1 78.34KB NOT CACHED NOT CACHED
1 2450846 839 1 61.83KB NOT CACHED NOT CACHED
2 2450860 747 1 54.86KB NOT CACHED NOT CACHED
3 2450874 922 1 66.74KB NOT CACHED NOT CACHED
4 2450888 856 1 63.33KB NOT CACHED NOT CACHED
Format Incremental stats \
0 PARQUET true
1 PARQUET true
2 PARQUET true
3 PARQUET true
4 PARQUET true
Location
0 hdfs://localhost:20500/test-warehouse/tpcds.st...
1 hdfs://localhost:20500/test-warehouse/tpcds.st...
2 hdfs://localhost:20500/test-warehouse/tpcds.st...
3 hdfs://localhost:20500/test-warehouse/tpcds.st...
4 hdfs://localhost:20500/test-warehouse/tpcds.st...
>>> cstats = client.column_status('store_sales', database='tpcds_parquet')
>>> cstats
Column Type #Distinct Values #Nulls Max Size Avg Size
0 ss_sold_time_sk BIGINT 13879 -1 NaN 8
1 ss_item_sk BIGINT 17925 -1 NaN 8
2 ss_customer_sk BIGINT 15207 -1 NaN 8
3 ss_cdemo_sk BIGINT 16968 -1 NaN 8
4 ss_hdemo_sk BIGINT 6220 -1 NaN 8
5 ss_addr_sk BIGINT 14077 -1 NaN 8
6 ss_store_sk BIGINT 6 -1 NaN 8
7 ss_promo_sk BIGINT 298 -1 NaN 8
8 ss_ticket_number INT 15006 -1 NaN 4
9 ss_quantity INT 99 -1 NaN 4
10 ss_wholesale_cost DECIMAL(7,2) 10196 -1 NaN 4
11 ss_list_price DECIMAL(7,2) 19393 -1 NaN 4
12 ss_sales_price DECIMAL(7,2) 15594 -1 NaN 4
13 ss_ext_discount_amt DECIMAL(7,2) 29772 -1 NaN 4
14 ss_ext_sales_price DECIMAL(7,2) 102758 -1 NaN 4
15 ss_ext_wholesale_cost DECIMAL(7,2) 125448 -1 NaN 4
16 ss_ext_list_price DECIMAL(7,2) 141419 -1 NaN 4
17 ss_ext_tax DECIMAL(7,2) 33837 -1 NaN 4
18 ss_coupon_amt DECIMAL(7,2) 29772 -1 NaN 4
19 ss_net_paid DECIMAL(7,2) 109981 -1 NaN 4
20 ss_net_paid_inc_tax DECIMAL(7,2) 132286 -1 NaN 4
21 ss_net_profit DECIMAL(7,2) 122436 -1 NaN 4
22 ss_sold_date_sk BIGINT 120 0 NaN 8REFRESH and INVALIDATE METADATA
These DDL commands are available as client-level methods:
invalidate_metadata
invalidate_metadata(self, name=None, database=None)
Issue an INVALIDATE METADATA command.
Optionally this applies to a specific table. See Impala documentation.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | None | Table name. Can be fully qualified (with database) | None |
| database | str | None | Database name | None |
refresh
refresh(self, name, database=None)
Reload metadata for a table.
This can be useful after ingesting data as part of an ETL pipeline, for example.
Related to INVALIDATE METADATA. See Impala documentation for more.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name. Can be fully qualified (with database) | required |
| database | str | None | Database name | None |
You can invalidate the cached metadata for a single table or for all tables using invalidate_metadata, and similarly invoke REFRESH db_name.table_name using the refresh method.
client.invalidate_metadata()
client.invalidate_metadata(table_name)
client.refresh(table_name)These methods are often used in conjunction with the LOAD DATA commands and COMPUTE STATS. See the Impala documentation for full details.
Parquet and other session options
Ibis gives you access to Impala session-level variables that affect query execution:
get_options
get_options(self)
Return current query options for the Impala session.
set_options
set_options(self, options)
set_compression_codec
set_compression_codec(self, codec)
For example:
>>> client.get_options()
{'ABORT_ON_ERROR': '0',
'APPX_COUNT_DISTINCT': '0',
'BUFFER_POOL_LIMIT': '',
'COMPRESSION_CODEC': '',
'COMPUTE_STATS_MIN_SAMPLE_SIZE': '1073741824',
'DEFAULT_JOIN_DISTRIBUTION_MODE': '0',
'DEFAULT_SPILLABLE_BUFFER_SIZE': '2097152',
'DISABLE_ROW_RUNTIME_FILTERING': '0',
'DISABLE_STREAMING_PREAGGREGATIONS': '0',
'DISABLE_UNSAFE_SPILLS': '0',
'ENABLE_EXPR_REWRITES': '1',
'EXEC_SINGLE_NODE_ROWS_THRESHOLD': '100',
'EXEC_TIME_LIMIT_S': '0',
'EXPLAIN_LEVEL': '1',
'HBASE_CACHE_BLOCKS': '0',
'HBASE_CACHING': '0',
'IDLE_SESSION_TIMEOUT': '0',
'MAX_ERRORS': '100',
'MAX_NUM_RUNTIME_FILTERS': '10',
'MAX_ROW_SIZE': '524288',
'MEM_LIMIT': '0',
'MIN_SPILLABLE_BUFFER_SIZE': '65536',
'MT_DOP': '',
'NUM_SCANNER_THREADS': '0',
'OPTIMIZE_PARTITION_KEY_SCANS': '0',
'PARQUET_ANNOTATE_STRINGS_UTF8': '0',
'PARQUET_ARRAY_RESOLUTION': '2',
'PARQUET_DICTIONARY_FILTERING': '1',
'PARQUET_FALLBACK_SCHEMA_RESOLUTION': '0',
'PARQUET_FILE_SIZE': '0',
'PARQUET_READ_STATISTICS': '1',
'PREFETCH_MODE': '1',
'QUERY_TIMEOUT_S': '0',
'REPLICA_PREFERENCE': '0',
'REQUEST_POOL': '',
'RUNTIME_BLOOM_FILTER_SIZE': '1048576',
'RUNTIME_FILTER_MAX_SIZE': '16777216',
'RUNTIME_FILTER_MIN_SIZE': '1048576',
'RUNTIME_FILTER_MODE': '2',
'RUNTIME_FILTER_WAIT_TIME_MS': '0',
'S3_SKIP_INSERT_STAGING': '1',
'SCHEDULE_RANDOM_REPLICA': '0',
'SCRATCH_LIMIT': '-1',
'SEQ_COMPRESSION_MODE': '',
'SYNC_DDL': '0'}To enable Snappy compression for Parquet files, you could do either of:
>>> client.set_options({'COMPRESSION_CODEC': 'snappy'})
>>> client.set_compression_codec('snappy')
>>> client.get_options()['COMPRESSION_CODEC']
'SNAPPY'Ingesting data from pandas
Overall interoperability between the Hadoop / Spark ecosystems and pandas / the PyData stack is poor, but it will improve in time (this is a major part of the Ibis roadmap).
Ibis’s Impala tools currently interoperate with pandas in these ways:
- Ibis expressions return pandas objects (i.e. DataFrame or Series) for non-scalar expressions when calling their
to_pandasmethod - The
create_tableandinsertmethods can accept pandas objects. This includes inserting into partitioned tables. It currently uses CSV as the ingest route.
For example:
>>> import pandas as pd
>>> data = pd.DataFrame({'foo': [1, 2, 3, 4], 'bar': ['a', 'b', 'c', 'd']})
>>> t = client.create_table('pandas_table', data)
>>> t.to_pandas()
bar foo
0 a 1
1 b 2
2 c 3
3 d 4
>>> to_insert = client.create_table('empty_for_insert', schema=t.schema())
>>> client.insert('empty_for_insert', data)
>>> to_insert.to_pandas()
bar foo
0 a 1
1 b 2
2 c 3
3 d 4Queries on Parquet, Avro, and Delimited files
Ibis can easily create temporary or persistent Impala tables that reference data in the following formats:
- Parquet (
parquet_file) - Avro (
avro_file) - Delimited text formats (CSV, TSV, etc.) (
delimited_file)
Parquet is the easiest because the schema can be read from the data files:
>>> path = '/__ibis/ibis-testing-data/parquet/tpch_lineitem'
>>> lineitem = con.parquet_file(path)
>>> lineitem.limit(2)
l_orderkey l_partkey l_suppkey l_linenumber l_quantity l_extendedprice \
0 1 155190 7706 1 17.00 21168.23
1 1 67310 7311 2 36.00 45983.16
l_discount l_tax l_returnflag l_linestatus l_shipdate l_commitdate \
0 0.04 0.02 N O 1996-03-13 1996-02-12
1 0.09 0.06 N O 1996-04-12 1996-02-28
l_receiptdate l_shipinstruct l_shipmode \
0 1996-03-22 DELIVER IN PERSON TRUCK
1 1996-04-20 TAKE BACK RETURN MAIL
l_comment
0 egular courts above the
1 ly final dependencies: slyly bold>>> lineitem.l_extendedprice.sum()
Decimal('229577310901.20')If you want to query a Parquet file and also create a table in Impala that remains after your session, you can pass more information to parquet_file:
>>> table = con.parquet_file(path, name='my_parquet_table',
... database='ibis_testing',
... persist=True)
>>> table.l_extendedprice.sum()
Decimal('229577310901.20')>>> con.table('my_parquet_table').l_extendedprice.sum()
Decimal('229577310901.20')>>> con.drop_table('my_parquet_table')To query delimited files, you need to write down an Ibis schema.
>>> schema = ibis.schema(dict(foo='string', bar='double', baz='int32'))
>>> table = con.delimited_file('/__ibis/ibis-testing-data/csv', schema)
>>> table.limit(10)
foo bar baz
0 63IEbRheTh 0.679389 6
1 mG4hlqnjeG 2.807106 15
2 JTPdX9SZH5 -0.155126 55
3 2jcl6FypOl 1.037878 21
4 k3TbJLaadQ -1.401908 23
5 rP5J4xvinM -0.442093 22
6 WniUylixYt -0.863748 27
7 znsDuKOB1n -0.566030 47
8 4SRP9jlo1M 0.331460 88
9 KsfjPyDf5e -0.578931 70>>> table.bar.summary()
count nulls min max sum mean approx_nunique
0 100 0 -1.401908 2.807106 8.479978 0.0848 10For functions like parquet_file and delimited_file, a directory must be passed and the directory must contain files all having the same schema.
Other helper functions for interacting with the database
We’re adding a growing list of useful utility functions for interacting with an Impala cluster on the client object. The idea is that you should be able to do any database-admin-type work with Ibis and not have to switch over to the Impala SQL shell. Any ways we can make this more pleasant, please let us know.
Here’s some of the features, which we’ll give examples for:
- Listing and searching for available databases and tables
- Creating and dropping databases
- Getting table schemas
>>> con.list_databases(like='ibis*')
['ibis_testing', 'ibis_testing_tmp_db']>>> con.list_tables(database='ibis_testing', like='tpch*')
['tpch_customer',
'tpch_lineitem',
'tpch_nation',
'tpch_orders',
'tpch_part',
'tpch_partsupp',
'tpch_region',
'tpch_region_avro',
'tpch_supplier']>>> schema = con.get_schema('functional_alltypes')
>>> schema
ibis.Schema {
id int32
bool_col boolean
tinyint_col int8
smallint_col int16
int_col int32
bigint_col int64
float_col float32
double_col float64
date_string_col string
string_col string
timestamp_col timestamp
year int32
month int32
}Databases can be created, too, and you can set the storage path in HDFS you want for the data files
>>> db = 'ibis_testing2'
>>> con.create_database(db, force=True)>>> con.create_table('example_table', con.table('functional_alltypes'),
... database=db, force=True)To drop a database, including all tables in it, you can use drop_database with force=True:
>>> con.drop_database(db, force=True)User Defined functions (UDF)
Impala currently supports user-defined scalar functions (known henceforth as UDFs) and aggregate functions (respectively UDAs) via a C++ extension API.
Initial support for using C++ UDFs in Ibis came in version 0.4.0.
Using scalar functions (UDFs)
Let’s take an example to illustrate how to make a C++ UDF available to Ibis. Here is a function that computes an approximate equality between floating point values:
#include "impala_udf/udf.h"
#include <cctype>
#include <cmath>
BooleanVal FuzzyEquals(FunctionContext* ctx, const DoubleVal& x, const DoubleVal& y) {
const double EPSILON = 0.000001f;
if (x.is_null || y.is_null) return BooleanVal::null();
double delta = fabs(x.val - y.val);
return BooleanVal(delta < EPSILON);
}You can compile this to either a shared library (a .so file) or to LLVM bitcode with clang (a .ll file). Skipping that step for now (will add some more detailed instructions here later, promise).
To make this function callable, we use ibis.impala.wrap_udf:
library = '/ibis/udfs/udftest.ll'
inputs = ['double', 'double']
output = 'boolean'
symbol = 'FuzzyEquals'
udf_db = 'ibis_testing'
udf_name = 'fuzzy_equals'
fuzzy_equals = ibis.impala.wrap_udf(
library, inputs, output, symbol, name=udf_name
)In typical workflows, you will set up a UDF in Impala once then use it thenceforth. So the first time you do this, you need to create the UDF in Impala:
client.create_function(fuzzy_equals, database=udf_db)Now, we must register this function as a new Impala operation in Ibis. This must take place each time you load your Ibis session.
func.register(fuzzy_equals.name, udf_db)The object fuzzy_equals is callable and works with Ibis expressions:
>>> t = con.tables.functional_alltypes
>>> expr = fuzzy_equals(t.float_col, t.double_col / 10)
>>> expr.to_pandas()[:10]
0 True
1 False
2 False
3 False
4 False
5 False
6 False
7 False
8 False
9 False
Name: tmp, dtype: boolNote that the call to register on the UDF object must happen each time you use Ibis. If you have a lot of UDFs, I suggest you create a file with all of your wrapper declarations and user APIs that you load with your Ibis session to plug in all your own functions.
Working with secure clusters (Kerberos)
Ibis is compatible with Hadoop clusters that are secured with Kerberos (as well as SSL and LDAP). Note that to enable this support, you’ll also need to install the kerberos package.
$ pip install kerberosJust like the Impala shell and ODBC/JDBC connectors, Ibis connects to Impala through the HiveServer2 interface (using the impyla client). Therefore, the connection semantics are similar to the other access methods for working with secure clusters.
Specifically, after authenticating yourself against Kerberos (e.g., by issuing the appropriate kinit command), pass auth_mechanism='GSSAPI' or auth_mechanism='LDAP' (and set kerberos_service_name if necessary along with user and password if necessary) to the ibis.impala_connect(...) method. This method also takes arguments to configure SSL (use_ssl, ca_cert). See the documentation for the Impala shell for more details.