The Python implementation of Dataflow to transfer Datastore entities to BigQuery

Hiroki Fujino
Python in Plain English
5 min readJun 29, 2020

--

In my previous article, I introduced how to delete entities of Google Cloud Datastore in bulk with Dataflow. In this article, I’m going to introduce how to transfer entities of Google Cloud Datastore into BigQuery in bulk with Dataflow.

All entire code is on my Github repositories.

Motivation

Datastore is a scalable NoSQL database. Datastore supports ACID-compliant transactions, Indexing, etc. Therefore, it can be used for the management of general data on an application such as user profiles, sessions, etc. More information about the use case of Datastore is written in this link.

On the other hand, BigQuery is one of the data analytics products. BigQuery provides some modules supporting data analytics and flexible SQL. Therefore, it can be used for extracting any data from BigQuery with SQL and analyzing data with modules such as BigQuery ML.

In an application, Datastore can be used for an application database and BigQuery can be used for data analytics. In order to analytics the application data from BigQuery, the entities of Datastore have to be constantly transferred to BigQuery, for example, every day or every week.

In order to realize this data transfer, there are some ways as follows:

  1. ds2bq
  2. Google Cloud Dataflow

ds2bq is the library to implement a combination of Datastore Export and BQ Load in Golang. Google Cloud Dataflow is a fully managed service that executes Apache Beam pipelines on Google Cloud Platform. Dataflow helps us to develop a scalable data pipeline of GCS modules such as Datastore, BigQuery and Cloud Storage. In this article, I’m going to introduce a way to use Dataflow.

Development Environment

These python packages below are used in the sample code of this article.

REQUIRED_PACKAGES = [
'apache-beam[gcp]==2.19.0',
'datetime==4.3.0'
]

Transfer entities with Beam

The pipeline of transferring entities is executed with following these steps:

  1. Get all entities of Datastore
  2. Load all entities into BigQuery through Google Cloud Storage
entities = (p
| 'Get Kinds' >> GetKinds(project_id)
| 'Create Query' >> beam.ParDo(CreateQuery(project_id))
| 'Get Entity' >> beam.ParDo(ReadFromDatastore._QueryFn()))

_ = (entities
| 'Convert Entity' >> beam.Map(convert)
| 'BigQuery Load' >> BigQueryBatchFileLoads(destination=lambda row, table_dict: table_dict[row["__key__"]["kind"]],
custom_gcs_temp_location=gcs_dir,
write_disposition='WRITE_TRUNCATE',
table_side_inputs=(table_names_dict,),
additional_bq_parameters=get_partition_conf,
schema='SCHEMA_AUTODETECT')
)
The entire pipeline

1. Get all entities of Datastore

Get all Kind names

Ideally, I want to transfer all entities of Datastore dynamically into BigQuery to reduce operation costs of changing the code when a new Kind is added. In order to get all entities dynamically on Datastore, at first, we have to get all Kind names. The below GetKinds class gets all Kind names. This class access the entities on Datastore by using the datastore module then returns all Kind names wrapped in PCollection.

class GetKinds(PTransform):
def __init__(self, project_id):
self.project_id = project_id

def expand(self, pcoll):
from google.cloud import datastore
from apache_beam import Create
query = datastore.Client(self.project_id).query(kind='__kind__')
query.keys_only()
kinds = [entity.key.id_or_name for entity in query.fetch()]
return pcoll.pipeline | 'Kind' >> Create(kinds)

All entities of each Kind

After getting all kind names, the code below gets all entities based on a query of each Kind. This process is executed in parallel by each Kind.

'Create Query' >> beam.ParDo(CreateQuery(project_id))
| 'Get Entity' >> beam.ParDo(ReadFromDatastore._QueryFn()))
class CreateQuery(DoFn):
def __init__(self, project_id):
self.project_id = project_id

def process(self, element):
from apache_beam.io.gcp.datastore.v1new.types import Query
return [Query(kind=element, project=self.project_id)]

2. Load all entities into BigQuery

BigQueryBatchFileLoads in the apache_beam.io.gcp.bigquery_file_loads module is suited for loading entities into BigQuery. This class scalably uploads entity into GCS then loads these files into BigQuery.

Convert entity object into JSON format

There are various ways to load data into BigQuery. BigQueryBatchFileLoads supports loading data from Cloud Storage with JSON format. So an entity object has to be transformed the format to enable BigQueryBatchFileLoads to upload entity on GCS with JSON format. The code below transforms an entity object into Python Dictionary. The key value of the Dictionary will be saved as a column name and the value of the Dictionary will be saved as a column value on BigQuery.

entities
| 'Convert Entity' >> beam.Map(convert)
def convert(entity):
return dict({k: v for k, v in entity.properties.items()},
__key__={
'kind': entity.key.path_elements[0],
'name': entity.key.path_elements[1]
})

Bq load of each entity into each table

BigQueryBatchFileLoads can take dynamical arguments for the table name of BigQuery and the entity name of Datastore. The destination is an entity name and the table_side_inputs is a table name to load.

In the code below, BigQueryBatchFileLoads takes callable arguments for destination and table_side_inputs. The destination argument takes a lambda function which calls the Python Dictionary created in convert function before. The table_side_inputs argument takes the Dictionary wrapped in PCollection. This Dictionary has key-value pairs of entity names and table names. By referring to this Dictionary, BigQueryBatchFileLoads can specify the table names to upload entities. The schema argument takes ‘SCHEMA_AUTODETECT’. This option enables BigQueryBatchFileLoads to decide the schema of a table automatically.

table_names_dict = beam.pvalue.AsDict(
p | "Get BigQuery Table Map" >> GetBqTableMap(project_id, options.dataset)
)
'BigQuery Load' >> BigQueryBatchFileLoads(destination=lambda row, table_dict: table_dict[row["__key__"]["kind"]],
custom_gcs_temp_location=gcs_dir,
write_disposition='WRITE_TRUNCATE',
table_side_inputs=(table_names_dict,),
additional_bq_parameters=get_partition_conf,
schema='SCHEMA_AUTODETECT')
class GetBqTableMap(PTransform):
def __init__(self, project_id, dataset_opt):
self.project_id = project_id
self.dataset_opt = dataset_opt

def expand(self, pbegin):
from google.cloud import datastore
from apache_beam.transforms import Impulse
query = datastore.Client(self.project_id).query(kind='__kind__')
query.keys_only()
kinds = [entity.key.id_or_name for entity in query.fetch()]
return (pbegin
| Impulse()
| beam.FlatMap(lambda _: [(kind, "{}.{}".format(self.dataset_opt.get(), kind)) for kind in kinds])
)

Execute the pipeline

Once the job is created in Dataflow UI, it will look like this.

The pipeline to transfer all entities

Pros and Cons

To summaries, I explain the pros and cons of this transfer way.

Pros

  • Scalability

One of the most essential features of Dataflow is scalability. So Dataflow can transfer the entities efficiently, even if the data size is enormous.

Parallelization and Distribution. Dataflow automatically partitions your data and distributes your worker code to Compute Engine instances for parallel processing.

  • Connection to a subsequent process

At times, we may want to execute a subsequent process after this transfer succeeds. In this case, this way enables you to realize it by adding a task at the end of the pipeline as the Beam Model behind Dataflow includes MapReduce.

Cons

  • It’s not a backup process

This transfer process explained in this article is not a backup process because it’s not easy to import the data transferred to GCS into Datastore. A metadata file created by export operation can be imported to Datastore from the Datastore Import page in the Google Cloud Console according to this link, but the JSON format file can not be imported.

If you want to transfer entities of Datastore into BigQuery as a backup, ds2bq would be a good solution.

I hope this article helps your work.

Thank you for reading!

--

--