Dataproc connectors
Google BigQuery connector
You can use a Google BigQuery connector to enable programmatic read/write access to Google BigQuery. This is a preferred way to process data that is stored in BigQuery. No command-line access is exposed. The BigQuery connector is a Java library that enables Hadoop to process data from BigQuery using abstracted versions of the Apache Hadoop InputFormat and OutputFormat classes.
Pricing considerations
When using the connector, you will also be charged for any associated BigQuery usage fees. Additionally, the BigQuery connector downloads data into a Google Cloud Storage bucket before running a Hadoop job. After the Hadoop job successfully completes, the data is deleted from Cloud Storage. You are charged for storage according to Cloud Storage pricing.
To avoid excess charges, check your Cloud Storage account and make sure to remove unneeded temporary files.
The BigQuery connector is installed by default on all Google Cloud Dataproc clusters.
Google Cloud Bigtable connector
Google Cloud Bigtable is Google's NoSQL Big Data database service. It's the same database that powers many core Google services, including Search, Analytics, Maps, and Gmail. Bigtable is designed to handle massive workloads at consistent low latency and high throughput, so it's a great choice for both operational and analytical applications, including IoT, user analytics, and financial data analysis.
Cloud Bigtable is an excellent option for any Apache Spark or Hadoop uses that requireApache HBase. Cloud Bigtable supports the Apache HBase 1.0+ APIs and offers aCloud Bigtable HBaseclient in Maven, so it is easy to use Cloud Bigtable with Cloud Dataproc.
Google Cloud Storage connector
The Google Cloud Storage connector lets you runApache HadooporApache Sparkjobs directly on data inCloud Storage, and offers a number of benefits over choosing the Hadoop Distributed File System (HDFS).
Cloud Storage connector
- Direct data access – Store your data in Cloud Storage and access it directly, with no need to transfer it into HDFS first.
- HDFS compatibility – You can easily access your data in Cloud Storage using the
gs://
prefix instead ofhdfs://
. - Interoperability – Storing data in Cloud Storage enables seamless interoperability between Spark, Hadoop, and Google services.
- Data accessibility – When you shut down a Hadoop cluster, you still have access to your data in Cloud Storage, unlike HDFS.
- High data availability – Data stored in Cloud Storage is highly available and globally replicated without a loss of performance.
- No storage management overhead – Unlike HDFS, Cloud Storage requires no routine maintenance such as checking the file system, upgrading or rolling back to a previous version of the file system, etc.
- Quick startup – In HDFS, a MapReduce job can't start until the
NameNode
is out of safe mode—a process that can take from a few seconds to many minutes depending on the size and state of your data. With Google Cloud Storage, you can start your job as soon as the task nodes start, leading to significant cost savings over time.
The Cloud Storage connector is installed by default on all Google Cloud Dataproc clusters.
There are multiple ways to access data stored in Google Cloud Storage:
- In a Spark or Hadoop application using the
gs://
prefix. - The hadoop shell:
hadoop fs -ls gs://CONFIGBUCKET/dir/file
. - The Cloud Platform Console Cloud Storage browser.
- Using the
gsutil cp
orgsutil rsync
commands.
Using the BigQuery Connector with Spark
The BigQuery Connector can be used with Apache Spark to read and write data from/to Google BigQuery. This section provides example code that uses the BigQuery connector with Spark.
This example reads data from BigQuery into Spark to perform a word count usingSparkContext.newAPIHadoopRDD
(see the Spark documentation) for more information. It writes the data back to BigQuery usingPairRDDFunctions.saveAsNewAPIHadoopDataset
.
Before trying this example, either create a dataset named "wordcount_dataset" or changeoutputDatasetId
to an existing BigQuery dataset in your project. Here is thebqcommand to create the dataset:
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
// Marked as transient since configuration is not Serializable. This should
// only be necessary in spark-shell REPL.
@transient
val conf = sc.hadoopConfiguration
// Input parameters
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
// Output parameters
val projectId = conf.get("fs.gs.project.id")
val outputDatasetId = "wordcount_dataset"
val outputTableId = "wordcount_output"
val outputTableSchema =
"[{'name': 'word', 'type': 'STRING'}, {'name': 'word_count', 'type': 'INTEGER'}]"
// Use the Google Cloud Storage bucket for temporary BigQuery export data used
// by the InputFormat. This assumes the Google Cloud Storage connector for
// Hadoop is configured.
val bucket = conf.get("fs.gs.system.bucket")
// Input configuration
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
// This temporary path is used while the InputFormat is live, and must
// be cleaned up when the job is done.
val inputTmpDir = s"gs://${bucket}/hadoop/tmp/bigquery/wordcounttmp"
conf.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, inputTmpDir)
// Output configuration
BigQueryConfiguration.configureBigQueryOutput(
conf, projectId, outputDatasetId, outputTableId, outputTableSchema)
conf.set(
"mapreduce.job.outputformat.class",
classOf[BigQueryOutputFormat[_,_]].getName)
// Helper to convert JsonObjects to (word, count) tuples.
def convertToTuple(record: JsonObject) : (String, Long) = {
val word = record.get("word").getAsString.toLowerCase
val count = record.get("word_count").getAsLong
return (word, count)
}
// Helper to convert (word, count) tuples to JsonObjects.
def convertToJson(pair: (String, Long)) : JsonObject = {
val word = pair._1
val count = pair._2
val jsonObject = new JsonObject()
jsonObject.addProperty("word", word)
jsonObject.addProperty("word_count", count)
return jsonObject
}
// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).cache
// Perform word count.
val wordCounts = (tableData
.map(entry => convertToTuple(entry._2))
.reduceByKey(_ + _))
// Display 10 results.
wordCounts.take(10).foreach(l => println(l))
// Write data back into a new BigQuery table.
// BigQueryOutputFormat discards keys, so set key to null.
(wordCounts
.map(pair => (null, convertToJson(pair)))
.saveAsNewAPIHadoopDataset(conf))
// Manually clean up the inputTmpDir, otherwise BigQuery export
// files will remain indefinitely.
val inputTmpDirPath = new Path(inputTmpDir)
inputTmpDirPath.getFileSystem(conf).delete(inputTmpDirPath, true)
Writing a MapReduce Job with the BigQuery Connector
GsonBigQueryInputFormat
provides Hadoop with the appropriate BigQuery objects accessible in a JsonObject format via the following primary operations:
- Using a user-specified query to select the appropriate BigQuery objects
- Splitting the results of the query evenly among the Hadoop nodes
- Parsing the splits into java objects to pass to the mapper. The Hadoop Mapper class receives a JsonObject representation of each selected BigQuery object.
This class provides access to BigQuery records through an extension of the Hadoop InputFormat class. To use this class correctly, a few lines must be added to the main Hadoop job. In particular, several parameters must be set in the Hadoop configuration and the InputFormat class must be set to GsonBigQueryInputFormat
. Below is an example of the parameters to set and the lines of code needed to correctly use GsonBigQueryInputFormat
.
Input Parameters
QualifiedInputTableId: The BigQuery table to read from, in the form: optional-projectId:datasetId.tableId
Example:
publicdata:samples.shakespeare
projectId: The BigQuery projectId under which all of the input operations occur.
Example:
my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);
Mapper
TheGsonBigQueryInputFormat
class reads from BigQuery. It passes the BigQuery objects one at a time as input to the HadoopMapper
function. The inputs take the form of aLongWritable
and aJsonObject
. The LongWritable tracks the record number. The JsonObject contains the Json-formatted BigQuery record. The Mapper accepts the LongWritable and JsonObject pair as input. Here is a snippet from theMapper
for asample WordCountjob.
// private static final LongWritable ONE = new LongWritable(1);
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
/**
* The mapper function for WordCount.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for
// in the map task.
Configuration conf = context.getConfiguration();
wordKey = context.getConfiguration().get(WORDCOUNT_WORD_FIELDNAME_KEY,
WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
IndirectBigQueryOutputFormat class
IndirectBigQueryOutputFormat
provides Hadoop with the ability to writeJsonObject
values directly into a BigQuery table. This class provides access to BigQuery records through an extension of the HadoopOutputFormatclass. To use it correctly, several parameters must be set in the Hadoop configuration, and theOutputFormatclass must be set toIndirectBigQueryOutputFormat
. Below is an example of the parameters to set and the lines of code needed to correctly useIndirectBigQueryOutputFormat
.
Output Parameters
projectId: The BigQuery projectId under which all of the output operations occur.
Example: "my-first-cloud-project"
QualifiedOutputTableId: The BigQuery dataset to write the final job results to, in the form optional-projectId:datasetId.tableId
. The datasetId should already be present in your project. outputDatasetId_hadoop_temporary dataset will be created in BigQuery for temporary results. Make sure this does not conflict with an existing dataset.
Examples:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema: A schema that defines the schema for the output BigQuery table.
GcsOutputPath: The output path to store temporary GCS data (gs://bucket/dir/
)
// Define the schema we will be using for the output BigQuery table.
List outputTableFieldSchema = new ArrayList();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input and output.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
Reducer
TheIndirectBigQueryOutputFormat
class writes to BigQuery. It takes a key and aJsonObject
value as input and writes only the JsonObject value to BigQuery (the key is ignored). TheJsonObject
should contain a Json-formatted BigQuery record. The Reducer should output a key of any type (NullWritable
is used in oursample WordCountjob) andJsonObject
value pair. The Reducer for the sample WordCount job is shown below.
/**
* Reducer function for WordCount.
*/
public static class Reduce
extends Reducer<Text, LongWritable, Text, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
Clean up
After the job completes, clean up Google Cloud Storage export paths.
job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());