Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

How to do distributed processing of Landsat data in Python

Wednesday, November 9, 2016

Posted by Lak Lakshmanan, Tech Lead for Data and ML Professional Services and Matt Hancher, Tech Lead for Google Earth Engine

One common data analysis task across the agricultural industry, as well as in academia and government (for drought studies, climate modeling, etc.), is to create a monthly vegetation index from Landsat images, now available as a public dataset on Google Cloud Platform (source of Landsat images: U.S. Geological Survey). One approach to create such a monthly vegetation index is to write a data processing script that does the following:

  1. Find all the Landsat images that cover the location in question
  2. Find the least-cloudy image for each month, making sure to correctly handle months when the study area needs multiple Landsat images for complete coverage
  3. For each image, compute the normalized difference between bands 3 and 4 or 4 and 5 depending on the type of spacecraft —  this is the vegetation index known as NDVI.

If you were the data analyst or engineer assigned to this task, you could code this workflow as a Python script, but parallelizing such a script to run on many machines in a fault-tolerant way is quite hard. Wouldn't it be great if there were a framework that would distribute this script onto many machines without you having to manage any clusters or retries? That’s exactly what Google Cloud Dataflow (which has an open source API in the form of an incubating Apache project called Apache Beam) can do.

Processing Landsat data at scale using Google Cloud Dataflow

Cloud Dataflow provides a fully-managed, autoscaling, serverless execution environment for data pipelines written in Apache Beam. While it excels as a unified batch and stream-processing framework and as an Apache Pig/Apache Spark replacement for doing data processing jobs without having to spin up (and manage) Apache Hadoop clusters, you can also use Cloud Dataflow to parallelize your scripting code across many machines, helping to bring speed, reliability and scalability to the process.

In the remainder of this post, we’ll explain how it’s done. Although the use case involved is fairly specific, the same principles will apply to many other workloads across different industries.

Main script

The Python code is quite straightforward. The Landsat dataset on Google Cloud Platform (GCP) includes an index file that's read line-by-line using beam.Read on a TextFileSource:

scenes = (p
      | 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
      | 'filter_scenes' >> beam.FlatMap(lambda line: filterScenes(line, lat, lon) )
      | 'least_cloudy' >> beam.CombinePerKey(clearest)
   )

The results from beam.Read are piped to the filterScenes method, which finds scenes that cover the lat/lon in which we're interested. We'll get multiple scenes because the spacecraft makes 2-3 passes over the area every month, so we next find the clearest image for each month.

The information about those scenes is written to an output file, and the vegetation index is computed for each scene:
 # write out info about scene
   scenes | beam.Map(lambda (yrmon, scene): scene.__dict__) | 'scene_info' >> beam.io.textio.WriteToText(output_file)

   # compute ndvi on scene
   scenes | 'compute_ndvi' >> beam.Map(lambda (yrmon, scene): ndvi.computeNdvi(scene.BASE_URL, 'gs://cloud-training-demos/landsat/'))

The code above also illustrates how you can pipe the result of a transform (i.e. scenes) to two places in Cloud Dataflow. The end result, over Reunion Island (in the Indian Ocean), looks like this:

There are three methods being invoked from the pipeline above: filterScenes, clearest, and computeNdvi. Let’s look at these methods next.

Transforms

The filterScenes method verifies that the scene contains the (lat, lon) in question.
def filterScenes(line, lat, lon):
   scene = SceneInfo(line)
   if scene.contains(lat, lon):
      yrmon = '{0}-{1}'.format(scene.DATE_ACQUIRED.year, scene.DATE_ACQUIRED.month)
      yield (yrmon, scene)

The SceneInfo class parses a single line of the comma-separated-values file and stores the values. If the scene meets the spatial criterion, the method returns a 2-tuple of the year-month and the scene. Note that filterScenes is a generator method. (In general, Map in Python Dataflow corresponds to lambda functions with one return value per input, and FlatMap corresponds to generator functions that are not one-to-one.)

The method clearest takes a list of scenes and finds the one with minimum cloud cover:

def clearest(scenes):
   if scenes:
      return min(scenes, key=lambda s: s.CLOUD_COVER)
   else:
      return None
computeNdvi consists of downloading the appropriate images, computing the normalized difference and uploading the result to Google Cloud Storage. (See the github repo for full code.)

Landsat over an area

If we're interested in a single point, it's sufficient to simply look for Landsat images that cover that one point (assuming that we're careful to disregard areas within the bounding box that were not actually scanned by the spacecraft). Here, though, we're interested in coverage of an area. This makes the problem a little more difficult since individual Landsat scenes may cover only part of the island:

In our workflow, therefore, we need to look for all Landsat images that cover any part of the island. Then, for each unique path and row of the spacecraft, we should find the clearest image during the month. These images can then be mosaiced to form the Landsat image over the island.

Fortunately, Python (and Cloud Dataflow) are sufficiently expressive to handle this task with ease. Here’s a more sophisticated version of the above script:
   lat =-21.1; lon = 55.50     # center of Reunion Island
   dlat = 0.4; dlon = 0.4

   # Read the index file and find all scenes that cover this area
   allscenes = (p
      | 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
      | 'to_scene' >> beam.Map(lambda line:  SceneInfo(line))
      | 'by_area' >> beam.FlatMap(lambda scene: filterByArea(scene,lat+dlat,lon-dlon,lat-dlat,lon+dlon) )
   )

   # for each month and spacecraft-coverage-pattern (given by the path and row), find clearest scene
   scenes = (allscenes
      | 'cov_month' >> beam.Map(lambda scene: (scene.month_path_row(), scene))
      | 'least_cloudy' >> beam.CombinePerKey(clearest)
      | 'yrmon-scene' >> beam.Map(lambda (key,scene): (scene.yrmon(), scene))
   )
The new functions in the above snippet deal with the intersection of bounding boxes and creating appropriate keys:
class SceneInfo:
   # as before ...


   def intersects(self, slat, wlon, nlat, elon):
      return (nlat > self.SOUTH_LAT) and (slat < self.NORTH_LAT) and (elon > self.WEST_LON) and (wlon < self.EAST_LON)

   def month_path_row(self):
      return '{}-{}-{}'.format(self.yrmon(), self.WRS_PATH, self.WRS_ROW)

   def yrmon(self):
      return '{}-{:02d}'.format(self.DATE_ACQUIRED.year, self.DATE_ACQUIRED.month)


def filterByArea(scene, slat, wlon, nlat, elon):
   if scene.intersects(slat, wlon, nlat, elon):
      yield scene

Parallel, distributed execution

You can run the above code locally as long as you have Python packages for GDAL (needed to read Landsat) and Cloud Dataflow installed; simply run the dfndvi.py script. Alternately, spin up a Google Compute Engine instance, install the above Python packages using install_packages.sh and run dfndvi.py. Doing that for 2015 and visualizing the resulting images as a movie shows changes in vegetation across the year:

One of the key benefits of Cloud Dataflow is not having to run it locally or on a single machine — rather, you can run it at scale on many machines to get the job completed quickly. To do that, change the pipeline runner from DirectPipelineRunner to DataflowPipelineRunner and supply your GCP project id. The run_oncloud.sh script specifies the necessary command-line parameters to lob off the computation to Google Cloud:
./dfndvi.py \
    --project=$PROJECT \
    --runner=DataflowPipelineRunner \
    --staging_location=gs://$BUCKET/landsat/staging \
    --temp_location=gs://$BUCKET/landsat/staging \
    --index_file=gs://gcp-public-data-landsat/index.csv.gz \
    --max_num_workers=10 \
    --output_file=gs://$BUCKET/landsat/output/scenes.txt \
    --output_dir=gs://$BUCKET/landsat/output \
    --job_name=monthly-landsat \
    --save_main_session \
    --setup_file=./setup.py

Running the script in distributed fashion with Cloud Dataflow also brings orchestration and failure handling to the process. If one of the workers fails, for example, a replacement worker will take over its task. If a step is taking too long, more workers will automatically be provisioned. In addition, the GCP web console allows you to monitor the task as it executes, showing progress and number of elements processed at each step:

Conclusion

In this post, you learned how to process the public Landsat dataset on Cloud Storage in a distributed manner, the concepts of which can be applied to many other use cases. The key ingredient here was Cloud Dataflow, which lets you distribute your Python scripts over lots of machines without having to configure or manage servers.

Happy coding!

Next steps:
  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.

TRY IT FREE