common module¶
This module contains some common functions for both folium and ipyleaflet.
The_national_map_USGS
¶
The national map is a collection of topological datasets, maintained by the USGS.
It provides an API endpoint which can be used to find downloadable links for the products offered. - Full description of datasets available can retrieved. This consists of metadata such as detail description and publication dates. - A wide range of dataformats are available
This class is a tiny wrapper to find and download files using the API.
More complete documentation for the API can be found at https://apps.nationalmap.gov/tnmaccess/#/
Source code in leafmap/common.py
class The_national_map_USGS:
"""
The national map is a collection of topological datasets, maintained by the USGS.
It provides an API endpoint which can be used to find downloadable links for the products offered.
- Full description of datasets available can retrieved.
This consists of metadata such as detail description and publication dates.
- A wide range of dataformats are available
This class is a tiny wrapper to find and download files using the API.
More complete documentation for the API can be found at
https://apps.nationalmap.gov/tnmaccess/#/
"""
def __init__(self):
self.api_endpoint = r"https://tnmaccess.nationalmap.gov/api/v1/"
self.DS = self.datasets_full
@property
def datasets_full(self) -> list:
"""
Full description of datasets provided.
Returns a JSON or empty list.
"""
link = f"{self.api_endpoint}datasets?"
try:
return requests.get(link).json()
except Exception:
print(f"Failed to load metadata from The National Map API endpoint\n{link}")
return []
@property
def prodFormats(self) -> list:
"""
Return all datatypes available in any of the collections.
Note that "All" is only peculiar to one dataset.
"""
return set(i["displayName"] for ds in self.DS for i in ds["formats"])
@property
def datasets(self) -> list:
"""
Returns a list of dataset tags (most common human readable self description for specific datasets).
"""
return set(y["sbDatasetTag"] for x in self.DS for y in x["tags"])
def parse_region(self, region, geopandas_args={}) -> list:
"""
Translate a Vector dataset to its bounding box.
Args:
region (str | list): an URL|filepath to a vector dataset to a polygon
geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
"""
import geopandas as gpd
if isinstance(region, str):
if region.startswith("http"):
region = github_raw_url(region)
region = download_file(region)
elif not os.path.exists(region):
raise ValueError("region must be a path or a URL to a vector dataset.")
roi = gpd.read_file(region, **geopandas_args)
roi = roi.to_crs(epsg=4326)
return roi.total_bounds
return region
def download_tiles(
self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
) -> None:
"""
Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API. Defaults to {}.
Returns:
None
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if out_dir is None:
out_dir = os.getcwd()
else:
out_dir = os.path.abspath(out_dir)
tiles = self.find_tiles(
region, return_type="list", geopandas_args=geopandas_args, API=API
)
T = len(tiles)
errors = 0
done = 0
for i, link in enumerate(tiles):
file_name = os.path.basename(link)
out_name = os.path.join(out_dir, file_name)
if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
print(f"Downloading {i+1} of {T}: {file_name}")
try:
download_file(link, out_name, **download_args)
done += 1
except KeyboardInterrupt:
print("Cancelled download")
break
except Exception:
errors += 1
print(f"Failed to download {i+1} of {T}: {file_name}")
print(
f"{done} Downloads completed, {errors} downloads failed, {T} files available"
)
return
def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
"""
Find a list of downloadable files.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_type (str): list | dict. Defaults to list. Changes the return output type and content.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API parameters. Defaults to {}.
Returns:
list: A list of download_urls.
dict: A dictionary with urls and related metadata
"""
assert region or API, "Provide a region or use the API"
if region:
API["bbox"] = self.parse_region(region, geopandas_args)
results = self.find_details(**API)
if return_type == "list":
return [i["downloadURL"] for i in results.get("items")]
return results
def find_details(
self,
bbox: List[float] = None,
polygon: List[Tuple[float, float]] = None,
datasets: str = None,
prodFormats: str = None,
prodExtents: str = None,
q: str = None,
dateType: str = None,
start: str = None,
end: str = None,
offset: int = 0,
max: int = None,
outputFormat: str = "JSON",
polyType: str = None,
polyCode: str = None,
extentQuery: int = None,
) -> Dict:
"""
Possible search parameters (kwargs) support by API
Parameter Values
Description
---------------------------------------------------------------------------------------------------
bbox 'minx, miny, maxx, maxy'
Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list.
polygon '[x,y x,y x,y x,y x,y]'
Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
datasets See: Datasets (Optional)
Dataset tag name (sbDatasetTag)
From https://apps.nationalmap.gov/tnmaccess/#/product
prodFormats See: Product Formats (Optional)
Dataset-specific format
prodExtents See: Product Extents (Optional)
Dataset-specific extent
q free text
Text input which can be used to filter by product titles and text descriptions.
dateType dateCreated | lastUpdated | Publication
Type of date to search by.
start 'YYYY-MM-DD'
Start date
end 'YYYY-MM-DD'
End date (required if start date is provided)
offset integer
Offset into paginated results - default=0
max integer
Number of results returned
outputFormat JSON | CSV | pjson
Default=JSON
polyType state | huc2 | huc4 | huc8
Well Known Polygon Type. Use this parameter to deliver data by state or HUC
(hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
polyCode state FIPS code or huc number
Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
extentQuery integer
A Polygon code in the science base system, typically from an uploaded shapefile
"""
try:
# call locals before creating new locals
used_locals = {k: v for k, v in locals().items() if v and k != "self"}
# Parsing
if polygon:
used_locals["polygon"] = ",".join(
" ".join(map(str, point)) for point in polygon
)
if bbox:
used_locals["bbox"] = str(bbox)[1:-1]
if max:
max += 2
# Fetch response
response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
if response.status_code // 100 == 2:
return response.json()
else:
# Parameter validation handled by API endpoint error responses
print(response.json())
return {}
except Exception as e:
print(e)
return {}
datasets: list
property
readonly
¶
Returns a list of dataset tags (most common human readable self description for specific datasets).
datasets_full: list
property
readonly
¶
Full description of datasets provided. Returns a JSON or empty list.
prodFormats: list
property
readonly
¶
Return all datatypes available in any of the collections. Note that "All" is only peculiar to one dataset.
download_tiles(self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
None |
None |
Source code in leafmap/common.py
def download_tiles(
self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
) -> None:
"""
Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API. Defaults to {}.
Returns:
None
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if out_dir is None:
out_dir = os.getcwd()
else:
out_dir = os.path.abspath(out_dir)
tiles = self.find_tiles(
region, return_type="list", geopandas_args=geopandas_args, API=API
)
T = len(tiles)
errors = 0
done = 0
for i, link in enumerate(tiles):
file_name = os.path.basename(link)
out_name = os.path.join(out_dir, file_name)
if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
print(f"Downloading {i+1} of {T}: {file_name}")
try:
download_file(link, out_name, **download_args)
done += 1
except KeyboardInterrupt:
print("Cancelled download")
break
except Exception:
errors += 1
print(f"Failed to download {i+1} of {T}: {file_name}")
print(
f"{done} Downloads completed, {errors} downloads failed, {T} files available"
)
return
find_details(self, bbox=None, polygon=None, datasets=None, prodFormats=None, prodExtents=None, q=None, dateType=None, start=None, end=None, offset=0, max=None, outputFormat='JSON', polyType=None, polyCode=None, extentQuery=None)
¶
Possible search parameters (kwargs) support by API
Parameter Values Description
bbox 'minx, miny, maxx, maxy' Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list. polygon '[x,y x,y x,y x,y x,y]' Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list. datasets See: Datasets (Optional) Dataset tag name (sbDatasetTag) From https://apps.nationalmap.gov/tnmaccess/#/product prodFormats See: Product Formats (Optional) Dataset-specific format
prodExtents See: Product Extents (Optional) Dataset-specific extent q free text Text input which can be used to filter by product titles and text descriptions. dateType dateCreated | lastUpdated | Publication Type of date to search by. start 'YYYY-MM-DD' Start date end 'YYYY-MM-DD' End date (required if start date is provided) offset integer Offset into paginated results - default=0 max integer Number of results returned outputFormat JSON | CSV | pjson Default=JSON polyType state | huc2 | huc4 | huc8 Well Known Polygon Type. Use this parameter to deliver data by state or HUC (hydrologic unit codes defined by the Watershed Boundary Dataset/WBD) polyCode state FIPS code or huc number Well Known Polygon Code. This value needs to coordinate with the polyType parameter. extentQuery integer A Polygon code in the science base system, typically from an uploaded shapefile
Source code in leafmap/common.py
def find_details(
self,
bbox: List[float] = None,
polygon: List[Tuple[float, float]] = None,
datasets: str = None,
prodFormats: str = None,
prodExtents: str = None,
q: str = None,
dateType: str = None,
start: str = None,
end: str = None,
offset: int = 0,
max: int = None,
outputFormat: str = "JSON",
polyType: str = None,
polyCode: str = None,
extentQuery: int = None,
) -> Dict:
"""
Possible search parameters (kwargs) support by API
Parameter Values
Description
---------------------------------------------------------------------------------------------------
bbox 'minx, miny, maxx, maxy'
Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list.
polygon '[x,y x,y x,y x,y x,y]'
Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
datasets See: Datasets (Optional)
Dataset tag name (sbDatasetTag)
From https://apps.nationalmap.gov/tnmaccess/#/product
prodFormats See: Product Formats (Optional)
Dataset-specific format
prodExtents See: Product Extents (Optional)
Dataset-specific extent
q free text
Text input which can be used to filter by product titles and text descriptions.
dateType dateCreated | lastUpdated | Publication
Type of date to search by.
start 'YYYY-MM-DD'
Start date
end 'YYYY-MM-DD'
End date (required if start date is provided)
offset integer
Offset into paginated results - default=0
max integer
Number of results returned
outputFormat JSON | CSV | pjson
Default=JSON
polyType state | huc2 | huc4 | huc8
Well Known Polygon Type. Use this parameter to deliver data by state or HUC
(hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
polyCode state FIPS code or huc number
Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
extentQuery integer
A Polygon code in the science base system, typically from an uploaded shapefile
"""
try:
# call locals before creating new locals
used_locals = {k: v for k, v in locals().items() if v and k != "self"}
# Parsing
if polygon:
used_locals["polygon"] = ",".join(
" ".join(map(str, point)) for point in polygon
)
if bbox:
used_locals["bbox"] = str(bbox)[1:-1]
if max:
max += 2
# Fetch response
response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
if response.status_code // 100 == 2:
return response.json()
else:
# Parameter validation handled by API endpoint error responses
print(response.json())
return {}
except Exception as e:
print(e)
return {}
find_tiles(self, region=None, return_type='list', geopandas_args={}, API={})
¶
Find a list of downloadable files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
required |
return_type |
str |
list | dict. Defaults to list. Changes the return output type and content. |
'list' |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API parameters. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
list |
A list of download_urls. dict: A dictionary with urls and related metadata |
Source code in leafmap/common.py
def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
"""
Find a list of downloadable files.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_type (str): list | dict. Defaults to list. Changes the return output type and content.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
Exposes most of the documented API parameters. Defaults to {}.
Returns:
list: A list of download_urls.
dict: A dictionary with urls and related metadata
"""
assert region or API, "Provide a region or use the API"
if region:
API["bbox"] = self.parse_region(region, geopandas_args)
results = self.find_details(**API)
if return_type == "list":
return [i["downloadURL"] for i in results.get("items")]
return results
parse_region(self, region, geopandas_args={})
¶
Translate a Vector dataset to its bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
an URL|filepath to a vector dataset to a polygon |
required |
geopandas_reader_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
required |
Source code in leafmap/common.py
def parse_region(self, region, geopandas_args={}) -> list:
"""
Translate a Vector dataset to its bounding box.
Args:
region (str | list): an URL|filepath to a vector dataset to a polygon
geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
"""
import geopandas as gpd
if isinstance(region, str):
if region.startswith("http"):
region = github_raw_url(region)
region = download_file(region)
elif not os.path.exists(region):
raise ValueError("region must be a path or a URL to a vector dataset.")
roi = gpd.read_file(region, **geopandas_args)
roi = roi.to_crs(epsg=4326)
return roi.total_bounds
return region
WhiteboxTools (WhiteboxTools)
¶
This class inherits the whitebox WhiteboxTools class.
Source code in leafmap/common.py
class WhiteboxTools(whitebox.WhiteboxTools):
"""This class inherits the whitebox WhiteboxTools class."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
__install_from_github(url)
¶
Install a package from a GitHub repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the GitHub repository. |
required |
Source code in leafmap/common.py
def __install_from_github(url):
"""Install a package from a GitHub repository.
Args:
url (str): The URL of the GitHub repository.
"""
try:
download_dir = os.path.join(os.path.expanduser("~"), "Downloads")
if not os.path.exists(download_dir):
os.makedirs(download_dir)
repo_name = os.path.basename(url)
zip_url = os.path.join(url, "archive/master.zip")
filename = repo_name + "-master.zip"
download_from_url(
url=zip_url, out_file_name=filename, out_dir=download_dir, unzip=True
)
pkg_dir = os.path.join(download_dir, repo_name + "-master")
pkg_name = os.path.basename(url)
work_dir = os.getcwd()
os.chdir(pkg_dir)
print("Installing {}...".format(pkg_name))
cmd = "pip install ."
os.system(cmd)
os.chdir(work_dir)
print("{} has been installed successfully.".format(pkg_name))
# print("\nPlease comment out 'install_from_github()' and restart the kernel to take effect:\nJupyter menu -> Kernel -> Restart & Clear Output")
except Exception as e:
raise Exception(e)
add_crs(filename, epsg)
¶
Add a CRS to a raster dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The filename of the raster dataset. |
required |
epsg |
int | str |
The EPSG code of the CRS. |
required |
Source code in leafmap/common.py
def add_crs(filename, epsg):
"""Add a CRS to a raster dataset.
Args:
filename (str): The filename of the raster dataset.
epsg (int | str): The EPSG code of the CRS.
"""
try:
import rasterio
except ImportError:
raise ImportError(
"rasterio is required for adding a CRS to a raster. Please install it using 'pip install rasterio'."
)
if not os.path.exists(filename):
raise ValueError("filename must exist.")
if isinstance(epsg, int):
epsg = f"EPSG:{epsg}"
elif isinstance(epsg, str):
epsg = "EPSG:" + epsg
else:
raise ValueError("epsg must be an integer or string.")
crs = rasterio.crs.CRS({"init": epsg})
with rasterio.open(filename, mode="r+") as src:
src.crs = crs
add_image_to_gif(in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False)
¶
Adds an image logo to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
Input file path to the GIF image. |
required |
out_gif |
str |
Output file path to the GIF image. |
required |
in_image |
str |
Input file path to the image. |
required |
xy |
tuple |
Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None. |
None |
image_size |
tuple |
Resize image. Defaults to (80, 80). |
(80, 80) |
circle_mask |
bool |
Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False. |
False |
Source code in leafmap/common.py
def add_image_to_gif(
in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False
):
"""Adds an image logo to a GIF image.
Args:
in_gif (str): Input file path to the GIF image.
out_gif (str): Output file path to the GIF image.
in_image (str): Input file path to the image.
xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
image_size (tuple, optional): Resize image. Defaults to (80, 80).
circle_mask (bool, optional): Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False.
"""
import io
import warnings
from PIL import Image, ImageDraw, ImageSequence
warnings.simplefilter("ignore")
in_gif = os.path.abspath(in_gif)
is_url = False
if in_image.startswith("http"):
is_url = True
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if (not is_url) and (not os.path.exists(in_image)):
print("The provided logo file does not exist.")
return
out_dir = check_dir((os.path.dirname(out_gif)))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
try:
gif = Image.open(in_gif)
except Exception as e:
print("An error occurred while opening the image.")
print(e)
return
logo_raw_image = None
try:
if in_image.startswith("http"):
logo_raw_image = open_image_from_url(in_image)
else:
in_image = os.path.abspath(in_image)
logo_raw_image = Image.open(in_image)
except Exception as e:
print(e)
logo_raw_size = logo_raw_image.size
ratio = max(
logo_raw_size[0] / image_size[0],
logo_raw_size[1] / image_size[1],
)
image_resize = (int(logo_raw_size[0] / ratio), int(logo_raw_size[1] / ratio))
image_size = min(logo_raw_size[0], image_size[0]), min(
logo_raw_size[1], image_size[1]
)
logo_image = logo_raw_image.convert("RGBA")
logo_image.thumbnail(image_size, Image.ANTIALIAS)
gif_width, gif_height = gif.size
mask_im = None
if circle_mask:
mask_im = Image.new("L", image_size, 0)
draw = ImageDraw.Draw(mask_im)
draw.ellipse((0, 0, image_size[0], image_size[1]), fill=255)
if has_transparency(logo_raw_image):
mask_im = logo_image.copy()
if xy is None:
# default logo location is 5% width and 5% height of the image.
delta = 10
xy = (gif_width - image_resize[0] - delta, gif_height - image_resize[1] - delta)
# xy = (int(0.05 * gif_width), int(0.05 * gif_height))
elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
return
elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
x, y = xy
if (x > 0) and (x < gif_width) and (y > 0) and (y < gif_height):
pass
else:
print(
"xy is out of bounds. x must be within [0, {}], and y must be within [0, {}]".format(
gif_width, gif_height
)
)
return
elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
x, y = xy
if ("%" in x) and ("%" in y):
try:
x = int(float(x.replace("%", "")) / 100.0 * gif_width)
y = int(float(y.replace("%", "")) / 100.0 * gif_height)
xy = (x, y)
except Exception:
raise Exception(
"The specified xy is invalid. It must be formatted like this ('10%', '10%')"
)
else:
raise Exception(
"The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
)
try:
frames = []
for _, frame in enumerate(ImageSequence.Iterator(gif)):
frame = frame.convert("RGBA")
frame.paste(logo_image, xy, mask_im)
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
frames[0].save(out_gif, save_all=True, append_images=frames[1:])
except Exception as e:
print(e)
add_mask_to_image(image, mask, output, color='red')
¶
Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
A local path or HTTP URL to an image. |
required |
mask |
str |
A local path or HTTP URL to a binary mask. |
required |
output |
str |
A local path to the output image. |
required |
color |
str |
Color of the mask. Defaults to 'red'. |
'red' |
Exceptions:
Type | Description |
---|---|
ImportError |
If rasterio and detectron2 are not installed. |
Source code in leafmap/common.py
def add_mask_to_image(image, mask, output, color="red"):
"""Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.
Args:
image (str): A local path or HTTP URL to an image.
mask (str): A local path or HTTP URL to a binary mask.
output (str): A local path to the output image.
color (str, optional): Color of the mask. Defaults to 'red'.
Raises:
ImportError: If rasterio and detectron2 are not installed.
"""
try:
import rasterio
from detectron2.utils.visualizer import Visualizer
from PIL import Image
except ImportError:
raise ImportError(
"Please install rasterio and detectron2 to use this function. See https://detectron2.readthedocs.io/en/latest/tutorials/install.html"
)
ds = rasterio.open(image)
image_arr = ds.read()
mask_arr = rasterio.open(mask).read()
vis = Visualizer(image_arr.transpose((1, 2, 0)))
vis.draw_binary_mask(mask_arr[0] > 0, color=color)
out_arr = Image.fromarray(vis.get_output().get_image())
out_arr.save(output)
if ds.crs is not None:
numpy_to_cog(output, output, profile=image)
add_progress_bar_to_gif(in_gif, out_gif, progress_bar_color='blue', progress_bar_height=5, duration=100, loop=0)
¶
Adds a progress bar to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The file path to the input GIF image. |
required |
out_gif |
str |
The file path to the output GIF image. |
required |
progress_bar_color |
str |
Color for the progress bar. Defaults to 'white'. |
'blue' |
progress_bar_height |
int |
Height of the progress bar. Defaults to 5. |
5 |
duration |
int |
controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100. |
100 |
loop |
int |
controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0. |
0 |
Source code in leafmap/common.py
def add_progress_bar_to_gif(
in_gif,
out_gif,
progress_bar_color="blue",
progress_bar_height=5,
duration=100,
loop=0,
):
"""Adds a progress bar to a GIF image.
Args:
in_gif (str): The file path to the input GIF image.
out_gif (str): The file path to the output GIF image.
progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100.
loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.
"""
import io
import warnings
from PIL import Image, ImageDraw, ImageSequence
warnings.simplefilter("ignore")
in_gif = os.path.abspath(in_gif)
out_gif = os.path.abspath(out_gif)
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if not os.path.exists(os.path.dirname(out_gif)):
os.makedirs(os.path.dirname(out_gif))
progress_bar_color = check_color(progress_bar_color)
try:
image = Image.open(in_gif)
except Exception as e:
raise Exception("An error occurred while opening the gif.")
count = image.n_frames
W, H = image.size
progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
progress_bar_shapes = [
[(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
]
try:
frames = []
# Loop over each frame in the animated image
for index, frame in enumerate(ImageSequence.Iterator(image)):
# Draw the text on the frame
frame = frame.convert("RGB")
draw = ImageDraw.Draw(frame)
# w, h = draw.textsize(text[index])
draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
del draw
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
# https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
# Save the frames as a new image
frames[0].save(
out_gif,
save_all=True,
append_images=frames[1:],
duration=duration,
loop=loop,
optimize=True,
)
except Exception as e:
raise Exception(e)
add_text_to_gif(in_gif, out_gif, xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='#000000', add_progress_bar=True, progress_bar_color='white', progress_bar_height=5, duration=100, loop=0)
¶
Adds animated text to a GIF image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The file path to the input GIF image. |
required |
out_gif |
str |
The file path to the output GIF image. |
required |
xy |
tuple |
Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None. |
None |
text_sequence |
int, str, list |
Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None. |
None |
font_type |
str |
Font type. Defaults to "arial.ttf". |
'arial.ttf' |
font_size |
int |
Font size. Defaults to 20. |
20 |
font_color |
str |
Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff'). Defaults to '#000000'. |
'#000000' |
add_progress_bar |
bool |
Whether to add a progress bar at the bottom of the GIF. Defaults to True. |
True |
progress_bar_color |
str |
Color for the progress bar. Defaults to 'white'. |
'white' |
progress_bar_height |
int |
Height of the progress bar. Defaults to 5. |
5 |
duration |
int |
controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100. |
100 |
loop |
int |
controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0. |
0 |
Source code in leafmap/common.py
def add_text_to_gif(
in_gif,
out_gif,
xy=None,
text_sequence=None,
font_type="arial.ttf",
font_size=20,
font_color="#000000",
add_progress_bar=True,
progress_bar_color="white",
progress_bar_height=5,
duration=100,
loop=0,
):
"""Adds animated text to a GIF image.
Args:
in_gif (str): The file path to the input GIF image.
out_gif (str): The file path to the output GIF image.
xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
text_sequence (int, str, list, optional): Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None.
font_type (str, optional): Font type. Defaults to "arial.ttf".
font_size (int, optional): Font size. Defaults to 20.
font_color (str, optional): Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff'). Defaults to '#000000'.
add_progress_bar (bool, optional): Whether to add a progress bar at the bottom of the GIF. Defaults to True.
progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100.
loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.
"""
import io
import warnings
import pkg_resources
from PIL import Image, ImageDraw, ImageFont, ImageSequence
warnings.simplefilter("ignore")
pkg_dir = os.path.dirname(pkg_resources.resource_filename("leafmap", "leafmap.py"))
default_font = os.path.join(pkg_dir, "data/fonts/arial.ttf")
in_gif = os.path.abspath(in_gif)
out_gif = os.path.abspath(out_gif)
if not os.path.exists(in_gif):
print("The input gif file does not exist.")
return
if not os.path.exists(os.path.dirname(out_gif)):
os.makedirs(os.path.dirname(out_gif))
if font_type == "arial.ttf":
font = ImageFont.truetype(default_font, font_size)
elif font_type == "alibaba.otf":
default_font = os.path.join(pkg_dir, "data/fonts/alibaba.otf")
font = ImageFont.truetype(default_font, font_size)
else:
try:
font_list = system_fonts(show_full_path=True)
font_names = [os.path.basename(f) for f in font_list]
if (font_type in font_list) or (font_type in font_names):
font = ImageFont.truetype(font_type, font_size)
else:
print(
"The specified font type could not be found on your system. Using the default font instead."
)
font = ImageFont.truetype(default_font, font_size)
except Exception as e:
print(e)
font = ImageFont.truetype(default_font, font_size)
color = check_color(font_color)
progress_bar_color = check_color(progress_bar_color)
try:
image = Image.open(in_gif)
except Exception as e:
print("An error occurred while opening the gif.")
print(e)
return
count = image.n_frames
W, H = image.size
progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
progress_bar_shapes = [
[(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
]
if xy is None:
# default text location is 5% width and 5% height of the image.
xy = (int(0.05 * W), int(0.05 * H))
elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
return
elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
x, y = xy
if (x > 0) and (x < W) and (y > 0) and (y < H):
pass
else:
print(
f"xy is out of bounds. x must be within [0, {W}], and y must be within [0, {H}]"
)
return
elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
x, y = xy
if ("%" in x) and ("%" in y):
try:
x = int(float(x.replace("%", "")) / 100.0 * W)
y = int(float(y.replace("%", "")) / 100.0 * H)
xy = (x, y)
except Exception:
raise Exception(
"The specified xy is invalid. It must be formatted like this ('10%', '10%')"
)
else:
print(
"The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
)
return
if text_sequence is None:
text = [str(x) for x in range(1, count + 1)]
elif isinstance(text_sequence, int):
text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
elif isinstance(text_sequence, str):
try:
text_sequence = int(text_sequence)
text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
except Exception:
text = [text_sequence] * count
elif isinstance(text_sequence, list) and len(text_sequence) != count:
print(
f"The length of the text sequence must be equal to the number ({count}) of frames in the gif."
)
return
else:
text = [str(x) for x in text_sequence]
try:
frames = []
# Loop over each frame in the animated image
for index, frame in enumerate(ImageSequence.Iterator(image)):
# Draw the text on the frame
frame = frame.convert("RGB")
draw = ImageDraw.Draw(frame)
# w, h = draw.textsize(text[index])
draw.text(xy, text[index], font=font, fill=color)
if add_progress_bar:
draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
del draw
b = io.BytesIO()
frame.save(b, format="GIF")
frame = Image.open(b)
frames.append(frame)
# https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
# Save the frames as a new image
frames[0].save(
out_gif,
save_all=True,
append_images=frames[1:],
duration=duration,
loop=loop,
optimize=True,
)
except Exception as e:
print(e)
adjust_longitude(in_fc)
¶
Adjusts longitude if it is less than -180 or greater than 180.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_fc |
dict |
The input dictionary containing coordinates. |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the converted longitudes |
Source code in leafmap/common.py
def adjust_longitude(in_fc):
"""Adjusts longitude if it is less than -180 or greater than 180.
Args:
in_fc (dict): The input dictionary containing coordinates.
Returns:
dict: A dictionary containing the converted longitudes
"""
try:
keys = in_fc.keys()
if "geometry" in keys:
coordinates = in_fc["geometry"]["coordinates"]
if in_fc["geometry"]["type"] == "Point":
longitude = coordinates[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][0] = longitude
elif in_fc["geometry"]["type"] == "Polygon":
for index1, item in enumerate(coordinates):
for index2, element in enumerate(item):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][index1][index2][0] = longitude
elif in_fc["geometry"]["type"] == "LineString":
for index, element in enumerate(coordinates):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["geometry"]["coordinates"][index][0] = longitude
elif "type" in keys:
coordinates = in_fc["coordinates"]
if in_fc["type"] == "Point":
longitude = coordinates[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][0] = longitude
elif in_fc["type"] == "Polygon":
for index1, item in enumerate(coordinates):
for index2, element in enumerate(item):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][index1][index2][0] = longitude
elif in_fc["type"] == "LineString":
for index, element in enumerate(coordinates):
longitude = element[0]
if longitude < -180:
longitude = 360 + longitude
elif longitude > 180:
longitude = longitude - 360
in_fc["coordinates"][index][0] = longitude
return in_fc
except Exception as e:
print(e)
return None
arc_active_map()
¶
Get the active map in ArcGIS Pro.
Returns:
Type | Description |
---|---|
arcpy.Map |
The active map in ArcGIS Pro. |
Source code in leafmap/common.py
def arc_active_map():
"""Get the active map in ArcGIS Pro.
Returns:
arcpy.Map: The active map in ArcGIS Pro.
"""
if is_arcpy():
import arcpy
aprx = arcpy.mp.ArcGISProject("CURRENT")
m = aprx.activeMap
return m
else:
return None
arc_active_view()
¶
Get the active view in ArcGIS Pro.
Returns:
Type | Description |
---|---|
arcpy.MapView |
The active view in ArcGIS Pro. |
Source code in leafmap/common.py
def arc_active_view():
"""Get the active view in ArcGIS Pro.
Returns:
arcpy.MapView: The active view in ArcGIS Pro.
"""
if is_arcpy():
import arcpy
aprx = arcpy.mp.ArcGISProject("CURRENT")
view = aprx.activeView
return view
else:
return None
arc_add_layer(url, name=None, shown=True, opacity=1.0)
¶
Add a layer to the active map in ArcGIS Pro.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the tile layer to add. |
required |
name |
str |
The name of the layer. Defaults to None. |
None |
shown |
bool |
Whether the layer is shown. Defaults to True. |
True |
opacity |
float |
The opacity of the layer. Defaults to 1.0. |
1.0 |
Source code in leafmap/common.py
def arc_add_layer(url, name=None, shown=True, opacity=1.0):
"""Add a layer to the active map in ArcGIS Pro.
Args:
url (str): The URL of the tile layer to add.
name (str, optional): The name of the layer. Defaults to None.
shown (bool, optional): Whether the layer is shown. Defaults to True.
opacity (float, optional): The opacity of the layer. Defaults to 1.0.
"""
if is_arcpy():
m = arc_active_map()
if m is not None:
m.addDataFromPath(url)
if isinstance(name, str):
layers = m.listLayers("Tiled service layer")
if len(layers) > 0:
layer = layers[0]
layer.name = name
layer.visible = shown
layer.transparency = 100 - (opacity * 100)
arc_zoom_to_bounds(bounds)
¶
Zoom to a bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
list |
The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)]. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
description |
Source code in leafmap/common.py
def arc_zoom_to_bounds(bounds):
"""Zoom to a bounding box.
Args:
bounds (list): The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)].
Raises:
ValueError: _description_
"""
if len(bounds) == 4:
xmin, ymin, xmax, ymax = bounds
elif len(bounds) == 2:
(ymin, xmin), (ymax, xmax) = bounds
else:
raise ValueError("bounds must be a tuple of length 2 or 4.")
arc_zoom_to_extent(xmin, ymin, xmax, ymax)
arc_zoom_to_extent(xmin, ymin, xmax, ymax)
¶
Zoom to an extent in ArcGIS Pro.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
xmin |
float |
The minimum x value of the extent. |
required |
ymin |
float |
The minimum y value of the extent. |
required |
xmax |
float |
The maximum x value of the extent. |
required |
ymax |
float |
The maximum y value of the extent. |
required |
Source code in leafmap/common.py
def arc_zoom_to_extent(xmin, ymin, xmax, ymax):
"""Zoom to an extent in ArcGIS Pro.
Args:
xmin (float): The minimum x value of the extent.
ymin (float): The minimum y value of the extent.
xmax (float): The maximum x value of the extent.
ymax (float): The maximum y value of the extent.
"""
if is_arcpy():
import arcpy
view = arc_active_view()
if view is not None:
view.camera.setExtent(
arcpy.Extent(
xmin,
ymin,
xmax,
ymax,
spatial_reference=arcpy.SpatialReference(4326),
)
)
# if isinstance(zoom, int):
# scale = 156543.04 * math.cos(0) / math.pow(2, zoom)
# view.camera.scale = scale # Not working properly
basemap_xyz_tiles()
¶
Returns a dictionary containing a set of basemaps that are XYZ tile layers.
Returns:
Type | Description |
---|---|
dict |
A dictionary of XYZ tile layers. |
Source code in leafmap/common.py
def basemap_xyz_tiles():
"""Returns a dictionary containing a set of basemaps that are XYZ tile layers.
Returns:
dict: A dictionary of XYZ tile layers.
"""
from .leafmap import basemaps
layers_dict = {}
keys = dict(basemaps).keys()
for key in keys:
if isinstance(basemaps[key], ipyleaflet.WMSLayer):
pass
else:
layers_dict[key] = basemaps[key]
return layers_dict
bbox_to_gdf(bbox, crs='epsg:4326')
¶
Convert a bounding box to a GeoPandas GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
list |
A bounding box in the format of [minx, miny, maxx, maxy]. |
required |
crs |
str |
The CRS of the bounding box. Defaults to 'epsg:4326'. |
'epsg:4326' |
Returns:
Type | Description |
---|---|
GeoDataFrame |
A GeoDataFrame with a single polygon. |
Source code in leafmap/common.py
def bbox_to_gdf(bbox, crs="epsg:4326"):
"""Convert a bounding box to a GeoPandas GeoDataFrame.
Args:
bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].
crs (str, optional): The CRS of the bounding box. Defaults to 'epsg:4326'.
Returns:
GeoDataFrame: A GeoDataFrame with a single polygon.
"""
import geopandas as gpd
from shapely.geometry import Polygon
return gpd.GeoDataFrame(
geometry=[Polygon.from_bounds(*bbox)],
crs=crs,
)
bbox_to_geojson(bounds)
¶
Convert coordinates of a bounding box to a geojson.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
list | tuple |
A list of coordinates representing [left, bottom, right, top] or m.bounds. |
required |
Returns:
Type | Description |
---|---|
dict |
A geojson feature. |
Source code in leafmap/common.py
def bbox_to_geojson(bounds):
"""Convert coordinates of a bounding box to a geojson.
Args:
bounds (list | tuple): A list of coordinates representing [left, bottom, right, top] or m.bounds.
Returns:
dict: A geojson feature.
"""
if isinstance(bounds, tuple) and len(bounds) == 2:
bounds = [bounds[0][1], bounds[0][0], bounds[1][1], bounds[1][0]]
return {
"geometry": {
"type": "Polygon",
"coordinates": [
[
[bounds[0], bounds[3]],
[bounds[0], bounds[1]],
[bounds[2], bounds[1]],
[bounds[2], bounds[3]],
[bounds[0], bounds[3]],
]
],
},
"type": "Feature",
}
bbox_to_polygon(bbox)
¶
Convert a bounding box to a shapely Polygon.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
list |
A bounding box in the format of [minx, miny, maxx, maxy]. |
required |
Returns:
Type | Description |
---|---|
Polygon |
A shapely Polygon. |
Source code in leafmap/common.py
def bbox_to_polygon(bbox):
"""Convert a bounding box to a shapely Polygon.
Args:
bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].
Returns:
Polygon: A shapely Polygon.
"""
from shapely.geometry import Polygon
return Polygon.from_bounds(*bbox)
bounds_to_xy_range(bounds)
¶
Convert bounds to x and y range to be used as input to bokeh map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bounds |
list |
A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax]. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of (x_range, y_range). |
Source code in leafmap/common.py
def bounds_to_xy_range(bounds):
"""Convert bounds to x and y range to be used as input to bokeh map.
Args:
bounds (list): A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax].
Returns:
tuple: A tuple of (x_range, y_range).
"""
if isinstance(bounds, tuple):
bounds = list(bounds)
elif not isinstance(bounds, list):
raise TypeError("bounds must be a list")
if len(bounds) == 4:
west, south, east, north = bounds
elif len(bounds) == 2:
south, west = bounds[0]
north, east = bounds[1]
xmin, ymin = lnglat_to_meters(west, south)
xmax, ymax = lnglat_to_meters(east, north)
x_range = (xmin, xmax)
y_range = (ymin, ymax)
return x_range, y_range
center_zoom_to_xy_range(center, zoom)
¶
Convert center and zoom to x and y range to be used as input to bokeh map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
center |
tuple |
A tuple of (latitude, longitude). |
required |
zoom |
int |
The zoom level. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of (x_range, y_range). |
Source code in leafmap/common.py
def center_zoom_to_xy_range(center, zoom):
"""Convert center and zoom to x and y range to be used as input to bokeh map.
Args:
center (tuple): A tuple of (latitude, longitude).
zoom (int): The zoom level.
Returns:
tuple: A tuple of (x_range, y_range).
"""
if isinstance(center, tuple) or isinstance(center, list):
pass
else:
raise TypeError("center must be a tuple or list")
if not isinstance(zoom, int):
raise TypeError("zoom must be an integer")
latitude, longitude = center
x_range = (-179, 179)
y_range = (-70, 70)
x_full_length = x_range[1] - x_range[0]
y_full_length = y_range[1] - y_range[0]
x_length = x_full_length / 2 ** (zoom - 2)
y_length = y_full_length / 2 ** (zoom - 2)
south = latitude - y_length / 2
north = latitude + y_length / 2
west = longitude - x_length / 2
east = longitude + x_length / 2
xmin, ymin = lnglat_to_meters(west, south)
xmax, ymax = lnglat_to_meters(east, north)
x_range = (xmin, xmax)
y_range = (ymin, ymax)
return x_range, y_range
cesium_to_streamlit(html, width=800, height=600, responsive=True, scrolling=False, token_name=None, token_value=None, **kwargs)
¶
Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html |
str |
The HTML file to render. It can a local file path or a URL. |
required |
width |
int |
Width of the map. Defaults to 800. |
800 |
height |
int |
Height of the map. Defaults to 600. |
600 |
responsive |
bool |
Whether to make the map responsive. Defaults to True. |
True |
scrolling |
bool |
Whether to allow the map to scroll. Defaults to False. |
False |
token_name |
str |
The name of the token in the HTML file to be replaced. Defaults to None. |
None |
token_value |
str |
The value of the token to pass to the HTML file. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
streamlit.components |
components.html object. |
Source code in leafmap/common.py
def cesium_to_streamlit(
html,
width=800,
height=600,
responsive=True,
scrolling=False,
token_name=None,
token_value=None,
**kwargs,
):
"""Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.
Args:
html (str): The HTML file to render. It can a local file path or a URL.
width (int, optional): Width of the map. Defaults to 800.
height (int, optional): Height of the map. Defaults to 600.
responsive (bool, optional): Whether to make the map responsive. Defaults to True.
scrolling (bool, optional): Whether to allow the map to scroll. Defaults to False.
token_name (str, optional): The name of the token in the HTML file to be replaced. Defaults to None.
token_value (str, optional): The value of the token to pass to the HTML file. Defaults to None.
Returns:
streamlit.components: components.html object.
"""
if token_name is None:
token_name = "your_access_token"
if token_value is None:
token_value = os.environ.get("CESIUM_TOKEN")
html_to_streamlit(
html, width, height, responsive, scrolling, token_name, token_value
)
check_cmap(cmap)
¶
Check the colormap and return a list of colors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmap |
str | list | Box |
The colormap to check. |
required |
Returns:
Type | Description |
---|---|
list |
A list of colors. |
Source code in leafmap/common.py
def check_cmap(cmap):
"""Check the colormap and return a list of colors.
Args:
cmap (str | list | Box): The colormap to check.
Returns:
list: A list of colors.
"""
from box import Box
from .colormaps import get_palette
if isinstance(cmap, str):
try:
return get_palette(cmap)
except Exception as e:
raise Exception(f"{cmap} is not a valid colormap.")
elif isinstance(cmap, Box):
return list(cmap["default"])
elif isinstance(cmap, list) or isinstance(cmap, tuple):
return cmap
else:
raise Exception(f"{cmap} is not a valid colormap.")
check_color(in_color)
¶
Checks the input color and returns the corresponding hex color code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_color |
str or tuple |
It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)). |
required |
Returns:
Type | Description |
---|---|
str |
A hex color code. |
Source code in leafmap/common.py
def check_color(in_color):
"""Checks the input color and returns the corresponding hex color code.
Args:
in_color (str or tuple): It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)).
Returns:
str: A hex color code.
"""
import colour
out_color = "#000000" # default black color
if isinstance(in_color, tuple) and len(in_color) == 3:
# rescale color if necessary
if all(isinstance(item, int) for item in in_color):
in_color = [c / 255.0 for c in in_color]
return colour.Color(rgb=tuple(in_color)).hex_l
else:
# try to guess the color system
try:
return colour.Color(in_color).hex_l
except Exception as e:
pass
# try again by adding an extra # (GEE handle hex codes without #)
try:
return colour.Color(f"#{in_color}").hex_l
except Exception as e:
print(
f"The provided color ({in_color}) is invalid. Using the default black color."
)
print(e)
return out_color
check_dir(dir_path, make_dirs=True)
¶
Checks if a directory exists and creates it if it does not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
[str |
The path to the directory. |
required |
make_dirs |
bool |
Whether to create the directory if it does not exist. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the directory could not be found. |
TypeError |
If the input directory path is not a string. |
Returns:
Type | Description |
---|---|
str |
The path to the directory. |
Source code in leafmap/common.py
def check_dir(dir_path, make_dirs=True):
"""Checks if a directory exists and creates it if it does not.
Args:
dir_path ([str): The path to the directory.
make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.
Raises:
FileNotFoundError: If the directory could not be found.
TypeError: If the input directory path is not a string.
Returns:
str: The path to the directory.
"""
if isinstance(dir_path, str):
if dir_path.startswith("~"):
dir_path = os.path.expanduser(dir_path)
else:
dir_path = os.path.abspath(dir_path)
if not os.path.exists(dir_path) and make_dirs:
os.makedirs(dir_path)
if os.path.exists(dir_path):
return dir_path
else:
raise FileNotFoundError("The provided directory could not be found.")
else:
raise TypeError("The provided directory path must be a string.")
check_file_path(file_path, make_dirs=True)
¶
Gets the absolute file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
The path to the file. |
required |
make_dirs |
bool |
Whether to create the directory if it does not exist. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the directory could not be found. |
TypeError |
If the input directory path is not a string. |
Returns:
Type | Description |
---|---|
str |
The absolute path to the file. |
Source code in leafmap/common.py
def check_file_path(file_path, make_dirs=True):
"""Gets the absolute file path.
Args:
file_path (str): The path to the file.
make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.
Raises:
FileNotFoundError: If the directory could not be found.
TypeError: If the input directory path is not a string.
Returns:
str: The absolute path to the file.
"""
if isinstance(file_path, str):
if file_path.startswith("~"):
file_path = os.path.expanduser(file_path)
else:
file_path = os.path.abspath(file_path)
file_dir = os.path.dirname(file_path)
if not os.path.exists(file_dir) and make_dirs:
os.makedirs(file_dir)
return file_path
else:
raise TypeError("The provided file path must be a string.")
classify(data, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None)
¶
Classify a dataframe column using a variety of classification schemes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | pd.DataFrame | gpd.GeoDataFrame |
The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe. |
required |
column |
str |
The column to classify. |
required |
cmap |
str |
The name of a colormap recognized by matplotlib. Defaults to None. |
None |
colors |
list |
A list of colors to use for the classification. Defaults to None. |
None |
labels |
list |
A list of labels to use for the legend. Defaults to None. |
None |
scheme |
str |
Name of a choropleth classification scheme (requires mapclassify). Name of a choropleth classification scheme (requires mapclassify). A mapclassify.MapClassifier object will be used under the hood. Supported are all schemes provided by mapclassify (e.g. 'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled', 'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced', 'JenksCaspallSampled', 'MaxP', 'MaximumBreaks', 'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean', 'UserDefined'). Arguments can be passed in classification_kwds. |
'Quantiles' |
k |
int |
Number of classes (ignored if scheme is None or if column is categorical). Default to 5. |
5 |
legend_kwds |
dict |
Keyword arguments to pass to :func: |
None |
classification_kwds |
dict |
Keyword arguments to pass to mapclassify. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame, dict |
A pandas dataframe with the classification applied and a legend dictionary. |
Source code in leafmap/common.py
def classify(
data,
column,
cmap=None,
colors=None,
labels=None,
scheme="Quantiles",
k=5,
legend_kwds=None,
classification_kwds=None,
):
"""Classify a dataframe column using a variety of classification schemes.
Args:
data (str | pd.DataFrame | gpd.GeoDataFrame): The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe.
column (str): The column to classify.
cmap (str, optional): The name of a colormap recognized by matplotlib. Defaults to None.
colors (list, optional): A list of colors to use for the classification. Defaults to None.
labels (list, optional): A list of labels to use for the legend. Defaults to None.
scheme (str, optional): Name of a choropleth classification scheme (requires mapclassify).
Name of a choropleth classification scheme (requires mapclassify).
A mapclassify.MapClassifier object will be used
under the hood. Supported are all schemes provided by mapclassify (e.g.
'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled',
'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced',
'JenksCaspallSampled', 'MaxP', 'MaximumBreaks',
'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean',
'UserDefined'). Arguments can be passed in classification_kwds.
k (int, optional): Number of classes (ignored if scheme is None or if column is categorical). Default to 5.
legend_kwds (dict, optional): Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or `matplotlib.pyplot.colorbar`. Defaults to None.
Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or
Additional accepted keywords when `scheme` is specified:
fmt : string
A formatting specification for the bin edges of the classes in the
legend. For example, to have no decimals: ``{"fmt": "{:.0f}"}``.
labels : list-like
A list of legend labels to override the auto-generated labblels.
Needs to have the same number of elements as the number of
classes (`k`).
interval : boolean (default False)
An option to control brackets from mapclassify legend.
If True, open/closed interval brackets are shown in the legend.
classification_kwds (dict, optional): Keyword arguments to pass to mapclassify. Defaults to None.
Returns:
pd.DataFrame, dict: A pandas dataframe with the classification applied and a legend dictionary.
"""
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib as mpl
import matplotlib.pyplot as plt
try:
import mapclassify
except ImportError:
raise ImportError(
"mapclassify is required for this function. Install with `pip install mapclassify`."
)
if isinstance(data, gpd.GeoDataFrame) or isinstance(data, pd.DataFrame):
df = data
else:
try:
df = gpd.read_file(data)
except Exception:
raise TypeError(
"Data must be a GeoDataFrame or a path to a file that can be read by geopandas.read_file()."
)
if df.empty:
warnings.warn(
"The GeoDataFrame you are attempting to plot is "
"empty. Nothing has been displayed.",
UserWarning,
)
return
columns = df.columns.values.tolist()
if column not in columns:
raise ValueError(
f"{column} is not a column in the GeoDataFrame. It must be one of {columns}."
)
# Convert categorical data to numeric
init_column = None
value_list = None
if np.issubdtype(df[column].dtype, np.object0):
value_list = df[column].unique().tolist()
value_list.sort()
df["category"] = df[column].replace(value_list, range(0, len(value_list)))
init_column = column
column = "category"
k = len(value_list)
if legend_kwds is not None:
legend_kwds = legend_kwds.copy()
# To accept pd.Series and np.arrays as column
if isinstance(column, (np.ndarray, pd.Series)):
if column.shape[0] != df.shape[0]:
raise ValueError(
"The dataframe and given column have different number of rows."
)
else:
values = column
# Make sure index of a Series matches index of df
if isinstance(values, pd.Series):
values = values.reindex(df.index)
else:
values = df[column]
values = df[column]
nan_idx = np.asarray(pd.isna(values), dtype="bool")
if cmap is None:
cmap = "Blues"
cmap = plt.cm.get_cmap(cmap, k)
if colors is None:
colors = [mpl.colors.rgb2hex(cmap(i))[1:] for i in range(cmap.N)]
colors = ["#" + i for i in colors]
elif isinstance(colors, list):
colors = [check_color(i) for i in colors]
elif isinstance(colors, str):
colors = [check_color(colors)] * k
allowed_schemes = [
"BoxPlot",
"EqualInterval",
"FisherJenks",
"FisherJenksSampled",
"HeadTailBreaks",
"JenksCaspall",
"JenksCaspallForced",
"JenksCaspallSampled",
"MaxP",
"MaximumBreaks",
"NaturalBreaks",
"Quantiles",
"Percentiles",
"StdMean",
"UserDefined",
]
if scheme.lower() not in [s.lower() for s in allowed_schemes]:
raise ValueError(
f"{scheme} is not a valid scheme. It must be one of {allowed_schemes}."
)
if classification_kwds is None:
classification_kwds = {}
if "k" not in classification_kwds:
classification_kwds["k"] = k
binning = mapclassify.classify(
np.asarray(values[~nan_idx]), scheme, **classification_kwds
)
df["category"] = binning.yb
df["color"] = [colors[i] for i in df["category"]]
if legend_kwds is None:
legend_kwds = {}
if "interval" not in legend_kwds:
legend_kwds["interval"] = True
if "fmt" not in legend_kwds:
if np.issubdtype(df[column].dtype, np.floating):
legend_kwds["fmt"] = "{:.2f}"
else:
legend_kwds["fmt"] = "{:.0f}"
if labels is None:
# set categorical to True for creating the legend
if legend_kwds is not None and "labels" in legend_kwds:
if len(legend_kwds["labels"]) != binning.k:
raise ValueError(
"Number of labels must match number of bins, "
"received {} labels for {} bins".format(
len(legend_kwds["labels"]), binning.k
)
)
else:
labels = list(legend_kwds.pop("labels"))
else:
# fmt = "{:.2f}"
if legend_kwds is not None and "fmt" in legend_kwds:
fmt = legend_kwds.pop("fmt")
labels = binning.get_legend_classes(fmt)
if legend_kwds is not None:
show_interval = legend_kwds.pop("interval", False)
else:
show_interval = False
if not show_interval:
labels = [c[1:-1] for c in labels]
if init_column is not None:
labels = value_list
elif isinstance(labels, list):
if len(labels) != len(colors):
raise ValueError("The number of labels must match the number of colors.")
else:
raise ValueError("labels must be a list or None.")
legend_dict = dict(zip(labels, colors))
df["category"] = df["category"] + 1
return df, legend_dict
clip_image(image, mask, output, to_cog=True)
¶
Clip an image by mask.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
Path to the image file in GeoTIFF format. |
required |
mask |
str | list | dict |
The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi. |
required |
output |
str |
Path to the output file. |
required |
to_cog |
bool |
Flags to indicate if you want to convert the output to COG. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
ImportError |
If the fiona or rasterio package is not installed. |
FileNotFoundError |
If the image is not found. |
ValueError |
If the mask is not a valid GeoJSON or raster file. |
FileNotFoundError |
If the mask file is not found. |
Source code in leafmap/common.py
def clip_image(image, mask, output, to_cog=True):
"""Clip an image by mask.
Args:
image (str): Path to the image file in GeoTIFF format.
mask (str | list | dict): The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi.
output (str): Path to the output file.
to_cog (bool, optional): Flags to indicate if you want to convert the output to COG. Defaults to True.
Raises:
ImportError: If the fiona or rasterio package is not installed.
FileNotFoundError: If the image is not found.
ValueError: If the mask is not a valid GeoJSON or raster file.
FileNotFoundError: If the mask file is not found.
"""
try:
import json
import fiona
import rasterio
import rasterio.mask
except ImportError as e:
raise ImportError(e)
if not os.path.exists(image):
raise FileNotFoundError(f"{image} does not exist.")
if not output.endswith(".tif"):
raise ValueError("Output must be a tif file.")
output = check_file_path(output)
if isinstance(mask, str):
if mask.startswith("http"):
mask = download_file(mask, output)
if not os.path.exists(mask):
raise FileNotFoundError(f"{mask} does not exist.")
elif isinstance(mask, list) or isinstance(mask, dict):
if isinstance(mask, list):
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {"type": "Polygon", "coordinates": [mask]},
}
],
}
else:
geojson = {
"type": "FeatureCollection",
"features": [mask],
}
mask = temp_file_path(".geojson")
with open(mask, "w") as f:
json.dump(geojson, f)
with fiona.open(mask, "r") as shapefile:
shapes = [feature["geometry"] for feature in shapefile]
with rasterio.open(image) as src:
out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
out_meta = src.meta
out_meta.update(
{
"driver": "GTiff",
"height": out_image.shape[1],
"width": out_image.shape[2],
"transform": out_transform,
}
)
with rasterio.open(output, "w", **out_meta) as dest:
dest.write(out_image)
if to_cog:
image_to_cog(output, output)
cog_validate(source, verbose=False)
¶
Validate Cloud Optimized Geotiff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
A dataset path or URL. Will be opened in "r" mode. |
required |
verbose |
bool |
Whether to print the output of the validation. Defaults to False. |
False |
Exceptions:
Type | Description |
---|---|
ImportError |
If the rio-cogeo package is not installed. |
FileNotFoundError |
If the provided file could not be found. |
Returns:
Type | Description |
---|---|
tuple |
A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings). |
Source code in leafmap/common.py
def cog_validate(source, verbose=False):
"""Validate Cloud Optimized Geotiff.
Args:
source (str): A dataset path or URL. Will be opened in "r" mode.
verbose (bool, optional): Whether to print the output of the validation. Defaults to False.
Raises:
ImportError: If the rio-cogeo package is not installed.
FileNotFoundError: If the provided file could not be found.
Returns:
tuple: A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings).
"""
try:
from rio_cogeo.cogeo import cog_validate, cog_info
except ImportError:
raise ImportError(
"The rio-cogeo package is not installed. Please install it with `pip install rio-cogeo` or `conda install rio-cogeo -c conda-forge`."
)
if not source.startswith("http"):
source = check_file_path(source)
if not os.path.exists(source):
raise FileNotFoundError("The provided input file could not be found.")
if verbose:
return cog_info(source)
else:
return cog_validate(source)
connect_postgis(database, host='localhost', user=None, password=None, port=5432, use_env_var=False)
¶
Connects to a PostGIS database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str |
Name of the database |
required |
host |
str |
Hosting server for the database. Defaults to "localhost". |
'localhost' |
user |
str |
User name to access the database. Defaults to None. |
None |
password |
str |
Password to access the database. Defaults to None. |
None |
port |
int |
Port number to connect to at the server host. Defaults to 5432. |
5432 |
use_env_var |
bool |
Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
If user is not specified. |
ValueError |
If password is not specified. |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in leafmap/common.py
def connect_postgis(
database, host="localhost", user=None, password=None, port=5432, use_env_var=False
):
"""Connects to a PostGIS database.
Args:
database (str): Name of the database
host (str, optional): Hosting server for the database. Defaults to "localhost".
user (str, optional): User name to access the database. Defaults to None.
password (str, optional): Password to access the database. Defaults to None.
port (int, optional): Port number to connect to at the server host. Defaults to 5432.
use_env_var (bool, optional): Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False.
Raises:
ValueError: If user is not specified.
ValueError: If password is not specified.
Returns:
[type]: [description]
"""
check_package(name="geopandas", URL="https://geopandas.org")
check_package(
name="sqlalchemy",
URL="https://docs.sqlalchemy.org/en/14/intro.html#installation",
)
from sqlalchemy import create_engine
if use_env_var:
if user is not None:
user = os.getenv(user)
else:
user = os.getenv("SQL_USER")
if password is not None:
password = os.getenv(password)
else:
password = os.getenv("SQL_PASSWORD")
if user is None:
raise ValueError("user is not specified.")
if password is None:
raise ValueError("password is not specified.")
connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
engine = create_engine(connection_string)
return engine
convert_lidar(source, destination=None, point_format_id=None, file_version=None, **kwargs)
¶
Converts a Las from one point format to another Automatically upgrades the file version if source file version is not compatible with the new point_format_id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str | laspy.lasdatas.base.LasBase |
The source data to be converted. |
required |
destination |
str |
The destination file path. Defaults to None. |
None |
point_format_id |
int |
The new point format id (the default is None, which won't change the source format id). |
None |
file_version |
str |
The new file version. None by default which means that the file_version may be upgraded for compatibility with the new point_format. The file version will not be downgraded. |
None |
Returns:
Type | Description |
---|---|
aspy.lasdatas.base.LasBase |
The converted LasData object. |
Source code in leafmap/common.py
def convert_lidar(
source, destination=None, point_format_id=None, file_version=None, **kwargs
):
"""Converts a Las from one point format to another Automatically upgrades the file version if source file version
is not compatible with the new point_format_id
Args:
source (str | laspy.lasdatas.base.LasBase): The source data to be converted.
destination (str, optional): The destination file path. Defaults to None.
point_format_id (int, optional): The new point format id (the default is None, which won't change the source format id).
file_version (str, optional): The new file version. None by default which means that the file_version may be upgraded
for compatibility with the new point_format. The file version will not be downgraded.
Returns:
aspy.lasdatas.base.LasBase: The converted LasData object.
"""
try:
import laspy
except ImportError:
print(
"The laspy package is required for this function. Use `pip install laspy[lazrs,laszip]` to install it."
)
return
if isinstance(source, str):
source = read_lidar(source)
las = laspy.convert(
source, point_format_id=point_format_id, file_version=file_version
)
if destination is None:
return las
else:
destination = check_file_path(destination)
write_lidar(las, destination, **kwargs)
return destination
coords_to_geojson(coords)
¶
Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of bbox coordinates representing [left, bottom, right, top]. |
required |
Returns:
Type | Description |
---|---|
dict |
A geojson FeatureCollection. |
Source code in leafmap/common.py
def coords_to_geojson(coords):
"""Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.
Args:
coords (list): A list of bbox coordinates representing [left, bottom, right, top].
Returns:
dict: A geojson FeatureCollection.
"""
features = []
for bbox in coords:
features.append(bbox_to_geojson(bbox))
return {"type": "FeatureCollection", "features": features}
coords_to_vector(coords, output=None, crs='EPSG:4326', **kwargs)
¶
Convert a list of coordinates to a GeoDataFrame or a vector file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of coordinates in the format of [(x1, y1), (x2, y2), ...]. |
required |
output |
str |
The path to the output vector file. Defaults to None. |
None |
crs |
str |
The CRS of the coordinates. Defaults to "EPSG:4326". |
'EPSG:4326' |
Returns:
Type | Description |
---|---|
gpd.GeoDataFraem |
A GeoDataFrame of the coordinates. |
Source code in leafmap/common.py
def coords_to_vector(coords, output=None, crs="EPSG:4326", **kwargs):
"""Convert a list of coordinates to a GeoDataFrame or a vector file.
Args:
coords (list): A list of coordinates in the format of [(x1, y1), (x2, y2), ...].
output (str, optional): The path to the output vector file. Defaults to None.
crs (str, optional): The CRS of the coordinates. Defaults to "EPSG:4326".
Returns:
gpd.GeoDataFraem: A GeoDataFrame of the coordinates.
"""
import geopandas as gpd
from shapely.geometry import Point
if not isinstance(coords, list):
raise TypeError("coords must be a list of coordinates")
if isinstance(coords[0], int) or isinstance(coords[0], float):
coords = [(coords[0], coords[1])]
# convert the points to a GeoDataFrame
geometry = [Point(xy) for xy in coords]
gdf = gpd.GeoDataFrame(geometry=geometry, crs="EPSG:4326")
gdf.to_crs(crs, inplace=True)
if output is not None:
gdf.to_file(output, **kwargs)
else:
return gdf
create_code_cell(code='', where='below')
¶
Creates a code cell in the IPython Notebook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
str |
Code to fill the new code cell with. Defaults to ''. |
'' |
where |
str |
Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'. |
'below' |
Source code in leafmap/common.py
def create_code_cell(code="", where="below"):
"""Creates a code cell in the IPython Notebook.
Args:
code (str, optional): Code to fill the new code cell with. Defaults to ''.
where (str, optional): Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'.
"""
import base64
from IPython.display import Javascript, display
encoded_code = (base64.b64encode(str.encode(code))).decode()
display(
Javascript(
"""
var code = IPython.notebook.insert_cell_{0}('code');
code.set_text(atob("{1}"));
""".format(
where, encoded_code
)
)
)
create_download_link(filename, title='Click here to download: ')
¶
Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The file path to the file to download |
required |
title |
str |
str. Defaults to "Click here to download: ". |
'Click here to download: ' |
Returns:
Type | Description |
---|---|
str |
HTML download URL. |
Source code in leafmap/common.py
def create_download_link(filename, title="Click here to download: "):
"""Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578
Args:
filename (str): The file path to the file to download
title (str, optional): str. Defaults to "Click here to download: ".
Returns:
str: HTML download URL.
"""
import base64
from IPython.display import HTML
data = open(filename, "rb").read()
b64 = base64.b64encode(data)
payload = b64.decode()
basename = os.path.basename(filename)
html = '<a download="{filename}" href="data:text/csv;base64,{payload}" style="color:#0000FF;" target="_blank">{title}</a>'
html = html.format(payload=payload, title=title + f" {basename}", filename=basename)
return HTML(html)
create_legend(title='Legend', labels=None, colors=None, legend_dict=None, builtin_legend=None, opacity=1.0, position='bottomright', draggable=True, output=None, style={})
¶
Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH
Parameters:
Name | Type | Description | Default |
---|---|---|---|
title |
str |
Title of the legend. Defaults to 'Legend'. Defaults to "Legend". |
'Legend' |
colors |
list |
A list of legend colors. Defaults to None. |
None |
labels |
list |
A list of legend labels. Defaults to None. |
None |
legend_dict |
dict |
A dictionary containing legend items as keys and color as values. If provided, legend_keys and legend_colors will be ignored. Defaults to None. |
None |
builtin_legend |
str |
Name of the builtin legend to add to the map. Defaults to None. |
None |
opacity |
float |
The opacity of the legend. Defaults to 1.0. |
1.0 |
position |
str |
The position of the legend, can be one of the following: "topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright". |
'bottomright' |
draggable |
bool |
If True, the legend can be dragged to a new position. Defaults to True. |
True |
output |
str |
The output file path (*.html) to save the legend. Defaults to None. |
None |
style |
Additional keyword arguments to style the legend, such as position, bottom, right, z-index, border, background-color, border-radius, padding, font-size, etc. The default style is: style = { 'position': 'fixed', 'z-index': '9999', 'border': '2px solid grey', 'background-color': 'rgba(255, 255, 255, 0.8)', 'border-radius': '5px', 'padding': '10px', 'font-size': '14px', 'bottom': '20px', 'right': '5px' } |
{} |
Returns:
Type | Description |
---|---|
str |
The HTML code of the legend. |
Source code in leafmap/common.py
def create_legend(
title="Legend",
labels=None,
colors=None,
legend_dict=None,
builtin_legend=None,
opacity=1.0,
position="bottomright",
draggable=True,
output=None,
style={},
):
"""Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH
Args:
title (str, optional): Title of the legend. Defaults to 'Legend'. Defaults to "Legend".
colors (list, optional): A list of legend colors. Defaults to None.
labels (list, optional): A list of legend labels. Defaults to None.
legend_dict (dict, optional): A dictionary containing legend items as keys and color as values.
If provided, legend_keys and legend_colors will be ignored. Defaults to None.
builtin_legend (str, optional): Name of the builtin legend to add to the map. Defaults to None.
opacity (float, optional): The opacity of the legend. Defaults to 1.0.
position (str, optional): The position of the legend, can be one of the following:
"topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright".
draggable (bool, optional): If True, the legend can be dragged to a new position. Defaults to True.
output (str, optional): The output file path (*.html) to save the legend. Defaults to None.
style: Additional keyword arguments to style the legend, such as position, bottom, right, z-index,
border, background-color, border-radius, padding, font-size, etc. The default style is:
style = {
'position': 'fixed',
'z-index': '9999',
'border': '2px solid grey',
'background-color': 'rgba(255, 255, 255, 0.8)',
'border-radius': '5px',
'padding': '10px',
'font-size': '14px',
'bottom': '20px',
'right': '5px'
}
Returns:
str: The HTML code of the legend.
"""
import pkg_resources
from .legends import builtin_legends
pkg_dir = os.path.dirname(pkg_resources.resource_filename("leafmap", "leafmap.py"))
legend_template = os.path.join(pkg_dir, "data/template/legend_style.html")
if draggable:
legend_template = os.path.join(pkg_dir, "data/template/legend.txt")
if not os.path.exists(legend_template):
raise FileNotFoundError("The legend template does not exist.")
if labels is not None:
if not isinstance(labels, list):
print("The legend keys must be a list.")
return
else:
labels = ["One", "Two", "Three", "Four", "etc"]
if colors is not None:
if not isinstance(colors, list):
print("The legend colors must be a list.")
return
elif all(isinstance(item, tuple) for item in colors):
try:
colors = [rgb_to_hex(x) for x in colors]
except Exception as e:
print(e)
elif all((item.startswith("#") and len(item) == 7) for item in colors):
pass
elif all((len(item) == 6) for item in colors):
pass
else:
print("The legend colors must be a list of tuples.")
return
else:
colors = [
"#8DD3C7",
"#FFFFB3",
"#BEBADA",
"#FB8072",
"#80B1D3",
]
if len(labels) != len(colors):
print("The legend keys and values must be the same length.")
return
allowed_builtin_legends = builtin_legends.keys()
if builtin_legend is not None:
if builtin_legend not in allowed_builtin_legends:
print(
"The builtin legend must be one of the following: {}".format(
", ".join(allowed_builtin_legends)
)
)
return
else:
legend_dict = builtin_legends[builtin_legend]
labels = list(legend_dict.keys())
colors = list(legend_dict.values())
if legend_dict is not None:
if not isinstance(legend_dict, dict):
print("The legend dict must be a dictionary.")
return
else:
labels = list(legend_dict.keys())
colors = list(legend_dict.values())
if all(isinstance(item, tuple) for item in colors):
try:
colors = [rgb_to_hex(x) for x in colors]
except Exception as e:
print(e)
allowed_positions = [
"topleft",
"topright",
"bottomleft",
"bottomright",
]
if position not in allowed_positions:
raise ValueError(
"The position must be one of the following: {}".format(
", ".join(allowed_positions)
)
)
if position == "bottomright":
if "bottom" not in style:
style["bottom"] = "20px"
if "right" not in style:
style["right"] = "5px"
if "left" in style:
del style["left"]
if "top" in style:
del style["top"]
elif position == "bottomleft":
if "bottom" not in style:
style["bottom"] = "5px"
if "left" not in style:
style["left"] = "5px"
if "right" in style:
del style["right"]
if "top" in style:
del style["top"]
elif position == "topright":
if "top" not in style:
style["top"] = "5px"
if "right" not in style:
style["right"] = "5px"
if "left" in style:
del style["left"]
if "bottom" in style:
del style["bottom"]
elif position == "topleft":
if "top" not in style:
style["top"] = "5px"
if "left" not in style:
style["left"] = "5px"
if "right" in style:
del style["right"]
if "bottom" in style:
del style["bottom"]
if "position" not in style:
style["position"] = "fixed"
if "z-index" not in style:
style["z-index"] = "9999"
if "background-color" not in style:
style["background-color"] = "rgba(255, 255, 255, 0.8)"
if "padding" not in style:
style["padding"] = "10px"
if "border-radius" not in style:
style["border-radius"] = "5px"
if "font-size" not in style:
style["font-size"] = "14px"
content = []
with open(legend_template) as f:
lines = f.readlines()
if draggable:
for index, line in enumerate(lines):
if index < 36:
content.append(line)
elif index == 36:
line = lines[index].replace("Legend", title)
content.append(line)
elif index < 39:
content.append(line)
elif index == 39:
for i, color in enumerate(colors):
item = f" <li><span style='background:{check_color(color)};opacity:{opacity};'></span>{labels[i]}</li>\n"
content.append(item)
elif index > 41:
content.append(line)
content = content[3:-1]
else:
for index, line in enumerate(lines):
if index < 8:
content.append(line)
elif index == 8:
for key, value in style.items():
content.append(
" {}: {};\n".format(key.replace("_", "-"), value)
)
elif index < 17:
pass
elif index < 19:
content.append(line)
elif index == 19:
content.append(line.replace("Legend", title))
elif index < 22:
content.append(line)
elif index == 22:
for index, key in enumerate(labels):
color = colors[index]
if not color.startswith("#"):
color = "#" + color
item = " <li><span style='background:{};opacity:{};'></span>{}</li>\n".format(
color, opacity, key
)
content.append(item)
elif index < 33:
pass
else:
content.append(line)
legend_text = "".join(content)
if output is not None:
with open(output, "w") as f:
f.write(legend_text)
else:
return legend_text
create_timelapse(images, out_gif, ext='.tif', bands=None, size=None, bbox=None, fps=5, loop=0, add_progress_bar=True, progress_bar_color='blue', progress_bar_height=5, add_text=False, text_xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='black', mp4=False, quiet=True, reduce_size=False, clean_up=True, **kwargs)
¶
Creates a timelapse gif from a list of images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
images |
Union[list, str] |
The list of images or input directory to create the gif from. For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...] |
required |
out_gif |
str |
File path to the output gif. |
required |
ext |
str |
The extension of the images. Defaults to '.tif'. |
'.tif' |
bands |
Optional[list] |
The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None. |
None |
size |
Optional[tuple] |
The size of the gif. For example, (500, 500). Defaults to None, using the original size. |
None |
bbox |
Optional[list] |
The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box. |
None |
fps |
int |
The frames per second of the gif. Defaults to 5. |
5 |
loop |
int |
The number of times to loop the gif. Defaults to 0, looping forever. |
0 |
add_progress_bar |
bool |
Whether to add a progress bar to the gif. Defaults to True. |
True |
progress_bar_color |
str |
The color of the progress bar, can be color name or hex code. Defaults to 'blue'. |
'blue' |
progress_bar_height |
int |
The height of the progress bar. Defaults to 5. |
5 |
add_text |
bool |
Whether to add text to the gif. Defaults to False. |
False |
text_xy |
Optional[tuple] |
The x, y coordinates of the text. For example, ('10%', '10%'). Defaults to None, using the bottom left corner. |
None |
text_sequence |
Optional[list] |
The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...]. |
None |
font_type |
str |
The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'. |
'arial.ttf' |
font_size |
int |
The font size of the text. Defaults to 20. |
20 |
font_color |
str |
The color of the text, can be color name or hex code. Defaults to 'black'. |
'black' |
mp4 |
bool |
Whether to convert the gif to mp4. Defaults to False. |
False |
quiet |
bool |
Whether to print the progress. Defaults to False. |
True |
reduce_size |
bool |
Whether to reduce the size of the gif using ffmpeg. Defaults to False. |
False |
clean_up |
bool |
Whether to clean up the temporary files. Defaults to True. |
True |
Source code in leafmap/common.py
def create_timelapse(
images: Union[list, str],
out_gif: str,
ext: str = ".tif",
bands: Optional[list] = None,
size: Optional[tuple] = None,
bbox: Optional[list] = None,
fps: int = 5,
loop: int = 0,
add_progress_bar: bool = True,
progress_bar_color: str = "blue",
progress_bar_height: int = 5,
add_text: bool = False,
text_xy: Optional[tuple] = None,
text_sequence: Optional[list] = None,
font_type: str = "arial.ttf",
font_size: int = 20,
font_color: str = "black",
mp4: bool = False,
quiet: bool = True,
reduce_size: bool = False,
clean_up: bool = True,
**kwargs,
):
"""Creates a timelapse gif from a list of images.
Args:
images (Union[list, str]): The list of images or input directory to create the gif from.
For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...]
out_gif (str): File path to the output gif.
ext (str, optional): The extension of the images. Defaults to '.tif'.
bands (Optional[list], optional): The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None.
size (Optional[tuple], optional): The size of the gif. For example, (500, 500). Defaults to None, using the original size.
bbox (Optional[list], optional): The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box.
fps (int, optional): The frames per second of the gif. Defaults to 5.
loop (int, optional): The number of times to loop the gif. Defaults to 0, looping forever.
add_progress_bar (bool, optional): Whether to add a progress bar to the gif. Defaults to True.
progress_bar_color (str, optional): The color of the progress bar, can be color name or hex code. Defaults to 'blue'.
progress_bar_height (int, optional): The height of the progress bar. Defaults to 5.
add_text (bool, optional): Whether to add text to the gif. Defaults to False.
text_xy (Optional[tuple], optional): The x, y coordinates of the text. For example, ('10%', '10%').
Defaults to None, using the bottom left corner.
text_sequence (Optional[list], optional): The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...].
font_type (str, optional): The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'.
font_size (int, optional): The font size of the text. Defaults to 20.
font_color (str, optional): The color of the text, can be color name or hex code. Defaults to 'black'.
mp4 (bool, optional): Whether to convert the gif to mp4. Defaults to False.
quiet (bool, optional): Whether to print the progress. Defaults to False.
reduce_size (bool, optional): Whether to reduce the size of the gif using ffmpeg. Defaults to False.
clean_up (bool, optional): Whether to clean up the temporary files. Defaults to True.
"""
import glob
import tempfile
if isinstance(images, str):
if not images.endswith(ext):
images = os.path.join(images, f"*{ext}")
images = list(glob.glob(images))
if not isinstance(images, list):
raise ValueError("images must be a list or a path to the image directory.")
images.sort()
temp_dir = os.path.join(tempfile.gettempdir(), "timelapse")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
if bbox is not None:
clip_dir = os.path.join(tempfile.gettempdir(), "clip")
if not os.path.exists(clip_dir):
os.makedirs(clip_dir)
if len(bbox) == 4:
bbox = bbox_to_geojson(bbox)
else:
clip_dir = None
output = widgets.Output()
if "out_ext" in kwargs:
out_ext = kwargs["out_ext"].lower()
else:
out_ext = ".jpg"
try:
for index, image in enumerate(images):
if bbox is not None:
clip_file = os.path.join(clip_dir, os.path.basename(image))
with output:
clip_image(image, mask=bbox, output=clip_file, to_cog=False)
image = clip_file
if "add_prefix" in kwargs:
basename = (
str(f"{index + 1}").zfill(len(str(len(images))))
+ "-"
+ os.path.basename(image).replace(ext, out_ext)
)
else:
basename = os.path.basename(image).replace(ext, out_ext)
if not quiet:
print(f"Processing {index+1}/{len(images)}: {basename} ...")
# ignore GDAL warnings
with output:
numpy_to_image(
image, os.path.join(temp_dir, basename), bands=bands, size=size
)
make_gif(
temp_dir,
out_gif,
ext=out_ext,
fps=fps,
loop=loop,
mp4=mp4,
clean_up=clean_up,
)
if clip_dir is not None:
shutil.rmtree(clip_dir)
if add_text:
add_text_to_gif(
out_gif,
out_gif,
text_xy,
text_sequence,
font_type,
font_size,
font_color,
add_progress_bar,
progress_bar_color,
progress_bar_height,
1000 / fps,
loop,
)
elif add_progress_bar:
add_progress_bar_to_gif(
out_gif,
out_gif,
progress_bar_color,
progress_bar_height,
1000 / fps,
loop,
)
if reduce_size:
reduce_gif_size(out_gif)
except Exception as e:
print(e)
csv_points_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude')
¶
Converts a csv file containing points (latitude, longitude) into a shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv |
required |
out_shp |
str |
File path to the output shapefile. |
required |
latitude |
str |
Column name for the latitude column. Defaults to 'latitude'. |
'latitude' |
longitude |
str |
Column name for the longitude column. Defaults to 'longitude'. |
'longitude' |
Source code in leafmap/common.py
def csv_points_to_shp(in_csv, out_shp, latitude="latitude", longitude="longitude"):
"""Converts a csv file containing points (latitude, longitude) into a shapefile.
Args:
in_csv (str): File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv
out_shp (str): File path to the output shapefile.
latitude (str, optional): Column name for the latitude column. Defaults to 'latitude'.
longitude (str, optional): Column name for the longitude column. Defaults to 'longitude'.
"""
if in_csv.startswith("http") and in_csv.endswith(".csv"):
out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
out_name = os.path.basename(in_csv)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
download_from_url(in_csv, out_dir=out_dir)
in_csv = os.path.join(out_dir, out_name)
wbt = whitebox.WhiteboxTools()
in_csv = os.path.abspath(in_csv)
out_shp = os.path.abspath(out_shp)
if not os.path.exists(in_csv):
raise Exception("The provided csv file does not exist.")
with open(in_csv, encoding="utf-8") as csv_file:
reader = csv.DictReader(csv_file)
fields = reader.fieldnames
xfield = fields.index(longitude)
yfield = fields.index(latitude)
wbt.csv_points_to_vector(in_csv, out_shp, xfield=xfield, yfield=yfield, epsg=4326)
csv_to_df(in_csv, **kwargs)
¶
Converts a CSV file to pandas dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
File path to the input CSV. |
required |
Returns:
Type | Description |
---|---|
pd.DataFrame |
pandas DataFrame |
Source code in leafmap/common.py
def csv_to_df(in_csv, **kwargs):
"""Converts a CSV file to pandas dataframe.
Args:
in_csv (str): File path to the input CSV.
Returns:
pd.DataFrame: pandas DataFrame
"""
import pandas as pd
try:
return pd.read_csv(in_csv, **kwargs)
except Exception as e:
raise Exception(e)
csv_to_gdf(in_csv, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Creates points for a CSV file and converts them to a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Returns:
Type | Description |
---|---|
object |
GeoDataFrame. |
Source code in leafmap/common.py
def csv_to_gdf(in_csv, latitude="latitude", longitude="longitude", encoding="utf-8"):
"""Creates points for a CSV file and converts them to a GeoDataFrame.
Args:
in_csv (str): The file path to the input CSV file.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
Returns:
object: GeoDataFrame.
"""
check_package(name="geopandas", URL="https://geopandas.org")
import geopandas as gpd
out_dir = os.getcwd()
out_geojson = os.path.join(out_dir, random_string() + ".geojson")
csv_to_geojson(in_csv, out_geojson, latitude, longitude, encoding)
gdf = gpd.read_file(out_geojson)
os.remove(out_geojson)
return gdf
csv_to_geojson(in_csv, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Creates points for a CSV file and exports data as a GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
out_geojson |
str |
The file path to the exported GeoJSON. Default to None. |
None |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Source code in leafmap/common.py
def csv_to_geojson(
in_csv,
out_geojson=None,
latitude="latitude",
longitude="longitude",
encoding="utf-8",
):
"""Creates points for a CSV file and exports data as a GeoJSON.
Args:
in_csv (str): The file path to the input CSV file.
out_geojson (str): The file path to the exported GeoJSON. Default to None.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
"""
import pandas as pd
in_csv = github_raw_url(in_csv)
if out_geojson is not None:
out_geojson = check_file_path(out_geojson)
df = pd.read_csv(in_csv)
geojson = df_to_geojson(
df, latitude=latitude, longitude=longitude, encoding=encoding
)
if out_geojson is None:
return geojson
else:
with open(out_geojson, "w", encoding=encoding) as f:
f.write(json.dumps(geojson))
csv_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Converts a csv file with latlon info to a point shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The input csv file containing longitude and latitude columns. |
required |
out_shp |
str |
The file path to the output shapefile. |
required |
latitude |
str |
The column name of the latitude column. Defaults to 'latitude'. |
'latitude' |
longitude |
str |
The column name of the longitude column. Defaults to 'longitude'. |
'longitude' |
Source code in leafmap/common.py
def csv_to_shp(
in_csv, out_shp, latitude="latitude", longitude="longitude", encoding="utf-8"
):
"""Converts a csv file with latlon info to a point shapefile.
Args:
in_csv (str): The input csv file containing longitude and latitude columns.
out_shp (str): The file path to the output shapefile.
latitude (str, optional): The column name of the latitude column. Defaults to 'latitude'.
longitude (str, optional): The column name of the longitude column. Defaults to 'longitude'.
"""
import shapefile as shp
if in_csv.startswith("http") and in_csv.endswith(".csv"):
in_csv = github_raw_url(in_csv)
in_csv = download_file(in_csv, quiet=True, overwrite=True)
try:
points = shp.Writer(out_shp, shapeType=shp.POINT)
with open(in_csv, encoding=encoding) as csvfile:
csvreader = csv.DictReader(csvfile)
header = csvreader.fieldnames
[points.field(field) for field in header]
for row in csvreader:
points.point((float(row[longitude])), (float(row[latitude])))
points.record(*tuple([row[f] for f in header]))
out_prj = out_shp.replace(".shp", ".prj")
with open(out_prj, "w") as f:
prj_str = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137,298.257223563]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]] '
f.write(prj_str)
except Exception as e:
raise Exception(e)
csv_to_vector(in_csv, output, latitude='latitude', longitude='longitude', encoding='utf-8', **kwargs)
¶
Creates points for a CSV file and converts them to a vector dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The file path to the input CSV file. |
required |
output |
str |
The file path to the output vector dataset. |
required |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Source code in leafmap/common.py
def csv_to_vector(
in_csv,
output,
latitude="latitude",
longitude="longitude",
encoding="utf-8",
**kwargs,
):
"""Creates points for a CSV file and converts them to a vector dataset.
Args:
in_csv (str): The file path to the input CSV file.
output (str): The file path to the output vector dataset.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
"""
gdf = csv_to_gdf(in_csv, latitude, longitude, encoding)
gdf.to_file(output, **kwargs)
delete_shp(in_shp, verbose=False)
¶
Deletes a shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_shp |
str |
The input shapefile to delete. |
required |
verbose |
bool |
Whether to print out descriptive text. Defaults to True. |
False |
Source code in leafmap/common.py
def delete_shp(in_shp, verbose=False):
"""Deletes a shapefile.
Args:
in_shp (str): The input shapefile to delete.
verbose (bool, optional): Whether to print out descriptive text. Defaults to True.
"""
from pathlib import Path
in_shp = os.path.abspath(in_shp)
in_dir = os.path.dirname(in_shp)
basename = os.path.basename(in_shp).replace(".shp", "")
files = Path(in_dir).rglob(basename + ".*")
for file in files:
filepath = os.path.join(in_dir, str(file))
os.remove(filepath)
if verbose:
print(f"Deleted {filepath}")
df_to_geojson(df, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')
¶
Creates points for a Pandas DataFrame and exports data as a GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pandas.DataFrame |
The input Pandas DataFrame. |
required |
out_geojson |
str |
The file path to the exported GeoJSON. Default to None. |
None |
latitude |
str |
The name of the column containing latitude coordinates. Defaults to "latitude". |
'latitude' |
longitude |
str |
The name of the column containing longitude coordinates. Defaults to "longitude". |
'longitude' |
encoding |
str |
The encoding of characters. Defaults to "utf-8". |
'utf-8' |
Source code in leafmap/common.py
def df_to_geojson(
df,
out_geojson=None,
latitude="latitude",
longitude="longitude",
encoding="utf-8",
):
"""Creates points for a Pandas DataFrame and exports data as a GeoJSON.
Args:
df (pandas.DataFrame): The input Pandas DataFrame.
out_geojson (str): The file path to the exported GeoJSON. Default to None.
latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
encoding (str, optional): The encoding of characters. Defaults to "utf-8".
"""
import json
from geojson import Feature, FeatureCollection, Point
if out_geojson is not None:
out_dir = os.path.dirname(os.path.abspath(out_geojson))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
features = df.apply(
lambda row: Feature(
geometry=Point((float(row[longitude]), float(row[latitude]))),
properties=dict(row),
),
axis=1,
).tolist()
geojson = FeatureCollection(features=features)
if out_geojson is None:
return geojson
else:
with open(out_geojson, "w", encoding=encoding) as f:
f.write(json.dumps(geojson))
dict_to_json(data, file_path, indent=4)
¶
Writes a dictionary to a JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
dict |
A dictionary. |
required |
file_path |
str |
The path to the JSON file. |
required |
indent |
int |
The indentation of the JSON file. Defaults to 4. |
4 |
Exceptions:
Type | Description |
---|---|
TypeError |
If the input data is not a dictionary. |
Source code in leafmap/common.py
def dict_to_json(data, file_path, indent=4):
"""Writes a dictionary to a JSON file.
Args:
data (dict): A dictionary.
file_path (str): The path to the JSON file.
indent (int, optional): The indentation of the JSON file. Defaults to 4.
Raises:
TypeError: If the input data is not a dictionary.
"""
import json
file_path = check_file_path(file_path)
if isinstance(data, dict):
with open(file_path, "w") as f:
json.dump(data, f, indent=indent)
else:
raise TypeError("The provided data must be a dictionary.")
disjoint(input_features, selecting_features, output=None, **kwargs)
¶
Find the features in the input_features that do not intersect the selecting_features.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_features |
str | GeoDataFrame |
The input features to select from. Can be a file path or a GeoDataFrame. |
required |
selecting_features |
str | GeoDataFrame |
The features in the Input Features parameter will be selected based on their relationship to the features from this layer. |
required |
output |
are |
The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None. |
None |
Returns:
Type | Description |
---|---|
str | GeoDataFrame |
The path to the output file or the GeoDataFrame. |
Source code in leafmap/common.py
def disjoint(input_features, selecting_features, output=None, **kwargs):
"""Find the features in the input_features that do not intersect the selecting_features.
Args:
input_features (str | GeoDataFrame): The input features to select from. Can be a file path or a GeoDataFrame.
selecting_features (str | GeoDataFrame): The features in the Input Features parameter will be selected based
on their relationship to the features from this layer.
output (are, optional): The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None.
Returns:
str | GeoDataFrame: The path to the output file or the GeoDataFrame.
"""
import geopandas as gpd
if isinstance(input_features, str):
input_features = gpd.read_file(input_features, **kwargs)
elif not isinstance(input_features, gpd.GeoDataFrame):
raise TypeError("input_features must be a file path or a GeoDataFrame")
if isinstance(selecting_features, str):
selecting_features = gpd.read_file(selecting_features, **kwargs)
elif not isinstance(selecting_features, gpd.GeoDataFrame):
raise TypeError("selecting_features must be a file path or a GeoDataFrame")
selecting_features = selecting_features.to_crs(input_features.crs)
input_features["savedindex"] = input_features.index
intersecting = selecting_features.sjoin(input_features, how="inner")["savedindex"]
results = input_features[~input_features.savedindex.isin(intersecting)].drop(
columns=["savedindex"], axis=1
)
if output is not None:
results.to_file(output, **kwargs)
else:
return results
display_html(filename, width='100%', height='600px', **kwargs)
¶
Show an HTML file in a Jupyter notebook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The path to the HTML file. |
required |
width |
str |
The width of the HTML file. Defaults to "100%". |
'100%' |
height |
str |
The height of the HTML file. Defaults to "600px". |
'600px' |
Returns:
Type | Description |
---|---|
IFrame |
An IFrame object. |
Source code in leafmap/common.py
def display_html(filename, width="100%", height="600px", **kwargs):
"""Show an HTML file in a Jupyter notebook.
Args:
filename (str): The path to the HTML file.
width (str, optional): The width of the HTML file. Defaults to "100%".
height (str, optional): The height of the HTML file. Defaults to "600px".
Returns:
IFrame: An IFrame object.
"""
from IPython.display import IFrame
if not os.path.exists(filename):
raise Exception(f"File {filename} does not exist")
return IFrame(filename, width=width, height=height, **kwargs)
download_file(url=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)
¶
Download a file from URL, including Google Drive shared URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Google Drive URL is also supported. Defaults to None. |
None |
output |
str |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
str |
The output file path. |
Source code in leafmap/common.py
def download_file(
url=None,
output=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
):
"""Download a file from URL, including Google Drive shared URL.
Args:
url (str, optional): Google Drive URL is also supported. Defaults to None.
output (str, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string,
in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
Returns:
str: The output file path.
"""
try:
import gdown
except ImportError:
print(
"The gdown package is required for this function. Use `pip install gdown` to install it."
)
return
if output is None:
if isinstance(url, str) and url.startswith("http"):
output = os.path.basename(url)
out_dir = os.path.abspath(os.path.dirname(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if isinstance(url, str):
if os.path.exists(os.path.abspath(output)) and (not overwrite):
print(
f"{output} already exists. Skip downloading. Set overwrite=True to overwrite."
)
return os.path.abspath(output)
else:
url = github_raw_url(url)
if "https://drive.google.com/file/d/" in url:
fuzzy = True
output = gdown.download(
url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume
)
if unzip and output.endswith(".zip"):
with zipfile.ZipFile(output, "r") as zip_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
zip_ref.extractall(output)
else:
zip_ref.extractall(os.path.dirname(output))
return os.path.abspath(output)
download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs)
async
¶
Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the file. |
required |
output |
str |
The local path to save the file. Defaults to None. |
None |
binary |
bool |
Whether the file is binary. Defaults to False. |
False |
overwrite |
bool |
Whether to overwrite the file if it exists. Defaults to False. |
False |
Source code in leafmap/common.py
async def download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs):
"""Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).
Args:
url (str): The URL of the file.
output (str, optional): The local path to save the file. Defaults to None.
binary (bool, optional): Whether the file is binary. Defaults to False.
overwrite (bool, optional): Whether to overwrite the file if it exists. Defaults to False.
"""
import sys
import pyodide
if "pyodide" not in sys.modules:
raise ValueError("Pyodide is not available.")
if output is None:
output = os.path.basename(url)
output = os.path.abspath(output)
ext = os.path.splitext(output)[1]
if ext in [".png", "jpg", ".tif", ".tiff", "zip", "gz", "bz2", "xz"]:
binary = True
if os.path.exists(output) and not overwrite:
print(f"{output} already exists, skip downloading.")
return output
if binary:
response = await pyodide.http.pyfetch(url)
with open(output, "wb") as f:
f.write(await response.bytes())
else:
obj = pyodide.http.open_url(url)
with open(output, "w") as fd:
shutil.copyfileobj(obj, fd)
return output
download_files(urls, out_dir=None, filenames=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)
¶
Download files from URLs, including Google Drive shared URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
urls |
list |
The list of urls to download. Google Drive URL is also supported. |
required |
out_dir |
str |
The output directory. Defaults to None. |
None |
filenames |
list |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
Source code in leafmap/common.py
def download_files(
urls,
out_dir=None,
filenames=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
):
"""Download files from URLs, including Google Drive shared URL.
Args:
urls (list): The list of urls to download. Google Drive URL is also supported.
out_dir (str, optional): The output directory. Defaults to None.
filenames (list, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
"""
if out_dir is None:
out_dir = os.getcwd()
if filenames is None:
filenames = [None] * len(urls)
for url, output in zip(urls, filenames):
if output is None:
filename = os.path.join(out_dir, os.path.basename(url))
else:
filename = os.path.join(out_dir, output)
download_file(
url,
filename,
quiet,
proxy,
speed,
use_cookies,
verify,
id,
fuzzy,
resume,
unzip,
overwrite,
subfolder,
)
download_folder(url=None, id=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, remaining_ok=False)
¶
Downloads the entire folder from URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None. |
None |
id |
str |
Google Drive's folder ID. Defaults to None. |
None |
output |
str |
String containing the path of the output folder. Defaults to current working directory. |
None |
quiet |
bool |
Suppress terminal output. Defaults to False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
required |
Returns:
Type | Description |
---|---|
list |
List of files downloaded, or None if failed. |
Source code in leafmap/common.py
def download_folder(
url=None,
id=None,
output=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
remaining_ok=False,
):
"""Downloads the entire folder from URL.
Args:
url (str, optional): URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None.
id (str, optional): Google Drive's folder ID. Defaults to None.
output (str, optional): String containing the path of the output folder. Defaults to current working directory.
quiet (bool, optional): Suppress terminal output. Defaults to False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
Returns:
list: List of files downloaded, or None if failed.
"""
try:
import gdown
except ImportError:
print(
"The gdown package is required for this function. Use `pip install gdown` to install it."
)
return
files = gdown.download_folder(
url, id, output, quiet, proxy, speed, use_cookies, remaining_ok
)
return files
download_from_gdrive(gfile_url, file_name, out_dir='.', unzip=True, verbose=True)
¶
Download a file shared via Google Drive (e.g., https://drive.google.com/file/d/18SUo_HcDGltuWYZs1s7PpOmOq_FvFn04/view?usp=sharing)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gfile_url |
str |
The Google Drive shared file URL |
required |
file_name |
str |
The output file name to use. |
required |
out_dir |
str |
The output directory. Defaults to '.'. |
'.' |
unzip |
bool |
Whether to unzip the output file if it is a zip file. Defaults to True. |
True |
verbose |
bool |
Whether to display or not the output of the function |
True |
Source code in leafmap/common.py
def download_from_gdrive(gfile_url, file_name, out_dir=".", unzip=True, verbose=True):
"""Download a file shared via Google Drive
(e.g., https://drive.google.com/file/d/18SUo_HcDGltuWYZs1s7PpOmOq_FvFn04/view?usp=sharing)
Args:
gfile_url (str): The Google Drive shared file URL
file_name (str): The output file name to use.
out_dir (str, optional): The output directory. Defaults to '.'.
unzip (bool, optional): Whether to unzip the output file if it is a zip file. Defaults to True.
verbose (bool, optional): Whether to display or not the output of the function
"""
try:
from google_drive_downloader import GoogleDriveDownloader as gdd
except ImportError:
raise ImportError(
'Please install googledrivedownloader using "pip install googledrivedownloader"'
)
file_id = gfile_url.split("/")[5]
if verbose:
print("Google Drive file id: {}".format(file_id))
out_dir = check_dir(out_dir)
dest_path = os.path.join(out_dir, file_name)
gdd.download_file_from_google_drive(file_id, dest_path, True, unzip)
download_from_url(url, out_file_name=None, out_dir='.', unzip=True, verbose=True)
¶
Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The HTTP URL to download. |
required |
out_file_name |
str |
The output file name to use. Defaults to None. |
None |
out_dir |
str |
The output directory to use. Defaults to '.'. |
'.' |
unzip |
bool |
Whether to unzip the downloaded file if it is a zip file. Defaults to True. |
True |
verbose |
bool |
Whether to display or not the output of the function |
True |
Source code in leafmap/common.py
def download_from_url(url, out_file_name=None, out_dir=".", unzip=True, verbose=True):
"""Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)
Args:
url (str): The HTTP URL to download.
out_file_name (str, optional): The output file name to use. Defaults to None.
out_dir (str, optional): The output directory to use. Defaults to '.'.
unzip (bool, optional): Whether to unzip the downloaded file if it is a zip file. Defaults to True.
verbose (bool, optional): Whether to display or not the output of the function
"""
in_file_name = os.path.basename(url)
out_dir = check_dir(out_dir)
if out_file_name is None:
out_file_name = in_file_name
out_file_path = os.path.join(out_dir, out_file_name)
if verbose:
print("Downloading {} ...".format(url))
try:
urllib.request.urlretrieve(url, out_file_path)
except Exception:
raise Exception("The URL is invalid. Please double check the URL.")
final_path = out_file_path
if unzip:
# if it is a zip file
if ".zip" in out_file_name:
if verbose:
print("Unzipping {} ...".format(out_file_name))
with zipfile.ZipFile(out_file_path, "r") as zip_ref:
zip_ref.extractall(out_dir)
final_path = os.path.join(
os.path.abspath(out_dir), out_file_name.replace(".zip", "")
)
# if it is a tar file
if ".tar" in out_file_name:
if verbose:
print("Unzipping {} ...".format(out_file_name))
with tarfile.open(out_file_path, "r") as tar_ref:
with tarfile.open(out_file_path, "r") as tar_ref:
def is_within_directory(directory, target):
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory
def safe_extract(
tar, path=".", members=None, *, numeric_owner=False
):
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise Exception("Attempted Path Traversal in Tar File")
tar.extractall(path, members, numeric_owner=numeric_owner)
safe_extract(tar_ref, out_dir)
final_path = os.path.join(
os.path.abspath(out_dir), out_file_name.replace(".tart", "")
)
if verbose:
print("Data downloaded to: {}".format(final_path))
download_ned(region, out_dir=None, return_url=False, download_args={}, geopandas_args={}, query={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy]. |
required |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
return_url |
bool |
Whether to return the download URLs of the files. Defaults to False. |
False |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
query |
dict |
A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. See https://apps.nationalmap.gov/tnmaccess/#/product for more information. |
{} |
Returns:
Type | Description |
---|---|
list |
A list of the download URLs of the files if return_url is True. |
Source code in leafmap/common.py
def download_ned(
region,
out_dir=None,
return_url=False,
download_args={},
geopandas_args={},
query={},
) -> Union[None, List]:
"""Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list): A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy].
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
query (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
See https://apps.nationalmap.gov/tnmaccess/#/product for more information.
Returns:
list: A list of the download URLs of the files if return_url is True.
"""
if os.environ.get("USE_MKDOCS") is not None:
return
if not query:
query = {
"datasets": "National Elevation Dataset (NED) 1/3 arc-second",
"prodFormats": "GeoTIFF",
}
TNM = The_national_map_USGS()
if return_url:
return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=query)
return TNM.download_tiles(
region=region,
out_dir=out_dir,
download_args=download_args,
geopandas_args=geopandas_args,
API=query,
)
download_tnm(region=None, out_dir=None, return_url=False, download_args={}, geopandas_args={}, API={})
¶
Download the US National Elevation Datasets (NED) for a region.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
str | list |
An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox. |
None |
out_dir |
str |
The directory to download the files to. Defaults to None, which uses the current working directory. |
None |
return_url |
bool |
Whether to return the download URLs of the files. Defaults to False. |
False |
download_args |
dict |
A dictionary of arguments to pass to the download_file function. Defaults to {}. |
{} |
geopandas_args |
dict |
A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath. |
{} |
API |
dict |
A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. Exposes most of the documented API. Defaults to {} |
{} |
Returns:
Type | Description |
---|---|
list |
A list of the download URLs of the files if return_url is True. |
Source code in leafmap/common.py
def download_tnm(
region=None,
out_dir=None,
return_url=False,
download_args={},
geopandas_args={},
API={},
) -> Union[None, List]:
"""Download the US National Elevation Datasets (NED) for a region.
Args:
region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
Alternatively you could use API parameters such as polygon or bbox.
out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
Used for reading a region URL|filepath.
API (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
Exposes most of the documented API. Defaults to {}
Returns:
list: A list of the download URLs of the files if return_url is True.
"""
if os.environ.get("USE_MKDOCS") is not None:
return
TNM = The_national_map_USGS()
if return_url:
return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=API)
return TNM.download_tiles(
region=region,
out_dir=out_dir,
download_args=download_args,
geopandas_args=geopandas_args,
API=API,
)
edit_download_html(htmlWidget, filename, title='Click here to download: ')
¶
Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058
Parameters:
Name | Type | Description | Default |
---|---|---|---|
htmlWidget |
object |
The HTML widget to display the URL. |
required |
filename |
str |
File path to download. |
required |
title |
str |
Download description. Defaults to "Click here to download: ". |
'Click here to download: ' |
Source code in leafmap/common.py
def edit_download_html(htmlWidget, filename, title="Click here to download: "):
"""Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058
Args:
htmlWidget (object): The HTML widget to display the URL.
filename (str): File path to download.
title (str, optional): Download description. Defaults to "Click here to download: ".
"""
# from IPython.display import HTML
# import ipywidgets as widgets
import base64
# Change widget html temporarily to a font-awesome spinner
htmlWidget.value = '<i class="fa fa-spinner fa-spin fa-2x fa-fw"></i><span class="sr-only">Loading...</span>'
# Process raw data
data = open(filename, "rb").read()
b64 = base64.b64encode(data)
payload = b64.decode()
basename = os.path.basename(filename)
# Create and assign html to widget
html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
htmlWidget.value = html.format(
payload=payload, title=title + basename, filename=basename
)
explode(coords)
¶
Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. As long as the input is conforming, the type of the geometry doesn't matter. From Fiona 1.4.8
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
list |
A list of coordinates. |
required |
Yields:
Type | Description |
---|---|
[type] |
[description] |
Source code in leafmap/common.py
def explode(coords):
"""Explode a GeoJSON geometry's coordinates object and yield
coordinate tuples. As long as the input is conforming, the type of
the geometry doesn't matter. From Fiona 1.4.8
Args:
coords (list): A list of coordinates.
Yields:
[type]: [description]
"""
for e in coords:
if isinstance(e, (float, int)):
yield coords
break
else:
for f in explode(e):
yield f
filter_bounds(data, bbox, within=False, align=True, **kwargs)
¶
Filters a GeoDataFrame or GeoSeries by a bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | GeoDataFrame |
The input data to filter. Can be a file path or a GeoDataFrame. |
required |
bbox |
list | GeoDataFrame |
The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame. |
required |
within |
bool |
Whether to filter by the bounding box or the bounding box's interior. Defaults to False. |
False |
align |
bool |
If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved. |
True |
Returns:
Type | Description |
---|---|
GeoDataFrame |
The filtered data. |
Source code in leafmap/common.py
def filter_bounds(data, bbox, within=False, align=True, **kwargs):
"""Filters a GeoDataFrame or GeoSeries by a bounding box.
Args:
data (str | GeoDataFrame): The input data to filter. Can be a file path or a GeoDataFrame.
bbox (list | GeoDataFrame): The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame.
within (bool, optional): Whether to filter by the bounding box or the bounding box's interior. Defaults to False.
align (bool, optional): If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved.
Returns:
GeoDataFrame: The filtered data.
"""
import geopandas as gpd
if isinstance(data, str):
data = gpd.read_file(data, **kwargs)
elif not isinstance(data, (gpd.GeoDataFrame, gpd.GeoSeries)):
raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")
if isinstance(bbox, list):
if len(bbox) != 4:
raise ValueError("bbox must be a list of 4 coordinates")
bbox = bbox_to_gdf(bbox)
elif isinstance(bbox, str):
bbox = gpd.read_file(bbox, **kwargs)
if within:
result = data[data.within(bbox.unary_union, align=align)]
else:
result = data[data.intersects(bbox.unary_union, align=align)]
return result
filter_date(data, start_date=None, end_date=None, date_field='date', date_args={}, **kwargs)
¶
Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str | DataFrame | GeoDataFrame |
The input data to filter. Can be a file path or a DataFrame or GeoDataFrame. |
required |
start_date |
str |
The start date, e.g., 2023-01-01. Defaults to None. |
None |
end_date |
str |
The end date, e.g., 2023-12-31. Defaults to None. |
None |
date_field |
str |
The name of the date field. Defaults to "date". |
'date' |
date_args |
dict |
Additional arguments for pd.to_datetime. Defaults to {}. |
{} |
Returns:
Type | Description |
---|---|
DataFrame |
The filtered data. |
Source code in leafmap/common.py
def filter_date(
data, start_date=None, end_date=None, date_field="date", date_args={}, **kwargs
):
"""Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.
Args:
data (str | DataFrame | GeoDataFrame): The input data to filter. Can be a file path or a DataFrame or GeoDataFrame.
start_date (str, optional): The start date, e.g., 2023-01-01. Defaults to None.
end_date (str, optional): The end date, e.g., 2023-12-31. Defaults to None.
date_field (str, optional): The name of the date field. Defaults to "date".
date_args (dict, optional): Additional arguments for pd.to_datetime. Defaults to {}.
Returns:
DataFrame: The filtered data.
"""
import datetime
import pandas as pd
import geopandas as gpd
if isinstance(data, str):
data = gpd.read_file(data, **kwargs)
elif not isinstance(
data, (gpd.GeoDataFrame, gpd.GeoSeries, pd.DataFrame, pd.Series)
):
raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")
if date_field not in data.columns:
raise ValueError(f"date_field must be one of {data.columns}")
new_field = f"{date_field}_temp"
data[new_field] = pd.to_datetime(data[date_field], **date_args)
if end_date is None:
end_date = datetime.datetime.now().strftime("%Y-%m-%d")
if start_date is None:
start_date = data[new_field].min()
mask = (data[new_field] >= start_date) & (data[new_field] <= end_date)
result = data.loc[mask]
return result.drop(columns=[new_field], axis=1)
find_files(input_dir, ext=None, fullpath=True, recursive=True)
¶
Find files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_dir |
str |
The input directory. |
required |
ext |
str |
The file extension to match. Defaults to None. |
None |
fullpath |
bool |
Whether to return the full path. Defaults to True. |
True |
recursive |
bool |
Whether to search recursively. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
list |
A list of matching files. |
Source code in leafmap/common.py
def find_files(input_dir, ext=None, fullpath=True, recursive=True):
"""Find files in a directory.
Args:
input_dir (str): The input directory.
ext (str, optional): The file extension to match. Defaults to None.
fullpath (bool, optional): Whether to return the full path. Defaults to True.
recursive (bool, optional): Whether to search recursively. Defaults to True.
Returns:
list: A list of matching files.
"""
from pathlib import Path
files = []
if ext is None:
ext = "*"
else:
ext = ext.replace(".", "")
ext = f"*.{ext}"
if recursive:
if fullpath:
files = [str(path.joinpath()) for path in Path(input_dir).rglob(ext)]
else:
files = [str(path.name) for path in Path(input_dir).rglob(ext)]
else:
if fullpath:
files = [str(path.joinpath()) for path in Path(input_dir).glob(ext)]
else:
files = [path.name for path in Path(input_dir).glob(ext)]
return files
gdf_bounds(gdf, return_geom=False)
¶
Returns the bounding box of a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
gpd.GeoDataFrame |
A GeoDataFrame. |
required |
return_geom |
bool |
Whether to return the bounding box as a GeoDataFrame. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
list | gpd.GeoDataFrame |
A bounding box in the form of a list (minx, miny, maxx, maxy) or GeoDataFrame. |
Source code in leafmap/common.py
def gdf_bounds(gdf, return_geom=False):
"""Returns the bounding box of a GeoDataFrame.
Args:
gdf (gpd.GeoDataFrame): A GeoDataFrame.
return_geom (bool, optional): Whether to return the bounding box as a GeoDataFrame. Defaults to False.
Returns:
list | gpd.GeoDataFrame: A bounding box in the form of a list (minx, miny, maxx, maxy) or GeoDataFrame.
"""
bounds = gdf.total_bounds
if return_geom:
return bbox_to_gdf(bbox=bounds)
else:
return bounds
gdf_centroid(gdf, return_geom=False)
¶
Returns the centroid of a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
gpd.GeoDataFrame |
A GeoDataFrame. |
required |
return_geom |
bool |
Whether to return the bounding box as a GeoDataFrame. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
list | gpd.GeoDataFrame |
A bounding box in the form of a list (lon, lat) or GeoDataFrame. |
Source code in leafmap/common.py
def gdf_centroid(gdf, return_geom=False):
"""Returns the centroid of a GeoDataFrame.
Args:
gdf (gpd.GeoDataFrame): A GeoDataFrame.
return_geom (bool, optional): Whether to return the bounding box as a GeoDataFrame. Defaults to False.
Returns:
list | gpd.GeoDataFrame: A bounding box in the form of a list (lon, lat) or GeoDataFrame.
"""
import warnings
warnings.filterwarnings("ignore")
centroid = gdf_bounds(gdf, return_geom=True).centroid
if return_geom:
return centroid
else:
return centroid.x[0], centroid.y[0]
gdf_geom_type(gdf, first_only=True)
¶
Returns the geometry type of a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
gpd.GeoDataFrame |
A GeoDataFrame. |
required |
first_only |
bool |
Whether to return the geometry type of the first feature in the GeoDataFrame. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
str |
The geometry type of the GeoDataFrame, such as Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon. For more info, see https://shapely.readthedocs.io/en/stable/manual.html |
Source code in leafmap/common.py
def gdf_geom_type(gdf, first_only=True):
"""Returns the geometry type of a GeoDataFrame.
Args:
gdf (gpd.GeoDataFrame): A GeoDataFrame.
first_only (bool, optional): Whether to return the geometry type of the first feature in the GeoDataFrame. Defaults to True.
Returns:
str: The geometry type of the GeoDataFrame, such as Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon.
For more info, see https://shapely.readthedocs.io/en/stable/manual.html
"""
import geopandas as gpd
if first_only:
return gdf.geometry.type[0]
else:
return gdf.geometry.type
gdf_to_bokeh(gdf)
¶
Function to convert a GeoPandas GeoDataFrame to a Bokeh ColumnDataSource object.
:param: (GeoDataFrame) gdf: GeoPandas GeoDataFrame with polygon(s) under the column name 'geometry.'
:return: ColumnDataSource for Bokeh.
Source code in leafmap/common.py
def gdf_to_bokeh(gdf):
"""
Function to convert a GeoPandas GeoDataFrame to a Bokeh
ColumnDataSource object.
:param: (GeoDataFrame) gdf: GeoPandas GeoDataFrame with polygon(s) under
the column name 'geometry.'
:return: ColumnDataSource for Bokeh.
"""
from bokeh.plotting import ColumnDataSource
shape_type = gdf_geom_type(gdf)
gdf_new = gdf.drop("geometry", axis=1).copy()
gdf_new["x"] = gdf.apply(
get_geometry_coords,
geom="geometry",
coord_type="x",
shape_type=shape_type,
mercator=True,
axis=1,
)
gdf_new["y"] = gdf.apply(
get_geometry_coords,
geom="geometry",
coord_type="y",
shape_type=shape_type,
mercator=True,
axis=1,
)
return ColumnDataSource(gdf_new)
gdf_to_df(gdf, drop_geom=True)
¶
Converts a GeoDataFrame to a pandas DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
gpd.GeoDataFrame |
A GeoDataFrame. |
required |
drop_geom |
bool |
Whether to drop the geometry column. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
pd.DataFrame |
A pandas DataFrame containing the GeoDataFrame. |
Source code in leafmap/common.py
def gdf_to_df(gdf, drop_geom=True):
"""Converts a GeoDataFrame to a pandas DataFrame.
Args:
gdf (gpd.GeoDataFrame): A GeoDataFrame.
drop_geom (bool, optional): Whether to drop the geometry column. Defaults to True.
Returns:
pd.DataFrame: A pandas DataFrame containing the GeoDataFrame.
"""
import pandas as pd
if drop_geom:
df = pd.DataFrame(gdf.drop(columns=["geometry"]))
else:
df = pd.DataFrame(gdf)
return df
gdf_to_geojson(gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding='utf-8')
¶
Converts a GeoDataFame to GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf |
GeoDataFrame |
A GeoPandas GeoDataFrame. |
required |
out_geojson |
str |
File path to he output GeoJSON. Defaults to None. |
None |
epsg |
str |
An EPSG string, e.g., "4326". Defaults to None. |
None |
tuple_to_list |
bool |
Whether to convert tuples to lists. Defaults to False. |
False |
encoding |
str |
The encoding to use for the GeoJSON. Defaults to "utf-8". |
'utf-8' |
Exceptions:
Type | Description |
---|---|
TypeError |
When the output file extension is incorrect. |
Exception |
When the conversion fails. |
Returns:
Type | Description |
---|---|
dict |
When the out_json is None returns a dict. |
Source code in leafmap/common.py
def gdf_to_geojson(
gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding="utf-8"
):
"""Converts a GeoDataFame to GeoJSON.
Args:
gdf (GeoDataFrame): A GeoPandas GeoDataFrame.
out_geojson (str, optional): File path to he output GeoJSON. Defaults to None.
epsg (str, optional): An EPSG string, e.g., "4326". Defaults to None.
tuple_to_list (bool, optional): Whether to convert tuples to lists. Defaults to False.
encoding (str, optional): The encoding to use for the GeoJSON. Defaults to "utf-8".
Raises:
TypeError: When the output file extension is incorrect.
Exception: When the conversion fails.
Returns:
dict: When the out_json is None returns a dict.
"""
check_package(name="geopandas", URL="https://geopandas.org")
def listit(t):
return list(map(listit, t)) if isinstance(t, (list, tuple)) else t
try:
if epsg is not None:
if gdf.crs is not None and gdf.crs.to_epsg() != epsg:
gdf = gdf.to_crs(epsg=epsg)
geojson = gdf.__geo_interface__
if tuple_to_list:
for feature in geojson["features"]:
feature["geometry"]["coordinates"] = listit(
feature["geometry"]["coordinates"]
)
if out_geojson is None:
return geojson
else:
ext = os.path.splitext(out_geojson)[1]
if ext.lower() not in [".json", ".geojson"]:
raise TypeError(
"The output file extension must be either .json or .geojson"
)
out_dir = os.path.dirname(out_geojson)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
gdf.to_file(out_geojson, driver="GeoJSON", encoding=encoding)
except Exception as e:
raise Exception(e)
geojson_to_df(in_geojson, encoding='utf-8', drop_geometry=True)
¶
Converts a GeoJSON object to a pandas DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_geojson |
str | dict |
The input GeoJSON file or dict. |
required |
encoding |
str |
The encoding of the GeoJSON object. Defaults to "utf-8". |
'utf-8' |
drop_geometry |
bool |
Whether to drop the geometry column. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the input GeoJSON file could not be found. |
Returns:
Type | Description |
---|---|
pd.DataFrame |
A pandas DataFrame containing the GeoJSON object. |
Source code in leafmap/common.py
def geojson_to_df(in_geojson, encoding="utf-8", drop_geometry=True):
"""Converts a GeoJSON object to a pandas DataFrame.
Args:
in_geojson (str | dict): The input GeoJSON file or dict.
encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8".
drop_geometry (bool, optional): Whether to drop the geometry column. Defaults to True.
Raises:
FileNotFoundError: If the input GeoJSON file could not be found.
Returns:
pd.DataFrame: A pandas DataFrame containing the GeoJSON object.
"""
import json
import pandas as pd
from urllib.request import urlopen
if isinstance(in_geojson, str):
if in_geojson.startswith("http"):
with urlopen(in_geojson) as f:
data = json.load(f)
else:
in_geojson = os.path.abspath(in_geojson)
if not os.path.exists(in_geojson):
raise FileNotFoundError("The provided GeoJSON file could not be found.")
with open(in_geojson, encoding=encoding) as f:
data = json.load(f)
elif isinstance(in_geojson, dict):
data = in_geojson
df = pd.json_normalize(data["features"])
df.columns = [col.replace("properties.", "") for col in df.columns]
if drop_geometry:
df = df[df.columns.drop(list(df.filter(regex="geometry")))]
return df
geojson_to_gdf(in_geojson, encoding='utf-8', **kwargs)
¶
Converts a GeoJSON object to a geopandas GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_geojson |
str | dict |
The input GeoJSON file or GeoJSON object as a dict. |
required |
encoding |
str |
The encoding of the GeoJSON object. Defaults to "utf-8". |
'utf-8' |
Returns:
Type | Description |
---|---|
geopandas.GeoDataFrame |
A geopandas GeoDataFrame containing the GeoJSON object. |
Source code in leafmap/common.py
def geojson_to_gdf(in_geojson, encoding="utf-8", **kwargs):
"""Converts a GeoJSON object to a geopandas GeoDataFrame.
Args:
in_geojson (str | dict): The input GeoJSON file or GeoJSON object as a dict.
encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8".
Returns:
geopandas.GeoDataFrame: A geopandas GeoDataFrame containing the GeoJSON object.
"""
import geopandas as gpd
if isinstance(in_geojson, dict):
out_file = temp_file_path(extension="geojson")
with open(out_file, "w") as f:
json.dump(in_geojson, f)
in_geojson = out_file
gdf = gpd.read_file(in_geojson, encoding=encoding, **kwargs)
return gdf
geojson_to_gpkg(in_geojson, out_gpkg, **kwargs)
¶
Converts a GeoJSON object to GeoPackage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_geojson |
str | dict |
The input GeoJSON file or dict. |
required |
out_gpkg |
str |
The output GeoPackage path. |
required |
Source code in leafmap/common.py
def geojson_to_gpkg(in_geojson, out_gpkg, **kwargs):
"""Converts a GeoJSON object to GeoPackage.
Args:
in_geojson (str | dict): The input GeoJSON file or dict.
out_gpkg (str): The output GeoPackage path.
"""
import geopandas as gpd
import json
ext = os.path.splitext(out_gpkg)[1]
if ext.lower() != ".gpkg":
out_gpkg = out_gpkg + ".gpkg"
out_gpkg = check_file_path(out_gpkg)
if isinstance(in_geojson, dict):
out_file = temp_file_path(extension="geojson")
with open(out_file, "w") as f:
json.dump(in_geojson, f)
in_geojson = out_file
gdf = gpd.read_file(in_geojson, **kwargs)
name = os.path.splitext(os.path.basename(out_gpkg))[0]
gdf.to_file(out_gpkg, layer=name, driver="GPKG")
geojson_to_shp(in_geojson, out_shp, **kwargs)
¶
Converts a GeoJSON object to GeoPandas GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_geojson |
str | dict |
The input GeoJSON file or dict. |
required |
out_shp |
str |
The output shapefile path. |
required |
Source code in leafmap/common.py
def geojson_to_shp(in_geojson, out_shp, **kwargs):
"""Converts a GeoJSON object to GeoPandas GeoDataFrame.
Args:
in_geojson (str | dict): The input GeoJSON file or dict.
out_shp (str): The output shapefile path.
"""
import geopandas as gpd
import json
ext = os.path.splitext(out_shp)[1]
if ext != ".shp":
out_shp = out_shp + ".shp"
out_shp = check_file_path(out_shp)
if isinstance(in_geojson, dict):
out_file = temp_file_path(extension="geojson")
with open(out_file, "w") as f:
json.dump(in_geojson, f)
in_geojson = out_file
gdf = gpd.read_file(in_geojson, **kwargs)
gdf.to_file(out_shp)
geom_type(in_geojson, encoding='utf-8')
¶
Returns the geometry type of a GeoJSON object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_geojson |
dict |
A GeoJSON object. |
required |
encoding |
str |
The encoding of the GeoJSON object. Defaults to "utf-8". |
'utf-8' |
Returns:
Type | Description |
---|---|
str |
The geometry type of the GeoJSON object, such as Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon. For more info, see https://shapely.readthedocs.io/en/stable/manual.html |
Source code in leafmap/common.py
def geom_type(in_geojson, encoding="utf-8"):
"""Returns the geometry type of a GeoJSON object.
Args:
in_geojson (dict): A GeoJSON object.
encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8".
Returns:
str: The geometry type of the GeoJSON object, such as Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon.
For more info, see https://shapely.readthedocs.io/en/stable/manual.html
"""
import json
try:
if isinstance(in_geojson, str):
if in_geojson.startswith("http"):
data = requests.get(in_geojson).json()
else:
in_geojson = os.path.abspath(in_geojson)
if not os.path.exists(in_geojson):
raise FileNotFoundError(
"The provided GeoJSON file could not be found."
)
with open(in_geojson, encoding=encoding) as f:
data = json.load(f)
elif isinstance(in_geojson, dict):
data = in_geojson
else:
raise TypeError("The input geojson must be a type of str or dict.")
return data["features"][0]["geometry"]["type"]
except Exception as e:
raise Exception(e)
geometry_bounds(geometry, decimals=4)
¶
Returns the bounds of a geometry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
dict |
A GeoJSON geometry. |
required |
decimals |
int |
The number of decimal places to round the bounds to. Defaults to 4. |
4 |
Returns:
Type | Description |
---|---|
list |
A list of bounds in the form of [minx, miny, maxx, maxy]. |
Source code in leafmap/common.py
def geometry_bounds(geometry, decimals=4):
"""Returns the bounds of a geometry.
Args:
geometry (dict): A GeoJSON geometry.
decimals (int, optional): The number of decimal places to round the bounds to. Defaults to 4.
Returns:
list: A list of bounds in the form of [minx, miny, maxx, maxy].
"""
if isinstance(geometry, dict):
if "geometry" in geometry:
coords = geometry["geometry"]["coordinates"][0]
else:
coords = geometry["coordinates"][0]
else:
raise ValueError("geometry must be a GeoJSON-like dictionary.")
x = [p[0] for p in coords]
y = [p[1] for p in coords]
west = round(min(x), decimals)
east = round(max(x), decimals)
south = round(min(y), decimals)
north = round(max(y), decimals)
return [west, south, east, north]
get_3dep_dem(geometry, resolution=30, src_crs='EPSG:4326', output=None, dst_crs='EPSG:5070', to_cog=False, overwrite=False, **kwargs)
¶
Get DEM data at any resolution from 3DEP.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Polygon | MultiPolygon | tuple |
It can be a polygon or a bounding box of form (xmin, ymin, xmax, ymax). |
required |
resolution |
int |
arget DEM source resolution in meters. Defaults to 30. |
30 |
src_crs |
str |
The spatial reference system of the input geometry. Defaults to "EPSG:4326". |
'EPSG:4326' |
output |
str |
The output GeoTIFF file. Defaults to None. |
None |
dst_crs |
str |
The spatial reference system of the output GeoTIFF file. Defaults to "EPSG:5070". |
'EPSG:5070' |
to_cog |
bool |
Convert to Cloud Optimized GeoTIFF. Defaults to False. |
False |
overwrite |
bool |
Whether to overwrite the output file if it exists. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
xarray.DataArray |
DEM at the specified resolution in meters and CRS. |
Source code in leafmap/common.py
def get_3dep_dem(
geometry,
resolution=30,
src_crs="EPSG:4326",
output=None,
dst_crs="EPSG:5070",
to_cog=False,
overwrite=False,
**kwargs,
):
"""Get DEM data at any resolution from 3DEP.
Args:
geometry (Polygon | MultiPolygon | tuple): It can be a polygon or a bounding
box of form (xmin, ymin, xmax, ymax).
resolution (int): arget DEM source resolution in meters. Defaults to 30.
src_crs (str, optional): The spatial reference system of the input geometry. Defaults to "EPSG:4326".
output (str, optional): The output GeoTIFF file. Defaults to None.
dst_crs (str, optional): The spatial reference system of the output GeoTIFF file. Defaults to "EPSG:5070".
to_cog (bool, optional): Convert to Cloud Optimized GeoTIFF. Defaults to False.
overwrite (bool, optional): Whether to overwrite the output file if it exists. Defaults to False.
Returns:
xarray.DataArray: DEM at the specified resolution in meters and CRS.
"""
try:
import py3dep
except ImportError:
raise ImportError("py3dep is not installed. Install it with pip install py3dep")
try:
import geopandas as gpd
except ImportError:
raise ImportError(
"geopandas is not installed. Install it with pip install geopandas"
)
if output is not None and os.path.exists(output) and not overwrite:
print(f"File {output} already exists. Set overwrite=True to overwrite it")
return
if isinstance(geometry, gpd.GeoDataFrame):
geometry = geometry.geometry.unary_union
dem = py3dep.get_dem(geometry, resolution=resolution, crs=src_crs)
dem = dem.rio.reproject(dst_crs)
if output is not None:
if not output.endswith(".tif"):
output += ".tif"
print(output)
dem.rio.to_raster(output, **kwargs)
if to_cog:
image_to_cog(output, output)
else:
return dem
get_api_key(token_name, m=None)
¶
Retrieves an API key based on a system environmen variable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token_name |
str |
The token name. |
required |
m |
ipyleaflet.Map | folium.Map |
A Map instance. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
str |
The API key. |
Source code in leafmap/common.py
def get_api_key(token_name, m=None):
"""Retrieves an API key based on a system environmen variable.
Args:
token_name (str): The token name.
m (ipyleaflet.Map | folium.Map, optional): A Map instance. Defaults to None.
Returns:
str: The API key.
"""
api_key = os.environ.get(token_name)
if m is not None and token_name in m.api_keys:
api_key = m.api_keys[token_name]
return api_key
get_bounds(geometry, north_up=True, transform=None)
¶
Bounding box of a GeoJSON geometry, GeometryCollection, or FeatureCollection. left, bottom, right, top not xmin, ymin, xmax, ymax If not north_up, y will be switched to guarantee the above. Source code adapted from https://github.com/mapbox/rasterio/blob/master/rasterio/features.py#L361
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
dict |
A GeoJSON dict. |
required |
north_up |
bool |
. Defaults to True. |
True |
transform |
[type] |
. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
list |
A list of coordinates representing [left, bottom, right, top] |
Source code in leafmap/common.py
def get_bounds(geometry, north_up=True, transform=None):
"""Bounding box of a GeoJSON geometry, GeometryCollection, or FeatureCollection.
left, bottom, right, top
*not* xmin, ymin, xmax, ymax
If not north_up, y will be switched to guarantee the above.
Source code adapted from https://github.com/mapbox/rasterio/blob/master/rasterio/features.py#L361
Args:
geometry (dict): A GeoJSON dict.
north_up (bool, optional): . Defaults to True.
transform ([type], optional): . Defaults to None.
Returns:
list: A list of coordinates representing [left, bottom, right, top]
"""
if "bbox" in geometry:
return tuple(geometry["bbox"])
geometry = geometry.get("geometry") or geometry
# geometry must be a geometry, GeometryCollection, or FeatureCollection
if not (
"coordinates" in geometry or "geometries" in geometry or "features" in geometry
):
raise ValueError(
"geometry must be a GeoJSON-like geometry, GeometryCollection, "
"or FeatureCollection"
)
if "features" in geometry:
# Input is a FeatureCollection
xmins = []
ymins = []
xmaxs = []
ymaxs = []
for feature in geometry["features"]:
xmin, ymin, xmax, ymax = get_bounds(feature["geometry"])
xmins.append(xmin)
ymins.append(ymin)
xmaxs.append(xmax)
ymaxs.append(ymax)
if north_up:
return min(xmins), min(ymins), max(xmaxs), max(ymaxs)
else:
return min(xmins), max(ymaxs), max(xmaxs), min(ymins)
elif "geometries" in geometry:
# Input is a geometry collection
xmins = []
ymins = []
xmaxs = []
ymaxs = []
for geometry in geometry["geometries"]:
xmin, ymin, xmax, ymax = get_bounds(geometry)
xmins.append(xmin)
ymins.append(ymin)
xmaxs.append(xmax)
ymaxs.append(ymax)
if north_up:
return min(xmins), min(ymins), max(xmaxs), max(ymaxs)
else:
return min(xmins), max(ymaxs), max(xmaxs), min(ymins)
elif "coordinates" in geometry:
# Input is a singular geometry object
if transform is not None:
xyz = list(explode(geometry["coordinates"]))
xyz_px = [transform * point for point in xyz]
xyz = tuple(zip(*xyz_px))
return min(xyz[0]), max(xyz[1]), max(xyz[0]), min(xyz[1])
else:
xyz = tuple(zip(*list(explode(geometry["coordinates"]))))
if north_up:
return min(xyz[0]), min(xyz[1]), max(xyz[0]), max(xyz[1])
else:
return min(xyz[0]), max(xyz[1]), max(xyz[0]), min(xyz[1])
# all valid inputs returned above, so whatever falls through is an error
raise ValueError(
"geometry must be a GeoJSON-like geometry, GeometryCollection, "
"or FeatureCollection"
)
get_census_dict(reset=False)
¶
Returns a dictionary of Census data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reset |
bool |
Reset the dictionary. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
dict |
A dictionary of Census data. |
Source code in leafmap/common.py
def get_census_dict(reset=False):
"""Returns a dictionary of Census data.
Args:
reset (bool, optional): Reset the dictionary. Defaults to False.
Returns:
dict: A dictionary of Census data.
"""
import json
import pkg_resources
pkg_dir = os.path.dirname(pkg_resources.resource_filename("leafmap", "leafmap.py"))
census_data = os.path.join(pkg_dir, "data/census_data.json")
if reset:
try:
from owslib.wms import WebMapService
except ImportError:
raise ImportError("Please install owslib using 'pip install owslib'.")
census_dict = {}
names = [
"Current",
"ACS 2021",
"ACS 2019",
"ACS 2018",
"ACS 2017",
"ACS 2016",
"ACS 2015",
"ACS 2014",
"ACS 2013",
"ACS 2012",
"ECON 2012",
"Census 2020",
"Census 2010",
"Physical Features",
"Decennial Census 2020",
"Decennial Census 2010",
"Decennial Census 2000",
"Decennial Physical Features",
]
links = {}
print("Retrieving data. Please wait ...")
for name in names:
if "Decennial" not in name:
links[
name
] = f"https://tigerweb.geo.census.gov/arcgis/services/TIGERweb/tigerWMS_{name.replace(' ', '')}/MapServer/WMSServer"
else:
links[
name
] = f"https://tigerweb.geo.census.gov/arcgis/services/Census2020/tigerWMS_{name.replace('Decennial', '').replace(' ', '')}/MapServer/WMSServer"
wms = WebMapService(links[name], timeout=300)
layers = list(wms.contents)
layers.sort()
census_dict[name] = {
"url": links[name],
"layers": layers,
# "title": wms.identification.title,
# "abstract": wms.identification.abstract,
}
with open(census_data, "w") as f:
json.dump(census_dict, f, indent=4)
else:
with open(census_data, "r") as f:
census_dict = json.load(f)
return census_dict
get_center(geometry, north_up=True, transform=None)
¶
Get the centroid of a GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
dict |
A GeoJSON dict. |
required |
north_up |
bool |
. Defaults to True. |
True |
transform |
[type] |
. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
list |
[lon, lat] |
Source code in leafmap/common.py
def get_center(geometry, north_up=True, transform=None):
"""Get the centroid of a GeoJSON.
Args:
geometry (dict): A GeoJSON dict.
north_up (bool, optional): . Defaults to True.
transform ([type], optional): . Defaults to None.
Returns:
list: [lon, lat]
"""
bounds = get_bounds(geometry, north_up, transform)
center = ((bounds[0] + bounds[2]) / 2, (bounds[1] + bounds[3]) / 2) # (lat, lon)
return center
get_direct_url(url)
¶
Get the direct URL for a given URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to get the direct URL for. |
required |
Returns:
Type | Description |
---|---|
str |
The direct URL. |
Source code in leafmap/common.py
def get_direct_url(url):
"""Get the direct URL for a given URL.
Args:
url (str): The URL to get the direct URL for.
Returns:
str: The direct URL.
"""
if not isinstance(url, str):
raise ValueError("url must be a string.")
if not url.startswith("http"):
raise ValueError("url must start with http.")
r = requests.head(url, allow_redirects=True)
return r.url
get_geometry_coords(row, geom, coord_type, shape_type, mercator=False)
¶
Returns the coordinates ('x' or 'y') of edges of a Polygon exterior.
:param: (GeoPandas Series) row : The row of each of the GeoPandas DataFrame. :param: (str) geom : The column name. :param: (str) coord_type : Whether it's 'x' or 'y' coordinate. :param: (str) shape_type
Source code in leafmap/common.py
def get_geometry_coords(row, geom, coord_type, shape_type, mercator=False):
"""
Returns the coordinates ('x' or 'y') of edges of a Polygon exterior.
:param: (GeoPandas Series) row : The row of each of the GeoPandas DataFrame.
:param: (str) geom : The column name.
:param: (str) coord_type : Whether it's 'x' or 'y' coordinate.
:param: (str) shape_type
"""
# Parse the exterior of the coordinate
if shape_type.lower() in ["polygon", "multipolygon"]:
exterior = row[geom].geoms[0].exterior
if coord_type == "x":
# Get the x coordinates of the exterior
coords = list(exterior.coords.xy[0])
if mercator:
coords = [lnglat_to_meters(x, 0)[0] for x in coords]
return coords
elif coord_type == "y":
# Get the y coordinates of the exterior
coords = list(exterior.coords.xy[1])
if mercator:
coords = [lnglat_to_meters(0, y)[1] for y in coords]
return coords
elif shape_type.lower() in ["linestring", "multilinestring"]:
if coord_type == "x":
coords = list(row[geom].coords.xy[0])
if mercator:
coords = [lnglat_to_meters(x, 0)[0] for x in coords]
return coords
elif coord_type == "y":
coords = list(row[geom].coords.xy[1])
if mercator:
coords = [lnglat_to_meters(0, y)[1] for y in coords]
return coords
elif shape_type.lower() in ["point", "multipoint"]:
exterior = row[geom]
if coord_type == "x":
# Get the x coordinates of the exterior
coords = exterior.coords.xy[0][0]
if mercator:
coords = lnglat_to_meters(coords, 0)[0]
return coords
elif coord_type == "y":
# Get the y coordinates of the exterior
coords = exterior.coords.xy[1][0]
if mercator:
coords = lnglat_to_meters(0, coords)[1]
return coords
get_local_tile_layer(source, port='default', debug=False, projection='EPSG:3857', band=None, palette=None, vmin=None, vmax=None, nodata=None, attribution=None, tile_format='ipyleaflet', layer_name='Local COG', return_client=False, **kwargs)
¶
Generate an ipyleaflet/folium TileLayer from a local raster dataset or remote Cloud Optimized GeoTIFF (COG). If you are using this function in JupyterHub on a remote server and the raster does not render properly, try running the following two lines before calling this function:
1 2 |
|
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
The path to the GeoTIFF file or the URL of the Cloud Optimized GeoTIFF. |
required |
port |
str |
The port to use for the server. Defaults to "default". |
'default' |
debug |
bool |
If True, the server will be started in debug mode. Defaults to False. |
False |
projection |
str |
The projection of the GeoTIFF. Defaults to "EPSG:3857". |
'EPSG:3857' |
band |
int |
The band to use. Band indexing starts at 1. Defaults to None. |
None |
palette |
str |
The name of the color palette from |
None |
vmin |
float |
The minimum value to use when colormapping the palette when plotting a single band. Defaults to None. |
None |
vmax |
float |
The maximum value to use when colormapping the palette when plotting a single band. Defaults to None. |
None |
nodata |
float |
The value from the band to use to interpret as not valid data. Defaults to None. |
None |
attribution |
str |
Attribution for the source raster. This defaults to a message about it being a local file.. Defaults to None. |
None |
tile_format |
str |
The tile layer format. Can be either ipyleaflet or folium. Defaults to "ipyleaflet". |
'ipyleaflet' |
layer_name |
str |
The layer name to use. Defaults to None. |
'Local COG' |
return_client |
bool |
If True, the tile client will be returned. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
ipyleaflet.TileLayer | folium.TileLayer |
An ipyleaflet.TileLayer or folium.TileLayer. |
Source code in leafmap/common.py
def get_local_tile_layer(
source,
port="default",
debug=False,
projection="EPSG:3857",
band=None,
palette=None,
vmin=None,
vmax=None,
nodata=None,
attribution=None,
tile_format="ipyleaflet",
layer_name="Local COG",
return_client=False,
**kwargs,
):
"""Generate an ipyleaflet/folium TileLayer from a local raster dataset or remote Cloud Optimized GeoTIFF (COG).
If you are using this function in JupyterHub on a remote server and the raster does not render properly, try
running the following two lines before calling this function:
import os
os.environ['LOCALTILESERVER_CLIENT_PREFIX'] = 'proxy/{port}'
Args:
source (str): The path to the GeoTIFF file or the URL of the Cloud Optimized GeoTIFF.
port (str, optional): The port to use for the server. Defaults to "default".
debug (bool, optional): If True, the server will be started in debug mode. Defaults to False.
projection (str, optional): The projection of the GeoTIFF. Defaults to "EPSG:3857".
band (int, optional): The band to use. Band indexing starts at 1. Defaults to None.
palette (str, optional): The name of the color palette from `palettable` to use when plotting a single band. See https://jiffyclub.github.io/palettable. Default is greyscale
vmin (float, optional): The minimum value to use when colormapping the palette when plotting a single band. Defaults to None.
vmax (float, optional): The maximum value to use when colormapping the palette when plotting a single band. Defaults to None.
nodata (float, optional): The value from the band to use to interpret as not valid data. Defaults to None.
attribution (str, optional): Attribution for the source raster. This defaults to a message about it being a local file.. Defaults to None.
tile_format (str, optional): The tile layer format. Can be either ipyleaflet or folium. Defaults to "ipyleaflet".
layer_name (str, optional): The layer name to use. Defaults to None.
return_client (bool, optional): If True, the tile client will be returned. Defaults to False.
Returns:
ipyleaflet.TileLayer | folium.TileLayer: An ipyleaflet.TileLayer or folium.TileLayer.
"""
check_package(
"localtileserver", URL="https://github.com/banesullivan/localtileserver"
)
if "max_zoom" not in kwargs:
kwargs["max_zoom"] = 100
if "max_native_zoom" not in kwargs:
kwargs["max_native_zoom"] = 100
# Make it compatible with binder and JupyterHub
if os.environ.get("JUPYTERHUB_SERVICE_PREFIX") is not None:
os.environ[
"LOCALTILESERVER_CLIENT_PREFIX"
] = f"{os.environ['JUPYTERHUB_SERVICE_PREFIX'].lstrip('/')}/proxy/{{port}}"
if is_studio_lab():
os.environ[
"LOCALTILESERVER_CLIENT_PREFIX"
] = f"studiolab/default/jupyter/proxy/{{port}}"
elif is_on_aws():
os.environ["LOCALTILESERVER_CLIENT_PREFIX"] = "proxy/{port}"
elif "prefix" in kwargs:
os.environ["LOCALTILESERVER_CLIENT_PREFIX"] = kwargs["prefix"]
kwargs.pop("prefix")
from localtileserver import (
get_leaflet_tile_layer,
get_folium_tile_layer,
TileClient,
)
if "show_loading" not in kwargs:
kwargs["show_loading"] = False
if isinstance(source, str):
if not source.startswith("http"):
if source.startswith("~"):
source = os.path.expanduser(source)
else:
source = os.path.abspath(source)
if not os.path.exists(source):
raise ValueError("The source path does not exist.")
else:
source = github_raw_url(source)
else:
raise ValueError("The source must either be a string or TileClient")
if isinstance(palette, str):
palette = get_palette_colors(palette, hashtag=True)
if tile_format not in ["ipyleaflet", "folium"]:
raise ValueError("The tile format must be either ipyleaflet or folium.")
if layer_name is None:
if source.startswith("http"):
layer_name = "RemoteTile_" + random_string(3)
else:
layer_name = "LocalTile_" + random_string(3)
tile_client = TileClient(source, port=port, debug=debug)
if tile_format == "ipyleaflet":
tile_layer = get_leaflet_tile_layer(
tile_client,
port=port,
debug=debug,
projection=projection,
band=band,
palette=palette,
vmin=vmin,
vmax=vmax,
nodata=nodata,
attribution=attribution,
name=layer_name,
**kwargs,
)
else:
tile_layer = get_folium_tile_layer(
tile_client,
port=port,
debug=debug,
projection=projection,
band=band,
palette=palette,
vmin=vmin,
vmax=vmax,
nodata=nodata,
attr=attribution,
overlay=True,
name=layer_name,
**kwargs,
)
if return_client:
return tile_layer, tile_client
else:
return tile_layer
# center = tile_client.center()
# bounds = tile_client.bounds() # [ymin, ymax, xmin, xmax]
# bounds = (bounds[2], bounds[0], bounds[3], bounds[1]) # [minx, miny, maxx, maxy]
# if get_center and get_bounds:
# return tile_layer, center, bounds
# elif get_center:
# return tile_layer, center
# elif get_bounds:
# return tile_layer, bounds
# else:
# return tile_layer
get_nhd_basins(feature_ids, fsource='nwissite', split_catchment=False, simplified=True, **kwargs)
¶
Get NHD basins for a list of station IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
feature_ids |
str | list |
Target feature ID(s). |
required |
fsource |
str |
The name of feature(s) source, defaults to |
'nwissite' |
split_catchment |
bool |
If True, split basins at their outlet locations |
False |
simplified |
bool |
If True, return a simplified version of basin geometries. Default to True. |
True |
Exceptions:
Type | Description |
---|---|
ImportError |
If pynhd is not installed. |
Returns:
Type | Description |
---|---|
geopandas.GeoDataFrame |
NLDI indexed basins in EPSG:4326. If some IDs don't return any features a list of missing ID(s) are returned as well. |
Source code in leafmap/common.py
def get_nhd_basins(
feature_ids,
fsource="nwissite",
split_catchment=False,
simplified=True,
**kwargs,
):
"""Get NHD basins for a list of station IDs.
Args:
feature_ids (str | list): Target feature ID(s).
fsource (str, optional): The name of feature(s) source, defaults to ``nwissite``.
The valid sources are:
* 'comid' for NHDPlus comid.
* 'ca_gages' for Streamgage catalog for CA SB19
* 'gfv11_pois' for USGS Geospatial Fabric V1.1 Points of Interest
* 'huc12pp' for HUC12 Pour Points
* 'nmwdi-st' for New Mexico Water Data Initative Sites
* 'nwisgw' for NWIS Groundwater Sites
* 'nwissite' for NWIS Surface Water Sites
* 'ref_gage' for geoconnex.us reference gauges
* 'vigil' for Vigil Network Data
* 'wade' for Water Data Exchange 2.0 Sites
* 'WQP' for Water Quality Portal
split_catchment (bool, optional): If True, split basins at their outlet locations
simplified (bool, optional): If True, return a simplified version of basin geometries.
Default to True.
Raises:
ImportError: If pynhd is not installed.
Returns:
geopandas.GeoDataFrame: NLDI indexed basins in EPSG:4326. If some IDs don't return any features
a list of missing ID(s) are returned as well.
"""
try:
from pynhd import NLDI
except ImportError:
raise ImportError("pynhd is not installed. Install it with pip install pynhd")
return NLDI().get_basins(
feature_ids, fsource, split_catchment, simplified, **kwargs
)
get_overlap(img1, img2, overlap, out_img1=None, out_img2=None, to_cog=True)
¶
Get overlapping area of two images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img1 |
str |
Path to the first image. |
required |
img2 |
str |
Path to the second image. |
required |
overlap |
str |
Path to the output overlap area in GeoJSON format. |
required |
out_img1 |
str |
Path to the cropped image of the first image. |
None |
out_img2 |
str |
Path to the cropped image of the second image. |
None |
to_cog |
bool |
Whether to convert the output images to COG. |
True |
Returns:
Type | Description |
---|---|
str |
Path to the overlap area in GeoJSON format. |
Source code in leafmap/common.py
def get_overlap(img1, img2, overlap, out_img1=None, out_img2=None, to_cog=True):
"""Get overlapping area of two images.
Args:
img1 (str): Path to the first image.
img2 (str): Path to the second image.
overlap (str): Path to the output overlap area in GeoJSON format.
out_img1 (str, optional): Path to the cropped image of the first image.
out_img2 (str, optional): Path to the cropped image of the second image.
to_cog (bool, optional): Whether to convert the output images to COG.
Returns:
str: Path to the overlap area in GeoJSON format.
"""
import json
from osgeo import gdal, ogr, osr
import geopandas as gpd
extent = gdal.Info(img1, format="json")["wgs84Extent"]
poly1 = ogr.CreateGeometryFromJson(json.dumps(extent))
extent = gdal.Info(img2, format="json")["wgs84Extent"]
poly2 = ogr.CreateGeometryFromJson(json.dumps(extent))
intersection = poly1.Intersection(poly2)
gg = gdal.OpenEx(intersection.ExportToJson())
ds = gdal.VectorTranslate(
overlap,
srcDS=gg,
format="GeoJSON",
layerCreationOptions=["RFC7946=YES", "WRITE_BBOX=YES"],
)
ds = None
d = gdal.Open(img1)
proj = osr.SpatialReference(wkt=d.GetProjection())
epsg = proj.GetAttrValue("AUTHORITY", 1)
gdf = gpd.read_file(overlap)
gdf.to_crs(epsg=epsg, inplace=True)
gdf.to_file(overlap)
if out_img1 is not None:
clip_image(img1, overlap, out_img1, to_cog=to_cog)
if out_img2 is not None:
clip_image(img2, overlap, out_img2, to_cog=to_cog)
return overlap
get_palettable(types=None)
¶
Get a list of palettable color palettes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
types |
list |
A list of palettable types to return, e.g., types=['matplotlib', 'cartocolors']. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
list |
A list of palettable color palettes. |
Source code in leafmap/common.py
def get_palettable(types=None):
"""Get a list of palettable color palettes.
Args:
types (list, optional): A list of palettable types to return, e.g., types=['matplotlib', 'cartocolors']. Defaults to None.
Returns:
list: A list of palettable color palettes.
"""
try:
import palettable
except ImportError:
raise ImportError(
"Please install the palettable package using 'pip install palettable'."
)
if types is not None and (not isinstance(types, list)):
raise ValueError("The types must be a list.")
allowed_palettes = [
"cartocolors",
"cmocean",
"colorbrewer",
"cubehelix",
"lightbartlein",
"matplotlib",
"mycarta",
"scientific",
"tableau",
"wesanderson",
]
if types is None:
types = allowed_palettes[:]
if all(x in allowed_palettes for x in types):
pass
else:
raise ValueError(
"The types must be one of the following: " + ", ".join(allowed_palettes)
)
palettes = []
if "cartocolors" in types:
cartocolors_diverging = [
f"cartocolors.diverging.{c}"
for c in dir(palettable.cartocolors.diverging)[:-19]
]
cartocolors_qualitative = [
f"cartocolors.qualitative.{c}"
for c in dir(palettable.cartocolors.qualitative)[:-19]
]
cartocolors_sequential = [
f"cartocolors.sequential.{c}"
for c in dir(palettable.cartocolors.sequential)[:-41]
]
palettes = (
palettes
+ cartocolors_diverging
+ cartocolors_qualitative
+ cartocolors_sequential
)
if "cmocean" in types:
cmocean_diverging = [
f"cmocean.diverging.{c}" for c in dir(palettable.cmocean.diverging)[:-19]
]
cmocean_sequential = [
f"cmocean.sequential.{c}" for c in dir(palettable.cmocean.sequential)[:-19]
]
palettes = palettes + cmocean_diverging + cmocean_sequential
if "colorbrewer" in types:
colorbrewer_diverging = [
f"colorbrewer.diverging.{c}"
for c in dir(palettable.colorbrewer.diverging)[:-19]
]
colorbrewer_qualitative = [
f"colorbrewer.qualitative.{c}"
for c in dir(palettable.colorbrewer.qualitative)[:-19]
]
colorbrewer_sequential = [
f"colorbrewer.sequential.{c}"
for c in dir(palettable.colorbrewer.sequential)[:-41]
]
palettes = (
palettes
+ colorbrewer_diverging
+ colorbrewer_qualitative
+ colorbrewer_sequential
)
if "cubehelix" in types:
cubehelix = [
"classic_16",
"cubehelix1_16",
"cubehelix2_16",
"cubehelix3_16",
"jim_special_16",
"perceptual_rainbow_16",
"purple_16",
"red_16",
]
cubehelix = [f"cubehelix.{c}" for c in cubehelix]
palettes = palettes + cubehelix
if "lightbartlein" in types:
lightbartlein_diverging = [
f"lightbartlein.diverging.{c}"
for c in dir(palettable.lightbartlein.diverging)[:-19]
]
lightbartlein_sequential = [
f"lightbartlein.sequential.{c}"
for c in dir(palettable.lightbartlein.sequential)[:-19]
]
palettes = palettes + lightbartlein_diverging + lightbartlein_sequential
if "matplotlib" in types:
matplotlib_colors = [
f"matplotlib.{c}" for c in dir(palettable.matplotlib)[:-16]
]
palettes = palettes + matplotlib_colors
if "mycarta" in types:
mycarta = [f"mycarta.{c}" for c in dir(palettable.mycarta)[:-16]]
palettes = palettes + mycarta
if "scientific" in types:
scientific_diverging = [
f"scientific.diverging.{c}"
for c in dir(palettable.scientific.diverging)[:-19]
]
scientific_sequential = [
f"scientific.sequential.{c}"
for c in dir(palettable.scientific.sequential)[:-19]
]
palettes = palettes + scientific_diverging + scientific_sequential
if "tableau" in types:
tableau = [f"tableau.{c}" for c in dir(palettable.tableau)[:-14]]
palettes = palettes + tableau
return palettes
get_palette_colors(cmap_name=None, n_class=None, hashtag=False)
¶
Get a palette from a matplotlib colormap. See the list of colormaps at https://matplotlib.org/stable/tutorials/colors/colormaps.html.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmap_name |
str |
The name of the matplotlib colormap. Defaults to None. |
None |
n_class |
int |
The number of colors. Defaults to None. |
None |
hashtag |
bool |
Whether to return a list of hex colors. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
list |
A list of hex colors. |
Source code in leafmap/common.py
def get_palette_colors(cmap_name=None, n_class=None, hashtag=False):
"""Get a palette from a matplotlib colormap. See the list of colormaps at https://matplotlib.org/stable/tutorials/colors/colormaps.html.
Args:
cmap_name (str, optional): The name of the matplotlib colormap. Defaults to None.
n_class (int, optional): The number of colors. Defaults to None.
hashtag (bool, optional): Whether to return a list of hex colors. Defaults to False.
Returns:
list: A list of hex colors.
"""
import matplotlib as mpl
import matplotlib.pyplot as plt
cmap = plt.cm.get_cmap(cmap_name, n_class)
colors = [mpl.colors.rgb2hex(cmap(i))[1:] for i in range(cmap.N)]
if hashtag:
colors = ["#" + i for i in colors]
return colors
get_stac_collections(url, **kwargs)
¶
Retrieve a list of STAC collections from a URL. This function is adapted from https://github.com/mykolakozyr/stacdiscovery/blob/a5d1029aec9c428a7ce7ae615621ea8915162824/app.py#L31. Credits to Mykola Kozyr.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
A URL to a STAC catalog. |
required |
**kwargs |
Additional keyword arguments to pass to the pystac Client.open() method. See https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open |
{} |
Returns:
Type | Description |
---|---|
list |
A list of STAC collections. |
Source code in leafmap/common.py
def get_stac_collections(url, **kwargs):
"""Retrieve a list of STAC collections from a URL.
This function is adapted from https://github.com/mykolakozyr/stacdiscovery/blob/a5d1029aec9c428a7ce7ae615621ea8915162824/app.py#L31.
Credits to Mykola Kozyr.
Args:
url (str): A URL to a STAC catalog.
**kwargs: Additional keyword arguments to pass to the pystac Client.open() method.
See https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open
Returns:
list: A list of STAC collections.
"""
from pystac_client import Client
# Expensive function. Added cache for it.
# Empty list that would be used for a dataframe to collect and visualize info about collections
root_catalog = Client.open(url, **kwargs)
collections_list = []
# Reading collections in the Catalog
collections = list(root_catalog.get_collections())
print(collections)
for collection in collections:
id = collection.id
title = collection.title
# bbox = collection.extent.spatial.bboxes # not in use for the first release
# interval = collection.extent.temporal.intervals # not in use for the first release
description = collection.description
# creating a list of lists of values
collections_list.append([id, title, description])
return collections_list
get_stac_items(url, collection, limit=None, bbox=None, datetime=None, intersects=None, ids=None, open_args=None, **kwargs)
¶
Retrieve a list of STAC items from a URL and a collection. This function is adapted from https://github.com/mykolakozyr/stacdiscovery/blob/a5d1029aec9c428a7ce7ae615621ea8915162824/app.py#L49. Credits to Mykola Kozyr. Available parameters can be found at https://github.com/radiantearth/stac-api-spec/tree/master/item-search
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
A URL to a STAC catalog. |
required |
collection |
str |
A STAC collection ID. |
required |
limit |
int |
The maximum number of results to return (page size). Defaults to None. |
None |
bbox |
tuple |
Requested bounding box in the format of (minx, miny, maxx, maxy). Defaults to None. |
None |
datetime |
str |
Single date+time, or a range ('/' separator), formatted to RFC 3339, section 5.6. Use double dots .. for open date ranges. |
None |
intersects |
dict |
A dictionary representing a GeoJSON Geometry. Searches items by performing intersection between their geometry and provided GeoJSON geometry. All GeoJSON geometry types must be supported. |
None |
ids |
list |
A list of item ids to return. |
None |
open_args |
dict |
A dictionary of arguments to pass to the pystac Client.open() method. Defaults to None. |
None |
**kwargs |
Additional keyword arguments to pass to the Catalog.search() method. |
{} |
Returns:
Type | Description |
---|---|
GeoPandas.GeoDataFraem |
A GeoDataFrame with the STAC items. |
Source code in leafmap/common.py
def get_stac_items(
url,
collection,
limit=None,
bbox=None,
datetime=None,
intersects=None,
ids=None,
open_args=None,
**kwargs,
):
"""Retrieve a list of STAC items from a URL and a collection.
This function is adapted from https://github.com/mykolakozyr/stacdiscovery/blob/a5d1029aec9c428a7ce7ae615621ea8915162824/app.py#L49.
Credits to Mykola Kozyr.
Available parameters can be found at https://github.com/radiantearth/stac-api-spec/tree/master/item-search
Args:
url (str): A URL to a STAC catalog.
collection (str): A STAC collection ID.
limit (int, optional): The maximum number of results to return (page size). Defaults to None.
bbox (tuple, optional): Requested bounding box in the format of (minx, miny, maxx, maxy). Defaults to None.
datetime (str, optional): Single date+time, or a range ('/' separator), formatted to RFC 3339, section 5.6. Use double dots .. for open date ranges.
intersects (dict, optional): A dictionary representing a GeoJSON Geometry. Searches items by performing intersection between their geometry and provided GeoJSON geometry. All GeoJSON geometry types must be supported.
ids (list, optional): A list of item ids to return.
open_args (dict, optional): A dictionary of arguments to pass to the pystac Client.open() method. Defaults to None.
**kwargs: Additional keyword arguments to pass to the Catalog.search() method.
Returns:
GeoPandas.GeoDataFraem: A GeoDataFrame with the STAC items.
"""
import itertools
import geopandas as gpd
from shapely.geometry import shape
from pystac_client import Client
# Empty list that would be used for a dataframe to collect and visualize info about collections
items_list = []
if open_args is None:
open_args = {}
root_catalog = Client.open(url)
if limit:
kwargs["limit"] = limit
if bbox:
kwargs["bbox"] = bbox
if datetime:
kwargs["datetime"] = datetime
if intersects:
kwargs["intersects"] = intersects
if ids:
kwargs["ids"] = ids
if kwargs:
try:
catalog = root_catalog.search(collections=collection, **kwargs)
except NotImplementedError:
catalog = root_catalog
else:
catalog = root_catalog
iterable = catalog.get_all_items()
items = list(
itertools.islice(iterable, limit)
) # getting first 25000 items. To Do some smarter logic
if len(items) == 0:
try:
catalog = root_catalog.get_child(collection)
iterable = catalog.get_all_items()
items = list(itertools.islice(iterable, limit))
except Exception as _:
print("Ooops, it looks like this collection does not have items.")
return None
# Iterating over items to collect main information
for item in items:
id = item.id
geometry = shape(item.geometry)
datetime = (
item.datetime
or item.properties["datetime"]
or item.properties["end_datetime"]
or item.properties["start_datetime"]
)
links = item.links
for link in links:
if link.rel == "self":
self_url = link.target
assets_list = []
assets = item.assets
for asset in assets:
assets_list.append(asset)
# creating a list of lists of values
items_list.append([id, geometry, datetime, self_url, assets_list])
if limit is not None:
items_list = items_list[:limit]
items_df = gpd.GeoDataFrame(items_list)
items_df.columns = ["id", "geometry", "datetime", "self_url", "assets_list"]
items_gdf = items_df.set_geometry("geometry")
items_gdf["datetime"] = items_gdf["datetime"].astype(
str
) # specifically for KeplerGL. See https://github.com/keplergl/kepler.gl/issues/602
# items_gdf["assets_list"] = items_gdf["assets_list"].astype(str) #specifically for KeplerGL. See https://github.com/keplergl/kepler.gl/issues/602
items_gdf.set_crs(epsg=4326, inplace=True)
return items_gdf
get_wms_layers(url)
¶
Returns a list of WMS layers from a WMS service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL of the WMS service. |
required |
Returns:
Type | Description |
---|---|
list |
A list of WMS layers. |
Source code in leafmap/common.py
def get_wms_layers(url):
"""Returns a list of WMS layers from a WMS service.
Args:
url (str): The URL of the WMS service.
Returns:
list: A list of WMS layers.
"""
try:
from owslib.wms import WebMapService
except ImportError:
raise ImportError("Please install owslib using 'pip install owslib'.")
wms = WebMapService(url)
layers = list(wms.contents)
layers.sort()
return layers
gif_fading(in_gif, out_gif, duration=1, verbose=True)
¶
Fade in/out the gif.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The input gif file. Can be a directory path or http URL, e.g., "https://i.imgur.com/ZWSZC5z.gif" |
required |
out_gif |
str |
The output gif file. |
required |
duration |
float |
The duration of the fading. Defaults to 1. |
1 |
verbose |
bool |
Whether to print the progress. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
Raise exception when the input gif does not exist. |
Exception |
Raise exception when ffmpeg is not installed. |
Source code in leafmap/common.py
def gif_fading(in_gif, out_gif, duration=1, verbose=True):
"""Fade in/out the gif.
Args:
in_gif (str): The input gif file. Can be a directory path or http URL, e.g., "https://i.imgur.com/ZWSZC5z.gif"
out_gif (str): The output gif file.
duration (float, optional): The duration of the fading. Defaults to 1.
verbose (bool, optional): Whether to print the progress. Defaults to True.
Raises:
FileNotFoundError: Raise exception when the input gif does not exist.
Exception: Raise exception when ffmpeg is not installed.
"""
import glob
import tempfile
current_dir = os.getcwd()
if isinstance(in_gif, str) and in_gif.startswith("http"):
ext = os.path.splitext(in_gif)[1]
file_path = temp_file_path(ext)
download_from_url(in_gif, file_path, verbose=verbose)
in_gif = file_path
in_gif = os.path.abspath(in_gif)
if not in_gif.endswith(".gif"):
raise Exception("in_gif must be a gif file.")
if " " in in_gif:
raise Exception("The filename cannot contain spaces.")
out_gif = os.path.abspath(out_gif)
if not os.path.exists(os.path.dirname(out_gif)):
os.makedirs(os.path.dirname(out_gif))
if not os.path.exists(in_gif):
raise FileNotFoundError(f"{in_gif} does not exist.")
basename = os.path.basename(in_gif).replace(".gif", "")
temp_dir = os.path.join(tempfile.gettempdir(), basename)
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
gif_to_png(in_gif, temp_dir, verbose=verbose)
os.chdir(temp_dir)
images = list(glob.glob(os.path.join(temp_dir, "*.png")))
count = len(images)
files = []
for i in range(1, count + 1):
files.append(f"-loop 1 -t {duration} -i {i}.png")
inputs = " ".join(files)
filters = []
for i in range(1, count):
if i == 1:
filters.append(
f"\"[1:v][0:v]blend=all_expr='A*(if(gte(T,3),1,T/3))+B*(1-(if(gte(T,3),1,T/3)))'[v0];"
)
else:
filters.append(
f"[{i}:v][{i-1}:v]blend=all_expr='A*(if(gte(T,3),1,T/3))+B*(1-(if(gte(T,3),1,T/3)))'[v{i-1}];"
)
last_filter = ""
for i in range(count - 1):
last_filter += f"[v{i}]"
last_filter += f'concat=n={count-1}:v=1:a=0[v]" -map "[v]"'
filters.append(last_filter)
filters = " ".join(filters)
cmd = f"ffmpeg -y -loglevel error {inputs} -filter_complex {filters} {out_gif}"
os.system(cmd)
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(e)
os.chdir(current_dir)
gif_to_mp4(in_gif, out_mp4)
¶
Converts a gif to mp4.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The input gif file. |
required |
out_mp4 |
str |
The output mp4 file. |
required |
Source code in leafmap/common.py
def gif_to_mp4(in_gif, out_mp4):
"""Converts a gif to mp4.
Args:
in_gif (str): The input gif file.
out_mp4 (str): The output mp4 file.
"""
from PIL import Image
if not os.path.exists(in_gif):
raise FileNotFoundError(f"{in_gif} does not exist.")
out_mp4 = os.path.abspath(out_mp4)
if not out_mp4.endswith(".mp4"):
out_mp4 = out_mp4 + ".mp4"
if not os.path.exists(os.path.dirname(out_mp4)):
os.makedirs(os.path.dirname(out_mp4))
if not is_tool("ffmpeg"):
print("ffmpeg is not installed on your computer.")
return
width, height = Image.open(in_gif).size
if width % 2 == 0 and height % 2 == 0:
cmd = f"ffmpeg -loglevel error -i {in_gif} -vcodec libx264 -crf 25 -pix_fmt yuv420p {out_mp4}"
os.system(cmd)
else:
width += width % 2
height += height % 2
cmd = f"ffmpeg -loglevel error -i {in_gif} -vf scale={width}:{height} -vcodec libx264 -crf 25 -pix_fmt yuv420p {out_mp4}"
os.system(cmd)
if not os.path.exists(out_mp4):
raise Exception(f"Failed to create mp4 file.")
gif_to_png(in_gif, out_dir=None, prefix='', verbose=True)
¶
Converts a gif to png.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_gif |
str |
The input gif file. |
required |
out_dir |
str |
The output directory. Defaults to None. |
None |
prefix |
str |
The prefix of the output png files. Defaults to None. |
'' |
verbose |
bool |
Whether to print the progress. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
Raise exception when the input gif does not exist. |
Exception |
Raise exception when ffmpeg is not installed. |
Source code in leafmap/common.py
def gif_to_png(in_gif, out_dir=None, prefix="", verbose=True):
"""Converts a gif to png.
Args:
in_gif (str): The input gif file.
out_dir (str, optional): The output directory. Defaults to None.
prefix (str, optional): The prefix of the output png files. Defaults to None.
verbose (bool, optional): Whether to print the progress. Defaults to True.
Raises:
FileNotFoundError: Raise exception when the input gif does not exist.
Exception: Raise exception when ffmpeg is not installed.
"""
import tempfile
in_gif = os.path.abspath(in_gif)
if " " in in_gif:
raise Exception("in_gif cannot contain spaces.")
if not os.path.exists(in_gif):
raise FileNotFoundError(f"{in_gif} does not exist.")
basename = os.path.basename(in_gif).replace(".gif", "")
if out_dir is None:
out_dir = os.path.join(tempfile.gettempdir(), basename)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
elif isinstance(out_dir, str) and not os.path.exists(out_dir):
os.makedirs(out_dir)
elif not isinstance(out_dir, str):
raise Exception("out_dir must be a string.")
out_dir = os.path.abspath(out_dir)
cmd = f"ffmpeg -loglevel error -i {in_gif} -vsync 0 {out_dir}/{prefix}%d.png"
os.system(cmd)
if verbose:
print(f"Images are saved to {out_dir}")
github_raw_url(url)
¶
Get the raw URL for a GitHub file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The GitHub URL. |
required |
Returns:
Type | Description |
---|---|
str |
The raw URL. |
Source code in leafmap/common.py
def github_raw_url(url):
"""Get the raw URL for a GitHub file.
Args:
url (str): The GitHub URL.
Returns:
str: The raw URL.
"""
if isinstance(url, str) and url.startswith("https://github.com/") and "blob" in url:
url = url.replace("github.com", "raw.githubusercontent.com").replace(
"blob/", ""
)
return url
has_transparency(img)
¶
Checks whether an image has transparency.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img |
object |
a PIL Image object. |
required |
Returns:
Type | Description |
---|---|
bool |
True if it has transparency, False otherwise. |
Source code in leafmap/common.py
def has_transparency(img):
"""Checks whether an image has transparency.
Args:
img (object): a PIL Image object.
Returns:
bool: True if it has transparency, False otherwise.
"""
if img.mode == "P":
transparent = img.info.get("transparency", -1)
for _, index in img.getcolors():
if index == transparent:
return True
elif img.mode == "RGBA":
extrema = img.getextrema()
if extrema[3][0] < 255:
return True
return False
hex_to_rgb(value='FFFFFF')
¶
Converts hex color to RGB color.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
str |
Hex color code as a string. Defaults to 'FFFFFF'. |
'FFFFFF' |
Returns:
Type | Description |
---|---|
tuple |
RGB color as a tuple. |
Source code in leafmap/common.py
def hex_to_rgb(value="FFFFFF"):
"""Converts hex color to RGB color.
Args:
value (str, optional): Hex color code as a string. Defaults to 'FFFFFF'.
Returns:
tuple: RGB color as a tuple.
"""
value = value.lstrip("#")
lv = len(value)
return tuple(int(value[i : i + lv // 3], 16) for i in range(0, lv, lv // 3))
html_to_gradio(html, width='100%', height='500px', **kwargs)
¶
Converts the map to an HTML string that can be used in Gradio. Removes unsupported elements, such as attribution and any code blocks containing functions. See https://github.com/gradio-app/gradio/issues/3190
Parameters:
Name | Type | Description | Default |
---|---|---|---|
width |
str |
The width of the map. Defaults to '100%'. |
'100%' |
height |
str |
The height of the map. Defaults to '500px'. |
'500px' |
Returns:
Type | Description |
---|---|
str |
The HTML string to use in Gradio. |
Source code in leafmap/common.py
def html_to_gradio(html, width="100%", height="500px", **kwargs):
"""Converts the map to an HTML string that can be used in Gradio. Removes unsupported elements, such as
attribution and any code blocks containing functions. See https://github.com/gradio-app/gradio/issues/3190
Args:
width (str, optional): The width of the map. Defaults to '100%'.
height (str, optional): The height of the map. Defaults to '500px'.
Returns:
str: The HTML string to use in Gradio.
"""
if isinstance(width, int):
width = f"{width}px"
if isinstance(height, int):
height = f"{height}px"
if isinstance(html, str):
with open(html, "r") as f:
lines = f.readlines()
elif isinstance(html, list):
lines = html
else:
raise TypeError("html must be a file path or a list of strings")
output = []
skipped_lines = []
for index, line in enumerate(lines):
if index in skipped_lines:
continue
if line.lstrip().startswith('{"attribution":'):
continue
elif "on(L.Draw.Event.CREATED, function(e)" in line:
for i in range(14):
skipped_lines.append(index + i)
elif "L.Control.geocoder" in line:
for i in range(5):
skipped_lines.append(index + i)
elif "function(e)" in line:
print(
f"Warning: The folium plotting backend does not support functions in code blocks. Please delete line {index + 1}."
)
else:
output.append(line + "\n")
return f"""<iframe style="width: {width}; height: {height}" name="result" allow="midi; geolocation; microphone; camera;
display-capture; encrypted-media;" sandbox="allow-modals allow-forms
allow-scripts allow-same-origin allow-popups
allow-top-navigation-by-user-activation allow-downloads" allowfullscreen=""
allowpaymentrequest="" frameborder="0" srcdoc='{"".join(output)}'></iframe>"""
html_to_streamlit(filename, width=None, height=None, scrolling=False, replace_dict={})
¶
Renders an HTML file as a Streamlit component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The filename of the HTML file. |
required |
width |
int |
Width of the map. Defaults to None. |
None |
height |
int |
Height of the map. Defaults to 600. |
None |
scrolling |
bool |
Whether to allow the map to scroll. Defaults to False. |
False |
replace_dict |
dict |
A dictionary of strings to replace in the HTML file. Defaults to {}. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If the filename does not exist. |
Returns:
Type | Description |
---|---|
streamlit.components |
components.html object. |
Source code in leafmap/common.py
def html_to_streamlit(
filename, width=None, height=None, scrolling=False, replace_dict={}
):
"""Renders an HTML file as a Streamlit component.
Args:
filename (str): The filename of the HTML file.
width (int, optional): Width of the map. Defaults to None.
height (int, optional): Height of the map. Defaults to 600.
scrolling (bool, optional): Whether to allow the map to scroll. Defaults to False.
replace_dict (dict, optional): A dictionary of strings to replace in the HTML file. Defaults to {}.
Raises:
ValueError: If the filename does not exist.
Returns:
streamlit.components: components.html object.
"""
import streamlit.components.v1 as components
if not os.path.exists(filename):
raise ValueError("filename must exist.")
f = open(filename, "r")
html = f.read()
for key, value in replace_dict.items():
html = html.replace(key, value)
f.close()
return components.html(html, width=width, height=height, scrolling=scrolling)
image_bandcount(image, **kwargs)
¶
Get the number of bands in an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
int |
The number of bands in the image. |
Source code in leafmap/common.py
def image_bandcount(image, **kwargs):
"""Get the number of bands in an image.
Args:
image (str): The input image filepath or URL.
Returns:
int: The number of bands in the image.
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return len(client.metadata()["bands"])
image_bounds(image, **kwargs)
¶
Get the bounds of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
list |
A list of bounds in the form of [(south, west), (north, east)]. |
Source code in leafmap/common.py
def image_bounds(image, **kwargs):
"""Get the bounds of an image.
Args:
image (str): The input image filepath or URL.
Returns:
list: A list of bounds in the form of [(south, west), (north, east)].
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
bounds = client.bounds()
return [(bounds[0], bounds[2]), (bounds[1], bounds[3])]
image_center(image, **kwargs)
¶
Get the center of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of (latitude, longitude). |
Source code in leafmap/common.py
def image_center(image, **kwargs):
"""Get the center of an image.
Args:
image (str): The input image filepath or URL.
Returns:
tuple: A tuple of (latitude, longitude).
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return client.center()
image_client(image, **kwargs)
¶
Get a LocalTileserver TileClient from an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
TileClient |
A LocalTileserver TileClient. |
Source code in leafmap/common.py
def image_client(image, **kwargs):
"""Get a LocalTileserver TileClient from an image.
Args:
image (str): The input image filepath or URL.
Returns:
TileClient: A LocalTileserver TileClient.
"""
image_check(image)
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
return client
image_comparison(img1, img2, label1='1', label2='2', width=704, show_labels=True, starting_position=50, make_responsive=True, in_memory=True, out_html=None)
¶
Create a comparison slider for two images. The source code is adapted from https://github.com/fcakyon/streamlit-image-comparison. Credits to the GitHub user @fcakyon. Users can also use https://juxtapose.knightlab.com to create a comparison slider.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img1 |
str |
Path to the first image. It can be a local file path, a URL, or a numpy array. |
required |
img2 |
str |
Path to the second image. It can be a local file path, a URL, or a numpy array. |
required |
label1 |
str |
Label for the first image. Defaults to "1". |
'1' |
label2 |
str |
Label for the second image. Defaults to "2". |
'2' |
width |
int |
Width of the component in pixels. Defaults to 704. |
704 |
show_labels |
bool |
Whether to show labels on the images. Default is True. |
True |
starting_position |
int |
Starting position of the slider as a percentage (0-100). Default is 50. |
50 |
make_responsive |
bool |
Whether to enable responsive mode. Default is True. |
True |
in_memory |
bool |
Whether to handle pillow to base64 conversion in memory without saving to local. Default is True. |
True |
out_html |
str |
Whether to handle pillow to base64 conversion in memory without saving to local. Default is True. |
None |
Source code in leafmap/common.py
def image_comparison(
img1: str,
img2: str,
label1: str = "1",
label2: str = "2",
width: int = 704,
show_labels: bool = True,
starting_position: int = 50,
make_responsive: bool = True,
in_memory: bool = True,
out_html: str = None,
):
"""Create a comparison slider for two images. The source code is adapted from
https://github.com/fcakyon/streamlit-image-comparison. Credits to the GitHub user @fcakyon.
Users can also use https://juxtapose.knightlab.com to create a comparison slider.
Args:
img1 (str): Path to the first image. It can be a local file path, a URL, or a numpy array.
img2 (str): Path to the second image. It can be a local file path, a URL, or a numpy array.
label1 (str, optional): Label for the first image. Defaults to "1".
label2 (str, optional): Label for the second image. Defaults to "2".
width (int, optional): Width of the component in pixels. Defaults to 704.
show_labels (bool, optional): Whether to show labels on the images. Default is True.
starting_position (int, optional): Starting position of the slider as a percentage (0-100). Default is 50.
make_responsive (bool, optional): Whether to enable responsive mode. Default is True.
in_memory (bool, optional): Whether to handle pillow to base64 conversion in memory without saving to local. Default is True.
out_html (str, optional): Whether to handle pillow to base64 conversion in memory without saving to local. Default is True.
"""
from PIL import Image
import base64
import io
import os
import uuid
from typing import Union
import requests
import tempfile
import numpy as np
from IPython.display import HTML, display
TEMP_DIR = os.path.join(tempfile.gettempdir(), random_string(6))
os.makedirs(TEMP_DIR, exist_ok=True)
def exif_transpose(image: Image.Image):
"""
Transpose a PIL image accordingly if it has an EXIF Orientation tag.
Inplace version of https://github.com/python-pillow/Pillow/blob/master/src/PIL/ImageOps.py exif_transpose()
:param image: The image to transpose.
:return: An image.
"""
exif = image.getexif()
orientation = exif.get(0x0112, 1) # default 1
if orientation > 1:
method = {
2: Image.FLIP_LEFT_RIGHT,
3: Image.ROTATE_180,
4: Image.FLIP_TOP_BOTTOM,
5: Image.TRANSPOSE,
6: Image.ROTATE_270,
7: Image.TRANSVERSE,
8: Image.ROTATE_90,
}.get(orientation)
if method is not None:
image = image.transpose(method)
del exif[0x0112]
image.info["exif"] = exif.tobytes()
return image
def read_image_as_pil(
image: Union[Image.Image, str, np.ndarray], exif_fix: bool = False
):
"""
Loads an image as PIL.Image.Image.
Args:
image : Can be image path or url (str), numpy image (np.ndarray) or PIL.Image
"""
# https://stackoverflow.com/questions/56174099/how-to-load-images-larger-than-max-image-pixels-with-pil
Image.MAX_IMAGE_PIXELS = None
if isinstance(image, Image.Image):
image_pil = image.convert("RGB")
elif isinstance(image, str):
# read image if str image path is provided
try:
image_pil = Image.open(
requests.get(image, stream=True).raw
if str(image).startswith("http")
else image
).convert("RGB")
if exif_fix:
image_pil = exif_transpose(image_pil)
except: # handle large/tiff image reading
try:
import skimage.io
except ImportError:
raise ImportError(
"Please run 'pip install -U scikit-image imagecodecs' for large image handling."
)
image_sk = skimage.io.imread(image).astype(np.uint8)
if len(image_sk.shape) == 2: # b&w
image_pil = Image.fromarray(image_sk, mode="1").convert("RGB")
elif image_sk.shape[2] == 4: # rgba
image_pil = Image.fromarray(image_sk, mode="RGBA").convert("RGB")
elif image_sk.shape[2] == 3: # rgb
image_pil = Image.fromarray(image_sk, mode="RGB")
else:
raise TypeError(
f"image with shape: {image_sk.shape[3]} is not supported."
)
elif isinstance(image, np.ndarray):
if image.shape[0] < 5: # image in CHW
image = image[:, :, ::-1]
image_pil = Image.fromarray(image).convert("RGB")
else:
raise TypeError("read image with 'pillow' using 'Image.open()'")
return image_pil
def pillow_to_base64(image: Image.Image) -> str:
"""
Convert a PIL image to a base64-encoded string.
Parameters
----------
image: PIL.Image.Image
The image to be converted.
Returns
-------
str
The base64-encoded string.
"""
in_mem_file = io.BytesIO()
image.save(in_mem_file, format="JPEG", subsampling=0, quality=100)
img_bytes = in_mem_file.getvalue() # bytes
image_str = base64.b64encode(img_bytes).decode("utf-8")
base64_src = f"data:image/jpg;base64,{image_str}"
return base64_src
def local_file_to_base64(image_path: str) -> str:
"""
Convert a local image file to a base64-encoded string.
Parameters
----------
image_path: str
The path to the image file.
Returns
-------
str
The base64-encoded string.
"""
file_ = open(image_path, "rb")
img_bytes = file_.read()
image_str = base64.b64encode(img_bytes).decode("utf-8")
file_.close()
base64_src = f"data:image/jpg;base64,{image_str}"
return base64_src
def pillow_local_file_to_base64(image: Image.Image, temp_dir: str):
"""
Convert a Pillow image to a base64 string, using a temporary file on disk.
Parameters
----------
image : PIL.Image.Image
The Pillow image to convert.
temp_dir : str
The directory to use for the temporary file.
Returns
-------
str
A base64-encoded string representing the image.
"""
# Create temporary file path using os.path.join()
img_path = os.path.join(temp_dir, str(uuid.uuid4()) + ".jpg")
# Save image to temporary file
image.save(img_path, subsampling=0, quality=100)
# Convert temporary file to base64 string
base64_src = local_file_to_base64(img_path)
return base64_src
# Prepare images
img1_pillow = read_image_as_pil(img1)
img2_pillow = read_image_as_pil(img2)
img_width, img_height = img1_pillow.size
h_to_w = img_height / img_width
height = int((width * h_to_w) * 0.95)
if in_memory:
# Convert images to base64 strings
img1 = pillow_to_base64(img1_pillow)
img2 = pillow_to_base64(img2_pillow)
else:
# Create base64 strings from temporary files
os.makedirs(TEMP_DIR, exist_ok=True)
for file_ in os.listdir(TEMP_DIR):
if file_.endswith(".jpg"):
os.remove(os.path.join(TEMP_DIR, file_))
img1 = pillow_local_file_to_base64(img1_pillow, TEMP_DIR)
img2 = pillow_local_file_to_base64(img2_pillow, TEMP_DIR)
# Load CSS and JS
cdn_path = "https://cdn.knightlab.com/libs/juxtapose/latest"
css_block = f'<link rel="stylesheet" href="{cdn_path}/css/juxtapose.css">'
js_block = f'<script src="{cdn_path}/js/juxtapose.min.js"></script>'
# write html block
htmlcode = f"""
<html>
<head>
<style>body {{ margin: unset; }}</style>
{css_block}
{js_block}
<div id="foo" style="height: {height}; width: {width or '100%'};"></div>
<script>
slider = new juxtapose.JXSlider('#foo',
[
{{
src: '{img1}',
label: '{label1}',
}},
{{
src: '{img2}',
label: '{label2}',
}}
],
{{
animate: true,
showLabels: {'true' if show_labels else 'false'},
showCredits: true,
startingPosition: "{starting_position}%",
makeResponsive: {'true' if make_responsive else 'false'},
}});
</script>
</head>
</html>
"""
if out_html is not None:
with open(out_html, "w") as f:
f.write(htmlcode)
shutil.rmtree(TEMP_DIR)
display(HTML(htmlcode))
image_filesize(region, cellsize, bands=1, dtype='uint8', unit='MB', source_crs='epsg:4326', dst_crs='epsg:3857', bbox=False)
¶
Calculate the size of an image in a given region and cell size.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
region |
list |
A bounding box in the format of [minx, miny, maxx, maxy]. |
required |
cellsize |
float |
The resolution of the image. |
required |
bands |
int |
Number of bands. Defaults to 1. |
1 |
dtype |
str |
Data type, such as unit8, float32. For more info, see https://numpy.org/doc/stable/user/basics.types.html. Defaults to 'uint8'. |
'uint8' |
unit |
str |
The unit of the output. Defaults to 'MB'. |
'MB' |
source_crs |
str |
The CRS of the region. Defaults to 'epsg:4326'. |
'epsg:4326' |
dst_crs |
str |
The destination CRS to calculate the area. Defaults to 'epsg:3857'. |
'epsg:3857' |
bbox |
bool |
Whether to use the bounding box of the region to calculate the area. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
float |
The size of the image in a given unit. |
Source code in leafmap/common.py
def image_filesize(
region,
cellsize,
bands=1,
dtype="uint8",
unit="MB",
source_crs="epsg:4326",
dst_crs="epsg:3857",
bbox=False,
):
"""Calculate the size of an image in a given region and cell size.
Args:
region (list): A bounding box in the format of [minx, miny, maxx, maxy].
cellsize (float): The resolution of the image.
bands (int, optional): Number of bands. Defaults to 1.
dtype (str, optional): Data type, such as unit8, float32. For more info,
see https://numpy.org/doc/stable/user/basics.types.html. Defaults to 'uint8'.
unit (str, optional): The unit of the output. Defaults to 'MB'.
source_crs (str, optional): The CRS of the region. Defaults to 'epsg:4326'.
dst_crs (str, optional): The destination CRS to calculate the area. Defaults to 'epsg:3857'.
bbox (bool, optional): Whether to use the bounding box of the region to calculate the area. Defaults to False.
Returns:
float: The size of the image in a given unit.
"""
import numpy as np
import geopandas as gpd
if bbox:
if isinstance(region, gpd.GeoDataFrame):
region = region.to_crs(dst_crs).total_bounds.tolist()
elif isinstance(region, str) and os.path.exists(region):
region = gpd.read_file(region).to_crs(dst_crs).total_bounds.tolist()
elif isinstance(region, list):
region = (
bbox_to_gdf(region, crs=source_crs)
.to_crs(dst_crs)
.total_bounds.tolist()
)
else:
raise ValueError("Invalid input region.")
bytes = (
np.prod(
[
int((region[2] - region[0]) / cellsize),
int((region[3] - region[1]) / cellsize),
bands,
]
)
* np.dtype(dtype).itemsize
)
else:
if isinstance(region, list):
region = bbox_to_gdf(region, crs=source_crs)
bytes = (
vector_area(region, crs=dst_crs)
/ pow(cellsize, 2)
* np.dtype(dtype).itemsize
* bands
)
unit = unit.upper()
if unit == "KB":
return bytes / 1024
elif unit == "MB":
return bytes / pow(1024, 2)
elif unit == "GB":
return bytes / pow(1024, 3)
elif unit == "TB":
return bytes / pow(1024, 4)
elif unit == "PB":
return bytes / pow(1024, 5)
else:
return bytes
image_geotransform(image, **kwargs)
¶
Get the geotransform of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
list |
A list of geotransform values. |
Source code in leafmap/common.py
def image_geotransform(image, **kwargs):
"""Get the geotransform of an image.
Args:
image (str): The input image filepath or URL.
Returns:
list: A list of geotransform values.
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return client.metadata()["GeoTransform"]
image_metadata(image, **kwargs)
¶
Get the metadata of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of image metadata. |
Source code in leafmap/common.py
def image_metadata(image, **kwargs):
"""Get the metadata of an image.
Args:
image (str): The input image filepath or URL.
Returns:
dict: A dictionary of image metadata.
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return client.metadata()
image_projection(image, **kwargs)
¶
Get the projection of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
str |
The projection of the image. |
Source code in leafmap/common.py
def image_projection(image, **kwargs):
"""Get the projection of an image.
Args:
image (str): The input image filepath or URL.
Returns:
str: The projection of the image.
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return client.metadata()["Projection"]
image_resolution(image, **kwargs)
¶
Get the resolution of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
float |
The resolution of the image. |
Source code in leafmap/common.py
def image_resolution(image, **kwargs):
"""Get the resolution of an image.
Args:
image (str): The input image filepath or URL.
Returns:
float: The resolution of the image.
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
return client.metadata()["GeoTransform"][1]
image_set_crs(image, epsg)
¶
Define the CRS of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath |
required |
epsg |
int |
The EPSG code of the CRS to set. |
required |
Source code in leafmap/common.py
def image_set_crs(image, epsg):
"""Define the CRS of an image.
Args:
image (str): The input image filepath
epsg (int): The EPSG code of the CRS to set.
"""
from rasterio.crs import CRS
import rasterio
with rasterio.open(image, "r+") as rds:
rds.crs = CRS.from_epsg(epsg)
image_size(image, **kwargs)
¶
Get the size (width, height) of an image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The input image filepath or URL. |
required |
Returns:
Type | Description |
---|---|
tuple |
A tuple of (width, height). |
Source code in leafmap/common.py
def image_size(image, **kwargs):
"""Get the size (width, height) of an image.
Args:
image (str): The input image filepath or URL.
Returns:
tuple: A tuple of (width, height).
"""
image_check(image)
if isinstance(image, str):
_, client = get_local_tile_layer(image, return_client=True, **kwargs)
else:
client = image
metadata = client.metadata()
return metadata["sourceSizeX"], metadata["sourceSizeY"]
image_to_cog(source, dst_path=None, profile='deflate', **kwargs)
¶
Converts an image to a COG file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
A dataset path, URL or rasterio.io.DatasetReader object. |
required |
dst_path |
str |
An output dataset path or or PathLike object. Defaults to None. |
None |
profile |
str |
COG profile. More at https://cogeotiff.github.io/rio-cogeo/profile. Defaults to "deflate". |
'deflate' |
Exceptions:
Type | Description |
---|---|
ImportError |
If rio-cogeo is not installed. |
FileNotFoundError |
If the source file could not be found. |
Source code in leafmap/common.py
def image_to_cog(source, dst_path=None, profile="deflate", **kwargs):
"""Converts an image to a COG file.
Args:
source (str): A dataset path, URL or rasterio.io.DatasetReader object.
dst_path (str, optional): An output dataset path or or PathLike object. Defaults to None.
profile (str, optional): COG profile. More at https://cogeotiff.github.io/rio-cogeo/profile. Defaults to "deflate".
Raises:
ImportError: If rio-cogeo is not installed.
FileNotFoundError: If the source file could not be found.
"""
try:
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
except ImportError:
raise ImportError(
"The rio-cogeo package is not installed. Please install it with `pip install rio-cogeo` or `conda install rio-cogeo -c conda-forge`."
)
if not source.startswith("http"):
source = check_file_path(source)
if not os.path.exists(source):
raise FileNotFoundError("The provided input file could not be found.")
if dst_path is None:
if not source.startswith("http"):
dst_path = os.path.splitext(source)[0] + "_cog.tif"
else:
dst_path = temp_file_path(extension=".tif")
dst_path = check_file_path(dst_path)
dst_profile = cog_profiles.get(profile)
cog_translate(source, dst_path, dst_profile, **kwargs)
image_to_numpy(image)
¶
Converts an image to a numpy array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
A dataset path, URL or rasterio.io.DatasetReader object. |
required |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the provided file could not be found. |
Returns:
Type | Description |
---|---|
np.array |
A numpy array. |
Source code in leafmap/common.py
def image_to_numpy(image):
"""Converts an image to a numpy array.
Args:
image (str): A dataset path, URL or rasterio.io.DatasetReader object.
Raises:
FileNotFoundError: If the provided file could not be found.
Returns:
np.array: A numpy array.
"""
import rasterio
# from osgeo import gdal
# # ... and suppress errors
# gdal.PushErrorHandler('CPLQuietErrorHandler')
try:
with rasterio.open(image, "r") as ds:
arr = ds.read() # read all raster values
return arr
except Exception as e:
raise Exception(e)
is_arcpy()
¶
Check if arcpy is available.
Returns:
Type | Description |
---|---|
book |
True if arcpy is available, False otherwise. |
Source code in leafmap/common.py
def is_arcpy():
"""Check if arcpy is available.
Returns:
book: True if arcpy is available, False otherwise.
"""
import sys
if "arcpy" in sys.modules:
return True
else:
return False
is_jupyterlite()
¶
Check if the current notebook is running on JupyterLite.
Returns:
Type | Description |
---|---|
book |
True if the notebook is running on JupyterLite. |
Source code in leafmap/common.py
def is_jupyterlite():
"""Check if the current notebook is running on JupyterLite.
Returns:
book: True if the notebook is running on JupyterLite.
"""
import sys
if "pyodide" in sys.modules:
return True
else:
return False
is_on_aws()
¶
Check if the current notebook is running on AWS.
Returns:
Type | Description |
---|---|
bool |
True if the notebook is running on AWS. |
Source code in leafmap/common.py
def is_on_aws():
"""Check if the current notebook is running on AWS.
Returns:
bool: True if the notebook is running on AWS.
"""
import psutil
output = psutil.Process().parent().cmdline()
on_aws = False
for item in output:
if item.endswith(".aws") or "ec2-user" in item:
on_aws = True
return on_aws
is_studio_lab()
¶
Check if the current notebook is running on Studio Lab.
Returns:
Type | Description |
---|---|
bool |
True if the notebook is running on Studio Lab. |
Source code in leafmap/common.py
def is_studio_lab():
"""Check if the current notebook is running on Studio Lab.
Returns:
bool: True if the notebook is running on Studio Lab.
"""
import psutil
output = psutil.Process().parent().cmdline()
on_studio_lab = False
for item in output:
if "studiolab/bin" in item:
on_studio_lab = True
return on_studio_lab
is_tool(name)
¶
Check whether name
is on PATH and marked as executable.
Source code in leafmap/common.py
def is_tool(name):
"""Check whether `name` is on PATH and marked as executable."""
return shutil.which(name) is not None
kml_to_geojson(in_kml, out_geojson=None)
¶
Converts a KML to GeoJSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_kml |
str |
The file path to the input KML. |
required |
out_geojson |
str |
The file path to the output GeoJSON. Defaults to None. |
None |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
The input KML could not be found. |
TypeError |
The output must be a GeoJSON. |
Source code in leafmap/common.py
def kml_to_geojson(in_kml, out_geojson=None):
"""Converts a KML to GeoJSON.
Args:
in_kml (str): The file path to the input KML.
out_geojson (str): The file path to the output GeoJSON. Defaults to None.
Raises:
FileNotFoundError: The input KML could not be found.
TypeError: The output must be a GeoJSON.
"""
import warnings
warnings.filterwarnings("ignore")
in_kml = os.path.abspath(in_kml)
if not os.path