Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

Using Apache Beam and Cloud Dataflow to integrate SAP HANA and BigQuery

Monday, November 13, 2017

By Babu Prasad, Technical Lead, Big Data and Mark Shalda, Technical Program Manager

SAP HANA is an in-memory columnar database that you can use either as a persistence layer for applications in the SAP ecosystem, or as an independent enterprise database.

BigQuery is Google's fully managed, petabyte scale, low-cost enterprise data warehouse for analytics. BigQuery is serverless, so you won’t need to manage any infrastructure, and you don't need a database administrator, so you can focus on analyzing data to find meaningful insights using familiar SQL. BigQuery can scan a terabyte in seconds and a petabyte in minutes. With BigQuery, you can easily scale your database size by several orders of magnitude with little administrative or operational overhead.

In this blog post, we will talk about leveraging both SAP HANA and BigQuery for your analytics needs. This integration can help you address cost optimization, high throughput performance, and low maintenance needs for your application.

Dataflow for data integration

Google Cloud Dataflow makes it easy to integrate SAP HANA with BigQuery.

Google Cloud Dataflow is a fully-managed service for transforming and enriching data in stream (real time) and batch (historical) modes with equal reliability and expressiveness -- no more complex workarounds or compromises needed. And with its serverless approach to resource provisioning and management, you have access to virtually limitless capacity to help solve your biggest data processing challenges, while paying only for what you use.

Cloud Dataflow supports fast, simplified pipeline development via expressive Java and Python APIs in the Apache Beam SDK, which provides a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. Plus, Beam’s unique, unified development model lets you reuse more code across streaming and batch pipelines.

Apache Beam SDKs provide a JDBC implementation to read and write data from data sources. This implementation can be used to connect to and read data off of SAP HANA. In this blog, we will demonstrate code that will read data and process the data read from SAP HANA  using Google Cloud Dataflow engine and write to Google BigQuery. The code can also be customized to incrementally extract data using a timestamp column in the SAP HANA table by providing a WHERE clause (see below) in the SQL definition.

Scheduling the pipeline

We have created a process that can extract data from SAP HANA and copy it over to Google BigQuery. Once you have tested that this job runs successfully, you might automate this by scheduling it so you do not have to run it manually every time.

You can use App Engine Cron Service to achieve this. App Engine Cron Service allows you to configure and run cron jobs at regular intervals. These cron jobs are a little different from regular Linux cron jobs in that they cannot run any script or command. They can only invoke a URL defined as part of your App Engine app via HTTP GET. In return, you don’t have to worry about how or where the cron job is running. App Engine infrastructure ensures your cron job runs at your preferred, intended interval.

You should refer to this blog for additional details on how to configure a servlet and deploy it on App Engine.

Code snippets and implementation

Now that we have conceptually defined a few useful pipelines, let’s walk through the code required to build the integration.

The following code uses the built-in Apache Beam JdbcIO source class to read in data from SAP Hana. In order to do this, we had to provide the ngdbc.jar file from the SAP Hana client download library as our jdbc driver. We used the following command to install it with maven:

mvn install:install-file -Dfile=ngdbc.jar -DgroupId=sap -DartifactId=sap-hana-jdbc -Dversion=1.0 -Dpackaging=jar -DgeneratePom=true

And then used the following maven dependency in our pom.xml file:

   <dependency>
     <groupId>sap</groupId>
     <artifactId>sap-hana-jdbc</artifactId>
     <version>1.0</version>
   </dependency>

Once this was setup, we could use the necessary com.sap.db.jdbc.Driver class to access the SAP Hana instance from our Beam Java pipeline. Our implementation used a parameterized query and a row mapper function. The row mapper allows us to prepare our data to be serialized and used throughout our pipeline. In this case, we are using a custom AutoValue class to package up the column values to be used later when creating our entry into BigQuery.

The parameterized query allows us to use a parameter setter to create multiple queries to chunk our data read operation so that we can scale in a more efficient manner. 

return input
    .apply("Hana JDBC IO", JdbcIO.<String, DBRow>readAll()
        .withCoder(SerializableCoder.of(DBRow.class))
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                this.driver, this.connectionString)
            .withUsername(this.username)
            .withPassword(this.password))
        .withQuery(this.query)
        .withRowMapper(new JdbcIO.RowMapper<DBRow>() {
            public DBRow mapRow(ResultSet resultSet) throws Exception {
              ResultSetMetaData meta = resultSet.getMetaData();
              int columnCount = meta.getColumnCount();
              List<Object> values = new ArrayList<Object>();
              for (int column = 1; column <= columnCount; ++column)
              {
                String name = HanaToDBRow.this.columnNames.get(column - 1);
                Object value = resultSet.getObject(name);
                values.add(value);
              }
              return DBRow.create(values);
            }
          })
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<String>() {
          @Override
          public void setParameters(String element, PreparedStatement statement)
            throws Exception {
            String[] startAndStop = element.split(",");
            statement.setString(1, startAndStop[0]);
            statement.setString(2, startAndStop[1]);
          }
        }));

We then make whichever Beam transformations we want on our data, but in this case we will simply take our custom DBRow object and convert it to a BigQuery TableRow object that can be used by the BigQueryIO sink in Beam. So for that, we create a custom DoFn which will extract each data field in the correct order and put them in the BigQuery TableRow object and then emit the result.

public class HanaDBRowToTableRowFn extends DoFn<DBRow, TableRow> {
…

 public void processElement(ProcessContext c) {
    DBRow data = c.element();
    List<String> columnNames = c.sideInput(columnNamesCollection);
    List<Object> fields = data.fields();
    TableRow row = new TableRow();    
    for(int i = 0; i < fields.size(); i++)
    {
      Object fieldData = fields.get(i);
      String columnName = columnNames.get(i);
      if(fieldData == null)
        continue;
      String sFieldData = fieldData.toString();
      if(!sFieldData.toLowerCase().equals("null"))
        row.put(columnName, sFieldData);
    }
    c.output(row);
  }
}

Now we can create our pipeline using our new PTransform and DoFn. We first manually create a PCollection of Strings that indicate how we will chunk up our input and then apply our PTransform and DoFn in order. Lastly, we utilize the BigQueryIO class to write our PCollection of TableRows.

   pipeline
     .apply(Create.of(chunkSpots)).setCoder(StringUtf8Coder.of())
     .apply(new HanaToDBRow.Reader(options.getDriver(), options.getConnectionString())
        .withUsername(options.getUsername())
        .withPassword(options.getPassword())
        .withQuery(query)
        .withColumnNames(columnNames)
        .create())
     .apply("Hana row to BQ TableRow",ParDo.of(new HanaDBRowToTableRowFn(columnNamesCollection))
         .withSideInputs(columnNamesCollection))
     .apply(BigQueryIO.writeTableRows().to(options.getDestDataset() + "." + options.getTableName())
         .withSchema(bqHanaSchema)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
    pipeline.run();

We can also connect manually to Hana outside of the pipeline operation to grab our table schema so that we could provide this to BigQuery in case it was needed.

    Statement stmt = connection.createStatement();
     
    ResultSet resultSet = stmt.executeQuery("SELECT COLUMN_NAME,DATA_TYPE_NAME FROM TABLE_COLUMNS WHERE TABLE_NAME = '" + options.getTableName() + "'");
    Map<String, String> columns = new HashMap<String,String>();
    while(resultSet.next())
    {
      String col_name = resultSet.getString("COLUMN_NAME");
      String type = resultSet.getString("DATA_TYPE_NAME");
      columns.put(col_name, type);
    }
    TableSchema schema = new TableSchema();
    List<TableFieldSchema> tableFieldSchema = new ArrayList<TableFieldSchema>();
    for(String col_name : columns.keySet())
    {
      TableFieldSchema schemaEntry = new TableFieldSchema();
      schemaEntry.setName(col_name);
      String type = columns.get(col_name);
      if(hanaToBqTypeMap.containsKey(type))
        schemaEntry.setType(hanaToBqTypeMap.get(type));
      else
        schemaEntry.setType("STRING");
      tableFieldSchema.add(schemaEntry);
    }
    schema.setFields(tableFieldSchema); 
    return schema;

The full and most up to date version of the code is available from a public git repository here.

Final output

To run our pipeline, we use Maven to automate our build and execution. We pass some parameters to our program which specify some information regarding how and which table to access from our SAP Hana instance and where in BigQuery we should store the data when transferred. Since we are going to use the Cloud Dataflow execution engine to run our Apache Beam code, we also need to pass in some additional parameters which tell the Dataflow Runner class where it can store files for staging and temporary purposes and what project it can use to run.

$ mvn compile exec:java -Dexec.mainClass=connectors.HanaToBQ
-Dexec.args="--tempLocation=gs://my-gcs-bucket/temp/ 
--stagingLocation=gs://my-gcs-bucket/temp/staging/ 
--project=my-sap-project 
--runner=DataflowRunner 
--connectionString=jdbc:sap://x.x.x.x:30015/?databaseName=x 
--tableName=WIKIPEDIA --username=x --password=x --destDataset=my_bigquery_dataset 
--timestampColumn=timestamp --startTime=1254355200 --endTime=1257033599"
…
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 92 files. Enable logging at DEBUG level to see which files will be staged.
Nov 02, 2017 9:55:37 PM connectors.HanaToBQ main
INFO: SELECT * FROM WIKIPEDIA WHERE timestamp >= 1254355200 AND timestamp < 1257033599
Nov 02, 2017 9:55:39 PM org.apache.beam.runners.dataflow.DataflowRunner run
…
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/sap-certification-env/dataflow/job/2017-11-02_14_55_41-18137875620858588709
Submitted job: 2017-11-02_14_55_41-18137875620858588709
Nov 02, 2017 9:55:42 PM org.apache.beam.runners.dataflow.DataflowRunner run2017-11-02_14_55_41-18137875620858588709
…
[INFO] Total time: 12.965 s
[INFO] Finished at: 2017-11-02T21:55:42+00:00
[INFO] Final Memory: 39M/95M

We’ve omitted a lot of the maven details, but you can see that the program logs the simple SQL query it will run to pull data from Hana, along with some information about how to check the progress of our pipeline in our GCP Console and the execution time to deploy the pipeline.

Following that link brings us to the Cloud Dataflow UI, which gives us a visual display of our pipeline, plus useful information about its execution.

Figure 1: PIpeline job details including auto-scaling timeline

Figure 2: Graphical pipeline overview and step details

The screenshots from the UI were taken after the pipeline finished executing. From it, we can gather a lot of information like execution time, worker scaling, size of the data in and out of the pipeline and other helpful metrics.

There is a lot more work to be done to make this pipeline robust and performant but this gives an initial look at how a tool like Apache Beam and Cloud Dataflow can be used to create a cost effective and usable data environment.

Caveats

The code provided here allows you to extract data from SAP HANA and does not throttle the reads from SAP HANA. You should first test this code on your development environment and understand the impact. If you choose to run this on your production database, you should run it during periods of low activity to avoid any regression.

You should work with SAP to understand if there are any licensing implications to extracting data from SAP HANA into a third party data store like Google BigQuery.

Currently the pipeline requires a timestamp column (or really any sort of numeric value) and a start value and end value so that you can extract a portion of your data. We do envision that this will be a major use case, but also given that JdbcIO’s does not parallelize the query, any query of significant size could cause a single worker to go out of memory. Future work on the repository will aim to accept just a column that can be used to partition the data into chunks that can be parallelized across multiple workers to read in larger sets of data.

  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.

TRY IT FREE

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.