Streaming data into BigQuery

Instead of using a batch job to load into BigQuery, you can choose to stream data into BigQuery on record at a time using the tabledata().insertAll() method. This approach avoids the delay of running a load job. There are important trade-offs to consider before choosing an approach, including streaming quotas, data availability, and data consistency.

Data availability

  • Streamed data is available for real-time analysis within a few seconds of the first streaming insertion into a table.

  • Data can take up to 90 minutes to become available for copy and export operations.

  • When streaming to a partitioned table, data in the streaming buffer has a NULL value for the_PARTITIONTIMEpseudo column.

To see whether data is available for copy and export, check thetables.getresponse for a section namedstreamingBuffer. If that section is absent, your data should be available for copy or export, and should have a non-null value for the_PARTITIONTIMEpseudo column.

Data consistency

  • To help ensure data consistency, you can supply insertId for each inserted row. BigQuery remembers this ID for at least one minute. If you try to stream the same set of rows within that time period and the ID property is set, BigQuery uses the property to de-duplicate your data on a best effort basis.

  • You might have to retry an insert because there's no way to determine the state of a streaming insert under certain error conditions, such as network errors or internal errors within BigQuery. If you retry an insert, use the same insertId for the same set of rows so that BigQuery can attempt to de-duplicate your data.

  • If you are streaming data from a location outside of the dataset's location, you might experience increased latency and error rates.

Streaming into partitioned tables

  • When streaming into partitioned tables, you can stream directly to the table. In this case, rows will have a NULL value for the [_PARTITIONTIME] pseudo column. Rows are assigned a value for [_PARTITIONTIME] based on the UTC timestamp at which they leave the streaming buffer.

  • You can also stream directly to a partition of a partitioned table. For example, you can stream to the partition corresponding to 2017-03-01 for tablemydataset.tableusing the partition decorator:

mydataset.table$20170301
  • When streaming using a partition decorator, you can stream to partitions within the last 30 days in the past and 5 days in the future relative to the current date, based on current UTC time. To write to partitions for dates outside these allowed bounds, you can use load or query jobs, as described in Restating data in a partition.

Streaming data using template tables

A common usage pattern for streaming data into BigQuery is to split a logical table into many smaller tables, either for creating smaller sets of data (e.g., by date or by user ID) or for scalability (e.g., streaming more than the current limit of 100,000 rows per second).

Use BigQuery's template tables feature to split a table into many smaller tables without adding complex client-side code.

Template tables:

To use a template table via the BigQuery API, add atemplateSuffixparameter to your insertAll request. For the bq command line tool, add the template_suffix flag to yourinsertcommand. If BigQuery detects atemplateSuffixparameter or thetemplate_suffixflag, it treats the targeted table as a base template, and creates a new table that shares the same schema as the targeted table and has a name that includes the specified suffix:

<targeted_table_name> + <templateSuffix>

By using a template table, you avoid the overhead of creating each table individually and specifying the schema for each table. You need only create a single template, and supply different suffixes so that BigQuery can create the new tables for you.

BigQuery places the tables in the same project and dataset.

Changing the template table schema:

  • Templates also make it easier to update the schema because you need only update the template table. If you change a template table schema, all subsequently generated tables will use the updated schema. Previously generated tables will not be affected, unless the existing table still has a streaming buffer.

  • For existing tables that still have a streaming buffer, if you modify the template table schema in a backward compatible way, the schema of those actively streamed generated tables will also be updated. However, if you modify the template table schema in a non-backward compatible way, any buffered data that uses the old schema will be lost. Additionally, you will not be able to stream new data to existing generated tables that use the old, but now incompatible, schema.

  • After you change a template table schema, wait until the changes have propagated before you try to insert new data or query generated tables. Requests to insert new fields should succeed within a few minutes. Attempts to query the new fields might require a longer wait of up to 90 minutes.

  • Tables created via template tables are usually available within a few seconds. On rare occasions they may take longer to become available.

Template table details

  • Template suffix value Value must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum combined length of the table name and the table suffix is 1024 characters.

  • Quota The same quotas apply to all tables, whether they are based on templates or created manually.

  • Time to live The generated table inherits its expiration time from the dataset. As with normal streaming data, generated tables cannot be copied or exported immediately.

  • Deduplication Deduplication only happens between uniform references to a destination table.

  • Views Template tables should not be views.

Use cases

Event logging and tracking

If you have an app that collects a large amount of data in real-time, streaming inserts can be a good choice. In principle, these types of apps share the following requirements:

  • Writes are not transactional. High volume of records are continuously appended to the table. The app can tolerate that some duplicate data might occur or that data might be temporarily unavailable.
  • Reads perform aggregate analysis. Queries are used for trend analysis, as opposed to single or narrow record selection.

An example of event logging application is for tracking events generated by mobile devices such as user interactions or application errors and stream them into BigQuery. You could analyze this data to determine some trends or/and monitor anomalies in real-time.

Real-time data visualization

In some scenarios, streaming data into BigQuery enables real-time analysis over transactional data. Since streaming data comes with a possibility of duplicated data, ensure that you have a primary, transactional data store outside of BigQuery.

You can take a few precautions to ensure that you'll be able to perform analysis over transactional data, and also have an up-to-the-second view of your data:

  • Create two tables with an identical schema. The first table is for the reconciled data, and the second table is for the real-time, unreconciled data. The unreconciled data table might include duplicates or dropped records.
  • On the client side, maintain a transactional data store for records.
  • Send insertAll() requests for these records asynchronously. The insertAll() request should use the unreconciled table as the destination table.
  • Periodically append the reconciled data from the transactional data store and truncate the unreconciled data table.
  • For real-time visualization, you can select data from both tables.

Removing duplicates

You can use the following process to remove duplicate rows exist after you are done streaming.

  1. Add the insertId as a column in your table schema and include the insertId value in the data for each row.
  2. After streaming has stopped, check for duplicates.
  3. Remove duplicates using the below query:

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY [ID_COLUMN]) row_number
      FROM
        `[TABLE_NAME]`)
    WHERE
      row_number = 1
    

API examples for streaming inserts

Java:

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response = bigquery.insertAll(InsertAllRequest.newBuilder(tableId)
    .addRow("rowId", rowContent)
    // More rows can be added in the same RPC by invoking .addRow() on the builder
    .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Python:

def stream_data(dataset_name, table_name, json_data):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset_name)
    table = dataset.table(table_name)
    data = json.loads(json_data)

    # Reload the table to get the schema.
    table.reload()

    rows = [data]
    errors = table.insert_data(rows)

    if not errors:
        print('Loaded 1 row into {}:{}'.format(dataset_name, table_name))
    else:
        print('Errors:')
        pprint(errors)

Streaming quota

The following limits apply for streaming data into BigQuery.

  • Maximum row size: 1 MB. Exceeding this value will causeinvaliderrors.
  • HTTP request size limit: 10 MB. Exceeding this value will cause invaliderrors.
  • Maximum rows per second: 100,000 rows per second, per project. Exceeding this amount will cause quotaExceedederrors. The maximum number of rows per second per table is also 100,000. You can use all of this quota on one table or you can divide this quota among several tables in a project.
  • Maximum rows per request: 10,000 rows per request. We recommend a maximum of 500 rows. Batching can increase performance and throughput to a point, but at the cost of per-request latency. Too few rows per request and the overhead of each request can make ingestion inefficient. Too many rows per request and the throughput may drop. We recommend using about 500 rows per request, but experimentation with representative data (schema and data sizes) will help you determine the ideal batch size.
  • Maximum bytes per second: 100 MB per second, per table. Exceeding this amount will cause quotaExceeded errors.

results matching ""

    No results matching ""