common module¶
This module contains some common functions for both folium and ipyleaflet.
The_national_map_USGS
¶
The national map is a collection of topological datasets, maintained by the USGS.
It provides an API endpoint which can be used to find downloadable links for the products offered. - Full description of datasets available can retrieved. This consists of metadata such as detail description and publication dates. - A wide range of dataformats are available
This class is a tiny wrapper to find and download files using the API.
More complete documentation for the API can be found at https://apps.nationalmap.gov/tnmaccess/#/
Source code in leafmap/common.py
class The_national_map_USGS:
"""
The national map is a collection of topological datasets, maintained by the USGS.
It provides an API endpoint which can be used to find downloadable links for the products offered.
- Full description of datasets available can retrieved.
This consists of metadata such as detail description and publication dates.
- A wide range of dataformats are available
This class is a tiny wrapper to find and download files using the API.
More complete documentation for the API can be found at
https://apps.nationalmap.gov/tnmaccess/#/
"""
def __init__(self):
self.api_endpoint = r"https://tnmaccess.nationalmap.gov/api/v1/"
self.DS = self.datasets_full
@property
def datasets_full(self) -> list:
"""
Full description of datasets provided.
Returns a JSON or empty list.
"""
link = f"{self.api_endpoint}datasets?"
try:
return requests.get(link).json()
except Exception:
print(f"Failed to load metadata from The National Map API endpoint\n{link}")
return []
@property
def prodFormats(self) -> list:
"""
Return all datatypes available in any of the collections.
Note that "All" is only peculiar to one dataset.
"""
return set(i["displayName"] for ds in self.DS for i in ds["formats"])
@property
def datasets(self) -> list:
"""
Returns a list of dataset tags (most common human readable self description for specific datasets).
"""
return set(y["sbDatasetTag"] for x in self.DS for y in x["tags"])
def parse_region(self, region, geopandas_args={}) -> list:
"""
Translate a Vector dataset to its bounding box.
Args:
region (str | list): an URL|filepath to a vector dataset to a polygon
geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
"""
import geopandas as gpd
if isinstance(region, str):
if region.startswith("http"):
region = github_raw_url(region)
region = download_file(region)
elif not os.path.exists(region):
raise ValueError("region must be a path or a URL to a vector dataset.")
roi = gpd.read_file(region, **geopandas_args)
roi = roi.to_crs(epsg=4326)
return roi.total_bounds
return region
def download_tiles(
self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
) -> None:
"""
Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API. Defaults to {}.
Returns:
None
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if out_dir is None:
out_dir = os.getcwd()
else:
out_dir = os.path.abspath(out_dir)
tiles = self.find_tiles(
region, return_type="list", geopandas_args=geopandas_args, API=API
)
T = len(tiles)
errors = 0
done = 0
for i, link in enumerate(tiles):
file_name = os.path.basename(link)
out_name = os.path.join(out_dir, file_name)
if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
print(f"Downloading {i+1} of {T}: {file_name}")
try:
download_file(link, out_name, **download_args)
done += 1
except KeyboardInterrupt:
print("Cancelled download")
break
except Exception:
errors += 1
print(f"Failed to download {i+1} of {T}: {file_name}")
print(
f"{done} Downloads completed, {errors} downloads failed, {T} files available"
)
return
def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
"""
Find a list of downloadable files.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_type (str): list | dict. Defaults to list. Changes the return output type and content.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API parameters. Defaults to {}.
Returns:
list: A list of download_urls.
dict: A dictionary with urls and related metadata
"""
assert region or API, "Provide a region or use the API"
if region:
API["bbox"] = self.parse_region(region, geopandas_args)
results = self.find_details(**API)
if return_type == "list":
return [i["downloadURL"] for i in results.get("items")]
return results
def find_details(
self,
bbox: List[float] = None,
polygon: List[Tuple[float, float]] = None,
datasets: str = None,
prodFormats: str = None,
prodExtents: str = None,
q: str = None,
dateType: str = None,
start: str = None,
end: str = None,
offset: int = 0,
max: int = None,
outputFormat: str = "JSON",
polyType: str = None,
polyCode: str = None,
extentQuery: int = None,
) -> Dict:
"""
Possible search parameters (kwargs) support by API
Parameter Values
Description
---------------------------------------------------------------------------------------------------
bbox 'minx, miny, maxx, maxy'
Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list.
polygon '[x,y x,y x,y x,y x,y]'
Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
datasets See: Datasets (Optional)
Dataset tag name (sbDatasetTag)
From https://apps.nationalmap.gov/tnmaccess/#/product
prodFormats See: Product Formats (Optional)
Dataset-specific format
prodExtents See: Product Extents (Optional)
Dataset-specific extent
q free text
Text input which can be used to filter by product titles and text descriptions.
dateType dateCreated | lastUpdated | Publication
Type of date to search by.
start 'YYYY-MM-DD'
Start date
end 'YYYY-MM-DD'
End date (required if start date is provided)
offset integer
Offset into paginated results - default=0
max integer
Number of results returned
outputFormat JSON | CSV | pjson
Default=JSON
polyType state | huc2 | huc4 | huc8
Well Known Polygon Type. Use this parameter to deliver data by state or HUC
(hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
polyCode state FIPS code or huc number
Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
extentQuery integer
A Polygon code in the science base system, typically from an uploaded shapefile
"""
try:
# call locals before creating new locals
used_locals = {k: v for k, v in locals().items() if v and k != "self"}
# Parsing
if polygon:
used_locals["polygon"] = ",".join(
" ".join(map(str, point)) for point in polygon
)
if bbox:
used_locals["bbox"] = str(bbox)[1:-1]
if max:
max += 2
# Fetch response
response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
if response.status_code // 100 == 2:
return response.json()
else:
# Parameter validation handled by API endpoint error responses
print(response.json())
return {}
except Exception as e:
print(e)
return {}
datasets: list
property
readonly
¶
Returns a list of dataset tags (most common human readable self description for specific datasets).
datasets_full: list
property
readonly
¶
Full description of datasets provided. Returns a JSON or empty list.
prodFormats: list
property
readonly
¶
Return all datatypes available in any of the collections. Note that "All" is only peculiar to one dataset.
download_tiles(self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def download_tiles(
self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
) -> None:
"""
Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API. Defaults to {}.
Returns:
None
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if out_dir is None:
out_dir = os.getcwd()
else:
out_dir = os.path.abspath(out_dir)
tiles = self.find_tiles(
region, return_type="list", geopandas_args=geopandas_args, API=API
)
T = len(tiles)
errors = 0
done = 0
for i, link in enumerate(tiles):
file_name = os.path.basename(link)
out_name = os.path.join(out_dir, file_name)
if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
print(f"Downloading {i+1} of {T}: {file_name}")
try:
download_file(link, out_name, **download_args)
done += 1
except KeyboardInterrupt:
print("Cancelled download")
break
except Exception:
errors += 1
print(f"Failed to download {i+1} of {T}: {file_name}")
print(
f"{done} Downloads completed, {errors} downloads failed, {T} files available"
)
return
find_details(self, bbox=None, polygon=None, datasets=None, prodFormats=None, prodExtents=None, q=None, dateType=None, start=None, end=None, offset=0, max=None, outputFormat='JSON', polyType=None, polyCode=None, extentQuery=None)
¶
Possible search parameters (kwargs) support by API
Parameter Values Description
bbox 'minx, miny, maxx, maxy' Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list. polygon '[x,y x,y x,y x,y x,y]' Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list. datasets See: Datasets (Optional) Dataset tag name (sbDatasetTag) From https://apps.nationalmap.gov/tnmaccess/#/product prodFormats See: Product Formats (Optional) Dataset-specific format
prodExtents See: Product Extents (Optional) Dataset-specific extent q free text Text input which can be used to filter by product titles and text descriptions. dateType dateCreated | lastUpdated | Publication Type of date to search by. start 'YYYY-MM-DD' Start date end 'YYYY-MM-DD' End date (required if start date is provided) offset integer Offset into paginated results - default=0 max integer Number of results returned outputFormat JSON | CSV | pjson Default=JSON polyType state | huc2 | huc4 | huc8 Well Known Polygon Type. Use this parameter to deliver data by state or HUC (hydrologic unit codes defined by the Watershed Boundary Dataset/WBD) polyCode state FIPS code or huc number Well Known Polygon Code. This value needs to coordinate with the polyType parameter. extentQuery integer A Polygon code in the science base system, typically from an uploaded shapefile
Source code in leafmap/common.py
def find_details(
self,
bbox: List[float] = None,
polygon: List[Tuple[float, float]] = None,
datasets: str = None,
prodFormats: str = None,
prodExtents: str = None,
q: str = None,
dateType: str = None,
start: str = None,
end: str = None,
offset: int = 0,
max: int = None,
outputFormat: str = "JSON",
polyType: str = None,
polyCode: str = None,
extentQuery: int = None,
) -> Dict:
"""
Possible search parameters (kwargs) support by API
Parameter Values
Description
---------------------------------------------------------------------------------------------------
bbox 'minx, miny, maxx, maxy'
Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list.
polygon '[x,y x,y x,y x,y x,y]'
Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
datasets See: Datasets (Optional)
Dataset tag name (sbDatasetTag)
From https://apps.nationalmap.gov/tnmaccess/#/product
prodFormats See: Product Formats (Optional)
Dataset-specific format
prodExtents See: Product Extents (Optional)
Dataset-specific extent
q free text
Text input which can be used to filter by product titles and text descriptions.
dateType dateCreated | lastUpdated | Publication
Type of date to search by.
start 'YYYY-MM-DD'
Start date
end 'YYYY-MM-DD'
End date (required if start date is provided)
offset integer
Offset into paginated results - default=0
max integer
Number of results returned
outputFormat JSON | CSV | pjson
Default=JSON
polyType state | huc2 | huc4 | huc8
Well Known Polygon Type. Use this parameter to deliver data by state or HUC
(hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
polyCode state FIPS code or huc number
Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
extentQuery integer
A Polygon code in the science base system, typically from an uploaded shapefile
"""
try:
# call locals before creating new locals
used_locals = {k: v for k, v in locals().items() if v and k != "self"}
# Parsing
if polygon:
used_locals["polygon"] = ",".join(
" ".join(map(str, point)) for point in polygon
)
if bbox:
used_locals["bbox"] = str(bbox)[1:-1]
if max:
max += 2
# Fetch response
response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
if response.status_code // 100 == 2:
return response.json()
else:
# Parameter validation handled by API endpoint error responses
print(response.json())
return {}
except Exception as e:
print(e)
return {}
find_tiles(self, region=None, return_type='list', geopandas_args={}, API={})
¶
Find a list of downloadable files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
required |
return_type |
str |
list | dict. Defaults to list. Changes the return output type and content. |
'list' |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API parameters. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
list |
A list of download_urls. dict: A dictionary with urls and related metadata |
Source code in leafmap/common.py
def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
"""
Find a list of downloadable files.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_type (str): list | dict. Defaults to list. Changes the return output type and content.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API parameters. Defaults to {}.
Returns:
list: A list of download_urls.
dict: A dictionary with urls and related metadata
"""
assert region or API, "Provide a region or use the API"
if region:
API["bbox"] = self.parse_region(region, geopandas_args)
results = self.find_details(**API)
if return_type == "list":
return [i["downloadURL"] for i in results.get("items")]
return results
parse_region(self, region, geopandas_args={})
¶
Translate a Vector dataset to its bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
an URL|filepath to a vector dataset to a polygon |
required |
geopandas_reader_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
required |
Source code in leafmap/common.py
def parse_region(self, region, geopandas_args={}) -> list:
"""
Translate a Vector dataset to its bounding box.
Args:
region (str | list): an URL|filepath to a vector dataset to a polygon
geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
"""
import geopandas as gpd
if isinstance(region, str):
if region.startswith("http"):
region = github_raw_url(region)
region = download_file(region)
elif not os.path.exists(region):
raise ValueError("region must be a path or a URL to a vector dataset.")
roi = gpd.read_file(region, **geopandas_args)
roi = roi.to_crs(epsg=4326)
return roi.total_bounds
return region
WhiteboxTools (WhiteboxTools)
¶
This class inherits the whitebox WhiteboxTools class.
Source code in leafmap/common.py
class WhiteboxTools(whitebox.WhiteboxTools):
"""This class inherits the whitebox WhiteboxTools class."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
__install_from_github(url)
¶
Install a package from a GitHub repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the GitHub repository. |
required |
Source code in leafmap/common.py
def __install_from_github(url: str) -> None:
"""Install a package from a GitHub repository.
Args:
url (str): The URL of the GitHub repository.
"""
try:
download_dir = os.path.join(os.path.expanduser("~"), "Downloads")
if not os.path.exists(download_dir):
os.makedirs(download_dir)
repo_name = os.path.basename(url)
zip_url = os.path.join(url, "archive/master.zip")
filename = repo_name + "-master.zip"
download_from_url(
url=zip_url, out_file_name=filename, out_dir=download_dir, unzip=True
)
pkg_dir = os.path.join(download_dir, repo_name + "-master")
pkg_name = os.path.basename(url)
work_dir = os.getcwd()
os.chdir(pkg_dir)
print("Installing {}...".format(pkg_name))
cmd = "pip install ."
os.system(cmd)
os.chdir(work_dir)
print("{} has been installed successfully.".format(pkg_name))
# print("\nPlease comment out 'install_from_github()' and restart the kernel to take effect:\nJupyter menu -> Kernel -> Restart & Clear Output")
except Exception as e:
raise Exception(e)
add_crs(filename, epsg)
¶
Add a CRS to a raster dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The filename of the raster dataset. |
required |
epsg |
int | str |
The EPSG code of the CRS. |
required |
Source code in leafmap/common.py
def add_crs(filename, epsg):
"""Add a CRS to a raster dataset.
Args:
filename (str): The filename of the raster dataset.
epsg (int | str): The EPSG code of the CRS.
"""
try:
import rasterio
except ImportError:
raise ImportError(
"rasterio is required for adding a CRS to a raster. Please install it using 'pip install rasterio'."
)
if not os.path.exists(filename):
raise ValueError("filename must exist.")
if isinstance(epsg, int):
epsg = f"EPSG:{epsg}"
elif isinstance(epsg, str):
epsg = "EPSG:" + epsg
else:
raise ValueError("epsg must be an integer or string.")
crs = rasterio.crs.CRS({"init": epsg})
with rasterio.open(filename, mode="r+") as src:
src.crs = crs
add_image_to_gif(in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False)
¶
Adds an image logo to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
Input file path to the GIF image. |
required |
out_gif |
str |
Output file path to the GIF image. |
required |
in_image |
str |
Input file path to the image. |
required |
xy |
tuple |
Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None. |
None |
image_size |
tuple |
Resize image. Defaults to (80, 80). |
(80, 80) |
circle_mask |
bool |
Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False. |
False |
Source code in leafmap/common.py
def add_image_to_gif(
in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False
):
"""Adds an image logo to a GIF image.
Args:
in_gif (str): Input file path to the GIF image.
out_gif (str): Output file path to the GIF image.
in_image (str): Input file path to the image.
xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
image_size (tuple, optional): Resize image. Defaults to (80, 80).
circle_mask (bool, optional): Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False.
"""
import io
from PIL import Image, ImageDraw, ImageSequence
warnings.simplefilter("ignore")
in_gif = os.path.abspath(in_gif)
is_url = False
if in_image.startswith("http"):
is_url = True
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if (not is_url) and (not os.path.exists(in_image)):
print("The provided logo file does not exist.")
return
out_dir = check_dir((os.path.dirname(out_gif)))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
try:
gif = Image.open(in_gif)
except Exception as e:
print("An error occurred while opening the image.")
print(e)
return
logo_raw_image = None
try:
if in_image.startswith("http"):
logo_raw_image = open_image_from_url(in_image)
else:
in_image = os.path.abspath(in_image)
logo_raw_image = Image.open(in_image)
except Exception as e:
print(e)
logo_raw_size = logo_raw_image.size
ratio = max(
logo_raw_size[0] / image_size[0],
logo_raw_size[1] / image_size[1],
)
image_resize = (int(logo_raw_size[0] / ratio), int(logo_raw_size[1] / ratio))
image_size = min(logo_raw_size[0], image_size[0]), min(
logo_raw_size[1], image_size[1]
)
logo_image = logo_raw_image.convert("RGBA")
logo_image.thumbnail(image_size, Image.ANTIALIAS)
gif_width, gif_height = gif.size
mask_im = None
if circle_mask:
mask_im = Image.new("L", image_size, 0)
draw = ImageDraw.Draw(mask_im)
draw.ellipse((0, 0, image_size[0], image_size[1]), fill=255)
if has_transparency(logo_raw_image):
mask_im = logo_image.copy()
if xy is None:
# default logo location is 5% width and 5% height of the image.
delta = 10
xy = (gif_width - image_resize[0] - delta, gif_height - image_resize[1] - delta)
# xy = (int(0.05 * gif_width), int(0.05 * gif_height))
elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
return
elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
x, y = xy
if (x > 0) and (x < gif_width) and (y > 0) and (y < gif_height):
pass
else:
print(
"xy is out of bounds. x must be within [0, {}], and y must be within [0, {}]".format(
gif_width, gif_height
)
)
return
elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
x, y = xy
if ("%" in x) and ("%" in y):
try:
x = int(float(x.replace("%", "")) / 100.0 * gif_width)
y = int(float(y.replace("%", "")) / 100.0 * gif_height)
xy = (x, y)
except Exception:
raise Exception(
"The specified xy is invalid. It must be formatted like this ('10%', '10%')"
)
else:
raise Exception(
"The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
)
try:
frames = []
for _, frame in enumerate(ImageSequence.Iterator(gif)):
frame = frame.convert("RGBA")
frame.paste(logo_image, xy, mask_im)
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
frames[0].save(out_gif, save_all=True, append_images=frames[1:])
except Exception as e:
print(e)
add_mask_to_image(image, mask, output, color='red')
¶
Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
A local path or HTTP URL to an image. |
required |
mask |
str |
A local path or HTTP URL to a binary mask. |
required |
output |
str |
A local path to the output image. |
required |
color |
str |
Color of the mask. Defaults to 'red'. |
'red' |
Exceptions:
Type | Description |
---|---|
ImportError |
If rasterio and detectron2 are not installed. |
Source code in leafmap/common.py
def add_mask_to_image(image, mask, output, color="red"):
"""Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.
Args:
image (str): A local path or HTTP URL to an image.
mask (str): A local path or HTTP URL to a binary mask.
output (str): A local path to the output image.
color (str, optional): Color of the mask. Defaults to 'red'.
Raises:
ImportError: If rasterio and detectron2 are not installed.
"""
try:
import rasterio
from detectron2.utils.visualizer import Visualizer
from PIL import Image
except ImportError:
raise ImportError(
"Please install rasterio and detectron2 to use this function. See https://detectron2.readthedocs.io/en/latest/tutorials/install.html"
)
ds = rasterio.open(image)
image_arr = ds.read()
mask_arr = rasterio.open(mask).read()
vis = Visualizer(image_arr.transpose((1, 2, 0)))
vis.draw_binary_mask(mask_arr[0] > 0, color=color)
out_arr = Image.fromarray(vis.get_output().get_image())
out_arr.save(output)
if ds.crs is not None:
numpy_to_cog(output, output, profile=image)
add_progress_bar_to_gif(in_gif, out_gif, progress_bar_color='blue', progress_bar_height=5, duration=100, loop=0)
¶
Adds a progress bar to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The file path to the input GIF image. |
required |
out_gif |
str |
The file path to the output GIF image. |
required |
progress_bar_color |
str |
Color for the progress bar. Defaults to 'white'. |
'blue' |
progress_bar_height |
int |
Height of the progress bar. Defaults to 5. |
5 |
duration |
int |
controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100. |
100 |
loop |
int |
controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0. |
0 |
Source code in leafmap/common.py
def add_progress_bar_to_gif(
in_gif,
out_gif,
progress_bar_color="blue",
progress_bar_height=5,
duration=100,
loop=0,
):
"""Adds a progress bar to a GIF image.
Args:
in_gif (str): The file path to the input GIF image.
out_gif (str): The file path to the output GIF image.
progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100.
loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.
"""
import io
from PIL import Image, ImageDraw, ImageSequence
warnings.simplefilter("ignore")
in_gif = os.path.abspath(in_gif)
out_gif = os.path.abspath(out_gif)
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if not os.path.exists(os.path.dirname(out_gif)):
os.makedirs(os.path.dirname(out_gif))
progress_bar_color = check_color(progress_bar_color)
try:
image = Image.open(in_gif)
except Exception as e:
raise Exception("An error occurred while opening the gif.")
count = image.n_frames
W, H = image.size
progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
progress_bar_shapes = [
[(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
]
try:
frames = []
# Loop over each frame in the animated image
for index, frame in enumerate(ImageSequence.Iterator(image)):
# Draw the text on the frame
frame = frame.convert("RGB")
draw = ImageDraw.Draw(frame)
# w, h = draw.textsize(text[index])
draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
del draw
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
# https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
# Save the frames as a new image
frames[0].save(
out_gif,
save_all=True,
append_images=frames[1:],
duration=duration,
loop=loop,
optimize=True,
)
except Exception as e:
raise Exception(e)
add_text_to_gif(in_gif, out_gif, xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='#000000', add_progress_bar=True, progress_bar_color='white', progress_bar_height=5, duration=100, loop=0)
¶
Adds animated text to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The file path to the input GIF image. |
required |
out_gif |
str |
The file path to the output GIF image. |
required |
xy |
tuple |
Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None. |
None |
text_sequence |
int, str, list |
Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None. |
None |
font_type |
str |
Font type. Defaults to "arial.ttf". |
'arial.ttf' |
font_size |
int |
Font size. Defaults to 20. |
20 |
font_color |
str |
Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff'). Defaults to '#000000'. |
'#000000' |
add_progress_bar |
bool |
Whether to add a progress bar at the bottom of the GIF. Defaults to True. |
True |
progress_bar_color |
str |
Color for the progress bar. Defaults to 'white'. |
'white' |
progress_bar_height |
int |
Height of the progress bar. Defaults to 5. |
5 |
duration |
int |
controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100. |
100 |
loop |
int |
controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0. |
0 |
Source code in leafmap/common.py
def add_text_to_gif(
in_gif,
out_gif,
xy=None,
text_sequence=None,
font_type="arial.ttf",
font_size=20,
font_color="#000000",
add_progress_bar=True,
progress_bar_color="white",
progress_bar_height=5,
duration=100,
loop=0,
):
"""Adds animated text to a GIF image.
Args:
in_gif (str): The file path to the input GIF image.
out_gif (str): The file path to the output GIF image.
xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
text_sequence (int, str, list, optional): Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None.
font_type (str, optional): Font type. Defaults to "arial.ttf".
font_size (int, optional): Font size. Defaults to 20.
font_color (str, optional): Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff'). Defaults to '#000000'.
add_progress_bar (bool, optional): Whether to add a progress bar at the bottom of the GIF. Defaults to True.
progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100.
loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.
"""
import io
import importlib.resources
from PIL import Image, ImageDraw, ImageFont, ImageSequence
warnings.simplefilter("ignore")
pkg_dir = os.path.dirname(importlib.resources.files("leafmap") / "leafmap.py")
default_font = os.path.join(pkg_dir, "data/fonts/arial.ttf")
in_gif = os.path.abspath(in_gif)
out_gif = os.path.abspath(out_gif)
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if not os.path.exists(os.path.dirname(out_gif)):
os.makedirs(os.path.dirname(out_gif))
if font_type == "arial.ttf":
font = ImageFont.truetype(default_font, font_size)
elif font_type == "alibaba.otf":
default_font = os.path.join(pkg_dir, "data/fonts/alibaba.otf")
font = ImageFont.truetype(default_font, font_size)
else:
try:
font_list = system_fonts(show_full_path=True)
font_names = [os.path.basename(f) for f in font_list]
if (font_type in font_list) or (font_type in font_names):
font = ImageFont.truetype(font_type, font_size)
else:
print(
"The specified font type could not be found on your system. Using the default font instead."
)
font = ImageFont.truetype(default_font, font_size)
except Exception as e:
print(e)
font = ImageFont.truetype(default_font, font_size)
color = check_color(font_color)
progress_bar_color = check_color(progress_bar_color)
try:
image = Image.open(in_gif)
except Exception as e:
print("An error occurred while opening the gif.")
print(e)
return
count = image.n_frames
W, H = image.size
progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
progress_bar_shapes = [
[(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
]
if xy is None:
# default text location is 5% width and 5% height of the image.
xy = (int(0.05 * W), int(0.05 * H))
elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
return
elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
x, y = xy
if (x > 0) and (x < W) and (y > 0) and (y < H):
pass
else:
print(
f"xy is out of bounds. x must be within [0, {W}], and y must be within [0, {H}]"
)
return
elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
x, y = xy
if ("%" in x) and ("%" in y):
try:
x = int(float(x.replace("%", "")) / 100.0 * W)
y = int(float(y.replace("%", "")) / 100.0 * H)
xy = (x, y)
except Exception:
raise Exception(
"The specified xy is invalid. It must be formatted like this ('10%', '10%')"
)
else:
print(
"The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
)
return
if text_sequence is None:
text = [str(x) for x in range(1, count + 1)]
elif isinstance(text_sequence, int):
text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
elif isinstance(text_sequence, str):
try:
text_sequence = int(text_sequence)
text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
except Exception:
text = [text_sequence] * count
elif isinstance(text_sequence, list) and len(text_sequence) != count:
print(
f"The length of the text sequence must be equal to the number ({count}) of frames in the gif."
)
return
else:
text = [str(x) for x in text_sequence]
try:
frames = []
# Loop over each frame in the animated image
for index, frame in enumerate(ImageSequence.Iterator(image)):
# Draw the text on the frame
frame = frame.convert("RGB")
draw = ImageDraw.Draw(frame)
# w, h = draw.textsize(text[index])
draw.text(xy, text[index], font=font, fill=color)
if add_progress_bar:
draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
del draw
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
# https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
# Save the frames as a new image
frames[0].save(
out_gif,
save_all=True,
append_images=frames[1:],
duration=duration,
loop=loop,
optimize=True,
)
except Exception as e:
print(e)
adjust_longitude(in_fc)
¶
Adjusts longitude if it is less than -180 or greater than 180.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_fc |
dict |
The input dictionary containing coordinates. |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the converted longitudes |
Source code in leafmap/common.py
def adjust_longitude(in_fc):
"""Adjusts longitude if it is less than -180 or greater than 180.
Args:
in_fc (dict): The input dictionary containing coordinates.
Returns:
dict: A dictionary containing the converted longitudes
"""
try:
keys = in_fc.keys()
if "geometry" in keys:
coordinates = in_fc["geometry"]["coordinates"]
if in_fc["geometry"]["type"] == "Point":
longitude = coordinates[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][0] = longitude
elif in_fc["geometry"]["type"] == "Polygon":
for index1, item in enumerate(coordinates):
for index2, element in enumerate(item):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][index1][index2][0] = longitude
elif in_fc["geometry"]["type"] == "LineString":
for index, element in enumerate(coordinates):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][index][0] = longitude
elif "type" in keys:
coordinates = in_fc["coordinates"]
if in_fc["type"] == "Point":
longitude = coordinates[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][0] = longitude
elif in_fc["type"] == "Polygon":
for index1, item in enumerate(coordinates):
for index2, element in enumerate(item):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][index1][index2][0] = longitude
elif in_fc["type"] == "LineString":
for index, element in enumerate(coordinates):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][index][0] = longitude
return in_fc
except Exception as e:
print(e)
return None
arc_active_map()
¶
Get the active map in ArcGIS Pro.
Returns:
Type | Description |
---|---|
arcpy.Map |
The active map in ArcGIS Pro. |
Source code in leafmap/common.py
def arc_active_map():
"""Get the active map in ArcGIS Pro.
Returns:
arcpy.Map: The active map in ArcGIS Pro.
"""
if is_arcpy():
import arcpy # pylint: disable=E0401
aprx = arcpy.mp.ArcGISProject("CURRENT")
m = aprx.activeMap
return m
else:
return None
arc_active_view()
¶
Get the active view in ArcGIS Pro.
Returns:
Type | Description |
---|---|
arcpy.MapView |
The active view in ArcGIS Pro. |
Source code in leafmap/common.py
def arc_active_view():
"""Get the active view in ArcGIS Pro.
Returns:
arcpy.MapView: The active view in ArcGIS Pro.
"""
if is_arcpy():
import arcpy # pylint: disable=E0401
aprx = arcpy.mp.ArcGISProject("CURRENT")
view = aprx.activeView
return view
else:
return None
arc_add_layer(url, name=None, shown=True, opacity=1.0)
¶
Add a layer to the active map in ArcGIS Pro.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the tile layer to add. |
required |
name |
str |
The name of the layer. Defaults to None. |
None |
shown |
bool |
Whether the layer is shown. Defaults to True. |
True |
opacity |
float |
The opacity of the layer. Defaults to 1.0. |
1.0 |
Source code in leafmap/common.py
def arc_add_layer(url, name=None, shown=True, opacity=1.0):
"""Add a layer to the active map in ArcGIS Pro.
Args:
url (str): The URL of the tile layer to add.
name (str, optional): The name of the layer. Defaults to None.
shown (bool, optional): Whether the layer is shown. Defaults to True.
opacity (float, optional): The opacity of the layer. Defaults to 1.0.
"""
if is_arcpy():
m = arc_active_map()
if m is not None:
m.addDataFromPath(url)
if isinstance(name, str):
layers = m.listLayers("Tiled service layer")
if len(layers) > 0:
layer = layers[0]
layer.name = name
layer.visible = shown
layer.transparency = 100 - (opacity * 100)
arc_zoom_to_bounds(bounds)
¶
Zoom to a bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
list |
The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)]. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
description |
Source code in leafmap/common.py
def arc_zoom_to_bounds(bounds):
"""Zoom to a bounding box.
Args:
bounds (list): The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)].
Raises:
ValueError: _description_
"""
if len(bounds) == 4:
xmin, ymin, xmax, ymax = bounds
elif len(bounds) == 2:
(ymin, xmin), (ymax, xmax) = bounds
else:
raise ValueError("bounds must be a tuple of length 2 or 4.")
arc_zoom_to_extent(xmin, ymin, xmax, ymax)
arc_zoom_to_extent(xmin, ymin, xmax, ymax)
¶
Zoom to an extent in ArcGIS Pro.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
xmin |
float |
The minimum x value of the extent. |
required |
ymin |
float |
The minimum y value of the extent. |
required |
xmax |
float |
The maximum x value of the extent. |
required |
ymax |
float |
The maximum y value of the extent. |
required |
Source code in leafmap/common.py
def arc_zoom_to_extent(xmin, ymin, xmax, ymax):
"""Zoom to an extent in ArcGIS Pro.
Args:
xmin (float): The minimum x value of the extent.
ymin (float): The minimum y value of the extent.
xmax (float): The maximum x value of the extent.
ymax (float): The maximum y value of the extent.
"""
if is_arcpy():
import arcpy # pylint: disable=E0401
view = arc_active_view()
if view is not None:
view.camera.setExtent(
arcpy.Extent(
xmin,
ymin,
xmax,
ymax,
spatial_reference=arcpy.SpatialReference(4326),
)
)
# if isinstance(zoom, int):
# scale = 156543.04 * math.cos(0) / math.pow(2, zoom)
# view.camera.scale = scale # Not working properly
array_to_image(array, output=None, source=None, dtype=None, compress='deflate', transpose=True, cellsize=None, crs=None, transform=None, driver='COG', **kwargs)
¶
Save a NumPy array as a GeoTIFF using the projection information from an existing GeoTIFF file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
np.ndarray |
The NumPy array to be saved as a GeoTIFF. |
required |
output |
str |
The path to the output image. If None, a temporary file will be created. Defaults to None. |
None |
source |
str |
The path to an existing GeoTIFF file with map projection information. Defaults to None. |
None |
dtype |
np.dtype |
The data type of the output array. Defaults to None. |
None |
compress |
str |
The compression method. Can be one of the following: "deflate", "lzw", "packbits", "jpeg". Defaults to "deflate". |
'deflate' |
transpose |
bool |
Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True. |
True |
cellsize |
float |
The resolution of the output image in meters. Defaults to None. |
None |
crs |
str |
The CRS of the output image. Defaults to None. |
None |
transform |
tuple |
The affine transformation matrix, can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None. |
None |
driver |
str |
The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG". |
'COG' |
**kwargs |
Additional keyword arguments to be passed to the rasterio.open() function. |
{} |
Source code in leafmap/common.py
def array_to_image(
array,
output: str = None,
source: str = None,
dtype: str = None,
compress: str = "deflate",
transpose: bool = True,
cellsize: float = None,
crs: str = None,
transform: tuple = None,
driver: str = "COG",
**kwargs,
) -> str:
"""Save a NumPy array as a GeoTIFF using the projection information from an existing GeoTIFF file.
Args:
array (np.ndarray): The NumPy array to be saved as a GeoTIFF.
output (str): The path to the output image. If None, a temporary file will be created. Defaults to None.
source (str, optional): The path to an existing GeoTIFF file with map projection information. Defaults to None.
dtype (np.dtype, optional): The data type of the output array. Defaults to None.
compress (str, optional): The compression method. Can be one of the following: "deflate", "lzw", "packbits", "jpeg". Defaults to "deflate".
transpose (bool, optional): Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.
cellsize (float, optional): The resolution of the output image in meters. Defaults to None.
crs (str, optional): The CRS of the output image. Defaults to None.
transform (tuple, optional): The affine transformation matrix, can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361).
Defaults to None.
driver (str, optional): The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".
**kwargs: Additional keyword arguments to be passed to the rasterio.open() function.
"""
import numpy as np
import rasterio
import xarray as xr
import rioxarray
from rasterio.transform import Affine
if output is None:
return array_to_memory_file(
array,
source,
dtype,
compress,
transpose,
cellsize,
crs=crs,
transform=transform,
driver=driver,
**kwargs,
)
if isinstance(array, xr.DataArray):
if (
hasattr(array, "rio")
and (array.rio.crs is not None)
and (array.rio.transform() is not None)
):
if "latitude" in array.dims and "longitude" in array.dims:
array = array.rename({"latitude": "y", "longitude": "x"})
elif "lat" in array.dims and "lon" in array.dims:
array = array.rename({"lat": "y", "lon": "x"})
if array.ndim == 2 and ("x" in array.dims) and ("y" in array.dims):
array = array.transpose("y", "x")
elif array.ndim == 3 and ("x" in array.dims) and ("y" in array.dims):
dims = list(array.dims)
dims.remove("x")
dims.remove("y")
array = array.transpose(dims[0], "y", "x")
if "long_name" in array.attrs:
array.attrs.pop("long_name")
array.rio.to_raster(
output, driver=driver, compress=compress, dtype=dtype, **kwargs
)
return
if array.ndim == 3 and transpose:
array = np.transpose(array, (1, 2, 0))
out_dir = os.path.dirname(os.path.abspath(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if not output.endswith(".tif"):
output += ".tif"
if source is not None:
with rasterio.open(source) as src:
crs = src.crs
transform = src.transform
if compress is None:
compress = src.compression
else:
if cellsize is None:
raise ValueError("resolution must be provided if source is not provided")
if crs is None:
raise ValueError(
"crs must be provided if source is not provided, such as EPSG:3857"
)
if transform is None:
# Define the geotransformation parameters
xmin, ymin, xmax, ymax = (
0,
0,
cellsize * array.shape[1],
cellsize * array.shape[0],
)
transform = rasterio.transform.from_bounds(
xmin, ymin, xmax, ymax, array.shape[1], array.shape[0]
)
elif isinstance(transform, Affine):
pass
elif isinstance(transform, (tuple, list)):
transform = Affine(*transform)
kwargs["transform"] = transform
if dtype is None:
# Determine the minimum and maximum values in the array
min_value = np.min(array)
max_value = np.max(array)
# Determine the best dtype for the array
if min_value >= 0 and max_value <= 1:
dtype = np.float32
elif min_value >= 0 and max_value <= 255:
dtype = np.uint8
elif min_value >= -128 and max_value <= 127:
dtype = np.int8
elif min_value >= 0 and max_value <= 65535:
dtype = np.uint16
elif min_value >= -32768 and max_value <= 32767:
dtype = np.int16
else:
dtype = np.float64
# Convert the array to the best dtype
array = array.astype(dtype)
# Define the GeoTIFF metadata
metadata = {
"driver": driver,
"height": array.shape[0],
"width": array.shape[1],
"dtype": array.dtype,
"crs": crs,
"transform": transform,
}
if array.ndim == 2:
metadata["count"] = 1
elif array.ndim == 3:
metadata["count"] = array.shape[2]
if compress is not None:
metadata["compress"] = compress
metadata.update(**kwargs)
# Create a new GeoTIFF file and write the array to it
with rasterio.open(output, "w", **metadata) as dst:
if array.ndim == 2:
dst.write(array, 1)
elif array.ndim == 3:
for i in range(array.shape[2]):
dst.write(array[:, :, i], i + 1)
array_to_memory_file(array, source=None, dtype=None, compress='deflate', transpose=True, cellsize=None, crs=None, transform=None, driver='COG', **kwargs)
¶
Convert a NumPy array to a memory file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
numpy.ndarray |
The input NumPy array. |
required |
source |
str |
Path to the source file to extract metadata from. Defaults to None. |
None |
dtype |
str |
The desired data type of the array. Defaults to None. |
None |
compress |
str |
The compression method for the output file. Defaults to "deflate". |
'deflate' |
transpose |
bool |
Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True. |
True |
cellsize |
float |
The cell size of the array if source is not provided. Defaults to None. |
None |
crs |
str |
The coordinate reference system of the array if source is not provided. Defaults to None. |
None |
transform |
tuple |
The affine transformation matrix if source is not provided. Can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None |
None |
driver |
str |
The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG". |
'COG' |
**kwargs |
Additional keyword arguments to be passed to the rasterio.open() function. |
{} |
Returns:
Type | Description |
---|---|
rasterio.DatasetReader |
The rasterio dataset reader object for the converted array. |
Source code in leafmap/common.py
def array_to_memory_file(
array,
source: str = None,
dtype: str = None,
compress: str = "deflate",
transpose: bool = True,
cellsize: float = None,
crs: str = None,
transform: tuple = None,
driver="COG",
**kwargs,
):
"""Convert a NumPy array to a memory file.
Args:
array (numpy.ndarray): The input NumPy array.
source (str, optional): Path to the source file to extract metadata from. Defaults to None.
dtype (str, optional): The desired data type of the array. Defaults to None.
compress (str, optional): The compression method for the output file. Defaults to "deflate".
transpose (bool, optional): Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.
cellsize (float, optional): The cell size of the array if source is not provided. Defaults to None.
crs (str, optional): The coordinate reference system of the array if source is not provided. Defaults to None.
transform (tuple, optional): The affine transformation matrix if source is not provided.
Can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None
driver (str, optional): The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".
**kwargs: Additional keyword arguments to be passed to the rasterio.open() function.
Returns:
rasterio.DatasetReader: The rasterio dataset reader object for the converted array.
"""
import rasterio
import numpy as np
import xarray as xr
from rasterio.transform import Affine
if isinstance(array, xr.DataArray):
coords = [coord for coord in array.coords]
if coords[0] == "time":
x_dim = coords[1]
y_dim = coords[2]
array = (
array.isel(time=0).rename({y_dim: "y", x_dim: "x"}).transpose("y", "x")
)
if hasattr(array, "rio"):
if hasattr(array.rio, "crs"):
crs = array.rio.crs
if transform is None and hasattr(array.rio, "transform"):
transform = array.rio.transform()
elif source is None:
if hasattr(array, "encoding"):
if "source" in array.encoding:
source = array.encoding["source"]
array = array.values
if array.ndim == 3 and transpose:
array = np.transpose(array, (1, 2, 0))
if source is not None:
with rasterio.open(source) as src:
crs = src.crs
transform = src.transform
if compress is None:
compress = src.compression
else:
if crs is None:
raise ValueError(
"crs must be provided if source is not provided, such as EPSG:3857"
)
if transform is None:
if cellsize is None:
raise ValueError("cellsize must be provided if source is not provided")
# Define the geotransformation parameters
xmin, ymin, xmax, ymax = (
0,
0,
cellsize * array.shape[1],
cellsize * array.shape[0],
)
# (west, south, east, north, width, height)
transform = rasterio.transform.from_bounds(
xmin, ymin, xmax, ymax, array.shape[1], array.shape[0]
)
elif isinstance(transform, Affine):
pass
elif isinstance(transform, (tuple, list)):
transform = Affine(*transform)
kwargs["transform"] = transform
if dtype is None:
# Determine the minimum and maximum values in the array
min_value = np.min(array)
max_value = np.max(array)
# Determine the best dtype for the array
if min_value >= 0 and max_value <= 1:
dtype = np.float32
elif min_value >= 0 and max_value <= 255:
dtype = np.uint8
elif min_value >= -128 and max_value <= 127:
dtype = np.int8
elif min_value >= 0 and max_value <= 65535:
dtype = np.uint16
elif min_value >= -32768 and max_value <= 32767:
dtype = np.int16
else:
dtype = np.float64
# Convert the array to the best dtype
array = array.astype(dtype)
# Define the GeoTIFF metadata
metadata = {
"driver": driver,
"height": array.shape[0],
"width": array.shape[1],
"dtype": array.dtype,
"crs": crs,
"transform": transform,
}
if array.ndim == 2:
metadata["count"] = 1
elif array.ndim == 3:
metadata["count"] = array.shape[2]
if compress is not None:
metadata["compress"] = compress
metadata.update(**kwargs)
# Create a new memory file and write the array to it
memory_file = rasterio.MemoryFile()
dst = memory_file.open(**metadata)
if array.ndim == 2:
dst.write(array, 1)
elif array.ndim == 3:
for i in range(array.shape[2]):
dst.write(array[:, :, i], i + 1)
dst.close()
# Read the dataset from memory
dataset_reader = rasterio.open(dst.name, mode="r")
return dataset_reader
assign_continuous_colors(df, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None, to_rgb=True, return_type='array', return_legend=False)
¶
Assigns continuous colors to a DataFrame column based on a specified scheme.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
A pandas DataFrame. |
required | |
column |
str |
The name of the column to assign colors. |
required |
cmap |
str |
The name of the colormap to use. |
None |
colors |
list |
A list of custom colors. |
None |
labels |
list |
A list of custom labels for the legend. |
None |
scheme |
str |
The scheme for classifying the data. Default is 'Quantiles'. |
'Quantiles' |
k |
int |
The number of classes for classification. |
5 |
legend_kwds |
dict |
Additional keyword arguments for configuring the legend. |
None |
classification_kwds |
dict |
Additional keyword arguments for configuring the classification. |
None |
to_rgb |
bool |
Whether to convert colors to RGB values. Default is True. |
True |
return_type |
str |
The type of the returned values. Default is 'array'. |
'array' |
return_legend |
bool |
Whether to return the legend. Default is False. |
False |
Returns:
Type | Description |
---|---|
Union[numpy.ndarray, Tuple[numpy.ndarray, dict]] |
The assigned colors as a numpy array or a tuple containing the colors and the legend, depending on the value of return_legend. |
Source code in leafmap/common.py
def assign_continuous_colors(
df,
column: str,
cmap: str = None,
colors: list = None,
labels: list = None,
scheme: str = "Quantiles",
k: int = 5,
legend_kwds: dict = None,
classification_kwds: dict = None,
to_rgb: bool = True,
return_type: str = "array",
return_legend: bool = False,
) -> Union[np.ndarray, Tuple[np.ndarray, dict]]:
"""Assigns continuous colors to a DataFrame column based on a specified scheme.
Args:
df: A pandas DataFrame.
column: The name of the column to assign colors.
cmap: The name of the colormap to use.
colors: A list of custom colors.
labels: A list of custom labels for the legend.
scheme: The scheme for classifying the data. Default is 'Quantiles'.
k: The number of classes for classification.
legend_kwds: Additional keyword arguments for configuring the legend.
classification_kwds: Additional keyword arguments for configuring the classification.
to_rgb: Whether to convert colors to RGB values. Default is True.
return_type: The type of the returned values. Default is 'array'.
return_legend: Whether to return the legend. Default is False.
Returns:
The assigned colors as a numpy array or a tuple containing the colors and the legend, depending on the value of return_legend.
"""
import numpy as np
data = df[[column]].copy()
new_df, legend = classify(
data, column, cmap, colors, labels, scheme, k, legend_kwds, classification_kwds
)
values = new_df["color"].values.tolist()
if to_rgb:
values = [hex_to_rgb(check_color(color)) for color in values]
if return_type == "array":
values = np.array(values, dtype=np.uint8)
if return_legend:
return values, legend
else:
return values
assign_discrete_colors(df, column, cmap, to_rgb=True, return_type='array')
¶
Assigns unique colors to each category in a categorical column of a dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pandas.DataFrame |
The input dataframe. |
required |
column |
str |
The name of the categorical column. |
required |
cmap |
dict |
A dictionary mapping categories to colors. |
required |
to_rgb |
bool |
Whether to convert the colors to RGB values. Defaults to True. |
True |
return_type |
str |
The type of the returned values. Can be 'list' or 'array'. Defaults to 'array'. |
'array' |
Returns:
Type | Description |
---|---|
list |
A list of colors for each category in the categorical column. |
Source code in leafmap/common.py
def assign_discrete_colors(df, column, cmap, to_rgb=True, return_type="array"):
"""
Assigns unique colors to each category in a categorical column of a dataframe.
Args:
df (pandas.DataFrame): The input dataframe.
column (str): The name of the categorical column.
cmap (dict): A dictionary mapping categories to colors.
to_rgb (bool): Whether to convert the colors to RGB values. Defaults to True.
return_type (str): The type of the returned values. Can be 'list' or 'array'. Defaults to 'array'.
Returns:
list: A list of colors for each category in the categorical column.
"""
import numpy as np
# Copy the categorical column from the original dataframe
category_column = df[column].copy()
# Map colors to the categorical values
category_column = category_column.map(cmap)
values = category_column.values.tolist()
if to_rgb:
values = [hex_to_rgb(check_color(color)) for color in values]
if return_type == "array":
values = np.array(values, dtype=np.uint8)
return values
basemap_xyz_tiles()
¶
Returns a dictionary containing a set of basemaps that are XYZ tile layers.
Returns:
Type | Description |
---|---|
dict |
A dictionary of XYZ tile layers. |
Source code in leafmap/common.py
def basemap_xyz_tiles():
"""Returns a dictionary containing a set of basemaps that are XYZ tile layers.
Returns:
dict: A dictionary of XYZ tile layers.
"""
from .leafmap import basemaps
layers_dict = {}
keys = dict(basemaps).keys()
for key in keys:
if isinstance(basemaps[key], ipyleaflet.WMSLayer):
pass
else:
layers_dict[key] = basemaps[key]
return layers_dict
bbox_to_gdf(bbox, crs='epsg:4326')
¶
Convert a bounding box to a GeoPandas GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
list |
A bounding box in the format of [minx, miny, maxx, maxy]. |
required |
crs |
str |
The CRS of the bounding box. Defaults to 'epsg:4326'. |
'epsg:4326' |
Returns:
Type | Description |
---|---|
GeoDataFrame |
A GeoDataFrame with a single polygon. |
Source code in leafmap/common.py
def bbox_to_gdf(bbox, crs="epsg:4326"):
"""Convert a bounding box to a GeoPandas GeoDataFrame.
Args:
bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].
crs (str, optional): The CRS of the bounding box. Defaults to 'epsg:4326'.
Returns:
GeoDataFrame: A GeoDataFrame with a single polygon.
"""
import geopandas as gpd
from shapely.geometry import Polygon
return gpd.GeoDataFrame(
geometry=[Polygon.from_bounds(*bbox)],
crs=crs,
)
bbox_to_geojson(bounds)
¶
Convert coordinates of a bounding box to a geojson.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
list | tuple |
A list of coordinates representing [left, bottom, right, top] or m.bounds. |
required |
Returns:
Type | Description |
---|---|
dict |
A geojson feature. |
Source code in leafmap/common.py
def bbox_to_geojson(bounds):
"""Convert coordinates of a bounding box to a geojson.
Args:
bounds (list | tuple): A list of coordinates representing [left, bottom, right, top] or m.bounds.
Returns:
dict: A geojson feature.
"""
if isinstance(bounds, tuple) and len(bounds) == 2:
bounds = [bounds[0][1], bounds[0][0], bounds[1][1], bounds[1][0]]
return {
"geometry": {
"type": "Polygon",
"coordinates": [
[
[bounds[0], bounds[3]],
[bounds[0], bounds[1]],
[bounds[2], bounds[1]],
[bounds[2], bounds[3]],
[bounds[0], bounds[3]],
]
],
},
"type": "Feature",
}
bbox_to_polygon(bbox)
¶
Convert a bounding box to a shapely Polygon.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
list |
A bounding box in the format of [minx, miny, maxx, maxy]. |
required |
Returns:
Type | Description |
---|---|
Polygon |
A shapely Polygon. |
Source code in leafmap/common.py
def bbox_to_polygon(bbox):
"""Convert a bounding box to a shapely Polygon.
Args:
bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].
Returns:
Polygon: A shapely Polygon.
"""
from shapely.geometry import Polygon
return Polygon.from_bounds(*bbox)
blend_images(img1, img2, alpha=0.5, output=False, show=True, figsize=(12, 10), axis='off', **kwargs)
¶
Blends two images together using the addWeighted function from the OpenCV library.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img1 |
numpy.ndarray |
The first input image on top represented as a NumPy array. |
required |
img2 |
numpy.ndarray |
The second input image at the bottom represented as a NumPy array. |
required |
alpha |
float |
The weighting factor for the first image in the blend. By default, this is set to 0.5. |
0.5 |
output |
str |
The path to the output image. Defaults to False. |
False |
show |
bool |
Whether to display the blended image. Defaults to True. |
True |
figsize |
tuple |
The size of the figure. Defaults to (12, 10). |
(12, 10) |
axis |
str |
The axis of the figure. Defaults to "off". |
'off' |
**kwargs |
Additional keyword arguments to pass to the cv2.addWeighted() function. |
{} |
Returns:
Type | Description |
---|---|
numpy.ndarray |
The blended image as a NumPy array. |
Source code in leafmap/common.py
def blend_images(
img1,
img2,
alpha=0.5,
output=False,
show=True,
figsize=(12, 10),
axis="off",
**kwargs,
):
"""
Blends two images together using the addWeighted function from the OpenCV library.
Args:
img1 (numpy.ndarray): The first input image on top represented as a NumPy array.
img2 (numpy.ndarray): The second input image at the bottom represented as a NumPy array.
alpha (float): The weighting factor for the first image in the blend. By default, this is set to 0.5.
output (str, optional): The path to the output image. Defaults to False.
show (bool, optional): Whether to display the blended image. Defaults to True.
figsize (tuple, optional): The size of the figure. Defaults to (12, 10).
axis (str, optional): The axis of the figure. Defaults to "off".
**kwargs: Additional keyword arguments to pass to the cv2.addWeighted() function.
Returns:
numpy.ndarray: The blended image as a NumPy array.
"""
import numpy as np
import matplotlib.pyplot as plt
try:
import cv2
except ImportError:
raise ImportError("The blend_images function requires the OpenCV library.")
# Resize the images to have the same dimensions
if isinstance(img1, str):
if img1.startswith("http"):
img1 = download_file(img1)
if not os.path.exists(img1):
raise ValueError(f"Input path {img1} does not exist.")
img1 = cv2.imread(img1)
if isinstance(img2, str):
if img2.startswith("http"):
img2 = download_file(img2)
if not os.path.exists(img2):
raise ValueError(f"Input path {img2} does not exist.")
img2 = cv2.imread(img2)
if img1.dtype == np.float32:
img1 = (img1 * 255).astype(np.uint8)
if img2.dtype == np.float32:
img2 = (img2 * 255).astype(np.uint8)
if img1.dtype != img2.dtype:
img2 = img2.astype(img1.dtype)
img1 = cv2.resize(img1, (img2.shape[1], img2.shape[0]))
# Blend the images using the addWeighted function
beta = 1 - alpha
blend_img = cv2.addWeighted(img1, alpha, img2, beta, 0, **kwargs)
if output:
array_to_image(blend_img, output, img2)
if show:
plt.figure(figsize=figsize)
plt.imshow(blend_img)
plt.axis(axis)
plt.show()
else:
return blend_img
bounds_to_xy_range(bounds)
¶
Convert bounds to x and y range to be used as input to bokeh map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
Union[List[Union[Tuple[float, float], float]], Tuple[float, float, float, float]] |
A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax]. |
required |
Returns:
Type | Description |
---|---|
Tuple[Tuple[float, float], Tuple[float, float]] |
A tuple of (x_range, y_range). |
Source code in leafmap/common.py
def bounds_to_xy_range(
bounds: Union[
List[Union[Tuple[float, float], float]], Tuple[float, float, float, float]
]
) -> Tuple[Tuple[float, float], Tuple[float, float]]:
"""
Convert bounds to x and y range to be used as input to bokeh map.
Args:
bounds (Union[List[Union[Tuple[float, float], float]], Tuple[float, float, float, float]]):
A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax].
Returns:
Tuple[Tuple[float, float], Tuple[float, float]]: A tuple of (x_range, y_range).
"""
if isinstance(bounds, tuple):
if len(bounds) != 4:
raise ValueError(
"Tuple bounds must have exactly 4 elements (xmin, ymin, xmax, ymax)."
)
west, south, east, north = bounds
elif isinstance(bounds, list):
if len(bounds) == 2 and all(
isinstance(coord, tuple) and len(coord) == 2 for coord in bounds
):
(south, west), (north, east) = bounds
elif len(bounds) == 4 and all(
isinstance(coord, (int, float)) for coord in bounds
):
west, south, east, north = bounds
else:
raise ValueError(
"List bounds must be in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax]."
)
else:
raise TypeError("bounds must be a list or tuple")
xmin, ymin = lnglat_to_meters(west, south)
xmax, ymax = lnglat_to_meters(east, north)
x_range = (xmin, xmax)
y_range = (ymin, ymax)
return x_range, y_range
center_zoom_to_xy_range(center, zoom)
¶
Convert center and zoom to x and y range to be used as input to bokeh map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
center |
tuple |
A tuple of (latitude, longitude). |
required |
zoom |
int |
The zoom level. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of (x_range, y_range). |
Source code in leafmap/common.py
def center_zoom_to_xy_range(center, zoom):
"""Convert center and zoom to x and y range to be used as input to bokeh map.
Args:
center (tuple): A tuple of (latitude, longitude).
zoom (int): The zoom level.
Returns:
tuple: A tuple of (x_range, y_range).
"""
if isinstance(center, tuple) or isinstance(center, list):
pass
else:
raise TypeError("center must be a tuple or list")
if not isinstance(zoom, int):
raise TypeError("zoom must be an integer")
latitude, longitude = center
x_range = (-179, 179)
y_range = (-70, 70)
x_full_length = x_range[1] - x_range[0]
y_full_length = y_range[1] - y_range[0]
x_length = x_full_length / 2 ** (zoom - 2)
y_length = y_full_length / 2 ** (zoom - 2)
south = latitude - y_length / 2
north = latitude + y_length / 2
west = longitude - x_length / 2
east = longitude + x_length / 2
xmin, ymin = lnglat_to_meters(west, south)
xmax, ymax = lnglat_to_meters(east, north)
x_range = (xmin, xmax)
y_range = (ymin, ymax)
return x_range, y_range
cesium_to_streamlit(html, width=800, height=600, responsive=True, scrolling=False, token_name=None, token_value=None, **kwargs)
¶
Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html |
str |
The HTML file to render. It can a local file path or a URL. |
required |
width |
int |
Width of the map. Defaults to 800. |
800 |
height |
int |
Height of the map. Defaults to 600. |
600 |
responsive |
bool |
Whether to make the map responsive. Defaults to True. |
True |
scrolling |
bool |
Whether to allow the map to scroll. Defaults to False. |
False |
token_name |
str |
The name of the token in the HTML file to be replaced. Defaults to None. |
None |
token_value |
str |
The value of the token to pass to the HTML file. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
streamlit.components |
components.html object. |
Source code in leafmap/common.py
def cesium_to_streamlit(
html,
width=800,
height=600,
responsive=True,
scrolling=False,
token_name=None,
token_value=None,
**kwargs,
):
"""Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.
Args:
html (str): The HTML file to render. It can a local file path or a URL.
width (int, optional): Width of the map. Defaults to 800.
height (int, optional): Height of the map. Defaults to 600.
responsive (bool, optional): Whether to make the map responsive. Defaults to True.
scrolling (bool, optional): Whether to allow the map to scroll. Defaults to False.
token_name (str, optional): The name of the token in the HTML file to be replaced. Defaults to None.
token_value (str, optional): The value of the token to pass to the HTML file. Defaults to None.
Returns:
streamlit.components: components.html object.
"""
if token_name is None:
token_name = "your_access_token"
if token_value is None:
token_value = os.environ.get("CESIUM_TOKEN")
html_to_streamlit(
html, width, height, responsive, scrolling, token_name, token_value
)
check_cmap(cmap)
¶
Check the colormap and return a list of colors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmap |
str | list | Box |
The colormap to check. |
required |
Returns:
Type | Description |
---|---|
list |
A list of colors. |
Source code in leafmap/common.py
def check_cmap(cmap):
"""Check the colormap and return a list of colors.
Args:
cmap (str | list | Box): The colormap to check.
Returns:
list: A list of colors.
"""
from box import Box
from .colormaps import get_palette
if isinstance(cmap, str):
try:
return get_palette(cmap)
except Exception as e:
raise Exception(f"{cmap} is not a valid colormap.")
elif isinstance(cmap, Box):
return list(cmap["default"])
elif isinstance(cmap, list) or isinstance(cmap, tuple):
return cmap
else:
raise Exception(f"{cmap} is not a valid colormap.")
check_color(in_color)
¶
Checks the input color and returns the corresponding hex color code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_color |
str or tuple or list |
It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)). |
required |
Returns:
Type | Description |
---|---|
str |
A hex color code. |
Source code in leafmap/common.py
def check_color(in_color: Union[str, Tuple]) -> str:
"""Checks the input color and returns the corresponding hex color code.
Args:
in_color (str or tuple or list): It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)).
Returns:
str: A hex color code.
"""
import colour
out_color = "#000000" # default black color
if (isinstance(in_color, tuple) or isinstance(in_color, list)) and len(
in_color
) == 3:
# rescale color if necessary
if all(isinstance(item, int) for item in in_color):
in_color = [c / 255.0 for c in in_color]
return colour.Color(rgb=tuple(in_color)).hex_l
else:
# try to guess the color system
try:
return colour.Color(in_color).hex_l
except Exception as e:
pass
# try again by adding an extra # (GEE handle hex codes without #)
try:
return colour.Color(f"#{in_color}").hex_l
except Exception as e:
print(
f"The provided color ({in_color}) is invalid. Using the default black color."
)
print(e)
return out_color
check_dir(dir_path, make_dirs=True)
¶
Checks if a directory exists and creates it if it does not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
[str |
The path to the directory. |
required |
make_dirs |
bool |
Whether to create the directory if it does not exist. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the directory could not be found. |
TypeError |
If the input directory path is not a string. |
Returns:
Type | Description |
---|---|
str |
The path to the directory. |
Source code in leafmap/common.py
def check_dir(dir_path, make_dirs=True):
"""Checks if a directory exists and creates it if it does not.
Args:
dir_path ([str): The path to the directory.
make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.
Raises:
FileNotFoundError: If the directory could not be found.
TypeError: If the input directory path is not a string.
Returns:
str: The path to the directory.
"""
if isinstance(dir_path, str):
if dir_path.startswith("~"):
dir_path = os.path.expanduser(dir_path)
else:
dir_path = os.path.abspath(dir_path)
if not os.path.exists(dir_path) and make_dirs:
os.makedirs(dir_path)
if os.path.exists(dir_path):
return dir_path
else:
raise FileNotFoundError("The provided directory could not be found.")
else:
raise TypeError("The provided directory path must be a string.")
check_file_path(file_path, make_dirs=True)
¶
Gets the absolute file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
The path to the file. |
required |
make_dirs |
bool |
Whether to create the directory if it does not exist. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the directory could not be found. |
TypeError |
If the input directory path is not a string. |
Returns:
Type | Description |
---|---|
str |
The absolute path to the file. |
Source code in leafmap/common.py
def check_file_path(file_path, make_dirs=True):
"""Gets the absolute file path.
Args:
file_path (str): The path to the file.
make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.
Raises:
FileNotFoundError: If the directory could not be found.
TypeError: If the input directory path is not a string.
Returns:
str: The absolute path to the file.
"""
if isinstance(file_path, str):
if file_path.startswith("~"):
file_path = os.path.expanduser(file_path)
else:
file_path = os.path.abspath(file_path)
file_dir = os.path.dirname(file_path)
if not os.path.exists(file_dir) and make_dirs:
os.makedirs(file_dir)
return file_path
else:
raise TypeError("The provided file path must be a string.")
check_html_string(html_string)
¶
Check if an HTML string contains local images and convert them to base64.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html_string |
str |
The HTML string. |
required |
Returns:
Type | Description |
---|---|
str |
The HTML string with local images converted to base64. |
Source code in leafmap/common.py
def check_html_string(html_string):
"""Check if an HTML string contains local images and convert them to base64.
Args:
html_string (str): The HTML string.
Returns:
str: The HTML string with local images converted to base64.
"""
import re
import base64
# Search for img tags with src attribute
img_regex = r'<img[^>]+src\s*=\s*["\']([^"\':]+)["\'][^>]*>'
for match in re.findall(img_regex, html_string):
with open(match, "rb") as img_file:
img_data = img_file.read()
base64_data = base64.b64encode(img_data).decode("utf-8")
html_string = html_string.replace(
'src="{}"'.format(match),
'src="data:image/png;base64,' + base64_data + '"',
)
return html_string
check_url(url)
¶
Check if an HTTP URL is working.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL is working (returns a 200 status code), False otherwise. |
Source code in leafmap/common.py
def check_url(url: str) -> bool:
"""Check if an HTTP URL is working.
Args:
url (str): The URL to check.
Returns:
bool: True if the URL is working (returns a 200 status code), False otherwise.
"""
try:
response = requests.get(url)
if response.status_code == 200:
return True
else:
return False
except requests.exceptions.RequestException:
return False
classify(data, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None)
¶
Classify a dataframe column using a variety of classification schemes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | pd.DataFrame | gpd.GeoDataFrame |
The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe. |
required |
column |
str |
The column to classify. |
required |
cmap |
str |
The name of a colormap recognized by matplotlib. Defaults to None. |
None |
colors |
list |
A list of colors to use for the classification. Defaults to None. |
None |
labels |
list |
A list of labels to use for the legend. Defaults to None. |
None |
scheme |
str |
Name of a choropleth classification scheme (requires mapclassify). Name of a choropleth classification scheme (requires mapclassify). A mapclassify.MapClassifier object will be used under the hood. Supported are all schemes provided by mapclassify (e.g. 'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled', 'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced', 'JenksCaspallSampled', 'MaxP', 'MaximumBreaks', 'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean', 'UserDefined'). Arguments can be passed in classification_kwds. |
'Quantiles' |
k |
int |
Number of classes (ignored if scheme is None or if column is categorical). Default to 5. |
5 |
legend_kwds |
dict |
Keyword arguments to pass to :func: |
None |
classification_kwds |
dict |
Keyword arguments to pass to mapclassify. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame, dict |
A pandas dataframe with the classification applied and a legend dictionary. |
Source code in leafmap/common.py
def classify(
data,
column,
cmap=None,
colors=None,
labels=None,
scheme="Quantiles",
k=5,
legend_kwds=None,
classification_kwds=None,
):
"""Classify a dataframe column using a variety of classification schemes.
Args:
data (str | pd.DataFrame | gpd.GeoDataFrame): The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe.
column (str): The column to classify.
cmap (str, optional): The name of a colormap recognized by matplotlib. Defaults to None.
colors (list, optional): A list of colors to use for the classification. Defaults to None.
labels (list, optional): A list of labels to use for the legend. Defaults to None.
scheme (str, optional): Name of a choropleth classification scheme (requires mapclassify).
Name of a choropleth classification scheme (requires mapclassify).
A mapclassify.MapClassifier object will be used
under the hood. Supported are all schemes provided by mapclassify (e.g.
'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled',
'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced',
'JenksCaspallSampled', 'MaxP', 'MaximumBreaks',
'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean',
'UserDefined'). Arguments can be passed in classification_kwds.
k (int, optional): Number of classes (ignored if scheme is None or if column is categorical). Default to 5.
legend_kwds (dict, optional): Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or `matplotlib.pyplot.colorbar`. Defaults to None.
Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or
Additional accepted keywords when `scheme` is specified:
fmt : string
A formatting specification for the bin edges of the classes in the
legend. For example, to have no decimals: ``{"fmt": "{:.0f}"}``.
labels : list-like
A list of legend labels to override the auto-generated labblels.
Needs to have the same number of elements as the number of
classes (`k`).
interval : boolean (default False)
An option to control brackets from mapclassify legend.
If True, open/closed interval brackets are shown in the legend.
classification_kwds (dict, optional): Keyword arguments to pass to mapclassify. Defaults to None.
Returns:
pd.DataFrame, dict: A pandas dataframe with the classification applied and a legend dictionary.
"""
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib as mpl
import matplotlib.pyplot as plt
try:
import mapclassify
except ImportError:
raise ImportError(
"mapclassify is required for this function. Install with `pip install mapclassify`."
)
if (
isinstance(data, gpd.GeoDataFrame)
or isinstance(data, pd.DataFrame)
or isinstance(data, pd.Series)
):
df = data
else:
try:
df = gpd.read_file(data)
except Exception:
raise TypeError(
"Data must be a GeoDataFrame or a path to a file that can be read by geopandas.read_file()."
)
if df.empty:
warnings.warn(
"The GeoDataFrame you are attempting to plot is "
"empty. Nothing has been displayed.",
UserWarning,
)
return
columns = df.columns.values.tolist()
if column not in columns:
raise ValueError(
f"{column} is not a column in the GeoDataFrame. It must be one of {columns}."
)
# Convert categorical data to numeric
init_column = None
value_list = None
if np.issubdtype(df[column].dtype, np.object_):
value_list = df[column].unique().tolist()
value_list.sort()
df["category"] = df[column].replace(value_list, range(0, len(value_list)))
init_column = column
column = "category"
k = len(value_list)
if legend_kwds is not None:
legend_kwds = legend_kwds.copy()
# To accept pd.Series and np.arrays as column
if isinstance(column, (np.ndarray, pd.Series)):
if column.shape[0] != df.shape[0]:
raise ValueError(
"The dataframe and given column have different number of rows."
)
else:
values = column
# Make sure index of a Series matches index of df
if isinstance(values, pd.Series):
values = values.reindex(df.index)
else:
values = df[column]
values = df[column]
nan_idx = np.asarray(pd.isna(values), dtype="bool")
if cmap is None:
cmap = "Blues"
try:
cmap = plt.get_cmap(cmap, k)
except:
cmap = plt.cm.get_cmap(cmap, k)
if colors is None:
colors = [mpl.colors.rgb2hex(cmap(i))[1:] for i in range(cmap.N)]
colors = ["#" + i for i in colors]
elif isinstance(colors, list):
colors = [check_color(i) for i in colors]
elif isinstance(colors, str):
colors = [check_color(colors)] * k
allowed_schemes = [
"BoxPlot",
"EqualInterval",
"FisherJenks",
"FisherJenksSampled",
"HeadTailBreaks",
"JenksCaspall",
"JenksCaspallForced",
"JenksCaspallSampled",
"MaxP",
"MaximumBreaks",
"NaturalBreaks",
"Quantiles",
"Percentiles",
"StdMean",
"UserDefined",
]
if scheme.lower() not in [s.lower() for s in allowed_schemes]:
raise ValueError(
f"{scheme} is not a valid scheme. It must be one of {allowed_schemes}."
)
if classification_kwds is None:
classification_kwds = {}
if "k" not in classification_kwds:
classification_kwds["k"] = k
binning = mapclassify.classify(
np.asarray(values[~nan_idx]), scheme, **classification_kwds
)
df["category"] = binning.yb
df["color"] = [colors[i] for i in df["category"]]
if legend_kwds is None:
legend_kwds = {}
if "interval" not in legend_kwds:
legend_kwds["interval"] = True
if "fmt" not in legend_kwds:
if np.issubdtype(df[column].dtype, np.floating):
legend_kwds["fmt"] = "{:.2f}"
else:
legend_kwds["fmt"] = "{:.0f}"
if labels is None:
# set categorical to True for creating the legend
if legend_kwds is not None and "labels" in legend_kwds:
if len(legend_kwds["labels"]) != binning.k:
raise ValueError(
"Number of labels must match number of bins, "
"received {} labels for {} bins".format(
len(legend_kwds["labels"]), binning.k
)
)
else:
labels = list(legend_kwds.pop("labels"))
else:
# fmt = "{:.2f}"
if legend_kwds is not None and "fmt" in legend_kwds:
fmt = legend_kwds.pop("fmt")
labels = binning.get_legend_classes(fmt)
if legend_kwds is not None:
show_interval = legend_kwds.pop("interval", False)
else:
show_interval = False
if not show_interval:
labels = [c[1:-1] for c in labels]
if init_column is not None:
labels = value_list
elif isinstance(labels, list):
if len(labels) != len(colors):
raise ValueError("The number of labels must match the number of colors.")
else:
raise ValueError("labels must be a list or None.")
legend_dict = dict(zip(labels, colors))
df["category"] = df["category"] + 1
return df, legend_dict
clip_image(image, mask, output, to_cog=True)
¶
Clip an image by mask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
Path to the image file in GeoTIFF format. |
required |
mask |
str | list | dict |
The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi. |
required |
output |
str |
Path to the output file. |
required |
to_cog |
bool |
Flags to indicate if you want to convert the output to COG. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
ImportError |
If the fiona or rasterio package is not installed. |
FileNotFoundError |
If the image is not found. |
ValueError |
If the mask is not a valid GeoJSON or raster file. |
FileNotFoundError |
If the mask file is not found. |
Source code in leafmap/common.py
def clip_image(image, mask, output, to_cog=True):
"""Clip an image by mask.
Args:
image (str): Path to the image file in GeoTIFF format.
mask (str | list | dict): The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi.
output (str): Path to the output file.
to_cog (bool, optional): Flags to indicate if you want to convert the output to COG. Defaults to True.
Raises:
ImportError: If the fiona or rasterio package is not installed.
FileNotFoundError: If the image is not found.
ValueError: If the mask is not a valid GeoJSON or raster file.
FileNotFoundError: If the mask file is not found.
"""
try:
import json
import fiona
import rasterio
import rasterio.mask
except ImportError as e:
raise ImportError(e)
if not os.path.exists(image):
raise FileNotFoundError(f"{image} does not exist.")
if not output.endswith(".tif"):
raise ValueError("Output must be a tif file.")
output = check_file_path(output)
if isinstance(mask, str):
if mask.startswith("http"):
mask = download_file(mask, output)
if not os.path.exists(mask):
raise FileNotFoundError(f"{mask} does not exist.")
elif isinstance(mask, list) or isinstance(mask, dict):
if isinstance(mask, list):
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {"type": "Polygon", "coordinates": [mask]},
}
],
}
else:
geojson = {
"type": "FeatureCollection",
"features": [mask],
}
mask = temp_file_path(".geojson")
with open(mask, "w") as f:
json.dump(geojson, f)
with fiona.open(mask, "r") as shapefile:
shapes = [feature["geometry"] for feature in shapefile]
with rasterio.open(image) as src:
out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
out_meta = src.meta
out_meta.update(
{
"driver": "GTiff",
"height": out_image.shape[1],
"width": out_image.shape[2],
"transform": out_transform,
}
)
with rasterio.open(output, "w", **out_meta) as dest:
dest.write(out_image)
if to_cog:
image_to_cog(output, output)
cog_validate(source, verbose=False)
¶
Validate Cloud Optimized Geotiff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
A dataset path or URL. Will be opened in "r" mode. |
required |
verbose |
bool |
Whether to print the output of the validation. Defaults to False. |
False |
Exceptions:
Type | Description |
---|---|
ImportError |
If the rio-cogeo package is not installed. |
FileNotFoundError |
If the provided file could not be found. |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings). |
Source code in leafmap/common.py
def cog_validate(source, verbose=False):
"""Validate Cloud Optimized Geotiff.
Args:
source (str): A dataset path or URL. Will be opened in "r" mode.
verbose (bool, optional): Whether to print the output of the validation. Defaults to False.
Raises:
ImportError: If the rio-cogeo package is not installed.
FileNotFoundError: If the provided file could not be found.
Returns:
tuple: A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings).
"""
try:
from rio_cogeo.cogeo import cog_validate, cog_info
except ImportError:
raise ImportError(
"The rio-cogeo package is not installed. Please install it with `pip install rio-cogeo` or `conda install rio-cogeo -c conda-forge`."
)
if not source.startswith("http"):
source = check_file_path(source)
if not os.path.exists(source):
raise FileNotFoundError("The provided input file could not be found.")
if verbose:
return cog_info(source)
else:
return cog_validate(source)
connect_points_as_line(gdf, sort_column=None, crs='EPSG:4326', single_line=True)
¶
Connects points in a GeoDataFrame into either a single LineString or multiple LineStrings based on a specified sort column or the index if no column is provided. The resulting GeoDataFrame will have the specified CRS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
GeoDataFrame |
A GeoDataFrame containing point geometries. |
required |
sort_column |
Optional[str] |
Column name to sort the points by (e.g., 'timestamp'). If None, the index is used for sorting. Defaults to None. |
None |
crs |
str |
The coordinate reference system (CRS) for the resulting GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
single_line |
bool |
If True, generates a single LineString connecting all points. If False, generates multiple LineStrings, each connecting two consecutive points. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
GeoDataFrame |
A new GeoDataFrame containing either a single LineString or multiple LineString geometries based on the single_line parameter, with the specified CRS. |
Examples:
>>> line_gdf = connect_points_as_line(gdf, 'timestamp', crs="EPSG:3857", single_line=True)
>>> line_gdf = connect_points_as_line(gdf, single_line=False) # Uses index and defaults to EPSG:4326
Source code in leafmap/common.py
def connect_points_as_line(
gdf: "GeoDataFrame",
sort_column: Optional[str] = None,
crs: str = "EPSG:4326",
single_line: bool = True,
) -> "GeoDataFrame":
"""
Connects points in a GeoDataFrame into either a single LineString or multiple LineStrings
based on a specified sort column or the index if no column is provided. The resulting
GeoDataFrame will have the specified CRS.
Args:
gdf (GeoDataFrame): A GeoDataFrame containing point geometries.
sort_column (Optional[str]): Column name to sort the points by (e.g., 'timestamp').
If None, the index is used for sorting. Defaults to None.
crs (str): The coordinate reference system (CRS) for the resulting GeoDataFrame.
Defaults to "EPSG:4326".
single_line (bool): If True, generates a single LineString connecting all points.
If False, generates multiple LineStrings, each connecting two consecutive points.
Defaults to True.
Returns:
GeoDataFrame: A new GeoDataFrame containing either a single LineString or multiple LineString geometries
based on the single_line parameter, with the specified CRS.
Example:
>>> line_gdf = connect_points_as_line(gdf, 'timestamp', crs="EPSG:3857", single_line=True)
>>> line_gdf = connect_points_as_line(gdf, single_line=False) # Uses index and defaults to EPSG:4326
"""
from shapely.geometry import LineString
import geopandas as gpd
# Sort the GeoDataFrame by the specified column or by index if None
gdf_sorted = gdf.sort_values(by=sort_column) if sort_column else gdf.sort_index()
if single_line:
# Create a single LineString connecting all points
line = LineString(gdf_sorted.geometry.tolist())
line_gdf = gpd.GeoDataFrame(geometry=[line], crs=crs)
else:
# Generate LineStrings for each consecutive pair of points
lines = [
LineString([gdf_sorted.geometry.iloc[i], gdf_sorted.geometry.iloc[i + 1]])
for i in range(len(gdf_sorted) - 1)
]
line_gdf = gpd.GeoDataFrame(geometry=lines, crs=crs)
return line_gdf
connect_postgis(database, host='localhost', user=None, password=None, port=5432, use_env_var=False)
¶
Connects to a PostGIS database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str |
Name of the database |
required |
host |
str |
Hosting server for the database. Defaults to "localhost". |
'localhost' |
user |
str |
User name to access the database. Defaults to None. |
None |
password |
str |
Password to access the database. Defaults to None. |
None |
port |
int |
Port number to connect to at the server host. Defaults to 5432. |
5432 |
use_env_var |
bool |
Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
If user is not specified. |
ValueError |
If password is not specified. |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in leafmap/common.py
def connect_postgis(
database, host="localhost", user=None, password=None, port=5432, use_env_var=False
):
"""Connects to a PostGIS database.
Args:
database (str): Name of the database
host (str, optional): Hosting server for the database. Defaults to "localhost".
user (str, optional): User name to access the database. Defaults to None.
password (str, optional): Password to access the database. Defaults to None.
port (int, optional): Port number to connect to at the server host. Defaults to 5432.
use_env_var (bool, optional): Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False.
Raises:
ValueError: If user is not specified.
ValueError: If password is not specified.
Returns:
[type]: [description]
"""
check_package(name="geopandas", URL="https://geopandas.org")
check_package(
name="sqlalchemy",
URL="https://docs.sqlalchemy.org/en/14/intro.html#installation",
)
from sqlalchemy import create_engine
if use_env_var:
if user is not None:
user = os.getenv(user)
else:
user = os.getenv("SQL_USER")
if password is not None:
password = os.getenv(password)
else:
password = os.getenv("SQL_PASSWORD")
if user is None:
raise ValueError("user is not specified.")
if password is None:
raise ValueError("password is not specified.")
connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
engine = create_engine(connection_string)
return engine
construct_bbox(*args, *, buffer=0.001, crs='EPSG:4326', return_gdf=False)
¶
Construct a bounding box (bbox) geometry based on either a centroid point or bbox.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Union[float, Tuple[float, float, float, float]] |
Coordinates for the geometry. - If 2 arguments are provided, it is interpreted as a centroid (x, y) with a buffer. - If 4 arguments are provided, it is interpreted as a bbox (minx, miny, maxx, maxy). |
() |
buffer |
float |
The buffer distance around the centroid point (default is 0.01 degrees). |
0.001 |
crs |
str |
The coordinate reference system (default is "EPSG:4326"). |
'EPSG:4326' |
return_gdf |
bool |
Whether to return a GeoDataFrame (default is False). |
False |
Returns:
Type | Description |
---|---|
shapely.geometry.Polygon |
The constructed bounding box (Polygon). |
Source code in leafmap/common.py
def construct_bbox(
*args: Union[float, Tuple[float, float, float, float]],
buffer: float = 0.001,
crs: str = "EPSG:4326",
return_gdf: bool = False,
) -> Union["Polygon", "gpd.GeoDataFrame"]:
"""
Construct a bounding box (bbox) geometry based on either a centroid point or bbox.
Args:
*args: Coordinates for the geometry.
- If 2 arguments are provided, it is interpreted as a centroid (x, y) with a buffer.
- If 4 arguments are provided, it is interpreted as a bbox (minx, miny, maxx, maxy).
buffer (float): The buffer distance around the centroid point (default is 0.01 degrees).
crs (str): The coordinate reference system (default is "EPSG:4326").
return_gdf (bool): Whether to return a GeoDataFrame (default is False).
Returns:
shapely.geometry.Polygon: The constructed bounding box (Polygon).
"""
from shapely.geometry import box
if len(args) == 2:
# Case 1: Create a bounding box around the centroid point with a buffer
x, y = args
minx, miny = x - buffer, y - buffer
maxx, maxy = x + buffer, y + buffer
geometry = box(minx, miny, maxx, maxy)
elif len(args) == 4:
# Case 2: Create a bounding box directly from the given coordinates
geometry = box(args[0], args[1], args[2], args[3])
else:
raise ValueError(
"Provide either 2 arguments for centroid (x, y) or 4 arguments for bbox (minx, miny, maxx, maxy)."
)
if return_gdf:
return gpd.GeoDataFrame(geometry=[geometry], columns=["geometry"], crs=crs)
else:
return geometry
convert_coordinates(x, y, source_crs, target_crs='epsg:4326')
¶
Convert coordinates from the source EPSG code to the target EPSG code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
float |
The x-coordinate of the point. |
required |
y |
float |
The y-coordinate of the point. |
required |
source_crs |
str |
The EPSG code of the source coordinate system. |
required |
target_crs |
str |
The EPSG code of the target coordinate system. Defaults to '4326' (EPSG code for WGS84). |
'epsg:4326' |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the converted longitude and latitude. |
Source code in leafmap/common.py
def convert_coordinates(x, y, source_crs, target_crs="epsg:4326"):
"""Convert coordinates from the source EPSG code to the target EPSG code.
Args:
x (float): The x-coordinate of the point.
y (float): The y-coordinate of the point.
source_crs (str): The EPSG code of the source coordinate system.
target_crs (str, optional): The EPSG code of the target coordinate system.
Defaults to '4326' (EPSG code for WGS84).
Returns:
tuple: A tuple containing the converted longitude and latitude.
"""
import pyproj
# Create the transformer
transformer = pyproj.Transformer.from_crs(source_crs, target_crs, always_xy=True)
# Perform the transformation
lon, lat = transformer.transform(x, y) # pylint: disable=E0633
# Return the converted coordinates
return lon, lat
convert_lidar(source, destination=None, point_format_id=None, file_version=None, **kwargs)
¶
Converts a Las from one point format to another Automatically upgrades the file version if source file version is not compatible with the new point_format_id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str | laspy.lasdatas.base.LasBase |
The source data to be converted. |
required |
destination |
str |
The destination file path. Defaults to None. |
None |
point_format_id |
int |
The new point format id (the default is None, which won't change the source format id). |
None |
file_version |
str |
The new file version. None by default which means that the file_version may be upgraded for compatibility with the new point_format. The file version will not be downgraded. |
None |
Returns:
Type | Description |
---|---|
aspy.lasdatas.base.LasBase |
The converted LasData object. |
Source code in leafmap/common.py
def convert_lidar(
source, destination=None, point_format_id=None, file_version=None, **kwargs
):
"""Converts a Las from one point format to another Automatically upgrades the file version if source file version
is not compatible with the new point_format_id
Args:
source (str | laspy.lasdatas.base.LasBase): The source data to be converted.
destination (str, optional): The destination file path. Defaults to None.
point_format_id (int, optional): The new point format id (the default is None, which won't change the source format id).
file_version (str, optional): The new file version. None by default which means that the file_version may be upgraded
for compatibility with the new point_format. The file version will not be downgraded.
Returns:
aspy.lasdatas.base.LasBase: The converted LasData object.
"""
try:
import laspy
except ImportError:
print(
"The laspy package is required for this function. Use `pip install laspy[lazrs,laszip]` to install it."
)
return
if isinstance(source, str):
source = read_lidar(source)
las = laspy.convert(
source, point_format_id=point_format_id, file_version=file_version
)
if destination is None:
return las
else:
destination = check_file_path(destination)
write_lidar(las, destination, **kwargs)
return destination
convert_to_gdf(data, geometry=None, lat=None, lon=None, crs='EPSG:4326', included=None, excluded=None, obj_to_str=False, open_args=None, **kwargs)
¶
Convert data to a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[pd.DataFrame, str] |
The input data, either as a DataFrame or a file path. |
required |
geometry |
Optional[str] |
The column name containing geometry data. Defaults to None. |
None |
lat |
Optional[str] |
The column name containing latitude data. Defaults to None. |
None |
lon |
Optional[str] |
The column name containing longitude data. Defaults to None. |
None |
crs |
str |
The coordinate reference system to use. Defaults to "EPSG:4326". |
'EPSG:4326' |
included |
Optional[List[str]] |
List of columns to include. Defaults to None. |
None |
excluded |
Optional[List[str]] |
List of columns to exclude. Defaults to None. |
None |
obj_to_str |
bool |
Whether to convert object dtype columns to string. Defaults to False. |
False |
open_args |
Optional[Dict[str, Any]] |
Additional arguments for file opening functions. Defaults to None. |
None |
**kwargs |
Any |
Additional keyword arguments for GeoDataFrame creation. |
{} |
Returns:
Type | Description |
---|---|
gpd.GeoDataFrame |
The converted GeoDataFrame. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the file format is unsupported or required columns are not provided. |
Source code in leafmap/common.py
def convert_to_gdf(
data: Union[pd.DataFrame, str],
geometry: Optional[str] = None,
lat: Optional[str] = None,
lon: Optional[str] = None,
crs: str = "EPSG:4326",
included: Optional[List[str]] = None,
excluded: Optional[List[str]] = None,
obj_to_str: bool = False,
open_args: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> "gpd.GeoDataFrame":
"""Convert data to a GeoDataFrame.
Args:
data (Union[pd.DataFrame, str]): The input data, either as a DataFrame or a file path.
geometry (Optional[str], optional): The column name containing geometry data. Defaults to None.
lat (Optional[str], optional): The column name containing latitude data. Defaults to None.
lon (Optional[str], optional): The column name containing longitude data. Defaults to None.
crs (str, optional): The coordinate reference system to use. Defaults to "EPSG:4326".
included (Optional[List[str]], optional): List of columns to include. Defaults to None.
excluded (Optional[List[str]], optional): List of columns to exclude. Defaults to None.
obj_to_str (bool, optional): Whether to convert object dtype columns to string. Defaults to False.
open_args (Optional[Dict[str, Any]], optional): Additional arguments for file opening functions. Defaults to None.
**kwargs (Any): Additional keyword arguments for GeoDataFrame creation.
Returns:
gpd.GeoDataFrame: The converted GeoDataFrame.
Raises:
ValueError: If the file format is unsupported or required columns are not provided.
"""
import geopandas as gpd
from shapely.geometry import Point, shape
if open_args is None:
open_args = {}
if not isinstance(data, pd.DataFrame):
if isinstance(data, str):
if data.endswith(".parquet"):
data = pd.read_parquet(data, **open_args)
elif data.endswith(".csv"):
data = pd.read_csv(data, **open_args)
elif data.endswith(".json"):
data = pd.read_json(data, **open_args)
elif data.endswith(".xlsx"):
data = pd.read_excel(data, **open_args)
else:
raise ValueError(
"Unsupported file format. Only Parquet, CSV, JSON, and Excel files are supported."
)
# If include_cols is specified, filter the DataFrame to include only those columns
if included:
if geometry:
included.append(geometry)
elif lat and lon:
included.append(lat)
included.append(lon)
data = data[included]
# Exclude specified columns if provided
if excluded:
data = data.drop(columns=excluded)
# Convert 'object' dtype columns to 'string' if obj_to_str is True
if obj_to_str:
data = data.astype(
{col: "string" for col in data.select_dtypes(include="object").columns}
)
# Handle the creation of geometry
if geometry:
def convert_geometry(x):
if isinstance(x, str):
try:
# Parse the string as JSON and then convert to a geometry
return shape(json.loads(x))
except (json.JSONDecodeError, TypeError) as e:
print(f"Error converting geometry: {e}")
return None
return x
data = data[data[geometry].notnull()]
data[geometry] = data[geometry].apply(convert_geometry)
elif lat and lon:
# Create a geometry column from latitude and longitude
data["geometry"] = data.apply(lambda row: Point(row[lon], row[lat]), axis=1)
geometry = "geometry"
else:
raise ValueError(
"Either geometry_col or both lat_col and lon_col must be provided."
)
# Convert the DataFrame to a GeoDataFrame
gdf = gpd.GeoDataFrame(data, geometry=geometry, **kwargs)
# Set CRS (assuming WGS84 by default, modify as needed)
gdf.set_crs(crs, inplace=True)
return gdf
coords_to_geojson(coords)
¶
Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of bbox coordinates representing [left, bottom, right, top]. |
required |
Returns:
Type | Description |
---|---|
dict |
A geojson FeatureCollection. |
Source code in leafmap/common.py
def coords_to_geojson(coords):
"""Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.
Args:
coords (list): A list of bbox coordinates representing [left, bottom, right, top].
Returns:
dict: A geojson FeatureCollection.
"""
features = []
for bbox in coords:
features.append(bbox_to_geojson(bbox))
return {"type": "FeatureCollection", "features": features}
coords_to_vector(coords, output=None, crs='EPSG:4326', **kwargs)
¶
Convert a list of coordinates to a GeoDataFrame or a vector file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of coordinates in the format of [(x1, y1), (x2, y2), ...]. |
required |
output |
str |
The path to the output vector file. Defaults to None. |
None |
crs |
str |
The CRS of the coordinates. Defaults to "EPSG:4326". |
'EPSG:4326' |
Returns:
Type | Description |
---|---|
gpd.GeoDataFraem |
A GeoDataFrame of the coordinates. |
Source code in leafmap/common.py
def coords_to_vector(coords, output=None, crs="EPSG:4326", **kwargs):
"""Convert a list of coordinates to a GeoDataFrame or a vector file.
Args:
coords (list): A list of coordinates in the format of [(x1, y1), (x2, y2), ...].
output (str, optional): The path to the output vector file. Defaults to None.
crs (str, optional): The CRS of the coordinates. Defaults to "EPSG:4326".
Returns:
gpd.GeoDataFraem: A GeoDataFrame of the coordinates.
"""
import geopandas as gpd
from shapely.geometry import Point
if not isinstance(coords, (list, tuple)):
raise TypeError("coords must be a list of coordinates")
if isinstance(coords[0], int) or isinstance(coords[0], float):
coords = [(coords[0], coords[1])]
# convert the points to a GeoDataFrame
geometry = [Point(xy) for xy in coords]
gdf = gpd.GeoDataFrame(geometry=geometry, crs="EPSG:4326")
gdf.to_crs(crs, inplace=True)
if output is not None:
gdf.to_file(output, **kwargs)
else:
return gdf
coords_to_xy(src_fp, coords, coord_crs='epsg:4326', request_payer='bucket-owner', env_args={}, open_args={}, **kwargs)
¶
Converts a list of coordinates to pixel coordinates, i.e., (col, row) coordinates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_fp |
str |
The source raster file path. |
required |
coords |
list |
A list of coordinates in the format of [[x1, y1], [x2, y2], ...] |
required |
coord_crs |
str |
The coordinate CRS of the input coordinates. Defaults to "epsg:4326". |
'epsg:4326' |
request_payer |
Specifies who pays for the download from S3. Can be "bucket-owner" or "requester". Defaults to "bucket-owner". |
'bucket-owner' |
|
env_args |
Additional keyword arguments to pass to rasterio.Env. |
{} |
|
open_args |
Additional keyword arguments to pass to rasterio.open. |
{} |
|
**kwargs |
Additional keyword arguments to pass to rasterio.transform.rowcol. |
{} |
Returns:
Type | Description |
---|---|
list |
A list of pixel coordinates in the format of [[x1, y1], [x2, y2], ...] |
Source code in leafmap/common.py
def coords_to_xy(
src_fp: str,
coords: list,
coord_crs: str = "epsg:4326",
request_payer="bucket-owner",
env_args={},
open_args={},
**kwargs,
) -> list:
"""Converts a list of coordinates to pixel coordinates, i.e., (col, row) coordinates.
Args:
src_fp: The source raster file path.
coords: A list of coordinates in the format of [[x1, y1], [x2, y2], ...]
coord_crs: The coordinate CRS of the input coordinates. Defaults to "epsg:4326".
request_payer: Specifies who pays for the download from S3.
Can be "bucket-owner" or "requester". Defaults to "bucket-owner".
env_args: Additional keyword arguments to pass to rasterio.Env.
open_args: Additional keyword arguments to pass to rasterio.open.
**kwargs: Additional keyword arguments to pass to rasterio.transform.rowcol.
Returns:
A list of pixel coordinates in the format of [[x1, y1], [x2, y2], ...]
"""
import numpy as np
import rasterio
if isinstance(coords, np.ndarray):
coords = coords.tolist()
if len(coords) == 4 and all([isinstance(c, (int, float)) for c in coords]):
coords = [[coords[0], coords[1]], [coords[2], coords[3]]]
xs, ys = zip(*coords)
with rasterio.Env(AWS_REQUEST_PAYER=request_payer, **env_args):
with rasterio.open(src_fp, **open_args) as src:
width = src.width
height = src.height
if coord_crs != src.crs:
xs, ys = transform_coords(
xs, ys, coord_crs, src.crs, **kwargs
) # pylint: disable=E0633
rows, cols = rasterio.transform.rowcol(src.transform, xs, ys, **kwargs)
result = [[col, row] for col, row in zip(cols, rows)]
result = [
[x, y] for x, y in result if x >= 0 and y >= 0 and x < width and y < height
]
if len(result) == 0:
print("No valid pixel coordinates found.")
elif len(result) < len(coords):
print("Some coordinates are out of the image boundary.")
return result
create_code_cell(code='', where='below')
¶
Creates a code cell in the IPython Notebook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
str |
Code to fill the new code cell with. Defaults to ''. |
'' |
where |
str |
Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'. |
'below' |
Source code in leafmap/common.py
def create_code_cell(code="", where="below"):
"""Creates a code cell in the IPython Notebook.
Args:
code (str, optional): Code to fill the new code cell with. Defaults to ''.
where (str, optional): Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'.
"""
import base64
# try:
# import pyperclip
# except ImportError:
# install_package("pyperclip")
# import pyperclip
from IPython.display import Javascript, display
# try:
# pyperclip.copy(str(code))
# except Exception as e:
# pass
encoded_code = (base64.b64encode(str.encode(code))).decode()
display(
Javascript(
"""
var code = IPython.notebook.insert_cell_{0}('code');
code.set_text(atob("{1}"));
""".format(
where, encoded_code
)
)
)
create_download_link(filename, title='Click here to download: ', basename=None)
¶
Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The file path to the file to download |
required |
title |
str |
str. Defaults to "Click here to download: ". |
'Click here to download: ' |
Returns:
Type | Description |
---|---|
str |
HTML download URL. |
Source code in leafmap/common.py
def create_download_link(filename, title="Click here to download: ", basename=None):
"""Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578
Args:
filename (str): The file path to the file to download
title (str, optional): str. Defaults to "Click here to download: ".
Returns:
str: HTML download URL.
"""
import base64
from IPython.display import HTML
data = open(filename, "rb").read()
b64 = base64.b64encode(data)
payload = b64.decode()
if basename is None:
basename = os.path.basename(filename)
html = '<a download="{filename}" href="data:text/csv;base64,{payload}" style="color:#0000FF;" target="_blank">{title}</a>'
html = html.format(payload=payload, title=title + f" {basename}", filename=basename)
return HTML(html)
create_legend(title='Legend', labels=None, colors=None, legend_dict=None, builtin_legend=None, opacity=1.0, position='bottomright', draggable=True, output=None, style={}, shape_type='rectangle')
¶
Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH
Parameters:
Name | Type | Description | Default |
---|---|---|---|
title |
str |
Title of the legend. Defaults to 'Legend'. Defaults to "Legend". |
'Legend' |
colors |
list |
A list of legend colors. Defaults to None. |
None |
labels |
list |
A list of legend labels. Defaults to None. |
None |
legend_dict |
dict |
A dictionary containing legend items as keys and color as values. If provided, legend_keys and legend_colors will be ignored. Defaults to None. |
None |
builtin_legend |
str |
Name of the builtin legend to add to the map. Defaults to None. |
None |
opacity |
float |
The opacity of the legend. Defaults to 1.0. |
1.0 |
position |
str |
The position of the legend, can be one of the following: "topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright". |
'bottomright' |
draggable |
bool |
If True, the legend can be dragged to a new position. Defaults to True. |
True |
output |
str |
The output file path (*.html) to save the legend. Defaults to None. |
None |
style |
Additional keyword arguments to style the legend, such as position, bottom, right, z-index, border, background-color, border-radius, padding, font-size, etc. The default style is: style = { 'position': 'fixed', 'z-index': '9999', 'border': '2px solid grey', 'background-color': 'rgba(255, 255, 255, 0.8)', 'border-radius': '5px', 'padding': '10px', 'font-size': '14px', 'bottom': '20px', 'right': '5px' } |
{} |
Returns:
Type | Description |
---|---|
str |
The HTML code of the legend. |
Source code in leafmap/common.py
def create_legend(
title="Legend",
labels=None,
colors=None,
legend_dict=None,
builtin_legend=None,
opacity=1.0,
position="bottomright",
draggable=True,
output=None,
style={},
shape_type="rectangle",
):
"""Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH
Args:
title (str, optional): Title of the legend. Defaults to 'Legend'. Defaults to "Legend".
colors (list, optional): A list of legend colors. Defaults to None.
labels (list, optional): A list of legend labels. Defaults to None.
legend_dict (dict, optional): A dictionary containing legend items as keys and color as values.
If provided, legend_keys and legend_colors will be ignored. Defaults to None.
builtin_legend (str, optional): Name of the builtin legend to add to the map. Defaults to None.
opacity (float, optional): The opacity of the legend. Defaults to 1.0.
position (str, optional): The position of the legend, can be one of the following:
"topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright".
draggable (bool, optional): If True, the legend can be dragged to a new position. Defaults to True.
output (str, optional): The output file path (*.html) to save the legend. Defaults to None.
style: Additional keyword arguments to style the legend, such as position, bottom, right, z-index,
border, background-color, border-radius, padding, font-size, etc. The default style is:
style = {
'position': 'fixed',
'z-index': '9999',
'border': '2px solid grey',
'background-color': 'rgba(255, 255, 255, 0.8)',
'border-radius': '5px',
'padding': '10px',
'font-size': '14px',
'bottom': '20px',
'right': '5px'
}
Returns:
str: The HTML code of the legend.
"""
import importlib.resources
from .legends import builtin_legends
pkg_dir = os.path.dirname(importlib.resources.files("leafmap") / "leafmap.py")
legend_template = os.path.join(pkg_dir, "data/template/legend_style.html")
if draggable:
legend_template = os.path.join(pkg_dir, "data/template/legend.txt")
if not os.path.exists(legend_template):
raise FileNotFoundError("The legend template does not exist.")
if labels is not None:
if not isinstance(labels, list):
print("The legend keys must be a list.")
return
else:
labels = ["One", "Two", "Three", "Four", "etc"]
if colors is not None:
if not isinstance(colors, list):
print("The legend colors must be a list.")
return
elif all(isinstance(item, tuple) for item in colors):
try:
colors = [rgb_to_hex(x) for x in colors]
except Exception as e:
print(e)
elif all((item.startswith("#") and len(item) == 7) for item in colors):
pass
elif all((len(item) == 6) for item in colors):
pass
else:
print("The legend colors must be a list of tuples.")
return
else:
colors = [
"#8DD3C7",
"#FFFFB3",
"#BEBADA",
"#FB8072",
"#80B1D3",
]
if len(labels) != len(colors):
print("The legend keys and values must be the same length.")
return
allowed_builtin_legends = builtin_legends.keys()
if builtin_legend is not None:
if builtin_legend not in allowed_builtin_legends:
print(
"The builtin legend must be one of the following: {}".format(
", ".join(allowed_builtin_legends)
)
)
return
else:
legend_dict = builtin_legends[builtin_legend]
labels = list(legend_dict.keys())
colors = list(legend_dict.values())
if legend_dict is not None:
if not isinstance(legend_dict, dict):
print("The legend dict must be a dictionary.")
return
else:
labels = list(legend_dict.keys())
colors = list(legend_dict.values())
if all(isinstance(item, tuple) for item in colors):
try:
colors = [rgb_to_hex(x) for x in colors]
except Exception as e:
print(e)
allowed_positions = [
"topleft",
"topright",
"bottomleft",
"bottomright",
]
if position not in allowed_positions:
raise ValueError(
"The position must be one of the following: {}".format(
", ".join(allowed_positions)
)
)
if position == "bottomright":
if "bottom" not in style:
style["bottom"] = "20px"
if "right" not in style:
style["right"] = "5px"
if "left" in style:
del style["left"]
if "top" in style:
del style["top"]
elif position == "bottomleft":
if "bottom" not in style:
style["bottom"] = "5px"
if "left" not in style:
style["left"] = "5px"
if "right" in style:
del style["right"]
if "top" in style:
del style["top"]
elif position == "topright":
if "top" not in style:
style["top"] = "5px"
if "right" not in style:
style["right"] = "5px"
if "left" in style:
del style["left"]
if "bottom" in style:
del style["bottom"]
elif position == "topleft":
if "top" not in style:
style["top"] = "5px"
if "left" not in style:
style["left"] = "5px"
if "right" in style:
del style["right"]
if "bottom" in style:
del style["bottom"]
if "position" not in style:
style["position"] = "fixed"
if "z-index" not in style:
style["z-index"] = "9999"
if "background-color" not in style:
style["background-color"] = "rgba(255, 255, 255, 0.8)"
if "padding" not in style:
style["padding"] = "10px"
if "border-radius" not in style:
style["border-radius"] = "5px"
if "font-size" not in style:
style["font-size"] = "14px"
content = []
with open(legend_template) as f:
lines = f.readlines()
if draggable:
for index, line in enumerate(lines):
if index < 36:
content.append(line)
elif index == 36:
line = lines[index].replace("Legend", title)
content.append(line)
elif index < 39:
content.append(line)
elif index == 39:
for i, color in enumerate(colors):
item = f" <li><span style='background:{check_color(color)};opacity:{opacity};'></span>{labels[i]}</li>\n"
content.append(item)
elif index > 41:
content.append(line)
content = content[3:-1]
else:
for index, line in enumerate(lines):
if index < 8:
content.append(line)
elif index == 8:
for key, value in style.items():
content.append(
" {}: {};\n".format(key.replace("_", "-"), value)
)
elif index < 17:
pass
elif index < 19:
content.append(line)
elif index == 19:
content.append(line.replace("Legend", title))
elif index < 22:
content.append(line)
elif index == 22:
for index, key in enumerate(labels):
color = colors[index]
if not color.startswith("#"):
color = "#" + color
item = " <li><span style='background:{};opacity:{};'></span>{}</li>\n".format(
color, opacity, key
)
content.append(item)
elif index < 33:
pass
else:
content.append(line)
legend_text = "".join(content)
if shape_type == "circle":
legend_text = legend_text.replace("width: 30px", "width: 16px")
legend_text = legend_text.replace(
"border: 1px solid #999;",
"border-radius: 50%;\n border: 1px solid #999;",
)
elif shape_type == "line":
legend_text = legend_text.replace("height: 16px", "height: 3px")
if output is not None:
with open(output, "w") as f:
f.write(legend_text)
else:
return legend_text
create_timelapse(images, out_gif, ext='.tif', bands=None, size=None, bbox=None, fps=5, loop=0, add_progress_bar=True, progress_bar_color='blue', progress_bar_height=5, add_text=False, text_xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='black', mp4=False, quiet=True, reduce_size=False, clean_up=True, **kwargs)
¶
Creates a timelapse gif from a list of images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
images |
list | str |
The list of images or input directory to create the gif from. For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...] |
required |
out_gif |
str |
File path to the output gif. |
required |
ext |
str |
The extension of the images. Defaults to '.tif'. |
'.tif' |
bands |
list |
The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None. |
None |
size |
tuple |
The size of the gif. For example, (500, 500). Defaults to None, using the original size. |
None |
bbox |
list |
The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box. |
None |
fps |
int |
The frames per second of the gif. Defaults to 5. |
5 |
loop |
int |
The number of times to loop the gif. Defaults to 0, looping forever. |
0 |
add_progress_bar |
bool |
Whether to add a progress bar to the gif. Defaults to True. |
True |
progress_bar_color |
str |
The color of the progress bar, can be color name or hex code. Defaults to 'blue'. |
'blue' |
progress_bar_height |
int |
The height of the progress bar. Defaults to 5. |
5 |
add_text |
bool |
Whether to add text to the gif. Defaults to False. |
False |
text_xy |
tuple |
The x, y coordinates of the text. For example, ('10%', '10%'). Defaults to None, using the bottom left corner. |
None |
text_sequence |
list |
The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...]. |
None |
font_type |
str |
The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'. |
'arial.ttf' |
font_size |
int |
The font size of the text. Defaults to 20. |
20 |
font_color |
str |
The color of the text, can be color name or hex code. Defaults to 'black'. |
'black' |
mp4 |
bool |
Whether to convert the gif to mp4. Defaults to False. |
False |
quiet |
bool |
Whether to print the progress. Defaults to False. |
True |
reduce_size |
bool |
Whether to reduce the size of the gif using ffmpeg. Defaults to False. |
False |
clean_up |
bool |
Whether to clean up the temporary files. Defaults to True. |
True |
Source code in leafmap/common.py
def create_timelapse(
images: Union[List, str],
out_gif: str,
ext: str = ".tif",
bands: Optional[List] = None,
size: Optional[Tuple] = None,
bbox: Optional[List] = None,
fps: int = 5,
loop: int = 0,
add_progress_bar: bool = True,
progress_bar_color: str = "blue",
progress_bar_height: int = 5,
add_text: bool = False,
text_xy: Optional[Tuple] = None,
text_sequence: Optional[List] = None,
font_type: str = "arial.ttf",
font_size: int = 20,
font_color: str = "black",
mp4: bool = False,
quiet: bool = True,
reduce_size: bool = False,
clean_up: bool = True,
**kwargs,
):
"""Creates a timelapse gif from a list of images.
Args:
images (list | str): The list of images or input directory to create the gif from.
For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...]
out_gif (str): File path to the output gif.
ext (str, optional): The extension of the images. Defaults to '.tif'.
bands (list, optional): The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None.
size (tuple, optional): The size of the gif. For example, (500, 500). Defaults to None, using the original size.
bbox (list, optional): The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box.
fps (int, optional): The frames per second of the gif. Defaults to 5.
loop (int, optional): The number of times to loop the gif. Defaults to 0, looping forever.
add_progress_bar (bool, optional): Whether to add a progress bar to the gif. Defaults to True.
progress_bar_color (str, optional): The color of the progress bar, can be color name or hex code. Defaults to 'blue'.
progress_bar_height (int, optional): The height of the progress bar. Defaults to 5.
add_text (bool, optional): Whether to add text to the gif. Defaults to False.
text_xy (tuple, optional): The x, y coordinates of the text. For example, ('10%', '10%').
Defaults to None, using the bottom left corner.
text_sequence (list, optional): The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...].
font_type (str, optional): The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'.
font_size (int, optional): The font size of the text. Defaults to 20.
font_color (str, optional): The color of the text, can be color name or hex code. Defaults to 'black'.
mp4 (bool, optional): Whether to convert the gif to mp4. Defaults to False.
quiet (bool, optional): Whether to print the progress. Defaults to False.
reduce_size (bool, optional): Whether to reduce the size of the gif using ffmpeg. Defaults to False.
clean_up (bool, optional): Whether to clean up the temporary files. Defaults to True.
"""
import glob
import tempfile
if isinstance(images, str):
if not images.endswith(ext):
images = os.path.join(images, f"*{ext}")
images = list(glob.glob(images))
if not isinstance(images, list):
raise ValueError("images must be a list or a path to the image directory.")
images.sort()
temp_dir = os.path.join(tempfile.gettempdir(), "timelapse")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
if bbox is not None:
clip_dir = os.path.join(tempfile.gettempdir(), "clip")
if not os.path.exists(clip_dir):
os.makedirs(clip_dir)
if len(bbox) == 4:
bbox = bbox_to_geojson(bbox)
else:
clip_dir = None
output = widgets.Output()
if "out_ext" in kwargs:
out_ext = kwargs["out_ext"].lower()
else:
out_ext = ".jpg"
try:
for index, image in enumerate(images):
if bbox is not None:
clip_file = os.path.join(clip_dir, os.path.basename(image))
with output:
clip_image(image, mask=bbox, output=clip_file, to_cog=False)
image = clip_file
if "add_prefix" in kwargs:
basename = (
str(f"{index + 1}").zfill(len(str(len(images))))
+ "-"
+ os.path.basename(image).replace(ext, out_ext)
)
else:
basename = os.path.basename(image).replace(ext, out_ext)
if not quiet:
print(f"Processing {index+1}/{len(images)}: {basename} ...")
# ignore GDAL warnings
with output:
numpy_to_image(
image, os.path.join(temp_dir, basename), bands=bands, size=size
)
make_gif(
temp_dir,
out_gif,
ext=out_ext,
fps=fps,
loop=loop,
mp4=mp4,
clean_up=clean_up,
)
if clip_dir is not None:
shutil.rmtree(clip_dir)
if add_text:
add_text_to_gif(
out_gif,
out_gif,
text_xy,
text_sequence,
font_type,
font_size,
font_color,
add_progress_bar,
progress_bar_color,
progress_bar_height,
1000 / fps,
loop,
)
elif add_progress_bar:
add_progress_bar_to_gif(
out_gif,
out_gif,
progress_bar_color,
progress_bar_height,
1000 / fps,
loop,
)
if reduce_size:
reduce_gif_size(out_gif)
except Exception as e:
print(e)
csv_points_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude')
¶
Converts a csv file containing points (latitude, longitude) into a shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv |
required |
out_shp |
str |
File path to the output shapefile. |
required |
latitude |
str |
Column name for the latitude column. Defaults to 'latitude'. |
'latitude' |
longitude |
str |
Column name for the longitude column. Defaults to 'longitude'. |
'longitude' |
Source code in leafmap/common.py
def csv_points_to_shp(in_csv, out_shp, latitude="latitude", longitude="longitude"):
"""Converts a csv file containing points (latitude, longitude) into a shapefile.
Args:
in_csv (str): File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv
out_shp (str): File path to the output shapefile.
latitude (str, optional): Column name for the latitude column. Defaults to 'latitude'.
longitude (str, optional): Column name for the longitude column. Defaults to 'longitude'.
"""
if in_csv.startswith("http") and in_csv.endswith(".csv"):
out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
out_name = os.path.basename(in_csv)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
download_from_url(in_csv, out_dir=out_dir)
in_csv = os.path.join(out_dir, out_name)
wbt = whitebox.WhiteboxTools()
in_csv = os.path.abspath(in_csv)
out_shp = os.path.abspath(out_shp)
if not os.path.exists(in_csv):
raise Exception("The provided csv file does not exist.")
with open(in_csv, encoding="utf-8") as csv_file:
reader = csv.DictReader(csv_file)
fields = reader.fieldnames
xfield = fields.index(longitude)
yfield = fields.index(latitude)
wbt.csv_points_to_vector(in_csv, out_shp, xfield=xfield, yfield=yfield, epsg=4326)
csv_to_df(in_csv, **kwargs)
¶
Converts a CSV file to pandas dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
File path to the input CSV. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
pandas DataFrame |
Source code in leafmap/common.py
def csv_to_df(in_csv, **kwargs):
"""Converts a CSV file to pandas dataframe.
Args:
in_csv (str): File path to the input CSV.
Returns:
pd.DataFrame: pandas DataFrame
"""
import pandas as pd
try:
return pd.read_csv(in_csv, **kwargs)
except Exception as e:
raise Exception(e)
csv_to_gdf(in_csv, latitude='latitude', longitude='longitude', geometry=None, crs='EPSG:4326', encoding='utf-8', **kwargs)
¶
Creates points for a CSV file and converts them to a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
geometry |
str |
The name of the column containing geometry. Defaults to None. |
None |
crs |
str |
The coordinate reference system. Defaults to "EPSG:4326". |
'EPSG:4326' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Returns:
Type | Description |
---|---|
object |
GeoDataFrame. |
Source code in leafmap/common.py
def csv_to_gdf(
in_csv,
latitude="latitude",
longitude="longitude",
geometry=None,
crs="EPSG:4326",
encoding="utf-8",
**kwargs,
):
"""Creates points for a CSV file and converts them to a GeoDataFrame.
Args:
in_csv (str): The file path to the input CSV file.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
geometry (str, optional): The name of the column containing geometry. Defaults to None.
crs (str, optional): The coordinate reference system. Defaults to "EPSG:4326".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
Returns:
object: GeoDataFrame.
"""
check_package(name="geopandas", URL="https://geopandas.org")
import geopandas as gpd
import pandas as pd
from shapely import wkt
out_dir = os.getcwd()
if geometry is None:
out_geojson = os.path.join(out_dir, random_string() + ".geojson")
csv_to_geojson(in_csv, out_geojson, latitude, longitude, encoding=encoding)
gdf = gpd.read_file(out_geojson)
os.remove(out_geojson)
else:
df = pd.read_csv(in_csv, encoding=encoding)
df["geometry"] = df[geometry].apply(wkt.loads)
gdf = gpd.GeoDataFrame(df, geometry="geometry", crs=crs, **kwargs)
return gdf
csv_to_geojson(in_csv, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Creates points for a CSV file and exports data as a GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
out_geojson |
str |
The file path to the exported GeoJSON. Default to None. |
None |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Source code in leafmap/common.py
def csv_to_geojson(
in_csv,
out_geojson=None,
latitude="latitude",
longitude="longitude",
encoding="utf-8",
):
"""Creates points for a CSV file and exports data as a GeoJSON.
Args:
in_csv (str): The file path to the input CSV file.
out_geojson (str): The file path to the exported GeoJSON. Default to None.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
"""
import pandas as pd
in_csv = github_raw_url(in_csv)
if out_geojson is not None:
out_geojson = check_file_path(out_geojson)
df = pd.read_csv(in_csv)
geojson = df_to_geojson(
df, latitude=latitude, longitude=longitude, encoding=encoding
)
if out_geojson is None:
return geojson
else:
with open(out_geojson, "w", encoding=encoding) as f:
f.write(json.dumps(geojson))
csv_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Converts a csv file with latlon info to a point shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The input csv file containing longitude and latitude columns. |
required |
out_shp |
str |
The file path to the output shapefile. |
required |
latitude |
str |
The column name of the latitude column. Defaults to 'latitude'. |
'latitude' |
longitude |
str |
The column name of the longitude column. Defaults to 'longitude'. |
'longitude' |
Source code in leafmap/common.py
def csv_to_shp(
in_csv, out_shp, latitude="latitude", longitude="longitude", encoding="utf-8"
):
"""Converts a csv file with latlon info to a point shapefile.
Args:
in_csv (str): The input csv file containing longitude and latitude columns.
out_shp (str): The file path to the output shapefile.
latitude (str, optional): The column name of the latitude column. Defaults to 'latitude'.
longitude (str, optional): The column name of the longitude column. Defaults to 'longitude'.
"""
import shapefile as shp
if in_csv.startswith("http") and in_csv.endswith(".csv"):
in_csv = github_raw_url(in_csv)
in_csv = download_file(in_csv, quiet=True, overwrite=True)
try:
points = shp.Writer(out_shp, shapeType=shp.POINT)
with open(in_csv, encoding=encoding) as csvfile:
csvreader = csv.DictReader(csvfile)
header = csvreader.fieldnames
[points.field(field) for field in header]
for row in csvreader:
points.point((float(row[longitude])), (float(row[latitude])))
points.record(*tuple([row[f] for f in header]))
out_prj = out_shp.replace(".shp", ".prj")
with open(out_prj, "w") as f:
prj_str = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137,298.257223563]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]] '
f.write(prj_str)
except Exception as e:
raise Exception(e)
csv_to_vector(in_csv, output, latitude='latitude', longitude='longitude', geometry=None, crs='EPSG:4326', encoding='utf-8', **kwargs)
¶
Creates points for a CSV file and converts them to a vector dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
output |
str |
The file path to the output vector dataset. |
required |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
geometry |
str |
The name of the column containing geometry. Defaults to None. |
None |
crs |
str |
The coordinate reference system. Defaults to "EPSG:4326". |
'EPSG:4326' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
**kwargs |
Additional keyword arguments to pass to gdf.to_file(). |
{} |
Source code in leafmap/common.py
def csv_to_vector(
in_csv,
output,
latitude="latitude",
longitude="longitude",
geometry=None,
crs="EPSG:4326",
encoding="utf-8",
**kwargs,
):
"""Creates points for a CSV file and converts them to a vector dataset.
Args:
in_csv (str): The file path to the input CSV file.
output (str): The file path to the output vector dataset.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
geometry (str, optional): The name of the column containing geometry. Defaults to None.
crs (str, optional): The coordinate reference system. Defaults to "EPSG:4326".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
**kwargs: Additional keyword arguments to pass to gdf.to_file().
"""
gdf = csv_to_gdf(in_csv, latitude, longitude, geometry, crs, encoding)
gdf.to_file(output, **kwargs)
d2s_tile(url, titiler_endpoint=None, **kwargs)
¶
Generate a D2S tile URL with optional API key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The base URL for the tile. |
required |
titiler_endpoint |
str |
The endpoint for the titiler service. Defaults to "https://tt.d2s.org". |
None |
**kwargs |
Any |
Additional keyword arguments to pass to the cog_stats function. |
{} |
Returns:
Type | Description |
---|---|
str |
The modified URL with the API key if required, otherwise the original URL. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the API key is required but not set in the environment variables. |
Source code in leafmap/common.py
def d2s_tile(url: str, titiler_endpoint: str = None, **kwargs: Any) -> str:
"""Generate a D2S tile URL with optional API key.
Args:
url (str): The base URL for the tile.
titiler_endpoint (str, optional): The endpoint for the titiler service.
Defaults to "https://tt.d2s.org".
**kwargs (Any): Additional keyword arguments to pass to the cog_stats function.
Returns:
str: The modified URL with the API key if required, otherwise the original URL.
Raises:
ValueError: If the API key is required but not set in the environment variables.
"""
if titiler_endpoint is None:
titiler_endpoint = os.environ.get("TITILER_ENDPOINT", "https://titiler.xyz")
stats = cog_stats(url, titiler_endpoint=titiler_endpoint, **kwargs)
if "detail" in stats:
api_key = get_api_key("D2S_API_KEY")
if api_key is None:
raise ValueError("Please set the D2S_API_KEY environment variable.")
else:
return f"{url}?API_KEY={api_key}"
else:
return url
delete_shp(in_shp, verbose=False)
¶
Deletes a shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_shp |
str |
The input shapefile to delete. |
required |
verbose |
bool |
Whether to print out descriptive text. Defaults to True. |
False |
Source code in leafmap/common.py
def delete_shp(in_shp, verbose=False):
"""Deletes a shapefile.
Args:
in_shp (str): The input shapefile to delete.
verbose (bool, optional): Whether to print out descriptive text. Defaults to True.
"""
from pathlib import Path
in_shp = os.path.abspath(in_shp)
in_dir = os.path.dirname(in_shp)
basename = os.path.basename(in_shp).replace(".shp", "")
files = Path(in_dir).rglob(basename + ".*")
for file in files:
filepath = os.path.join(in_dir, str(file))
os.remove(filepath)
if verbose:
print(f"Deleted {filepath}")
df_to_gdf(df, geometry='geometry', src_crs='EPSG:4326', dst_crs=None, **kwargs)
¶
Converts a pandas DataFrame to a GeoPandas GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pandas.DataFrame |
The pandas DataFrame to convert. |
required |
geometry |
str |
The name of the geometry column in the DataFrame. |
'geometry' |
src_crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326". |
'EPSG:4326' |
dst_crs |
str |
The target CRS of the GeoDataFrame. Default is None |
None |
Returns:
Type | Description |
---|---|
geopandas.GeoDataFrame |
The converted GeoPandas GeoDataFrame. |
Source code in leafmap/common.py
def df_to_gdf(df, geometry="geometry", src_crs="EPSG:4326", dst_crs=None, **kwargs):
"""
Converts a pandas DataFrame to a GeoPandas GeoDataFrame.
Args:
df (pandas.DataFrame): The pandas DataFrame to convert.
geometry (str): The name of the geometry column in the DataFrame.
src_crs (str): The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326".
dst_crs (str): The target CRS of the GeoDataFrame. Default is None
Returns:
geopandas.GeoDataFrame: The converted GeoPandas GeoDataFrame.
"""
import geopandas as gpd
from shapely import wkt
# Convert the geometry column to Shapely geometry objects
df[geometry] = df[geometry].apply(lambda x: wkt.loads(x))
# Convert the pandas DataFrame to a GeoPandas GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=src_crs, **kwargs)
if dst_crs is not None and dst_crs != src_crs:
gdf = gdf.to_crs(dst_crs)
return gdf
df_to_geojson(df, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Creates points for a Pandas DataFrame and exports data as a GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pandas.DataFrame |
The input Pandas DataFrame. |
required |
out_geojson |
str |
The file path to the exported GeoJSON. Default to None. |
None |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Source code in leafmap/common.py
def df_to_geojson(
df,
out_geojson=None,
latitude="latitude",
longitude="longitude",
encoding="utf-8",
):
"""Creates points for a Pandas DataFrame and exports data as a GeoJSON.
Args:
df (pandas.DataFrame): The input Pandas DataFrame.
out_geojson (str): The file path to the exported GeoJSON. Default to None.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
"""
import json
from geojson import Feature, FeatureCollection, Point
if out_geojson is not None:
out_dir = os.path.dirname(os.path.abspath(out_geojson))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
features = df.apply(
lambda row: Feature(
geometry=Point((float(row[longitude]), float(row[latitude]))),
properties=dict(row),
),
axis=1,
).tolist()
geojson = FeatureCollection(features=features)
if out_geojson is None:
return geojson
else:
with open(out_geojson, "w", encoding=encoding) as f:
f.write(json.dumps(geojson))
dict_to_json(data, file_path, indent=4)
¶
Writes a dictionary to a JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
dict |
A dictionary. |
required |
file_path |
str |
The path to the JSON file. |
required |
indent |
int |
The indentation of the JSON file. Defaults to 4. |
4 |
Exceptions:
Type | Description |
---|---|
TypeError |
If the input data is not a dictionary. |
Source code in leafmap/common.py
def dict_to_json(data, file_path, indent=4):
"""Writes a dictionary to a JSON file.
Args:
data (dict): A dictionary.
file_path (str): The path to the JSON file.
indent (int, optional): The indentation of the JSON file. Defaults to 4.
Raises:
TypeError: If the input data is not a dictionary.
"""
import json
file_path = check_file_path(file_path)
if isinstance(data, dict):
with open(file_path, "w") as f:
json.dump(data, f, indent=indent)
else:
raise TypeError("The provided data must be a dictionary.")
disjoint(input_features, selecting_features, output=None, **kwargs)
¶
Find the features in the input_features that do not intersect the selecting_features.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_features |
str | GeoDataFrame |
The input features to select from. Can be a file path or a GeoDataFrame. |
required |
selecting_features |
str | GeoDataFrame |
The features in the Input Features parameter will be selected based on their relationship to the features from this layer. |
required |
output |
are |
The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None. |
None |
Returns:
Type | Description |
---|---|
str | GeoDataFrame |
The path to the output file or the GeoDataFrame. |
Source code in leafmap/common.py
def disjoint(input_features, selecting_features, output=None, **kwargs):
"""Find the features in the input_features that do not intersect the selecting_features.
Args:
input_features (str | GeoDataFrame): The input features to select from. Can be a file path or a GeoDataFrame.
selecting_features (str | GeoDataFrame): The features in the Input Features parameter will be selected based
on their relationship to the features from this layer.
output (are, optional): The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None.
Returns:
str | GeoDataFrame: The path to the output file or the GeoDataFrame.
"""
import geopandas as gpd
if isinstance(input_features, str):
input_features = gpd.read_file(input_features, **kwargs)
elif not isinstance(input_features, gpd.GeoDataFrame):
raise TypeError("input_features must be a file path or a GeoDataFrame")
if isinstance(selecting_features, str):
selecting_features = gpd.read_file(selecting_features, **kwargs)
elif not isinstance(selecting_features, gpd.GeoDataFrame):
raise TypeError("selecting_features must be a file path or a GeoDataFrame")
selecting_features = selecting_features.to_crs(input_features.crs)
input_features["savedindex"] = input_features.index
intersecting = selecting_features.sjoin(input_features, how="inner")["savedindex"]
results = input_features[~input_features.savedindex.isin(intersecting)].drop(
columns=["savedindex"], axis=1
)
if output is not None:
results.to_file(output, **kwargs)
else:
return results
display_html(html, width='100%', height=500)
¶
Displays an HTML file or HTML string in a Jupyter Notebook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html |
Union[str, bytes] |
Path to an HTML file or an HTML string. |
required |
width |
str |
Width of the displayed iframe. Default is '100%'. |
'100%' |
height |
int |
Height of the displayed iframe. Default is 500. |
500 |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def display_html(
html: Union[str, bytes], width: str = "100%", height: int = 500
) -> None:
"""
Displays an HTML file or HTML string in a Jupyter Notebook.
Args:
html (Union[str, bytes]): Path to an HTML file or an HTML string.
width (str, optional): Width of the displayed iframe. Default is '100%'.
height (int, optional): Height of the displayed iframe. Default is 500.
Returns:
None
"""
from IPython.display import IFrame, display
if isinstance(html, str) and html.startswith("<"):
# If the input is an HTML string
html_content = html
elif isinstance(html, str):
# If the input is a file path
with open(html, "r") as file:
html_content = file.read()
elif isinstance(html, bytes):
# If the input is a byte string
html_content = html.decode("utf-8")
else:
raise ValueError("Invalid input type. Expected a file path or an HTML string.")
display(IFrame(src=html_content, width=width, height=height))
download_file(url=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)
¶
Download a file from URL, including Google Drive shared URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Google Drive URL is also supported. Defaults to None. |
None |
output |
str |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
str |
The output file path. |
Source code in leafmap/common.py
def download_file(
url=None,
output=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
):
"""Download a file from URL, including Google Drive shared URL.
Args:
url (str, optional): Google Drive URL is also supported. Defaults to None.
output (str, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string,
in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
Returns:
str: The output file path.
"""
try:
import gdown
except ImportError:
print(
"The gdown package is required for this function. Use `pip install gdown` to install it."
)
return
if output is None:
if isinstance(url, str) and url.startswith("http"):
output = os.path.basename(url)
out_dir = os.path.abspath(os.path.dirname(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if isinstance(url, str):
if os.path.exists(os.path.abspath(output)) and (not overwrite):
print(
f"{output} already exists. Skip downloading. Set overwrite=True to overwrite."
)
return os.path.abspath(output)
else:
url = github_raw_url(url)
if "https://drive.google.com/file/d/" in url:
fuzzy = True
output = gdown.download(
url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume
)
if unzip:
if output.endswith(".zip"):
with zipfile.ZipFile(output, "r") as zip_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
zip_ref.extractall(output)
else:
zip_ref.extractall(os.path.dirname(output))
elif output.endswith(".tar.gz") or output.endswith(".tar"):
if output.endswith(".tar.gz"):
mode = "r:gz"
else:
mode = "r"
with tarfile.open(output, mode) as tar_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
tar_ref.extractall(output)
else:
tar_ref.extractall(os.path.dirname(output))
return os.path.abspath(output)
download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs)
async
¶
Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the file. |
required |
output |
str |
The local path to save the file. Defaults to None. |
None |
binary |
bool |
Whether the file is binary. Defaults to False. |
False |
overwrite |
bool |
Whether to overwrite the file if it exists. Defaults to False. |
False |
Source code in leafmap/common.py
async def download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs):
"""Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).
Args:
url (str): The URL of the file.
output (str, optional): The local path to save the file. Defaults to None.
binary (bool, optional): Whether the file is binary. Defaults to False.
overwrite (bool, optional): Whether to overwrite the file if it exists. Defaults to False.
"""
import sys
import pyodide # pylint: disable=E0401
if "pyodide" not in sys.modules:
raise ValueError("Pyodide is not available.")
if output is None:
output = os.path.basename(url)
output = os.path.abspath(output)
ext = os.path.splitext(output)[1]
if ext in [".png", "jpg", ".tif", ".tiff", "zip", "gz", "bz2", "xz"]:
binary = True
if os.path.exists(output) and not overwrite:
print(f"{output} already exists, skip downloading.")
return output
if binary:
response = await pyodide.http.pyfetch(url)
with open(output, "wb") as f:
f.write(await response.bytes())
else:
obj = pyodide.http.open_url(url)
with open(output, "w") as fd:
shutil.copyfileobj(obj, fd)
return output
download_files(urls, out_dir=None, filenames=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False, multi_part=False)
¶
Download files from URLs, including Google Drive shared URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
urls |
list |
The list of urls to download. Google Drive URL is also supported. |
required |
out_dir |
str |
The output directory. Defaults to None. |
None |
filenames |
list |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
multi_part |
bool |
If the file is a multi-part file. Defaults to False. |
False |
Examples:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"] base_url = "https://github.com/opengeos/datasets/releases/download/models/" urls = [base_url + f for f in files] leafmap.download_files(urls, out_dir="models", multi_part=True)
Source code in leafmap/common.py
def download_files(
urls,
out_dir=None,
filenames=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
multi_part=False,
):
"""Download files from URLs, including Google Drive shared URL.
Args:
urls (list): The list of urls to download. Google Drive URL is also supported.
out_dir (str, optional): The output directory. Defaults to None.
filenames (list, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
multi_part (bool, optional): If the file is a multi-part file. Defaults to False.
Examples:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"]
base_url = "https://github.com/opengeos/datasets/releases/download/models/"
urls = [base_url + f for f in files]
leafmap.download_files(urls, out_dir="models", multi_part=True)
"""
if out_dir is None:
out_dir = os.getcwd()
if filenames is None:
filenames = [None] * len(urls)
filepaths = []
for url, output in zip(urls, filenames):
if output is None:
filename = os.path.join(out_dir, os.path.basename(url))
else:
filename = os.path.join(out_dir, output)
filepaths.append(filename)
if multi_part:
unzip = False
download_file(
url,
filename,
quiet,
proxy,
speed,
use_cookies,
verify,
id,
fuzzy,
resume,
unzip,
overwrite,
subfolder,
)
if multi_part:
archive = os.path.splitext(filename)[0] + ".zip"
out_dir = os.path.dirname(filename)
extract_archive(archive, out_dir)
for file in filepaths:
os.remove(file)
download_folder(url=None, id=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, remaining_ok=False)
¶
Downloads the entire folder from URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None. |
None |
id |
str |
Google Drive's folder ID. Defaults to None. |
None |
output |
str |
String containing the path of the output folder. Defaults to current working directory. |
None |
quiet |
bool |
Suppress terminal output. Defaults to False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
required |
Returns:
Type | Description |
---|---|
list |
List of files downloaded, or None if failed. |
Source code in leafmap/common.py
def download_folder(
url=None,
id=None,
output=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
remaining_ok=False,
):
"""Downloads the entire folder from URL.
Args:
url (str, optional): URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None.
id (str, optional): Google Drive's folder ID. Defaults to None.
output (str, optional): String containing the path of the output folder. Defaults to current working directory.
quiet (bool, optional): Suppress terminal output. Defaults to False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
Returns:
list: List of files downloaded, or None if failed.
"""
try:
import gdown
except ImportError:
print(
"The gdown package is required for this function. Use `pip install gdown` to install it."
)
return
files = gdown.download_folder(
url, id, output, quiet, proxy, speed, use_cookies, remaining_ok
)
return files
download_from_url(url, out_file_name=None, out_dir='.', unzip=True, verbose=True)
¶
Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The HTTP URL to download. |
required |
out_file_name |
str |
The output file name to use. Defaults to None. |
None |
out_dir |
str |
The output directory to use. Defaults to '.'. |
'.' |
unzip |
bool |
Whether to unzip the downloaded file if it is a zip file. Defaults to True. |
True |
verbose |
bool |
Whether to display or not the output of the function |
True |
Source code in leafmap/common.py
def download_from_url(
url: str,
out_file_name: Optional[str] = None,
out_dir: Optional[str] = ".",
unzip: Optional[bool] = True,
verbose: Optional[bool] = True,
):
"""Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)
Args:
url (str): The HTTP URL to download.
out_file_name (str, optional): The output file name to use. Defaults to None.
out_dir (str, optional): The output directory to use. Defaults to '.'.
unzip (bool, optional): Whether to unzip the downloaded file if it is a zip file. Defaults to True.
verbose (bool, optional): Whether to display or not the output of the function
"""
in_file_name = os.path.basename(url)
out_dir = check_dir(out_dir)
if out_file_name is None:
out_file_name = in_file_name
out_file_path = os.path.join(out_dir, out_file_name)
if verbose:
print("Downloading {} ...".format(url))
try:
urllib.request.urlretrieve(url, out_file_path)
except Exception:
raise Exception("The URL is invalid. Please double check the URL.")
final_path = out_file_path
if unzip:
# if it is a zip file
if ".zip" in out_file_name:
if verbose:
print("Unzipping {} ...".format(out_file_name))
with zipfile.ZipFile(out_file_path, "r") as zip_ref:
zip_ref.extractall(out_dir)
final_path = os.path.join(
os.path.abspath(out_dir), out_file_name.replace(".zip", "")
)
# if it is a tar file
if ".tar" in out_file_name:
if verbose:
print("Unzipping {} ...".format(out_file_name))
with tarfile.open(out_file_path, "r") as tar_ref:
with tarfile.open(out_file_path, "r") as tar_ref:
def is_within_directory(directory, target):
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory
def safe_extract(
tar, path=".", members=None, *, numeric_owner=False
):
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise Exception("Attempted Path Traversal in Tar File")
tar.extractall(path, members, numeric_owner=numeric_owner)
safe_extract(tar_ref, out_dir)
final_path = os.path.join(
os.path.abspath(out_dir), out_file_name.replace(".tart", "")
)
if verbose:
print("Data downloaded to: {}".format(final_path))
download_google_buildings(location, out_dir=None, merge_output=None, head=None, keep_geojson=False, overwrite=False, quiet=False, **kwargs)
¶
Download Google Open Building dataset for a specific location. Check the dataset links from https://sites.research.google/open-buildings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
location |
str |
The location name for which to download the dataset. |
required |
out_dir |
Optional[str] |
The output directory to save the downloaded files. If not provided, the current working directory is used. |
None |
merge_output |
Optional[str] |
Optional. The output file path for merging the downloaded files into a single GeoDataFrame. |
None |
head |
Optional[int] |
Optional. The number of files to download. If not provided, all files will be downloaded. |
None |
keep_geojson |
bool |
Optional. If True, the GeoJSON files will be kept after converting them to CSV files. |
False |
overwrite |
bool |
Optional. If True, overwrite the existing files. |
False |
quiet |
bool |
Optional. If True, suppresses the download progress messages. |
False |
**kwargs |
Additional keyword arguments to be passed to the |
{} |
Returns:
Type | Description |
---|---|
List[str] |
A list of file paths of the downloaded files. |
Source code in leafmap/common.py
def download_google_buildings(
location: str,
out_dir: Optional[str] = None,
merge_output: Optional[str] = None,
head: Optional[int] = None,
keep_geojson: bool = False,
overwrite: bool = False,
quiet: bool = False,
**kwargs,
) -> List[str]:
"""
Download Google Open Building dataset for a specific location. Check the dataset links from
https://sites.research.google/open-buildings.
Args:
location: The location name for which to download the dataset.
out_dir: The output directory to save the downloaded files. If not provided, the current working directory is used.
merge_output: Optional. The output file path for merging the downloaded files into a single GeoDataFrame.
head: Optional. The number of files to download. If not provided, all files will be downloaded.
keep_geojson: Optional. If True, the GeoJSON files will be kept after converting them to CSV files.
overwrite: Optional. If True, overwrite the existing files.
quiet: Optional. If True, suppresses the download progress messages.
**kwargs: Additional keyword arguments to be passed to the `gpd.to_file` function.
Returns:
A list of file paths of the downloaded files.
"""
import pandas as pd
import geopandas as gpd
from shapely import wkt
building_url = "https://sites.research.google/open-buildings/tiles.geojson"
country_url = (
"https://naciscdn.org/naturalearth/110m/cultural/ne_110m_admin_0_countries.zip"
)
if out_dir is None:
out_dir = os.getcwd()
if not os.path.exists(out_dir):
os.makedirs(out_dir)
building_gdf = gpd.read_file(building_url)
country_gdf = gpd.read_file(country_url)
country = country_gdf[country_gdf["NAME"] == location]
if len(country) == 0:
country = country_gdf[country_gdf["NAME_LONG"] == location]
if len(country) == 0:
raise ValueError(f"Could not find {location} in the Natural Earth dataset.")
gdf = building_gdf[building_gdf.intersects(country.geometry.iloc[0])]
gdf.sort_values(by="size_mb", inplace=True)
print(f"Found {len(gdf)} links for {location}.")
if head is not None:
gdf = gdf.head(head)
if len(gdf) > 0:
links = gdf["tile_url"].tolist()
download_files(links, out_dir=out_dir, quiet=quiet, **kwargs)
filenames = [os.path.join(out_dir, os.path.basename(link)) for link in links]
gdfs = []
for filename in filenames:
# Read the CSV file into a pandas DataFrame
df = pd.read_csv(filename)
# Create a geometry column from the "geometry" column in the DataFrame
df["geometry"] = df["geometry"].apply(wkt.loads)
# Convert the pandas DataFrame to a GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry="geometry")
gdf.crs = "EPSG:4326"
if keep_geojson:
gdf.to_file(
filename.replace(".csv.gz", ".geojson"), driver="GeoJSON", **kwargs
)
gdfs.append(gdf)
if merge_output:
if os.path.exists(merge_output) and not overwrite:
print(f"File {merge_output} already exists, skip merging...")
else:
if not quiet:
print("Merging GeoDataFrames ...")
gdf = gpd.GeoDataFrame(
pd.concat(gdfs, ignore_index=True), crs="EPSG:4326"
)
gdf.to_file(merge_output, **kwargs)
else:
print(f"No buildings found for {location}.")
download_ms_buildings(location, out_dir=None, merge_output=None, head=None, quiet=False, **kwargs)
¶
Download Microsoft Buildings dataset for a specific location. Check the dataset links from https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
location |
str |
The location name for which to download the dataset. |
required |
out_dir |
Optional[str] |
The output directory to save the downloaded files. If not provided, the current working directory is used. |
None |
merge_output |
Optional[str] |
Optional. The output file path for merging the downloaded files into a single GeoDataFrame. |
None |
head |
Optional. The number of files to download. If not provided, all files will be downloaded. |
None |
|
quiet |
bool |
Optional. If True, suppresses the download progress messages. |
False |
**kwargs |
Additional keyword arguments to be passed to the |
{} |
Returns:
Type | Description |
---|---|
List[str] |
A list of file paths of the downloaded files. |
Source code in leafmap/common.py
def download_ms_buildings(
location: str,
out_dir: Optional[str] = None,
merge_output: Optional[str] = None,
head=None,
quiet: bool = False,
**kwargs,
) -> List[str]:
"""
Download Microsoft Buildings dataset for a specific location. Check the dataset links from
https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv.
Args:
location: The location name for which to download the dataset.
out_dir: The output directory to save the downloaded files. If not provided, the current working directory is used.
merge_output: Optional. The output file path for merging the downloaded files into a single GeoDataFrame.
head: Optional. The number of files to download. If not provided, all files will be downloaded.
quiet: Optional. If True, suppresses the download progress messages.
**kwargs: Additional keyword arguments to be passed to the `gpd.to_file` function.
Returns:
A list of file paths of the downloaded files.
"""
import pandas as pd
import geopandas as gpd
from shapely.geometry import shape
if out_dir is None:
out_dir = os.getcwd()
if not os.path.exists(out_dir):
os.makedirs(out_dir)
dataset_links = pd.read_csv(
"https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv"
)
country_links = dataset_links[dataset_links.Location == location]
if not quiet:
print(f"Found {len(country_links)} links for {location}")
if head is not None:
country_links = country_links.head(head)
filenames = []
i = 1
for _, row in country_links.iterrows():
if not quiet:
print(f"Downloading {i} of {len(country_links)}: {row.QuadKey}.geojson")
i += 1
filename = os.path.join(out_dir, f"{row.QuadKey}.geojson")
filenames.append(filename)
if os.path.exists(filename):
print(f"File {filename} already exists, skipping...")
continue
df = pd.read_json(row.Url, lines=True)
df["geometry"] = df["geometry"].apply(shape)
gdf = gpd.GeoDataFrame(df, crs=4326)
gdf.to_file(filename, driver="GeoJSON", **kwargs)
if merge_output is not None:
if os.path.exists(merge_output):
print(f"File {merge_output} already exists, skip merging...")
return filenames
merge_vector(filenames, merge_output, quiet=quiet)
return filenames
download_ned(region, out_dir=None, return_url=False, download_args={}, geopandas_args={}, query={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy]. |
required |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
return_url |
bool |
Whether to return the download URLs of the files. Defaults to False. |
False |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
query |
dict |
A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. See https://apps.nationalmap.gov/tnmaccess/#/product for more information. |
{} |
Returns:
Type | Description |
---|---|
list |
A list of the download URLs of the files if return_url is True. |
Source code in leafmap/common.py
def download_ned(
region,
out_dir=None,
return_url=False,
download_args={},
geopandas_args={},
query={},
) -> Union[None, List]:
"""Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list): A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy].
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
query (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
See https://apps.nationalmap.gov/tnmaccess/#/product for more information.
Returns:
list: A list of the download URLs of the files if return_url is True.
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if not query:
query = {
"datasets": "National Elevation Dataset (NED) 1/3 arc-second",
"prodFormats": "GeoTIFF",
}
TNM = The_national_map_USGS()
if return_url:
return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=query)
return TNM.download_tiles(
region=region,
out_dir=out_dir,
download_args=download_args,
geopandas_args=geopandas_args,
API=query,
)
download_nlcd(years, out_dir=None, quiet=False, **kwargs)
¶
Downloads NLCD (National Land Cover Database) files for the specified years.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
years |
List[int] |
A list of years for which to download the NLCD files. |
required |
out_dir |
str |
The directory where the downloaded files will be saved. Defaults to the current working directory. |
None |
quiet |
bool |
If True, suppresses download progress messages. Defaults to False. |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the download_file function. |
{} |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def download_nlcd(
years: List[int], out_dir: str = None, quiet: bool = False, **kwargs: Any
) -> None:
"""
Downloads NLCD (National Land Cover Database) files for the specified years.
Args:
years (List[int]): A list of years for which to download the NLCD files.
out_dir (str, optional): The directory where the downloaded files will be saved.
Defaults to the current working directory.
quiet (bool, optional): If True, suppresses download progress messages. Defaults to False.
**kwargs (Any): Additional keyword arguments to pass to the download_file function.
Returns:
None
"""
allow_years = list(range(1985, 2024, 1))
url = "https://s3-us-west-2.amazonaws.com/mrlc/Annual_NLCD_LndCov_{}_CU_C1V0.tif"
if out_dir is None:
out_dir = os.getcwd()
elif not os.path.exists(out_dir):
os.makedirs(out_dir)
for year in years:
if year not in allow_years:
print(f"Year {year} is not available. Skipping...")
continue
year_url = url.format(year)
basename = os.path.basename(year_url)
filepath = os.path.join(out_dir, basename)
download_file(year_url, filepath, quiet=quiet, **kwargs)
download_tnm(region=None, out_dir=None, return_url=False, download_args={}, geopandas_args={}, API={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
return_url |
bool |
Whether to return the download URLs of the files. Defaults to False. |
False |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. Exposes most of the documented API. Defaults to {} |
{} |
Returns:
Type | Description |
---|---|
list |
A list of the download URLs of the files if return_url is True. |
Source code in leafmap/common.py
def download_tnm(
region=None,
out_dir=None,
return_url=False,
download_args={},
geopandas_args={},
API={},
) -> Union[None, List]:
"""Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
Exposes most of the documented API. Defaults to {}
Returns:
list: A list of the download URLs of the files if return_url is True.
"""
if os.environ.get("USE_MKDOCS") is not None:
return
TNM = The_national_map_USGS()
if return_url:
return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=API)
return TNM.download_tiles(
region=region,
out_dir=out_dir,
download_args=download_args,
geopandas_args=geopandas_args,
API=API,
)
edit_download_html(htmlWidget, filename, title='Click here to download: ')
¶
Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058
Parameters:
Name | Type | Description | Default |
---|---|---|---|
htmlWidget |
object |
The HTML widget to display the URL. |
required |
filename |
str |
File path to download. |
required |
title |
str |
Download description. Defaults to "Click here to download: ". |
'Click here to download: ' |
Source code in leafmap/common.py
def edit_download_html(htmlWidget, filename, title="Click here to download: "):
"""Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058
Args:
htmlWidget (object): The HTML widget to display the URL.
filename (str): File path to download.
title (str, optional): Download description. Defaults to "Click here to download: ".
"""
# from IPython.display import HTML
# import ipywidgets as widgets
import base64
# Change widget html temporarily to a font-awesome spinner
htmlWidget.value = '<i class="fa fa-spinner fa-spin fa-2x fa-fw"></i><span class="sr-only">Loading...</span>'
# Process raw data
data = open(filename, "rb").read()
b64 = base64.b64encode(data)
payload = b64.decode()
basename = os.path.basename(filename)
# Create and assign html to widget
html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
htmlWidget.value = html.format(
payload=payload, title=title + basename, filename=basename
)
ee_tile_url(ee_object=None, vis_params={}, asset_id=None, ee_initialize=False, project_id=None, **kwargs)
¶
Adds a Google Earth Engine tile layer to the map based on the tile layer URL from https://github.com/opengeos/ee-tile-layers/blob/main/datasets.tsv.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ee_object |
object |
The Earth Engine object to display. |
None |
vis_params |
dict |
Visualization parameters. For example, {'min': 0, 'max': 100}. |
{} |
asset_id |
str |
The ID of the Earth Engine asset. |
None |
ee_initialize |
bool |
Whether to initialize the Earth Engine |
False |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def ee_tile_url(
ee_object=None,
vis_params={},
asset_id: str = None,
ee_initialize: bool = False,
project_id=None,
**kwargs,
) -> None:
"""
Adds a Google Earth Engine tile layer to the map based on the tile layer URL from
https://github.com/opengeos/ee-tile-layers/blob/main/datasets.tsv.
Args:
ee_object (object): The Earth Engine object to display.
vis_params (dict): Visualization parameters. For example, {'min': 0, 'max': 100}.
asset_id (str): The ID of the Earth Engine asset.
ee_initialize (bool, optional): Whether to initialize the Earth Engine
Returns:
None
"""
import pandas as pd
if isinstance(asset_id, str):
df = pd.read_csv(
"https://raw.githubusercontent.com/opengeos/ee-tile-layers/main/datasets.tsv",
sep="\t",
)
asset_id = asset_id.strip()
if asset_id in df["id"].values:
url = df.loc[df["id"] == asset_id, "url"].values[0]
return url
else:
print(f"The provided EE tile layer {asset_id} does not exist.")
return None
elif ee_object is not None:
try:
import geemap
from geemap.ee_tile_layers import _get_tile_url_format
if ee_initialize:
geemap.ee_initialize(project=project_id, **kwargs)
url = _get_tile_url_format(ee_object, vis_params)
return url
except Exception as e:
print(e)
return None
evaluate_model(df, y_col='y', y_pred_col='y_pred', metrics=None, drop_na=True, filter_nonzero=True)
¶
Evaluates the model performance on the given dataframe with customizable options.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
A pandas DataFrame with columns for actual and predicted values. |
required |
y_col |
str |
Column name for the actual values. |
'y' |
y_pred_col |
str |
Column name for the predicted values. |
'y_pred' |
metrics |
list |
A list of metrics to calculate. Available options: - 'r2': R-squared - 'r': Pearson correlation coefficient - 'rmse': Root Mean Squared Error - 'mae': Mean Absolute Error - 'mape': Mean Absolute Percentage Error Defaults to all metrics if None. |
None |
drop_na |
bool |
Whether to drop rows with NaN in the actual values column. |
True |
filter_nonzero |
bool |
Whether to filter out rows where actual values are zero. |
True |
Returns:
Type | Description |
---|---|
dict |
A dictionary of the selected performance metrics. |
Source code in leafmap/common.py
def evaluate_model(
df: pd.DataFrame,
y_col: str = "y",
y_pred_col: str = "y_pred",
metrics: list = None,
drop_na: bool = True,
filter_nonzero: bool = True,
) -> dict:
"""
Evaluates the model performance on the given dataframe with customizable options.
Args:
df: A pandas DataFrame with columns for actual and predicted values.
y_col: Column name for the actual values.
y_pred_col: Column name for the predicted values.
metrics: A list of metrics to calculate. Available options:
- 'r2': R-squared
- 'r': Pearson correlation coefficient
- 'rmse': Root Mean Squared Error
- 'mae': Mean Absolute Error
- 'mape': Mean Absolute Percentage Error
Defaults to all metrics if None.
drop_na: Whether to drop rows with NaN in the actual values column.
filter_nonzero: Whether to filter out rows where actual values are zero.
Returns:
A dictionary of the selected performance metrics.
"""
import math
try:
from sklearn import metrics as skmetrics
except ImportError:
raise ImportError(
"The scikit-learn package is required for this function. Install it using 'pip install scikit-learn'."
)
# Default metrics if none are provided
if metrics is None:
metrics = ["r2", "r", "rmse", "mae", "mape"]
# Data preprocessing
if drop_na:
df = df.dropna(subset=[y_col])
if filter_nonzero:
df = df[df[y_col] != 0]
# Metric calculations
results = {}
if "r2" in metrics:
results["r2"] = skmetrics.r2_score(df[y_col], df[y_pred_col])
if "r" in metrics:
results["r"] = df[y_col].corr(df[y_pred_col])
if "rmse" in metrics:
results["rmse"] = math.sqrt(
skmetrics.mean_squared_error(df[y_col], df[y_pred_col])
)
if "mae" in metrics:
results["mae"] = skmetrics.mean_absolute_error(df[y_col], df[y_pred_col])
if "mape" in metrics:
results["mape"] = skmetrics.mean_absolute_percentage_error(
df[y_col], df[y_pred_col]
)
return results
execute_maplibre_notebook_dir(in_dir, out_dir, delete_html=True, replace_api_key=True, recursive=False, keep_notebook=False, index_html=True)
¶
Executes Jupyter notebooks found in a specified directory, optionally replacing API keys and deleting HTML outputs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_dir |
str |
The input directory containing Jupyter notebooks to be executed. |
required |
out_dir |
str |
The output directory where the executed notebooks and their HTML outputs will be saved. |
required |
delete_html |
bool |
If True, deletes any existing HTML files in the output directory before execution. Defaults to True. |
True |
replace_api_key |
bool |
If True, replaces the API key in the output HTML. Defaults to True. set "MAPTILER_KEY" and "MAPTILER_KEY_PUBLIC" to your MapTiler API key and public key, respectively. |
True |
recursive |
bool |
If True, searches for notebooks in the input directory recursively. Defaults to False. |
False |
keep_notebook |
bool |
If True, keeps the executed notebooks in the output directory. Defaults to False. |
False |
index_html |
bool |
If True, generates an index.html file in the output directory listing all files. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def execute_maplibre_notebook_dir(
in_dir: str,
out_dir: str,
delete_html: bool = True,
replace_api_key: bool = True,
recursive: bool = False,
keep_notebook: bool = False,
index_html: bool = True,
) -> None:
"""
Executes Jupyter notebooks found in a specified directory, optionally replacing API keys and deleting HTML outputs.
Args:
in_dir (str): The input directory containing Jupyter notebooks to be executed.
out_dir (str): The output directory where the executed notebooks and their HTML outputs will be saved.
delete_html (bool, optional): If True, deletes any existing HTML files in the output directory before execution. Defaults to True.
replace_api_key (bool, optional): If True, replaces the API key in the output HTML. Defaults to True.
set "MAPTILER_KEY" and "MAPTILER_KEY_PUBLIC" to your MapTiler API key and public key, respectively.
recursive (bool, optional): If True, searches for notebooks in the input directory recursively. Defaults to False.
keep_notebook (bool, optional): If True, keeps the executed notebooks in the output directory. Defaults to False.
index_html (bool, optional): If True, generates an index.html file in the output directory listing all files. Defaults to True.
Returns:
None
"""
import shutil
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if replace_api_key:
os.environ["MAPTILER_REPLACE_KEY"] = "True"
if delete_html:
html_files = find_files(out_dir, "*.html", recursive=recursive)
for file in html_files:
os.remove(file)
files = find_files(in_dir, "*.ipynb", recursive=recursive)
for index, file in enumerate(files):
print(f"Processing {index + 1}/{len(files)}: {file} ...")
basename = os.path.basename(file)
out_file = os.path.join(out_dir, basename)
shutil.copy(file, out_file)
with open(out_file, "r") as f:
lines = f.readlines()
out_lines = []
for line in lines:
if line.strip() == '"m"':
title = os.path.splitext(basename)[0].replace("_", " ")
out_lines.append(line.replace("m", f"m.to_html(title='{title}')"))
else:
out_lines.append(line)
with open(out_file, "w") as f:
f.writelines(out_lines)
out_html = os.path.basename(out_file).replace(".ipynb", ".html")
os.environ["MAPLIBRE_OUTPUT"] = out_html
execute_notebook(out_file)
if not keep_notebook:
all_files = find_files(out_dir, "*", recursive=recursive)
for file in all_files:
if not file.endswith(".html"):
os.remove(file)
if index_html:
generate_index_html(out_dir)
execute_notebook(in_file)
¶
Executes a Jupyter notebook and save output cells
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_file |
str |
Input Jupyter notebook. |
required |
Source code in leafmap/common.py
def execute_notebook(in_file):
"""Executes a Jupyter notebook and save output cells
Args:
in_file (str): Input Jupyter notebook.
"""
# command = 'jupyter nbconvert --to notebook --execute ' + in_file + ' --inplace'
command = 'jupyter nbconvert --to notebook --execute "{}" --inplace'.format(in_file)
print(os.popen(command).read().rstrip())
# os.popen(command)
execute_notebook_dir(in_dir)
¶
Executes all Jupyter notebooks in the given directory recursively and save output cells.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_dir |
str |
Input folder containing notebooks. |
required |
Source code in leafmap/common.py
def execute_notebook_dir(in_dir):
"""Executes all Jupyter notebooks in the given directory recursively and save output cells.
Args:
in_dir (str): Input folder containing notebooks.
"""
from pathlib import Path
in_dir = os.path.abspath(in_dir)
files = list(Path(in_dir).rglob("*.ipynb"))
files.sort()
count = len(files)
if files is not None:
for index, file in enumerate(files):
in_file = str(file)
print(f"Processing {index + 1}/{count}: {file} ...")
execute_notebook(in_file)
explode(coords)
¶
Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. As long as the input is conforming, the type of the geometry doesn't matter. From Fiona 1.4.8
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of coordinates. |
required |
Yields:
Type | Description |
---|---|
[type] |
[description] |
Source code in leafmap/common.py
def explode(coords):
"""Explode a GeoJSON geometry's coordinates object and yield
coordinate tuples. As long as the input is conforming, the type of
the geometry doesn't matter. From Fiona 1.4.8
Args:
coords (list): A list of coordinates.
Yields:
[type]: [description]
"""
for e in coords:
if isinstance(e, (float, int)):
yield coords
break
else:
for f in explode(e):
yield f
extract_archive(archive, outdir=None, **kwargs)
¶
Extracts a multipart archive.
This function uses the patoolib library to extract a multipart archive. If the patoolib library is not installed, it attempts to install it. If the archive does not end with ".zip", it appends ".zip" to the archive name. If the extraction fails (for example, if the files already exist), it skips the extraction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
archive |
str |
The path to the archive file. |
required |
outdir |
str |
The directory where the archive should be extracted. |
None |
**kwargs |
Arbitrary keyword arguments for the patoolib.extract_archive function. |
{} |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
Exception |
An exception is raised if the extraction fails for reasons other than the files already existing. |
Examples:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"] base_url = "https://github.com/opengeos/datasets/releases/download/models/" urls = [base_url + f for f in files] leafmap.download_files(urls, out_dir="models", multi_part=True)
Source code in leafmap/common.py
def extract_archive(archive, outdir=None, **kwargs) -> None:
"""
Extracts a multipart archive.
This function uses the patoolib library to extract a multipart archive.
If the patoolib library is not installed, it attempts to install it.
If the archive does not end with ".zip", it appends ".zip" to the archive name.
If the extraction fails (for example, if the files already exist), it skips the extraction.
Args:
archive (str): The path to the archive file.
outdir (str): The directory where the archive should be extracted.
**kwargs: Arbitrary keyword arguments for the patoolib.extract_archive function.
Returns:
None
Raises:
Exception: An exception is raised if the extraction fails for reasons other than the files already existing.
Example:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"]
base_url = "https://github.com/opengeos/datasets/releases/download/models/"
urls = [base_url + f for f in files]
leafmap.download_files(urls, out_dir="models", multi_part=True)
"""
try:
import patoolib
except ImportError:
install_package("patool")
import patoolib
if not archive.endswith(".zip"):
archive = archive + ".zip"
if outdir is None:
outdir = os.path.dirname(archive)
try:
patoolib.extract_archive(archive, outdir=outdir, **kwargs)
except Exception as e:
print("The unzipped files might already exist. Skipping extraction.")
return
filter_bounds(data, bbox, within=False, align=True, **kwargs)
¶
Filters a GeoDataFrame or GeoSeries by a bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | GeoDataFrame |
The input data to filter. Can be a file path or a GeoDataFrame. |
required |
bbox |
list | GeoDataFrame |
The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame. |
required |
within |
bool |
Whether to filter by the bounding box or the bounding box's interior. Defaults to False. |
False |
align |
bool |
If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved. |
True |
Returns:
Type | Description |
---|---|
GeoDataFrame |
The filtered data. |
Source code in leafmap/common.py
def filter_bounds(data, bbox, within=False, align=True, **kwargs):
"""Filters a GeoDataFrame or GeoSeries by a bounding box.
Args:
data (str | GeoDataFrame): The input data to filter. Can be a file path or a GeoDataFrame.
bbox (list | GeoDataFrame): The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame.
within (bool, optional): Whether to filter by the bounding box or the bounding box's interior. Defaults to False.
align (bool, optional): If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved.
Returns:
GeoDataFrame: The filtered data.
"""
import geopandas as gpd
if isinstance(data, str):
data = gpd.read_file(data, **kwargs)
elif not isinstance(data, (gpd.GeoDataFrame, gpd.GeoSeries)):
raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")
if isinstance(bbox, list):
if len(bbox) != 4:
raise ValueError("bbox must be a list of 4 coordinates")
bbox = bbox_to_gdf(bbox)
elif isinstance(bbox, str):
bbox = gpd.read_file(bbox, **kwargs)
if within:
result = data[data.within(bbox.unary_union, align=align)]
else:
result = data[data.intersects(bbox.unary_union, align=align)]
return result
filter_date(data, start_date=None, end_date=None, date_field='date', date_args={}, **kwargs)
¶
Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | DataFrame | GeoDataFrame |
The input data to filter. Can be a file path or a DataFrame or GeoDataFrame. |
required |
start_date |
str |
The start date, e.g., 2023-01-01. Defaults to None. |
None |
end_date |
str |
The end date, e.g., 2023-12-31. Defaults to None. |
None |
date_field |
str |
The name of the date field. Defaults to "date". |
'date' |
date_args |
dict |
Additional arguments for pd.to_datetime. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
DataFrame |
The filtered data. |
Source code in leafmap/common.py
def filter_date(
data, start_date=None, end_date=None, date_field="date", date_args={}, **kwargs
):
"""Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.
Args:
data (str | DataFrame | GeoDataFrame): The input data to filter. Can be a file path or a DataFrame or GeoDataFrame.
start_date (str, optional): The start date, e.g., 2023-01-01. Defaults to None.
end_date (str, optional): The end date, e.g., 2023-12-31. Defaults to None.
date_field (str, optional): The name of the date field. Defaults to "date".
date_args (dict, optional): Additional arguments for pd.to_datetime. Defaults to {}.
Returns:
DataFrame: The filtered data.
"""
import datetime
import pandas as pd
import geopandas as gpd
if isinstance(data, str):
data = gpd.read_file(data, **kwargs)
elif not isinstance(
data, (gpd.GeoDataFrame, gpd.GeoSeries, pd.DataFrame, pd.Series)
):
raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")
if date_field not in data.columns:
raise ValueError(f"date_field must be one of {data.columns}")
new_field = f"{date_field}_temp"
data[new_field] = pd.to_datetime(data[date_field], **date_args)
if end_date is None:
end_date = datetime.datetime.now().strftime("%Y-%m-%d")
if start_date is None:
start_date = data[new_field].min()
mask = (data[new_field] >= start_date) & (data[new_field] <= end_date)
result = data.loc[mask]
return result.drop(columns=[new_field], axis=1)
find_files(input_dir, ext=None, fullpath=True, recursive=True)
¶
Find files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_dir |
str |
The input directory. |
required |
ext |
str |
The file extension to match. Defaults to None. |
None |
fullpath |
bool |
Whether to return the full path. Defaults to True. |
True |
recursive |
bool |
Whether to search recursively. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
list |
A list of matching files. |
Source code in leafmap/common.py
def find_files(input_dir, ext=None, fullpath=True, recursive=True):
"""Find files in a directory.
Args:
input_dir (str): The input directory.
ext (str, optional): The file extension to match. Defaults to None.
fullpath (bool, optional): Whether to return the full path. Defaults to True.
recursive (bool, optional): Whether to search recursively. Defaults to True.
Returns:
list: A list of matching files.
"""
from pathlib import Path
files = []
if ext is None:
ext = "*"
else:
ext = ext.replace(".", "")
ext = f"*.{ext}"
if recursive:
if fullpath:
files = [str(path.joinpath()) for path in Path(input_dir).rglob(ext)]
else:
files = [str(path.name) for path in Path(input_dir).rglob(ext)]
else:
if fullpath:
files = [str(path.joinpath()) for path in Path(input_dir).glob(ext)]
else:
files = [path.name for path in Path(input_dir).glob(ext)]
files.sort()
return files
gdb_layer_names(gdb_path)
¶
Get a list of layer names in a File Geodatabase (GDB).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdb_path |
str |
The path to the File Geodatabase (GDB). |
required |
Returns:
Type | Description |
---|---|
List[str] |
A list of layer names in the GDB. |
Source code in leafmap/common.py
def gdb_layer_names(gdb_path: str) -> List[str]:
"""Get a list of layer names in a File Geodatabase (GDB).
Args:
gdb_path (str): The path to the File Geodatabase (GDB).
Returns:
List[str]: A list of layer names in the GDB.
"""
from osgeo import ogr
# Open the GDB
gdb_driver = ogr.GetDriverByName("OpenFileGDB")
gdb_dataset = gdb_driver.Open(gdb_path, 0)
# Get the number of layers in the GDB
layer_count = gdb_dataset.GetLayerCount()
# Iterate over the layers
layer_names = []
for i in range(layer_count):
layer = gdb_dataset.GetLayerByIndex(i)
feature_class_name = layer.GetName()
layer_names.append(feature_class_name)
# Close the GDB dataset
gdb_dataset = None
return layer_names
gdb_to_vector(gdb_path, out_dir, layers=None, filenames=None, gdal_driver='GPKG', file_extension=None, overwrite=False, quiet=False, **kwargs)
¶
Converts layers from a File Geodatabase (GDB) to a vector format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdb_path |
str |
The path to the File Geodatabase (GDB). |
required |
out_dir |
str |
The output directory to save the converted files. |
required |
layers |
Optional[List[str]] |
A list of layer names to convert. If None, all layers will be converted. Default is None. |
None |
filenames |
Optional[List[str]] |
A list of output file names. If None, the layer names will be used as the file names. Default is None. |
None |
gdal_driver |
str |
The GDAL driver name for the output vector format. Default is "GPKG". |
'GPKG' |
file_extension |
Optional[str] |
The file extension for the output files. If None, it will be determined automatically based on the gdal_driver. Default is None. |
None |
overwrite |
bool |
Whether to overwrite the existing output files. Default is False. |
False |
quiet |
bool |
If True, suppress the log output. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def gdb_to_vector(
gdb_path: str,
out_dir: str,
layers: Optional[List[str]] = None,
filenames: Optional[List[str]] = None,
gdal_driver: str = "GPKG",
file_extension: Optional[str] = None,
overwrite: bool = False,
quiet=False,
**kwargs,
) -> None:
"""Converts layers from a File Geodatabase (GDB) to a vector format.
Args:
gdb_path (str): The path to the File Geodatabase (GDB).
out_dir (str): The output directory to save the converted files.
layers (Optional[List[str]]): A list of layer names to convert. If None, all layers will be converted. Default is None.
filenames (Optional[List[str]]): A list of output file names. If None, the layer names will be used as the file names. Default is None.
gdal_driver (str): The GDAL driver name for the output vector format. Default is "GPKG".
file_extension (Optional[str]): The file extension for the output files. If None, it will be determined automatically based on the gdal_driver. Default is None.
overwrite (bool): Whether to overwrite the existing output files. Default is False.
quiet (bool): If True, suppress the log output. Defaults to False.
Returns:
None
"""
from osgeo import ogr
# Open the GDB
gdb_driver = ogr.GetDriverByName("OpenFileGDB")
gdb_dataset = gdb_driver.Open(gdb_path, 0)
# Get the number of layers in the GDB
layer_count = gdb_dataset.GetLayerCount()
if isinstance(layers, str):
layers = [layers]
if isinstance(filenames, str):
filenames = [filenames]
if filenames is not None:
if len(filenames) != len(layers):
raise ValueError("The length of filenames must match the length of layers.")
if not os.path.exists(out_dir):
os.makedirs(out_dir