Skip to content

common module

This module contains some common functions for both folium and ipyleaflet.

The_national_map_USGS

The national map is a collection of topological datasets, maintained by the USGS.

It provides an API endpoint which can be used to find downloadable links for the products offered. - Full description of datasets available can retrieved. This consists of metadata such as detail description and publication dates. - A wide range of dataformats are available

This class is a tiny wrapper to find and download files using the API.

More complete documentation for the API can be found at https://apps.nationalmap.gov/tnmaccess/#/

Source code in leafmap/common.py
class The_national_map_USGS:
    """
    The national map is a collection of topological datasets, maintained by the USGS.

    It provides an API endpoint which can be used to find downloadable links for the products offered.
        - Full description of datasets available can retrieved.
          This consists of metadata such as detail description and publication dates.
        - A wide range of dataformats are available

    This class is a tiny wrapper to find and download files using the API.

    More complete documentation for the API can be found at
        https://apps.nationalmap.gov/tnmaccess/#/
    """

    def __init__(self):
        self.api_endpoint = r"https://tnmaccess.nationalmap.gov/api/v1/"
        self.DS = self.datasets_full

    @property
    def datasets_full(self) -> list:
        """
        Full description of datasets provided.
        Returns a JSON or empty list.
        """
        link = f"{self.api_endpoint}datasets?"
        try:
            return requests.get(link).json()
        except Exception:
            print(f"Failed to load metadata from The National Map API endpoint\n{link}")
            return []

    @property
    def prodFormats(self) -> list:
        """
        Return all datatypes available in any of the collections.
        Note that "All" is only peculiar to one dataset.
        """
        return set(i["displayName"] for ds in self.DS for i in ds["formats"])

    @property
    def datasets(self) -> list:
        """
        Returns a list of dataset tags (most common human readable self description for specific datasets).
        """
        return set(y["sbDatasetTag"] for x in self.DS for y in x["tags"])

    def parse_region(self, region, geopandas_args={}) -> list:
        """

        Translate a Vector dataset to its bounding box.

        Args:
            region (str | list): an URL|filepath to a vector dataset to a polygon
            geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
                Used for reading a region URL|filepath.
        """
        import geopandas as gpd

        if isinstance(region, str):
            if region.startswith("http"):
                region = github_raw_url(region)
                region = download_file(region)
            elif not os.path.exists(region):
                raise ValueError("region must be a path or a URL to a vector dataset.")

            roi = gpd.read_file(region, **geopandas_args)
            roi = roi.to_crs(epsg=4326)
            return roi.total_bounds
        return region

    def download_tiles(
        self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
    ) -> None:
        """

        Download the US National Elevation Datasets (NED) for a region.

        Args:
            region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
                Alternatively you could use API parameters such as polygon or bbox.
            out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
            download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
            geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
                Used for reading a region URL|filepath.
            API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
                Exposes most of the documented API. Defaults to {}.

        Returns:
            None
        """

        if os.environ.get("USE_MKDOCS") is not None:
            return

        if out_dir is None:
            out_dir = os.getcwd()
        else:
            out_dir = os.path.abspath(out_dir)

        tiles = self.find_tiles(
            region, return_type="list", geopandas_args=geopandas_args, API=API
        )
        T = len(tiles)
        errors = 0
        done = 0

        for i, link in enumerate(tiles):
            file_name = os.path.basename(link)
            out_name = os.path.join(out_dir, file_name)
            if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
                print(f"Downloading {i+1} of {T}: {file_name}")
            try:
                download_file(link, out_name, **download_args)
                done += 1
            except KeyboardInterrupt:
                print("Cancelled download")
                break
            except Exception:
                errors += 1
                print(f"Failed to download {i+1} of {T}: {file_name}")

        print(
            f"{done} Downloads completed, {errors} downloads failed, {T} files available"
        )
        return

    def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
        """
        Find a list of downloadable files.

        Args:
            region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
                Alternatively you could use API parameters such as polygon or bbox.
            out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
            return_type (str): list | dict. Defaults to list. Changes the return output type and content.
            geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
                Used for reading a region URL|filepath.
            API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
                Exposes most of the documented API parameters. Defaults to {}.

        Returns:
            list: A list of download_urls.
            dict: A dictionary with urls and related metadata
        """
        assert region or API, "Provide a region or use the API"

        if region:
            API["bbox"] = self.parse_region(region, geopandas_args)

        results = self.find_details(**API)
        if return_type == "list":
            return [i["downloadURL"] for i in results.get("items")]
        return results

    def find_details(
        self,
        bbox: List[float] = None,
        polygon: List[Tuple[float, float]] = None,
        datasets: str = None,
        prodFormats: str = None,
        prodExtents: str = None,
        q: str = None,
        dateType: str = None,
        start: str = None,
        end: str = None,
        offset: int = 0,
        max: int = None,
        outputFormat: str = "JSON",
        polyType: str = None,
        polyCode: str = None,
        extentQuery: int = None,
    ) -> Dict:
        """
        Possible search parameters (kwargs) support by API

        Parameter               Values
            Description
        ---------------------------------------------------------------------------------------------------
        bbox                    'minx, miny, maxx, maxy'
            Geographic longitude/latitude values expressed in  decimal degrees in a comma-delimited list.
        polygon                 '[x,y x,y x,y x,y x,y]'
            Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
        datasets                See: Datasets (Optional)
            Dataset tag name (sbDatasetTag)
            From https://apps.nationalmap.gov/tnmaccess/#/product
        prodFormats             See: Product Formats (Optional)
            Dataset-specific format

        prodExtents             See: Product Extents (Optional)
            Dataset-specific extent
        q                       free text
            Text input which can be used to filter by product titles and text descriptions.
        dateType                dateCreated | lastUpdated | Publication
            Type of date to search by.
        start                   'YYYY-MM-DD'
            Start date
        end                     'YYYY-MM-DD'
            End date (required if start date is provided)
        offset                  integer
            Offset into paginated results - default=0
        max                     integer
            Number of results returned
        outputFormat            JSON | CSV | pjson
            Default=JSON
        polyType                state | huc2 | huc4 | huc8
            Well Known Polygon Type. Use this parameter to deliver data by state or HUC
            (hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
        polyCode                state FIPS code or huc number
            Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
        extentQuery             integer
            A Polygon code in the science base system, typically from an uploaded shapefile
        """

        try:
            # call locals before creating new locals
            used_locals = {k: v for k, v in locals().items() if v and k != "self"}

            # Parsing
            if polygon:
                used_locals["polygon"] = ",".join(
                    " ".join(map(str, point)) for point in polygon
                )
            if bbox:
                used_locals["bbox"] = str(bbox)[1:-1]

            if max:
                max += 2

            # Fetch response
            response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
            if response.status_code // 100 == 2:
                return response.json()
            else:
                # Parameter validation handled by API endpoint error responses
                print(response.json())
            return {}
        except Exception as e:
            print(e)
            return {}

datasets: list property readonly

Returns a list of dataset tags (most common human readable self description for specific datasets).

datasets_full: list property readonly

Full description of datasets provided. Returns a JSON or empty list.

prodFormats: list property readonly

Return all datatypes available in any of the collections. Note that "All" is only peculiar to one dataset.

download_tiles(self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={})

Download the US National Elevation Datasets (NED) for a region.

Parameters:

Name Type Description Default
region str | list

An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox.

None
out_dir str

The directory to download the files to. Defaults to None, which uses the current working directory.

None
download_args dict

A dictionary of arguments to pass to the download_file function. Defaults to {}.

{}
geopandas_args dict

A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath.

{}
API dict

A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API. Defaults to {}.

{}

Returns:

Type Description
None

None

Source code in leafmap/common.py
def download_tiles(
    self, region=None, out_dir=None, download_args={}, geopandas_args={}, API={}
) -> None:
    """

    Download the US National Elevation Datasets (NED) for a region.

    Args:
        region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
            Alternatively you could use API parameters such as polygon or bbox.
        out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
        download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
        geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
            Used for reading a region URL|filepath.
        API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
            Exposes most of the documented API. Defaults to {}.

    Returns:
        None
    """

    if os.environ.get("USE_MKDOCS") is not None:
        return

    if out_dir is None:
        out_dir = os.getcwd()
    else:
        out_dir = os.path.abspath(out_dir)

    tiles = self.find_tiles(
        region, return_type="list", geopandas_args=geopandas_args, API=API
    )
    T = len(tiles)
    errors = 0
    done = 0

    for i, link in enumerate(tiles):
        file_name = os.path.basename(link)
        out_name = os.path.join(out_dir, file_name)
        if i < 5 or (i < 50 and not (i % 5)) or not (i % 20):
            print(f"Downloading {i+1} of {T}: {file_name}")
        try:
            download_file(link, out_name, **download_args)
            done += 1
        except KeyboardInterrupt:
            print("Cancelled download")
            break
        except Exception:
            errors += 1
            print(f"Failed to download {i+1} of {T}: {file_name}")

    print(
        f"{done} Downloads completed, {errors} downloads failed, {T} files available"
    )
    return

find_details(self, bbox=None, polygon=None, datasets=None, prodFormats=None, prodExtents=None, q=None, dateType=None, start=None, end=None, offset=0, max=None, outputFormat='JSON', polyType=None, polyCode=None, extentQuery=None)

Possible search parameters (kwargs) support by API

Parameter Values Description


bbox 'minx, miny, maxx, maxy' Geographic longitude/latitude values expressed in decimal degrees in a comma-delimited list. polygon '[x,y x,y x,y x,y x,y]' Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list. datasets See: Datasets (Optional) Dataset tag name (sbDatasetTag) From https://apps.nationalmap.gov/tnmaccess/#/product prodFormats See: Product Formats (Optional) Dataset-specific format

prodExtents See: Product Extents (Optional) Dataset-specific extent q free text Text input which can be used to filter by product titles and text descriptions. dateType dateCreated | lastUpdated | Publication Type of date to search by. start 'YYYY-MM-DD' Start date end 'YYYY-MM-DD' End date (required if start date is provided) offset integer Offset into paginated results - default=0 max integer Number of results returned outputFormat JSON | CSV | pjson Default=JSON polyType state | huc2 | huc4 | huc8 Well Known Polygon Type. Use this parameter to deliver data by state or HUC (hydrologic unit codes defined by the Watershed Boundary Dataset/WBD) polyCode state FIPS code or huc number Well Known Polygon Code. This value needs to coordinate with the polyType parameter. extentQuery integer A Polygon code in the science base system, typically from an uploaded shapefile

Source code in leafmap/common.py
def find_details(
    self,
    bbox: List[float] = None,
    polygon: List[Tuple[float, float]] = None,
    datasets: str = None,
    prodFormats: str = None,
    prodExtents: str = None,
    q: str = None,
    dateType: str = None,
    start: str = None,
    end: str = None,
    offset: int = 0,
    max: int = None,
    outputFormat: str = "JSON",
    polyType: str = None,
    polyCode: str = None,
    extentQuery: int = None,
) -> Dict:
    """
    Possible search parameters (kwargs) support by API

    Parameter               Values
        Description
    ---------------------------------------------------------------------------------------------------
    bbox                    'minx, miny, maxx, maxy'
        Geographic longitude/latitude values expressed in  decimal degrees in a comma-delimited list.
    polygon                 '[x,y x,y x,y x,y x,y]'
        Polygon, longitude/latitude values expressed in decimal degrees in a space-delimited list.
    datasets                See: Datasets (Optional)
        Dataset tag name (sbDatasetTag)
        From https://apps.nationalmap.gov/tnmaccess/#/product
    prodFormats             See: Product Formats (Optional)
        Dataset-specific format

    prodExtents             See: Product Extents (Optional)
        Dataset-specific extent
    q                       free text
        Text input which can be used to filter by product titles and text descriptions.
    dateType                dateCreated | lastUpdated | Publication
        Type of date to search by.
    start                   'YYYY-MM-DD'
        Start date
    end                     'YYYY-MM-DD'
        End date (required if start date is provided)
    offset                  integer
        Offset into paginated results - default=0
    max                     integer
        Number of results returned
    outputFormat            JSON | CSV | pjson
        Default=JSON
    polyType                state | huc2 | huc4 | huc8
        Well Known Polygon Type. Use this parameter to deliver data by state or HUC
        (hydrologic unit codes defined by the Watershed Boundary Dataset/WBD)
    polyCode                state FIPS code or huc number
        Well Known Polygon Code. This value needs to coordinate with the polyType parameter.
    extentQuery             integer
        A Polygon code in the science base system, typically from an uploaded shapefile
    """

    try:
        # call locals before creating new locals
        used_locals = {k: v for k, v in locals().items() if v and k != "self"}

        # Parsing
        if polygon:
            used_locals["polygon"] = ",".join(
                " ".join(map(str, point)) for point in polygon
            )
        if bbox:
            used_locals["bbox"] = str(bbox)[1:-1]

        if max:
            max += 2

        # Fetch response
        response = requests.get(f"{self.api_endpoint}products?", params=used_locals)
        if response.status_code // 100 == 2:
            return response.json()
        else:
            # Parameter validation handled by API endpoint error responses
            print(response.json())
        return {}
    except Exception as e:
        print(e)
        return {}

find_tiles(self, region=None, return_type='list', geopandas_args={}, API={})

Find a list of downloadable files.

Parameters:

Name Type Description Default
region str | list

An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox.

None
out_dir str

The directory to download the files to. Defaults to None, which uses the current working directory.

required
return_type str

list | dict. Defaults to list. Changes the return output type and content.

'list'
geopandas_args dict

A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath.

{}
API dict

A dictionary of arguments to pass to the self.find_details() function. Exposes most of the documented API parameters. Defaults to {}.

{}

Returns:

Type Description
list

A list of download_urls. dict: A dictionary with urls and related metadata

Source code in leafmap/common.py
def find_tiles(self, region=None, return_type="list", geopandas_args={}, API={}):
    """
    Find a list of downloadable files.

    Args:
        region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
            Alternatively you could use API parameters such as polygon or bbox.
        out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
        return_type (str): list | dict. Defaults to list. Changes the return output type and content.
        geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
            Used for reading a region URL|filepath.
        API (dict, optional): A dictionary of arguments to pass to the self.find_details() function.
            Exposes most of the documented API parameters. Defaults to {}.

    Returns:
        list: A list of download_urls.
        dict: A dictionary with urls and related metadata
    """
    assert region or API, "Provide a region or use the API"

    if region:
        API["bbox"] = self.parse_region(region, geopandas_args)

    results = self.find_details(**API)
    if return_type == "list":
        return [i["downloadURL"] for i in results.get("items")]
    return results

parse_region(self, region, geopandas_args={})

Translate a Vector dataset to its bounding box.

Parameters:

Name Type Description Default
region str | list

an URL|filepath to a vector dataset to a polygon

required
geopandas_reader_args dict

A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath.

required
Source code in leafmap/common.py
def parse_region(self, region, geopandas_args={}) -> list:
    """

    Translate a Vector dataset to its bounding box.

    Args:
        region (str | list): an URL|filepath to a vector dataset to a polygon
        geopandas_reader_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
            Used for reading a region URL|filepath.
    """
    import geopandas as gpd

    if isinstance(region, str):
        if region.startswith("http"):
            region = github_raw_url(region)
            region = download_file(region)
        elif not os.path.exists(region):
            raise ValueError("region must be a path or a URL to a vector dataset.")

        roi = gpd.read_file(region, **geopandas_args)
        roi = roi.to_crs(epsg=4326)
        return roi.total_bounds
    return region

WhiteboxTools (WhiteboxTools)

This class inherits the whitebox WhiteboxTools class.

Source code in leafmap/common.py
class WhiteboxTools(whitebox.WhiteboxTools):
    """This class inherits the whitebox WhiteboxTools class."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

__install_from_github(url)

Install a package from a GitHub repository.

Parameters:

Name Type Description Default
url str

The URL of the GitHub repository.

required
Source code in leafmap/common.py
def __install_from_github(url: str):
    """Install a package from a GitHub repository.

    Args:
        url (str): The URL of the GitHub repository.
    """

    try:
        download_dir = os.path.join(os.path.expanduser("~"), "Downloads")
        if not os.path.exists(download_dir):
            os.makedirs(download_dir)

        repo_name = os.path.basename(url)
        zip_url = os.path.join(url, "archive/master.zip")
        filename = repo_name + "-master.zip"
        download_from_url(
            url=zip_url, out_file_name=filename, out_dir=download_dir, unzip=True
        )

        pkg_dir = os.path.join(download_dir, repo_name + "-master")
        pkg_name = os.path.basename(url)
        work_dir = os.getcwd()
        os.chdir(pkg_dir)
        print("Installing {}...".format(pkg_name))
        cmd = "pip install ."
        os.system(cmd)
        os.chdir(work_dir)
        print("{} has been installed successfully.".format(pkg_name))
        # print("\nPlease comment out 'install_from_github()' and restart the kernel to take effect:\nJupyter menu -> Kernel -> Restart & Clear Output")

    except Exception as e:
        raise Exception(e)

add_crs(filename, epsg)

Add a CRS to a raster dataset.

Parameters:

Name Type Description Default
filename str

The filename of the raster dataset.

required
epsg int | str

The EPSG code of the CRS.

required
Source code in leafmap/common.py
def add_crs(filename, epsg):
    """Add a CRS to a raster dataset.

    Args:
        filename (str): The filename of the raster dataset.
        epsg (int | str): The EPSG code of the CRS.

    """
    try:
        import rasterio
    except ImportError:
        raise ImportError(
            "rasterio is required for adding a CRS to a raster. Please install it using 'pip install rasterio'."
        )

    if not os.path.exists(filename):
        raise ValueError("filename must exist.")

    if isinstance(epsg, int):
        epsg = f"EPSG:{epsg}"
    elif isinstance(epsg, str):
        epsg = "EPSG:" + epsg
    else:
        raise ValueError("epsg must be an integer or string.")

    crs = rasterio.crs.CRS({"init": epsg})
    with rasterio.open(filename, mode="r+") as src:
        src.crs = crs

add_image_to_gif(in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False)

Adds an image logo to a GIF image.

Parameters:

Name Type Description Default
in_gif str

Input file path to the GIF image.

required
out_gif str

Output file path to the GIF image.

required
in_image str

Input file path to the image.

required
xy tuple

Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.

None
image_size tuple

Resize image. Defaults to (80, 80).

(80, 80)
circle_mask bool

Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False.

False
Source code in leafmap/common.py
def add_image_to_gif(
    in_gif, out_gif, in_image, xy=None, image_size=(80, 80), circle_mask=False
):
    """Adds an image logo to a GIF image.

    Args:
        in_gif (str): Input file path to the GIF image.
        out_gif (str): Output file path to the GIF image.
        in_image (str): Input file path to the image.
        xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
        image_size (tuple, optional): Resize image. Defaults to (80, 80).
        circle_mask (bool, optional): Whether to apply a circle mask to the image. This only works with non-png images. Defaults to False.
    """
    import io

    from PIL import Image, ImageDraw, ImageSequence

    warnings.simplefilter("ignore")

    in_gif = os.path.abspath(in_gif)

    is_url = False
    if in_image.startswith("http"):
        is_url = True

    if not os.path.exists(in_gif):
        print("The input gif file does not exist.")
        return

    if (not is_url) and (not os.path.exists(in_image)):
        print("The provided logo file does not exist.")
        return

    out_dir = check_dir((os.path.dirname(out_gif)))
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    try:
        gif = Image.open(in_gif)
    except Exception as e:
        print("An error occurred while opening the image.")
        print(e)
        return

    logo_raw_image = None
    try:
        if in_image.startswith("http"):
            logo_raw_image = open_image_from_url(in_image)
        else:
            in_image = os.path.abspath(in_image)
            logo_raw_image = Image.open(in_image)
    except Exception as e:
        print(e)

    logo_raw_size = logo_raw_image.size

    ratio = max(
        logo_raw_size[0] / image_size[0],
        logo_raw_size[1] / image_size[1],
    )
    image_resize = (int(logo_raw_size[0] / ratio), int(logo_raw_size[1] / ratio))
    image_size = min(logo_raw_size[0], image_size[0]), min(
        logo_raw_size[1], image_size[1]
    )

    logo_image = logo_raw_image.convert("RGBA")
    logo_image.thumbnail(image_size, Image.ANTIALIAS)

    gif_width, gif_height = gif.size
    mask_im = None

    if circle_mask:
        mask_im = Image.new("L", image_size, 0)
        draw = ImageDraw.Draw(mask_im)
        draw.ellipse((0, 0, image_size[0], image_size[1]), fill=255)

    if has_transparency(logo_raw_image):
        mask_im = logo_image.copy()

    if xy is None:
        # default logo location is 5% width and 5% height of the image.
        delta = 10
        xy = (gif_width - image_resize[0] - delta, gif_height - image_resize[1] - delta)
        # xy = (int(0.05 * gif_width), int(0.05 * gif_height))
    elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
        print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
        return
    elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
        x, y = xy
        if (x > 0) and (x < gif_width) and (y > 0) and (y < gif_height):
            pass
        else:
            print(
                "xy is out of bounds. x must be within [0, {}], and y must be within [0, {}]".format(
                    gif_width, gif_height
                )
            )
            return
    elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
        x, y = xy
        if ("%" in x) and ("%" in y):
            try:
                x = int(float(x.replace("%", "")) / 100.0 * gif_width)
                y = int(float(y.replace("%", "")) / 100.0 * gif_height)
                xy = (x, y)
            except Exception:
                raise Exception(
                    "The specified xy is invalid. It must be formatted like this ('10%', '10%')"
                )

    else:
        raise Exception(
            "The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
        )

    try:
        frames = []
        for _, frame in enumerate(ImageSequence.Iterator(gif)):
            frame = frame.convert("RGBA")
            frame.paste(logo_image, xy, mask_im)

            b = io.BytesIO()
            frame.save(b, format="GIF")
            frame = Image.open(b)
            frames.append(frame)

        frames[0].save(out_gif, save_all=True, append_images=frames[1:])
    except Exception as e:
        print(e)

add_mask_to_image(image, mask, output, color='red')

Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.

Parameters:

Name Type Description Default
image str

A local path or HTTP URL to an image.

required
mask str

A local path or HTTP URL to a binary mask.

required
output str

A local path to the output image.

required
color str

Color of the mask. Defaults to 'red'.

'red'

Exceptions:

Type Description
ImportError

If rasterio and detectron2 are not installed.

Source code in leafmap/common.py
def add_mask_to_image(image, mask, output, color="red"):
    """Overlay a binary mask (e.g., roads, building footprints, etc) on an image. Credits to Xingjian Shi for the sample code.

    Args:
        image (str): A local path or HTTP URL to an image.
        mask (str): A local path or HTTP URL to a binary mask.
        output (str): A local path to the output image.
        color (str, optional): Color of the mask. Defaults to 'red'.

    Raises:
        ImportError: If rasterio and detectron2 are not installed.
    """
    try:
        import rasterio
        from detectron2.utils.visualizer import Visualizer
        from PIL import Image
    except ImportError:
        raise ImportError(
            "Please install rasterio and detectron2 to use this function. See https://detectron2.readthedocs.io/en/latest/tutorials/install.html"
        )

    ds = rasterio.open(image)
    image_arr = ds.read()

    mask_arr = rasterio.open(mask).read()

    vis = Visualizer(image_arr.transpose((1, 2, 0)))
    vis.draw_binary_mask(mask_arr[0] > 0, color=color)

    out_arr = Image.fromarray(vis.get_output().get_image())

    out_arr.save(output)

    if ds.crs is not None:
        numpy_to_cog(output, output, profile=image)

add_progress_bar_to_gif(in_gif, out_gif, progress_bar_color='blue', progress_bar_height=5, duration=100, loop=0)

Adds a progress bar to a GIF image.

Parameters:

Name Type Description Default
in_gif str

The file path to the input GIF image.

required
out_gif str

The file path to the output GIF image.

required
progress_bar_color str

Color for the progress bar. Defaults to 'white'.

'blue'
progress_bar_height int

Height of the progress bar. Defaults to 5.

5
duration int

controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100.

100
loop int

controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.

0
Source code in leafmap/common.py
def add_progress_bar_to_gif(
    in_gif,
    out_gif,
    progress_bar_color="blue",
    progress_bar_height=5,
    duration=100,
    loop=0,
):
    """Adds a progress bar to a GIF image.

    Args:
        in_gif (str): The file path to the input GIF image.
        out_gif (str): The file path to the output GIF image.
        progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
        progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
        duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation. Defaults to 100.
        loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.

    """
    import io

    from PIL import Image, ImageDraw, ImageSequence

    warnings.simplefilter("ignore")

    in_gif = os.path.abspath(in_gif)
    out_gif = os.path.abspath(out_gif)

    if not os.path.exists(in_gif):
        print("The input gif file does not exist.")
        return

    if not os.path.exists(os.path.dirname(out_gif)):
        os.makedirs(os.path.dirname(out_gif))

    progress_bar_color = check_color(progress_bar_color)

    try:
        image = Image.open(in_gif)
    except Exception as e:
        raise Exception("An error occurred while opening the gif.")

    count = image.n_frames
    W, H = image.size
    progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
    progress_bar_shapes = [
        [(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
    ]

    try:
        frames = []
        # Loop over each frame in the animated image
        for index, frame in enumerate(ImageSequence.Iterator(image)):
            # Draw the text on the frame
            frame = frame.convert("RGB")
            draw = ImageDraw.Draw(frame)
            # w, h = draw.textsize(text[index])
            draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
            del draw

            b = io.BytesIO()
            frame.save(b, format="GIF")
            frame = Image.open(b)

            frames.append(frame)
        # https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
        # Save the frames as a new image

        frames[0].save(
            out_gif,
            save_all=True,
            append_images=frames[1:],
            duration=duration,
            loop=loop,
            optimize=True,
        )
    except Exception as e:
        raise Exception(e)

add_text_to_gif(in_gif, out_gif, xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='#000000', add_progress_bar=True, progress_bar_color='white', progress_bar_height=5, duration=100, loop=0)

Adds animated text to a GIF image.

Parameters:

Name Type Description Default
in_gif str

The file path to the input GIF image.

required
out_gif str

The file path to the output GIF image.

required
xy tuple

Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.

None
text_sequence int, str, list

Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None.

None
font_type str

Font type. Defaults to "arial.ttf".

'arial.ttf'
font_size int

Font size. Defaults to 20.

20
font_color str

Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff'). Defaults to '#000000'.

'#000000'
add_progress_bar bool

Whether to add a progress bar at the bottom of the GIF. Defaults to True.

True
progress_bar_color str

Color for the progress bar. Defaults to 'white'.

'white'
progress_bar_height int

Height of the progress bar. Defaults to 5.

5
duration int

controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100.

100
loop int

controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.

0
Source code in leafmap/common.py
def add_text_to_gif(
    in_gif,
    out_gif,
    xy=None,
    text_sequence=None,
    font_type="arial.ttf",
    font_size=20,
    font_color="#000000",
    add_progress_bar=True,
    progress_bar_color="white",
    progress_bar_height=5,
    duration=100,
    loop=0,
):
    """Adds animated text to a GIF image.

    Args:
        in_gif (str): The file path to the input GIF image.
        out_gif (str): The file path to the output GIF image.
        xy (tuple, optional): Top left corner of the text. It can be formatted like this: (10, 10) or ('15%', '25%'). Defaults to None.
        text_sequence (int, str, list, optional): Text to be drawn. It can be an integer number, a string, or a list of strings. Defaults to None.
        font_type (str, optional): Font type. Defaults to "arial.ttf".
        font_size (int, optional): Font size. Defaults to 20.
        font_color (str, optional): Font color. It can be a string (e.g., 'red'), rgb tuple (e.g., (255, 127, 0)), or hex code (e.g., '#ff00ff').  Defaults to '#000000'.
        add_progress_bar (bool, optional): Whether to add a progress bar at the bottom of the GIF. Defaults to True.
        progress_bar_color (str, optional): Color for the progress bar. Defaults to 'white'.
        progress_bar_height (int, optional): Height of the progress bar. Defaults to 5.
        duration (int, optional): controls how long each frame will be displayed for, in milliseconds. It is the inverse of the frame rate. Setting it to 100 milliseconds gives 10 frames per second. You can decrease the duration to give a smoother animation.. Defaults to 100.
        loop (int, optional): controls how many times the animation repeats. The default, 1, means that the animation will play once and then stop (displaying the last frame). A value of 0 means that the animation will repeat forever. Defaults to 0.

    """
    import io

    import pkg_resources
    from PIL import Image, ImageDraw, ImageFont, ImageSequence

    warnings.simplefilter("ignore")
    pkg_dir = os.path.dirname(pkg_resources.resource_filename("leafmap", "leafmap.py"))
    default_font = os.path.join(pkg_dir, "data/fonts/arial.ttf")

    in_gif = os.path.abspath(in_gif)
    out_gif = os.path.abspath(out_gif)

    if not os.path.exists(in_gif):
        print("The input gif file does not exist.")
        return

    if not os.path.exists(os.path.dirname(out_gif)):
        os.makedirs(os.path.dirname(out_gif))

    if font_type == "arial.ttf":
        font = ImageFont.truetype(default_font, font_size)
    elif font_type == "alibaba.otf":
        default_font = os.path.join(pkg_dir, "data/fonts/alibaba.otf")
        font = ImageFont.truetype(default_font, font_size)
    else:
        try:
            font_list = system_fonts(show_full_path=True)
            font_names = [os.path.basename(f) for f in font_list]
            if (font_type in font_list) or (font_type in font_names):
                font = ImageFont.truetype(font_type, font_size)
            else:
                print(
                    "The specified font type could not be found on your system. Using the default font instead."
                )
                font = ImageFont.truetype(default_font, font_size)
        except Exception as e:
            print(e)
            font = ImageFont.truetype(default_font, font_size)

    color = check_color(font_color)
    progress_bar_color = check_color(progress_bar_color)

    try:
        image = Image.open(in_gif)
    except Exception as e:
        print("An error occurred while opening the gif.")
        print(e)
        return

    count = image.n_frames
    W, H = image.size
    progress_bar_widths = [i * 1.0 / count * W for i in range(1, count + 1)]
    progress_bar_shapes = [
        [(0, H - progress_bar_height), (x, H)] for x in progress_bar_widths
    ]

    if xy is None:
        # default text location is 5% width and 5% height of the image.
        xy = (int(0.05 * W), int(0.05 * H))
    elif (xy is not None) and (not isinstance(xy, tuple)) and (len(xy) == 2):
        print("xy must be a tuple, e.g., (10, 10), ('10%', '10%')")
        return
    elif all(isinstance(item, int) for item in xy) and (len(xy) == 2):
        x, y = xy
        if (x > 0) and (x < W) and (y > 0) and (y < H):
            pass
        else:
            print(
                f"xy is out of bounds. x must be within [0, {W}], and y must be within [0, {H}]"
            )
            return
    elif all(isinstance(item, str) for item in xy) and (len(xy) == 2):
        x, y = xy
        if ("%" in x) and ("%" in y):
            try:
                x = int(float(x.replace("%", "")) / 100.0 * W)
                y = int(float(y.replace("%", "")) / 100.0 * H)
                xy = (x, y)
            except Exception:
                raise Exception(
                    "The specified xy is invalid. It must be formatted like this ('10%', '10%')"
                )
    else:
        print(
            "The specified xy is invalid. It must be formatted like this: (10, 10) or ('10%', '10%')"
        )
        return

    if text_sequence is None:
        text = [str(x) for x in range(1, count + 1)]
    elif isinstance(text_sequence, int):
        text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
    elif isinstance(text_sequence, str):
        try:
            text_sequence = int(text_sequence)
            text = [str(x) for x in range(text_sequence, text_sequence + count + 1)]
        except Exception:
            text = [text_sequence] * count
    elif isinstance(text_sequence, list) and len(text_sequence) != count:
        print(
            f"The length of the text sequence must be equal to the number ({count}) of frames in the gif."
        )
        return
    else:
        text = [str(x) for x in text_sequence]

    try:
        frames = []
        # Loop over each frame in the animated image
        for index, frame in enumerate(ImageSequence.Iterator(image)):
            # Draw the text on the frame
            frame = frame.convert("RGB")
            draw = ImageDraw.Draw(frame)
            # w, h = draw.textsize(text[index])
            draw.text(xy, text[index], font=font, fill=color)
            if add_progress_bar:
                draw.rectangle(progress_bar_shapes[index], fill=progress_bar_color)
            del draw

            b = io.BytesIO()
            frame.save(b, format="GIF")
            frame = Image.open(b)

            frames.append(frame)
        # https://www.pythoninformer.com/python-libraries/pillow/creating-animated-gif/
        # Save the frames as a new image

        frames[0].save(
            out_gif,
            save_all=True,
            append_images=frames[1:],
            duration=duration,
            loop=loop,
            optimize=True,
        )
    except Exception as e:
        print(e)

adjust_longitude(in_fc)

Adjusts longitude if it is less than -180 or greater than 180.

Parameters:

Name Type Description Default
in_fc dict

The input dictionary containing coordinates.

required

Returns:

Type Description
dict

A dictionary containing the converted longitudes

Source code in leafmap/common.py
def adjust_longitude(in_fc):
    """Adjusts longitude if it is less than -180 or greater than 180.

    Args:
        in_fc (dict): The input dictionary containing coordinates.

    Returns:
        dict: A dictionary containing the converted longitudes
    """
    try:
        keys = in_fc.keys()

        if "geometry" in keys:
            coordinates = in_fc["geometry"]["coordinates"]

            if in_fc["geometry"]["type"] == "Point":
                longitude = coordinates[0]
                if longitude < -180:
                    longitude = 360 + longitude
                elif longitude > 180:
                    longitude = longitude - 360
                in_fc["geometry"]["coordinates"][0] = longitude

            elif in_fc["geometry"]["type"] == "Polygon":
                for index1, item in enumerate(coordinates):
                    for index2, element in enumerate(item):
                        longitude = element[0]
                        if longitude < -180:
                            longitude = 360 + longitude
                        elif longitude > 180:
                            longitude = longitude - 360
                        in_fc["geometry"]["coordinates"][index1][index2][0] = longitude

            elif in_fc["geometry"]["type"] == "LineString":
                for index, element in enumerate(coordinates):
                    longitude = element[0]
                    if longitude < -180:
                        longitude = 360 + longitude
                    elif longitude > 180:
                        longitude = longitude - 360
                    in_fc["geometry"]["coordinates"][index][0] = longitude

        elif "type" in keys:
            coordinates = in_fc["coordinates"]

            if in_fc["type"] == "Point":
                longitude = coordinates[0]
                if longitude < -180:
                    longitude = 360 + longitude
                elif longitude > 180:
                    longitude = longitude - 360
                in_fc["coordinates"][0] = longitude

            elif in_fc["type"] == "Polygon":
                for index1, item in enumerate(coordinates):
                    for index2, element in enumerate(item):
                        longitude = element[0]
                        if longitude < -180:
                            longitude = 360 + longitude
                        elif longitude > 180:
                            longitude = longitude - 360
                        in_fc["coordinates"][index1][index2][0] = longitude

            elif in_fc["type"] == "LineString":
                for index, element in enumerate(coordinates):
                    longitude = element[0]
                    if longitude < -180:
                        longitude = 360 + longitude
                    elif longitude > 180:
                        longitude = longitude - 360
                    in_fc["coordinates"][index][0] = longitude

        return in_fc

    except Exception as e:
        print(e)
        return None

arc_active_map()

Get the active map in ArcGIS Pro.

Returns:

Type Description
arcpy.Map

The active map in ArcGIS Pro.

Source code in leafmap/common.py
def arc_active_map():
    """Get the active map in ArcGIS Pro.

    Returns:
        arcpy.Map: The active map in ArcGIS Pro.
    """
    if is_arcpy():
        import arcpy

        aprx = arcpy.mp.ArcGISProject("CURRENT")
        m = aprx.activeMap
        return m
    else:
        return None

arc_active_view()

Get the active view in ArcGIS Pro.

Returns:

Type Description
arcpy.MapView

The active view in ArcGIS Pro.

Source code in leafmap/common.py
def arc_active_view():
    """Get the active view in ArcGIS Pro.

    Returns:
        arcpy.MapView: The active view in ArcGIS Pro.
    """
    if is_arcpy():
        import arcpy

        aprx = arcpy.mp.ArcGISProject("CURRENT")
        view = aprx.activeView
        return view
    else:
        return None

arc_add_layer(url, name=None, shown=True, opacity=1.0)

Add a layer to the active map in ArcGIS Pro.

Parameters:

Name Type Description Default
url str

The URL of the tile layer to add.

required
name str

The name of the layer. Defaults to None.

None
shown bool

Whether the layer is shown. Defaults to True.

True
opacity float

The opacity of the layer. Defaults to 1.0.

1.0
Source code in leafmap/common.py
def arc_add_layer(url, name=None, shown=True, opacity=1.0):
    """Add a layer to the active map in ArcGIS Pro.

    Args:
        url (str): The URL of the tile layer to add.
        name (str, optional): The name of the layer. Defaults to None.
        shown (bool, optional): Whether the layer is shown. Defaults to True.
        opacity (float, optional): The opacity of the layer. Defaults to 1.0.
    """
    if is_arcpy():
        m = arc_active_map()
        if m is not None:
            m.addDataFromPath(url)
            if isinstance(name, str):
                layers = m.listLayers("Tiled service layer")
                if len(layers) > 0:
                    layer = layers[0]
                    layer.name = name
                    layer.visible = shown
                    layer.transparency = 100 - (opacity * 100)

arc_zoom_to_bounds(bounds)

Zoom to a bounding box.

Parameters:

Name Type Description Default
bounds list

The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)].

required

Exceptions:

Type Description
ValueError

description

Source code in leafmap/common.py
def arc_zoom_to_bounds(bounds):
    """Zoom to a bounding box.

    Args:
        bounds (list): The bounding box to zoom to in the form [xmin, ymin, xmax, ymax] or [(ymin, xmin), (ymax, xmax)].

    Raises:
        ValueError: _description_
    """

    if len(bounds) == 4:
        xmin, ymin, xmax, ymax = bounds
    elif len(bounds) == 2:
        (ymin, xmin), (ymax, xmax) = bounds
    else:
        raise ValueError("bounds must be a tuple of length 2 or 4.")

    arc_zoom_to_extent(xmin, ymin, xmax, ymax)

arc_zoom_to_extent(xmin, ymin, xmax, ymax)

Zoom to an extent in ArcGIS Pro.

Parameters:

Name Type Description Default
xmin float

The minimum x value of the extent.

required
ymin float

The minimum y value of the extent.

required
xmax float

The maximum x value of the extent.

required
ymax float

The maximum y value of the extent.

required
Source code in leafmap/common.py
def arc_zoom_to_extent(xmin, ymin, xmax, ymax):
    """Zoom to an extent in ArcGIS Pro.

    Args:
        xmin (float): The minimum x value of the extent.
        ymin (float): The minimum y value of the extent.
        xmax (float): The maximum x value of the extent.
        ymax (float): The maximum y value of the extent.
    """
    if is_arcpy():
        import arcpy

        view = arc_active_view()
        if view is not None:
            view.camera.setExtent(
                arcpy.Extent(
                    xmin,
                    ymin,
                    xmax,
                    ymax,
                    spatial_reference=arcpy.SpatialReference(4326),
                )
            )

        # if isinstance(zoom, int):
        #     scale = 156543.04 * math.cos(0) / math.pow(2, zoom)
        #     view.camera.scale = scale  # Not working properly

array_to_image(array, output=None, source=None, dtype=None, compress='deflate', transpose=True, cellsize=None, crs=None, transform=None, driver='COG', **kwargs)

Save a NumPy array as a GeoTIFF using the projection information from an existing GeoTIFF file.

Parameters:

Name Type Description Default
array np.ndarray

The NumPy array to be saved as a GeoTIFF.

required
output str

The path to the output image. If None, a temporary file will be created. Defaults to None.

None
source str

The path to an existing GeoTIFF file with map projection information. Defaults to None.

None
dtype np.dtype

The data type of the output array. Defaults to None.

None
compress str

The compression method. Can be one of the following: "deflate", "lzw", "packbits", "jpeg". Defaults to "deflate".

'deflate'
transpose bool

Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.

True
cellsize float

The resolution of the output image in meters. Defaults to None.

None
crs str

The CRS of the output image. Defaults to None.

None
transform tuple

The affine transformation matrix, can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None.

None
driver str

The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".

'COG'
**kwargs

Additional keyword arguments to be passed to the rasterio.open() function.

{}
Source code in leafmap/common.py
def array_to_image(
    array,
    output: str = None,
    source: str = None,
    dtype: str = None,
    compress: str = "deflate",
    transpose: bool = True,
    cellsize: float = None,
    crs: str = None,
    transform: tuple = None,
    driver: str = "COG",
    **kwargs,
) -> str:
    """Save a NumPy array as a GeoTIFF using the projection information from an existing GeoTIFF file.

    Args:
        array (np.ndarray): The NumPy array to be saved as a GeoTIFF.
        output (str): The path to the output image. If None, a temporary file will be created. Defaults to None.
        source (str, optional): The path to an existing GeoTIFF file with map projection information. Defaults to None.
        dtype (np.dtype, optional): The data type of the output array. Defaults to None.
        compress (str, optional): The compression method. Can be one of the following: "deflate", "lzw", "packbits", "jpeg". Defaults to "deflate".
        transpose (bool, optional): Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.
        cellsize (float, optional): The resolution of the output image in meters. Defaults to None.
        crs (str, optional): The CRS of the output image. Defaults to None.
        transform (tuple, optional): The affine transformation matrix, can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361).
            Defaults to None.
        driver (str, optional): The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".
        **kwargs: Additional keyword arguments to be passed to the rasterio.open() function.
    """

    import numpy as np
    import rasterio
    import xarray as xr
    from rasterio.transform import Affine

    if output is None:
        return array_to_memory_file(
            array,
            source,
            dtype,
            compress,
            transpose,
            cellsize,
            crs=crs,
            transform=transform,
            driver=driver,
            **kwargs,
        )

    if isinstance(array, xr.DataArray):
        coords = [coord for coord in array.coords]
        if coords[0] == "time":
            x_dim = coords[1]
            y_dim = coords[2]
            array = (
                array.isel(time=0).rename({y_dim: "y", x_dim: "x"}).transpose("y", "x")
            )
        array = array.values

    if array.ndim == 3 and transpose:
        array = np.transpose(array, (1, 2, 0))

    out_dir = os.path.dirname(os.path.abspath(output))
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    if not output.endswith(".tif"):
        output += ".tif"

    if source is not None:
        with rasterio.open(source) as src:
            crs = src.crs
            transform = src.transform
            if compress is None:
                compress = src.compression
    else:
        if cellsize is None:
            raise ValueError("resolution must be provided if source is not provided")
        if crs is None:
            raise ValueError(
                "crs must be provided if source is not provided, such as EPSG:3857"
            )

        if transform is None:
            # Define the geotransformation parameters
            xmin, ymin, xmax, ymax = (
                0,
                0,
                cellsize * array.shape[1],
                cellsize * array.shape[0],
            )
            transform = rasterio.transform.from_bounds(
                xmin, ymin, xmax, ymax, array.shape[1], array.shape[0]
            )
        elif isinstance(transform, Affine):
            pass
        elif isinstance(transform, (tuple, list)):
            transform = Affine(*transform)

        kwargs["transform"] = transform

    if dtype is None:
        # Determine the minimum and maximum values in the array
        min_value = np.min(array)
        max_value = np.max(array)
        # Determine the best dtype for the array
        if min_value >= 0 and max_value <= 1:
            dtype = np.float32
        elif min_value >= 0 and max_value <= 255:
            dtype = np.uint8
        elif min_value >= -128 and max_value <= 127:
            dtype = np.int8
        elif min_value >= 0 and max_value <= 65535:
            dtype = np.uint16
        elif min_value >= -32768 and max_value <= 32767:
            dtype = np.int16
        else:
            dtype = np.float64

    # Convert the array to the best dtype
    array = array.astype(dtype)

    # Define the GeoTIFF metadata
    metadata = {
        "driver": driver,
        "height": array.shape[0],
        "width": array.shape[1],
        "dtype": array.dtype,
        "crs": crs,
        "transform": transform,
    }

    if array.ndim == 2:
        metadata["count"] = 1
    elif array.ndim == 3:
        metadata["count"] = array.shape[2]
    if compress is not None:
        metadata["compress"] = compress

    metadata.update(**kwargs)

    # Create a new GeoTIFF file and write the array to it
    with rasterio.open(output, "w", **metadata) as dst:
        if array.ndim == 2:
            dst.write(array, 1)
        elif array.ndim == 3:
            for i in range(array.shape[2]):
                dst.write(array[:, :, i], i + 1)

array_to_memory_file(array, source=None, dtype=None, compress='deflate', transpose=True, cellsize=None, crs=None, transform=None, driver='COG', **kwargs)

Convert a NumPy array to a memory file.

Parameters:

Name Type Description Default
array numpy.ndarray

The input NumPy array.

required
source str

Path to the source file to extract metadata from. Defaults to None.

None
dtype str

The desired data type of the array. Defaults to None.

None
compress str

The compression method for the output file. Defaults to "deflate".

'deflate'
transpose bool

Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.

True
cellsize float

The cell size of the array if source is not provided. Defaults to None.

None
crs str

The coordinate reference system of the array if source is not provided. Defaults to None.

None
transform tuple

The affine transformation matrix if source is not provided. Can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None

None
driver str

The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".

'COG'
**kwargs

Additional keyword arguments to be passed to the rasterio.open() function.

{}

Returns:

Type Description
rasterio.DatasetReader

The rasterio dataset reader object for the converted array.

Source code in leafmap/common.py
def array_to_memory_file(
    array,
    source: str = None,
    dtype: str = None,
    compress: str = "deflate",
    transpose: bool = True,
    cellsize: float = None,
    crs: str = None,
    transform: tuple = None,
    driver="COG",
    **kwargs,
):
    """Convert a NumPy array to a memory file.

    Args:
        array (numpy.ndarray): The input NumPy array.
        source (str, optional): Path to the source file to extract metadata from. Defaults to None.
        dtype (str, optional): The desired data type of the array. Defaults to None.
        compress (str, optional): The compression method for the output file. Defaults to "deflate".
        transpose (bool, optional): Whether to transpose the array from (bands, rows, columns) to (rows, columns, bands). Defaults to True.
        cellsize (float, optional): The cell size of the array if source is not provided. Defaults to None.
        crs (str, optional): The coordinate reference system of the array if source is not provided. Defaults to None.
        transform (tuple, optional): The affine transformation matrix if source is not provided.
            Can be rio.transform() or a tuple like (0.5, 0.0, -180.25, 0.0, -0.5, 83.780361). Defaults to None
        driver (str, optional): The driver to use for creating the output file, such as 'GTiff'. Defaults to "COG".
        **kwargs: Additional keyword arguments to be passed to the rasterio.open() function.

    Returns:
        rasterio.DatasetReader: The rasterio dataset reader object for the converted array.
    """
    import rasterio
    import numpy as np
    import xarray as xr
    from rasterio.transform import Affine

    if isinstance(array, xr.DataArray):
        coords = [coord for coord in array.coords]
        if coords[0] == "time":
            x_dim = coords[1]
            y_dim = coords[2]
            array = (
                array.isel(time=0).rename({y_dim: "y", x_dim: "x"}).transpose("y", "x")
            )
        if hasattr(array, "rio"):
            if hasattr(array.rio, "crs"):
                crs = array.rio.crs
            if transform is None and hasattr(array.rio, "transform"):
                transform = array.rio.transform()
        elif source is None:
            if hasattr(array, "encoding"):
                if "source" in array.encoding:
                    source = array.encoding["source"]
        array = array.values

    if array.ndim == 3 and transpose:
        array = np.transpose(array, (1, 2, 0))
    if source is not None:
        with rasterio.open(source) as src:
            crs = src.crs
            transform = src.transform
            if compress is None:
                compress = src.compression
    else:
        if crs is None:
            raise ValueError(
                "crs must be provided if source is not provided, such as EPSG:3857"
            )

        if transform is None:
            if cellsize is None:
                raise ValueError("cellsize must be provided if source is not provided")
            # Define the geotransformation parameters
            xmin, ymin, xmax, ymax = (
                0,
                0,
                cellsize * array.shape[1],
                cellsize * array.shape[0],
            )
            # (west, south, east, north, width, height)
            transform = rasterio.transform.from_bounds(
                xmin, ymin, xmax, ymax, array.shape[1], array.shape[0]
            )
        elif isinstance(transform, Affine):
            pass
        elif isinstance(transform, (tuple, list)):
            transform = Affine(*transform)

        kwargs["transform"] = transform

    if dtype is None:
        # Determine the minimum and maximum values in the array
        min_value = np.min(array)
        max_value = np.max(array)
        # Determine the best dtype for the array
        if min_value >= 0 and max_value <= 1:
            dtype = np.float32
        elif min_value >= 0 and max_value <= 255:
            dtype = np.uint8
        elif min_value >= -128 and max_value <= 127:
            dtype = np.int8
        elif min_value >= 0 and max_value <= 65535:
            dtype = np.uint16
        elif min_value >= -32768 and max_value <= 32767:
            dtype = np.int16
        else:
            dtype = np.float64

    # Convert the array to the best dtype
    array = array.astype(dtype)

    # Define the GeoTIFF metadata
    metadata = {
        "driver": driver,
        "height": array.shape[0],
        "width": array.shape[1],
        "dtype": array.dtype,
        "crs": crs,
        "transform": transform,
    }

    if array.ndim == 2:
        metadata["count"] = 1
    elif array.ndim == 3:
        metadata["count"] = array.shape[2]
    if compress is not None:
        metadata["compress"] = compress

    metadata.update(**kwargs)

    # Create a new memory file and write the array to it
    memory_file = rasterio.MemoryFile()
    dst = memory_file.open(**metadata)

    if array.ndim == 2:
        dst.write(array, 1)
    elif array.ndim == 3:
        for i in range(array.shape[2]):
            dst.write(array[:, :, i], i + 1)

    dst.close()

    # Read the dataset from memory
    dataset_reader = rasterio.open(dst.name, mode="r")

    return dataset_reader

assign_continuous_colors(df, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None, to_rgb=True, return_type='array', return_legend=False)

Assigns continuous colors to a DataFrame column based on a specified scheme.

Parameters:

Name Type Description Default
df

A pandas DataFrame.

required
column str

The name of the column to assign colors.

required
cmap str

The name of the colormap to use.

None
colors list

A list of custom colors.

None
labels list

A list of custom labels for the legend.

None
scheme str

The scheme for classifying the data. Default is 'Quantiles'.

'Quantiles'
k int

The number of classes for classification.

5
legend_kwds dict

Additional keyword arguments for configuring the legend.

None
classification_kwds dict

Additional keyword arguments for configuring the classification.

None
to_rgb bool

Whether to convert colors to RGB values. Default is True.

True
return_type str

The type of the returned values. Default is 'array'.

'array'
return_legend bool

Whether to return the legend. Default is False.

False

Returns:

Type Description

The assigned colors as a numpy array or a tuple containing the colors and the legend, depending on the value of return_legend.

Source code in leafmap/common.py
def assign_continuous_colors(
    df,
    column: str,
    cmap: str = None,
    colors: list = None,
    labels: list = None,
    scheme: str = "Quantiles",
    k: int = 5,
    legend_kwds: dict = None,
    classification_kwds: dict = None,
    to_rgb: bool = True,
    return_type: str = "array",
    return_legend: bool = False,
):
    """Assigns continuous colors to a DataFrame column based on a specified scheme.

    Args:
        df: A pandas DataFrame.
        column: The name of the column to assign colors.
        cmap: The name of the colormap to use.
        colors: A list of custom colors.
        labels: A list of custom labels for the legend.
        scheme: The scheme for classifying the data. Default is 'Quantiles'.
        k: The number of classes for classification.
        legend_kwds: Additional keyword arguments for configuring the legend.
        classification_kwds: Additional keyword arguments for configuring the classification.
        to_rgb: Whether to convert colors to RGB values. Default is True.
        return_type: The type of the returned values. Default is 'array'.
        return_legend: Whether to return the legend. Default is False.

    Returns:
        The assigned colors as a numpy array or a tuple containing the colors and the legend, depending on the value of return_legend.
    """
    import numpy as np

    data = df[[column]].copy()
    new_df, legend = classify(
        data, column, cmap, colors, labels, scheme, k, legend_kwds, classification_kwds
    )
    values = new_df["color"].values.tolist()

    if to_rgb:
        values = [hex_to_rgb(check_color(color)) for color in values]
        if return_type == "array":
            values = np.array(values, dtype=np.uint8)

    if return_legend:
        return values, legend
    else:
        return values

assign_discrete_colors(df, column, cmap, to_rgb=True, return_type='array')

Assigns unique colors to each category in a categorical column of a dataframe.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input dataframe.

required
column str

The name of the categorical column.

required
cmap dict

A dictionary mapping categories to colors.

required
to_rgb bool

Whether to convert the colors to RGB values. Defaults to True.

True
return_type str

The type of the returned values. Can be 'list' or 'array'. Defaults to 'array'.

'array'

Returns:

Type Description
list

A list of colors for each category in the categorical column.

Source code in leafmap/common.py
def assign_discrete_colors(df, column, cmap, to_rgb=True, return_type="array"):
    """
    Assigns unique colors to each category in a categorical column of a dataframe.

    Args:
        df (pandas.DataFrame): The input dataframe.
        column (str): The name of the categorical column.
        cmap (dict): A dictionary mapping categories to colors.
        to_rgb (bool): Whether to convert the colors to RGB values. Defaults to True.
        return_type (str): The type of the returned values. Can be 'list' or 'array'. Defaults to 'array'.

    Returns:
        list: A list of colors for each category in the categorical column.
    """
    import numpy as np

    # Copy the categorical column from the original dataframe
    category_column = df[column].copy()

    # Map colors to the categorical values
    category_column = category_column.map(cmap)

    values = category_column.values.tolist()

    if to_rgb:
        values = [hex_to_rgb(check_color(color)) for color in values]
        if return_type == "array":
            values = np.array(values, dtype=np.uint8)

    return values

basemap_xyz_tiles()

Returns a dictionary containing a set of basemaps that are XYZ tile layers.

Returns:

Type Description
dict

A dictionary of XYZ tile layers.

Source code in leafmap/common.py
def basemap_xyz_tiles():
    """Returns a dictionary containing a set of basemaps that are XYZ tile layers.

    Returns:
        dict: A dictionary of XYZ tile layers.
    """
    from .leafmap import basemaps

    layers_dict = {}
    keys = dict(basemaps).keys()
    for key in keys:
        if isinstance(basemaps[key], ipyleaflet.WMSLayer):
            pass
        else:
            layers_dict[key] = basemaps[key]
    return layers_dict

bbox_to_gdf(bbox, crs='epsg:4326')

Convert a bounding box to a GeoPandas GeoDataFrame.

Parameters:

Name Type Description Default
bbox list

A bounding box in the format of [minx, miny, maxx, maxy].

required
crs str

The CRS of the bounding box. Defaults to 'epsg:4326'.

'epsg:4326'

Returns:

Type Description
GeoDataFrame

A GeoDataFrame with a single polygon.

Source code in leafmap/common.py
def bbox_to_gdf(bbox, crs="epsg:4326"):
    """Convert a bounding box to a GeoPandas GeoDataFrame.

    Args:
        bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].
        crs (str, optional): The CRS of the bounding box. Defaults to 'epsg:4326'.

    Returns:
        GeoDataFrame: A GeoDataFrame with a single polygon.
    """
    import geopandas as gpd
    from shapely.geometry import Polygon

    return gpd.GeoDataFrame(
        geometry=[Polygon.from_bounds(*bbox)],
        crs=crs,
    )

bbox_to_geojson(bounds)

Convert coordinates of a bounding box to a geojson.

Parameters:

Name Type Description Default
bounds list | tuple

A list of coordinates representing [left, bottom, right, top] or m.bounds.

required

Returns:

Type Description
dict

A geojson feature.

Source code in leafmap/common.py
def bbox_to_geojson(bounds):
    """Convert coordinates of a bounding box to a geojson.

    Args:
        bounds (list | tuple): A list of coordinates representing [left, bottom, right, top] or m.bounds.

    Returns:
        dict: A geojson feature.
    """

    if isinstance(bounds, tuple) and len(bounds) == 2:
        bounds = [bounds[0][1], bounds[0][0], bounds[1][1], bounds[1][0]]

    return {
        "geometry": {
            "type": "Polygon",
            "coordinates": [
                [
                    [bounds[0], bounds[3]],
                    [bounds[0], bounds[1]],
                    [bounds[2], bounds[1]],
                    [bounds[2], bounds[3]],
                    [bounds[0], bounds[3]],
                ]
            ],
        },
        "type": "Feature",
    }

bbox_to_polygon(bbox)

Convert a bounding box to a shapely Polygon.

Parameters:

Name Type Description Default
bbox list

A bounding box in the format of [minx, miny, maxx, maxy].

required

Returns:

Type Description
Polygon

A shapely Polygon.

Source code in leafmap/common.py
def bbox_to_polygon(bbox):
    """Convert a bounding box to a shapely Polygon.

    Args:
        bbox (list): A bounding box in the format of [minx, miny, maxx, maxy].

    Returns:
        Polygon: A shapely Polygon.
    """
    from shapely.geometry import Polygon

    return Polygon.from_bounds(*bbox)

blend_images(img1, img2, alpha=0.5, output=False, show=True, figsize=(12, 10), axis='off', **kwargs)

Blends two images together using the addWeighted function from the OpenCV library.

Parameters:

Name Type Description Default
img1 numpy.ndarray

The first input image on top represented as a NumPy array.

required
img2 numpy.ndarray

The second input image at the bottom represented as a NumPy array.

required
alpha float

The weighting factor for the first image in the blend. By default, this is set to 0.5.

0.5
output str

The path to the output image. Defaults to False.

False
show bool

Whether to display the blended image. Defaults to True.

True
figsize tuple

The size of the figure. Defaults to (12, 10).

(12, 10)
axis str

The axis of the figure. Defaults to "off".

'off'
**kwargs

Additional keyword arguments to pass to the cv2.addWeighted() function.

{}

Returns:

Type Description
numpy.ndarray

The blended image as a NumPy array.

Source code in leafmap/common.py
def blend_images(
    img1,
    img2,
    alpha=0.5,
    output=False,
    show=True,
    figsize=(12, 10),
    axis="off",
    **kwargs,
):
    """
    Blends two images together using the addWeighted function from the OpenCV library.

    Args:
        img1 (numpy.ndarray): The first input image on top represented as a NumPy array.
        img2 (numpy.ndarray): The second input image at the bottom represented as a NumPy array.
        alpha (float): The weighting factor for the first image in the blend. By default, this is set to 0.5.
        output (str, optional): The path to the output image. Defaults to False.
        show (bool, optional): Whether to display the blended image. Defaults to True.
        figsize (tuple, optional): The size of the figure. Defaults to (12, 10).
        axis (str, optional): The axis of the figure. Defaults to "off".
        **kwargs: Additional keyword arguments to pass to the cv2.addWeighted() function.

    Returns:
        numpy.ndarray: The blended image as a NumPy array.
    """
    import cv2
    import numpy as np
    import matplotlib.pyplot as plt

    # Resize the images to have the same dimensions
    if isinstance(img1, str):
        if img1.startswith("http"):
            img1 = download_file(img1)

        if not os.path.exists(img1):
            raise ValueError(f"Input path {img1} does not exist.")

        img1 = cv2.imread(img1)

    if isinstance(img2, str):
        if img2.startswith("http"):
            img2 = download_file(img2)

        if not os.path.exists(img2):
            raise ValueError(f"Input path {img2} does not exist.")

        img2 = cv2.imread(img2)

    if img1.dtype == np.float32:
        img1 = (img1 * 255).astype(np.uint8)

    if img2.dtype == np.float32:
        img2 = (img2 * 255).astype(np.uint8)

    if img1.dtype != img2.dtype:
        img2 = img2.astype(img1.dtype)

    img1 = cv2.resize(img1, (img2.shape[1], img2.shape[0]))

    # Blend the images using the addWeighted function
    beta = 1 - alpha
    blend_img = cv2.addWeighted(img1, alpha, img2, beta, 0, **kwargs)

    if output:
        array_to_image(blend_img, output, img2)

    if show:
        plt.figure(figsize=figsize)
        plt.imshow(blend_img)
        plt.axis(axis)
        plt.show()
    else:
        return blend_img

bounds_to_xy_range(bounds)

Convert bounds to x and y range to be used as input to bokeh map.

Parameters:

Name Type Description Default
bounds list

A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax].

required

Returns:

Type Description
tuple

A tuple of (x_range, y_range).

Source code in leafmap/common.py
def bounds_to_xy_range(bounds):
    """Convert bounds to x and y range to be used as input to bokeh map.

    Args:
        bounds (list): A list of bounds in the form [(south, west), (north, east)] or [xmin, ymin, xmax, ymax].

    Returns:
        tuple: A tuple of (x_range, y_range).
    """

    if isinstance(bounds, tuple):
        bounds = list(bounds)
    elif not isinstance(bounds, list):
        raise TypeError("bounds must be a list")

    if len(bounds) == 4:
        west, south, east, north = bounds
    elif len(bounds) == 2:
        south, west = bounds[0]
        north, east = bounds[1]

    xmin, ymin = lnglat_to_meters(west, south)
    xmax, ymax = lnglat_to_meters(east, north)
    x_range = (xmin, xmax)
    y_range = (ymin, ymax)
    return x_range, y_range

center_zoom_to_xy_range(center, zoom)

Convert center and zoom to x and y range to be used as input to bokeh map.

Parameters:

Name Type Description Default
center tuple

A tuple of (latitude, longitude).

required
zoom int

The zoom level.

required

Returns:

Type Description
tuple

A tuple of (x_range, y_range).

Source code in leafmap/common.py
def center_zoom_to_xy_range(center, zoom):
    """Convert center and zoom to x and y range to be used as input to bokeh map.

    Args:
        center (tuple): A tuple of (latitude, longitude).
        zoom (int): The zoom level.

    Returns:
        tuple: A tuple of (x_range, y_range).
    """

    if isinstance(center, tuple) or isinstance(center, list):
        pass
    else:
        raise TypeError("center must be a tuple or list")

    if not isinstance(zoom, int):
        raise TypeError("zoom must be an integer")

    latitude, longitude = center
    x_range = (-179, 179)
    y_range = (-70, 70)
    x_full_length = x_range[1] - x_range[0]
    y_full_length = y_range[1] - y_range[0]

    x_length = x_full_length / 2 ** (zoom - 2)
    y_length = y_full_length / 2 ** (zoom - 2)

    south = latitude - y_length / 2
    north = latitude + y_length / 2
    west = longitude - x_length / 2
    east = longitude + x_length / 2

    xmin, ymin = lnglat_to_meters(west, south)
    xmax, ymax = lnglat_to_meters(east, north)

    x_range = (xmin, xmax)
    y_range = (ymin, ymax)

    return x_range, y_range

cesium_to_streamlit(html, width=800, height=600, responsive=True, scrolling=False, token_name=None, token_value=None, **kwargs)

Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.

Parameters:

Name Type Description Default
html str

The HTML file to render. It can a local file path or a URL.

required
width int

Width of the map. Defaults to 800.

800
height int

Height of the map. Defaults to 600.

600
responsive bool

Whether to make the map responsive. Defaults to True.

True
scrolling bool

Whether to allow the map to scroll. Defaults to False.

False
token_name str

The name of the token in the HTML file to be replaced. Defaults to None.

None
token_value str

The value of the token to pass to the HTML file. Defaults to None.

None

Returns:

Type Description
streamlit.components

components.html object.

Source code in leafmap/common.py
def cesium_to_streamlit(
    html,
    width=800,
    height=600,
    responsive=True,
    scrolling=False,
    token_name=None,
    token_value=None,
    **kwargs,
):
    """Renders an cesium HTML file in a Streamlit app. This method is a static Streamlit Component, meaning, no information is passed back from Leaflet on browser interaction.

    Args:
        html (str): The HTML file to render. It can a local file path or a URL.
        width (int, optional): Width of the map. Defaults to 800.
        height (int, optional): Height of the map. Defaults to 600.
        responsive (bool, optional): Whether to make the map responsive. Defaults to True.
        scrolling (bool, optional): Whether to allow the map to scroll. Defaults to False.
        token_name (str, optional): The name of the token in the HTML file to be replaced. Defaults to None.
        token_value (str, optional): The value of the token to pass to the HTML file. Defaults to None.

    Returns:
        streamlit.components: components.html object.
    """
    if token_name is None:
        token_name = "your_access_token"

    if token_value is None:
        token_value = os.environ.get("CESIUM_TOKEN")

    html_to_streamlit(
        html, width, height, responsive, scrolling, token_name, token_value
    )

check_cmap(cmap)

Check the colormap and return a list of colors.

Parameters:

Name Type Description Default
cmap str | list | Box

The colormap to check.

required

Returns:

Type Description
list

A list of colors.

Source code in leafmap/common.py
def check_cmap(cmap):
    """Check the colormap and return a list of colors.

    Args:
        cmap (str | list | Box): The colormap to check.

    Returns:
        list: A list of colors.
    """

    from box import Box
    from .colormaps import get_palette

    if isinstance(cmap, str):
        try:
            return get_palette(cmap)
        except Exception as e:
            raise Exception(f"{cmap} is not a valid colormap.")
    elif isinstance(cmap, Box):
        return list(cmap["default"])
    elif isinstance(cmap, list) or isinstance(cmap, tuple):
        return cmap
    else:
        raise Exception(f"{cmap} is not a valid colormap.")

check_color(in_color)

Checks the input color and returns the corresponding hex color code.

Parameters:

Name Type Description Default
in_color str or tuple or list

It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)).

required

Returns:

Type Description
str

A hex color code.

Source code in leafmap/common.py
def check_color(in_color: Union[str, Tuple]) -> str:
    """Checks the input color and returns the corresponding hex color code.

    Args:
            in_color (str or tuple or list): It can be a string (e.g., 'red', '#ffff00', 'ffff00', 'ff0') or RGB tuple (e.g., (255, 127, 0)).

    Returns:
        str: A hex color code.
    """
    import colour

    out_color = "#000000"  # default black color
    if (isinstance(in_color, tuple) or isinstance(in_color, list)) and len(
        in_color
    ) == 3:
        # rescale color if necessary
        if all(isinstance(item, int) for item in in_color):
            in_color = [c / 255.0 for c in in_color]

        return colour.Color(rgb=tuple(in_color)).hex_l

    else:
        # try to guess the color system
        try:
            return colour.Color(in_color).hex_l

        except Exception as e:
            pass

        # try again by adding an extra # (GEE handle hex codes without #)
        try:
            return colour.Color(f"#{in_color}").hex_l

        except Exception as e:
            print(
                f"The provided color ({in_color}) is invalid. Using the default black color."
            )
            print(e)

        return out_color

check_dir(dir_path, make_dirs=True)

Checks if a directory exists and creates it if it does not.

Parameters:

Name Type Description Default
dir_path [str

The path to the directory.

required
make_dirs bool

Whether to create the directory if it does not exist. Defaults to True.

True

Exceptions:

Type Description
FileNotFoundError

If the directory could not be found.

TypeError

If the input directory path is not a string.

Returns:

Type Description
str

The path to the directory.

Source code in leafmap/common.py
def check_dir(dir_path, make_dirs=True):
    """Checks if a directory exists and creates it if it does not.

    Args:
        dir_path ([str): The path to the directory.
        make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.

    Raises:
        FileNotFoundError: If the directory could not be found.
        TypeError: If the input directory path is not a string.

    Returns:
        str: The path to the directory.
    """

    if isinstance(dir_path, str):
        if dir_path.startswith("~"):
            dir_path = os.path.expanduser(dir_path)
        else:
            dir_path = os.path.abspath(dir_path)

        if not os.path.exists(dir_path) and make_dirs:
            os.makedirs(dir_path)

        if os.path.exists(dir_path):
            return dir_path
        else:
            raise FileNotFoundError("The provided directory could not be found.")
    else:
        raise TypeError("The provided directory path must be a string.")

check_file_path(file_path, make_dirs=True)

Gets the absolute file path.

Parameters:

Name Type Description Default
file_path str

The path to the file.

required
make_dirs bool

Whether to create the directory if it does not exist. Defaults to True.

True

Exceptions:

Type Description
FileNotFoundError

If the directory could not be found.

TypeError

If the input directory path is not a string.

Returns:

Type Description
str

The absolute path to the file.

Source code in leafmap/common.py
def check_file_path(file_path, make_dirs=True):
    """Gets the absolute file path.

    Args:
        file_path (str): The path to the file.
        make_dirs (bool, optional): Whether to create the directory if it does not exist. Defaults to True.

    Raises:
        FileNotFoundError: If the directory could not be found.
        TypeError: If the input directory path is not a string.

    Returns:
        str: The absolute path to the file.
    """
    if isinstance(file_path, str):
        if file_path.startswith("~"):
            file_path = os.path.expanduser(file_path)
        else:
            file_path = os.path.abspath(file_path)

        file_dir = os.path.dirname(file_path)
        if not os.path.exists(file_dir) and make_dirs:
            os.makedirs(file_dir)

        return file_path

    else:
        raise TypeError("The provided file path must be a string.")

check_html_string(html_string)

Check if an HTML string contains local images and convert them to base64.

Parameters:

Name Type Description Default
html_string str

The HTML string.

required

Returns:

Type Description
str

The HTML string with local images converted to base64.

Source code in leafmap/common.py
def check_html_string(html_string):
    """Check if an HTML string contains local images and convert them to base64.

    Args:
        html_string (str): The HTML string.

    Returns:
        str: The HTML string with local images converted to base64.
    """
    import re
    import base64

    # Search for img tags with src attribute
    img_regex = r'<img[^>]+src\s*=\s*["\']([^"\':]+)["\'][^>]*>'

    for match in re.findall(img_regex, html_string):
        with open(match, "rb") as img_file:
            img_data = img_file.read()
            base64_data = base64.b64encode(img_data).decode("utf-8")
            html_string = html_string.replace(
                'src="{}"'.format(match),
                'src="data:image/png;base64,' + base64_data + '"',
            )

    return html_string

check_url(url)

Check if an HTTP URL is working.

Parameters:

Name Type Description Default
url str

The URL to check.

required

Returns:

Type Description
bool

True if the URL is working (returns a 200 status code), False otherwise.

Source code in leafmap/common.py
def check_url(url: str) -> bool:
    """Check if an HTTP URL is working.

    Args:
        url (str): The URL to check.

    Returns:
        bool: True if the URL is working (returns a 200 status code), False otherwise.
    """
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return True
        else:
            return False
    except requests.exceptions.RequestException:
        return False

classify(data, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None)

Classify a dataframe column using a variety of classification schemes.

Parameters:

Name Type Description Default
data str | pd.DataFrame | gpd.GeoDataFrame

The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe.

required
column str

The column to classify.

required
cmap str

The name of a colormap recognized by matplotlib. Defaults to None.

None
colors list

A list of colors to use for the classification. Defaults to None.

None
labels list

A list of labels to use for the legend. Defaults to None.

None
scheme str

Name of a choropleth classification scheme (requires mapclassify). Name of a choropleth classification scheme (requires mapclassify). A mapclassify.MapClassifier object will be used under the hood. Supported are all schemes provided by mapclassify (e.g. 'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled', 'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced', 'JenksCaspallSampled', 'MaxP', 'MaximumBreaks', 'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean', 'UserDefined'). Arguments can be passed in classification_kwds.

'Quantiles'
k int

Number of classes (ignored if scheme is None or if column is categorical). Default to 5.

5
legend_kwds dict

Keyword arguments to pass to :func:matplotlib.pyplot.legend or matplotlib.pyplot.colorbar. Defaults to None. Keyword arguments to pass to :func:matplotlib.pyplot.legend or Additional accepted keywords when scheme is specified: fmt : string A formatting specification for the bin edges of the classes in the legend. For example, to have no decimals: {"fmt": "{:.0f}"}. labels : list-like A list of legend labels to override the auto-generated labblels. Needs to have the same number of elements as the number of classes (k). interval : boolean (default False) An option to control brackets from mapclassify legend. If True, open/closed interval brackets are shown in the legend.

None
classification_kwds dict

Keyword arguments to pass to mapclassify. Defaults to None.

None

Returns:

Type Description
pd.DataFrame, dict

A pandas dataframe with the classification applied and a legend dictionary.

Source code in leafmap/common.py
def classify(
    data,
    column,
    cmap=None,
    colors=None,
    labels=None,
    scheme="Quantiles",
    k=5,
    legend_kwds=None,
    classification_kwds=None,
):
    """Classify a dataframe column using a variety of classification schemes.

    Args:
        data (str | pd.DataFrame | gpd.GeoDataFrame): The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe.
        column (str): The column to classify.
        cmap (str, optional): The name of a colormap recognized by matplotlib. Defaults to None.
        colors (list, optional): A list of colors to use for the classification. Defaults to None.
        labels (list, optional): A list of labels to use for the legend. Defaults to None.
        scheme (str, optional): Name of a choropleth classification scheme (requires mapclassify).
            Name of a choropleth classification scheme (requires mapclassify).
            A mapclassify.MapClassifier object will be used
            under the hood. Supported are all schemes provided by mapclassify (e.g.
            'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled',
            'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced',
            'JenksCaspallSampled', 'MaxP', 'MaximumBreaks',
            'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean',
            'UserDefined'). Arguments can be passed in classification_kwds.
        k (int, optional): Number of classes (ignored if scheme is None or if column is categorical). Default to 5.
        legend_kwds (dict, optional): Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or `matplotlib.pyplot.colorbar`. Defaults to None.
            Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or
            Additional accepted keywords when `scheme` is specified:
            fmt : string
                A formatting specification for the bin edges of the classes in the
                legend. For example, to have no decimals: ``{"fmt": "{:.0f}"}``.
            labels : list-like
                A list of legend labels to override the auto-generated labblels.
                Needs to have the same number of elements as the number of
                classes (`k`).
            interval : boolean (default False)
                An option to control brackets from mapclassify legend.
                If True, open/closed interval brackets are shown in the legend.
        classification_kwds (dict, optional): Keyword arguments to pass to mapclassify. Defaults to None.

    Returns:
        pd.DataFrame, dict: A pandas dataframe with the classification applied and a legend dictionary.
    """

    import numpy as np
    import pandas as pd
    import geopandas as gpd
    import matplotlib as mpl
    import matplotlib.pyplot as plt

    try:
        import mapclassify
    except ImportError:
        raise ImportError(
            "mapclassify is required for this function. Install with `pip install mapclassify`."
        )

    if (
        isinstance(data, gpd.GeoDataFrame)
        or isinstance(data, pd.DataFrame)
        or isinstance(data, pd.Series)
    ):
        df = data
    else:
        try:
            df = gpd.read_file(data)
        except Exception:
            raise TypeError(
                "Data must be a GeoDataFrame or a path to a file that can be read by geopandas.read_file()."
            )

    if df.empty:
        warnings.warn(
            "The GeoDataFrame you are attempting to plot is "
            "empty. Nothing has been displayed.",
            UserWarning,
        )
        return

    columns = df.columns.values.tolist()
    if column not in columns:
        raise ValueError(
            f"{column} is not a column in the GeoDataFrame. It must be one of {columns}."
        )

    # Convert categorical data to numeric
    init_column = None
    value_list = None
    if np.issubdtype(df[column].dtype, np.object0):
        value_list = df[column].unique().tolist()
        value_list.sort()
        df["category"] = df[column].replace(value_list, range(0, len(value_list)))
        init_column = column
        column = "category"
        k = len(value_list)

    if legend_kwds is not None:
        legend_kwds = legend_kwds.copy()

    # To accept pd.Series and np.arrays as column
    if isinstance(column, (np.ndarray, pd.Series)):
        if column.shape[0] != df.shape[0]:
            raise ValueError(
                "The dataframe and given column have different number of rows."
            )
        else:
            values = column

            # Make sure index of a Series matches index of df
            if isinstance(values, pd.Series):
                values = values.reindex(df.index)
    else:
        values = df[column]

    values = df[column]
    nan_idx = np.asarray(pd.isna(values), dtype="bool")

    if cmap is None:
        cmap = "Blues"
    try:
        cmap = plt.get_cmap(cmap, k)
    except:
        cmap = plt.cm.get_cmap(cmap, k)
    if colors is None:
        colors = [mpl.colors.rgb2hex(cmap(i))[1:] for i in range(cmap.N)]
        colors = ["#" + i for i in colors]
    elif isinstance(colors, list):
        colors = [check_color(i) for i in colors]
    elif isinstance(colors, str):
        colors = [check_color(colors)] * k

    allowed_schemes = [
        "BoxPlot",
        "EqualInterval",
        "FisherJenks",
        "FisherJenksSampled",
        "HeadTailBreaks",
        "JenksCaspall",
        "JenksCaspallForced",
        "JenksCaspallSampled",
        "MaxP",
        "MaximumBreaks",
        "NaturalBreaks",
        "Quantiles",
        "Percentiles",
        "StdMean",
        "UserDefined",
    ]

    if scheme.lower() not in [s.lower() for s in allowed_schemes]:
        raise ValueError(
            f"{scheme} is not a valid scheme. It must be one of {allowed_schemes}."
        )

    if classification_kwds is None:
        classification_kwds = {}
    if "k" not in classification_kwds:
        classification_kwds["k"] = k

    binning = mapclassify.classify(
        np.asarray(values[~nan_idx]), scheme, **classification_kwds
    )
    df["category"] = binning.yb
    df["color"] = [colors[i] for i in df["category"]]

    if legend_kwds is None:
        legend_kwds = {}

    if "interval" not in legend_kwds:
        legend_kwds["interval"] = True

    if "fmt" not in legend_kwds:
        if np.issubdtype(df[column].dtype, np.floating):
            legend_kwds["fmt"] = "{:.2f}"
        else:
            legend_kwds["fmt"] = "{:.0f}"

    if labels is None:
        # set categorical to True for creating the legend
        if legend_kwds is not None and "labels" in legend_kwds:
            if len(legend_kwds["labels"]) != binning.k:
                raise ValueError(
                    "Number of labels must match number of bins, "
                    "received {} labels for {} bins".format(
                        len(legend_kwds["labels"]), binning.k
                    )
                )
            else:
                labels = list(legend_kwds.pop("labels"))
        else:
            # fmt = "{:.2f}"
            if legend_kwds is not None and "fmt" in legend_kwds:
                fmt = legend_kwds.pop("fmt")

            labels = binning.get_legend_classes(fmt)
            if legend_kwds is not None:
                show_interval = legend_kwds.pop("interval", False)
            else:
                show_interval = False
            if not show_interval:
                labels = [c[1:-1] for c in labels]

        if init_column is not None:
            labels = value_list
    elif isinstance(labels, list):
        if len(labels) != len(colors):
            raise ValueError("The number of labels must match the number of colors.")
    else:
        raise ValueError("labels must be a list or None.")

    legend_dict = dict(zip(labels, colors))
    df["category"] = df["category"] + 1
    return df, legend_dict

clip_image(image, mask, output, to_cog=True)

Clip an image by mask.

Parameters:

Name Type Description Default
image str

Path to the image file in GeoTIFF format.

required
mask str | list | dict

The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi.

required
output str

Path to the output file.

required
to_cog bool

Flags to indicate if you want to convert the output to COG. Defaults to True.

True

Exceptions:

Type Description
ImportError

If the fiona or rasterio package is not installed.

FileNotFoundError

If the image is not found.

ValueError

If the mask is not a valid GeoJSON or raster file.

FileNotFoundError

If the mask file is not found.

Source code in leafmap/common.py
def clip_image(image, mask, output, to_cog=True):
    """Clip an image by mask.

    Args:
        image (str): Path to the image file in GeoTIFF format.
        mask (str | list | dict): The mask used to extract the image. It can be a path to vector datasets (e.g., GeoJSON, Shapefile), a list of coordinates, or m.user_roi.
        output (str): Path to the output file.
        to_cog (bool, optional): Flags to indicate if you want to convert the output to COG. Defaults to True.

    Raises:
        ImportError: If the fiona or rasterio package is not installed.
        FileNotFoundError: If the image is not found.
        ValueError: If the mask is not a valid GeoJSON or raster file.
        FileNotFoundError: If the mask file is not found.
    """
    try:
        import json
        import fiona
        import rasterio
        import rasterio.mask
    except ImportError as e:
        raise ImportError(e)

    if not os.path.exists(image):
        raise FileNotFoundError(f"{image} does not exist.")

    if not output.endswith(".tif"):
        raise ValueError("Output must be a tif file.")

    output = check_file_path(output)

    if isinstance(mask, str):
        if mask.startswith("http"):
            mask = download_file(mask, output)
        if not os.path.exists(mask):
            raise FileNotFoundError(f"{mask} does not exist.")
    elif isinstance(mask, list) or isinstance(mask, dict):
        if isinstance(mask, list):
            geojson = {
                "type": "FeatureCollection",
                "features": [
                    {
                        "type": "Feature",
                        "properties": {},
                        "geometry": {"type": "Polygon", "coordinates": [mask]},
                    }
                ],
            }
        else:
            geojson = {
                "type": "FeatureCollection",
                "features": [mask],
            }
        mask = temp_file_path(".geojson")
        with open(mask, "w") as f:
            json.dump(geojson, f)

    with fiona.open(mask, "r") as shapefile:
        shapes = [feature["geometry"] for feature in shapefile]

    with rasterio.open(image) as src:
        out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
        out_meta = src.meta

    out_meta.update(
        {
            "driver": "GTiff",
            "height": out_image.shape[1],
            "width": out_image.shape[2],
            "transform": out_transform,
        }
    )

    with rasterio.open(output, "w", **out_meta) as dest:
        dest.write(out_image)

    if to_cog:
        image_to_cog(output, output)

cog_validate(source, verbose=False)

Validate Cloud Optimized Geotiff.

Parameters:

Name Type Description Default
source str

A dataset path or URL. Will be opened in "r" mode.

required
verbose bool

Whether to print the output of the validation. Defaults to False.

False

Exceptions:

Type Description
ImportError

If the rio-cogeo package is not installed.

FileNotFoundError

If the provided file could not be found.

Returns:

Type Description
tuple

A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings).

Source code in leafmap/common.py
def cog_validate(source, verbose=False):
    """Validate Cloud Optimized Geotiff.

    Args:
        source (str): A dataset path or URL. Will be opened in "r" mode.
        verbose (bool, optional): Whether to print the output of the validation. Defaults to False.

    Raises:
        ImportError: If the rio-cogeo package is not installed.
        FileNotFoundError: If the provided file could not be found.

    Returns:
        tuple: A tuple containing the validation results (True is src_path is a valid COG, List of validation errors, and a list of validation warnings).
    """
    try:
        from rio_cogeo.cogeo import cog_validate, cog_info
    except ImportError:
        raise ImportError(
            "The rio-cogeo package is not installed. Please install it with `pip install rio-cogeo` or `conda install rio-cogeo -c conda-forge`."
        )

    if not source.startswith("http"):
        source = check_file_path(source)

        if not os.path.exists(source):
            raise FileNotFoundError("The provided input file could not be found.")

    if verbose:
        return cog_info(source)
    else:
        return cog_validate(source)

connect_postgis(database, host='localhost', user=None, password=None, port=5432, use_env_var=False)

Connects to a PostGIS database.

Parameters:

Name Type Description Default
database str

Name of the database

required
host str

Hosting server for the database. Defaults to "localhost".

'localhost'
user str

User name to access the database. Defaults to None.

None
password str

Password to access the database. Defaults to None.

None
port int

Port number to connect to at the server host. Defaults to 5432.

5432
use_env_var bool

Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False.

False

Exceptions:

Type Description
ValueError

If user is not specified.

ValueError

If password is not specified.

Returns:

Type Description
[type]

[description]

Source code in leafmap/common.py
def connect_postgis(
    database, host="localhost", user=None, password=None, port=5432, use_env_var=False
):
    """Connects to a PostGIS database.

    Args:
        database (str): Name of the database
        host (str, optional): Hosting server for the database. Defaults to "localhost".
        user (str, optional): User name to access the database. Defaults to None.
        password (str, optional): Password to access the database. Defaults to None.
        port (int, optional): Port number to connect to at the server host. Defaults to 5432.
        use_env_var (bool, optional): Whether to use environment variables. It set to True, user and password are treated as an environment variables with default values user="SQL_USER" and password="SQL_PASSWORD". Defaults to False.

    Raises:
        ValueError: If user is not specified.
        ValueError: If password is not specified.

    Returns:
        [type]: [description]
    """
    check_package(name="geopandas", URL="https://geopandas.org")
    check_package(
        name="sqlalchemy",
        URL="https://docs.sqlalchemy.org/en/14/intro.html#installation",
    )

    from sqlalchemy import create_engine

    if use_env_var:
        if user is not None:
            user = os.getenv(user)
        else:
            user = os.getenv("SQL_USER")

        if password is not None:
            password = os.getenv(password)
        else:
            password = os.getenv("SQL_PASSWORD")

        if user is None:
            raise ValueError("user is not specified.")
        if password is None:
            raise ValueError("password is not specified.")

    connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
    engine = create_engine(connection_string)

    return engine

convert_coordinates(x, y, source_crs, target_crs='epsg:4326')

Convert coordinates from the source EPSG code to the target EPSG code.

Parameters:

Name Type Description Default
x float

The x-coordinate of the point.

required
y float

The y-coordinate of the point.

required
source_crs str

The EPSG code of the source coordinate system.

required
target_crs str

The EPSG code of the target coordinate system. Defaults to '4326' (EPSG code for WGS84).

'epsg:4326'

Returns:

Type Description
tuple

A tuple containing the converted longitude and latitude.

Source code in leafmap/common.py
def convert_coordinates(x, y, source_crs, target_crs="epsg:4326"):
    """Convert coordinates from the source EPSG code to the target EPSG code.

    Args:
        x (float): The x-coordinate of the point.
        y (float): The y-coordinate of the point.
        source_crs (str): The EPSG code of the source coordinate system.
        target_crs (str, optional): The EPSG code of the target coordinate system.
            Defaults to '4326' (EPSG code for WGS84).

    Returns:
        tuple: A tuple containing the converted longitude and latitude.
    """
    import pyproj

    # Create the transformer
    transformer = pyproj.Transformer.from_crs(source_crs, target_crs, always_xy=True)

    # Perform the transformation
    lon, lat = transformer.transform(x, y)

    # Return the converted coordinates
    return lon, lat

convert_lidar(source, destination=None, point_format_id=None, file_version=None, **kwargs)

Converts a Las from one point format to another Automatically upgrades the file version if source file version is not compatible with the new point_format_id

Parameters:

Name Type Description Default
source str | laspy.lasdatas.base.LasBase

The source data to be converted.

required
destination str

The destination file path. Defaults to None.

None
point_format_id int

The new point format id (the default is None, which won't change the source format id).

None
file_version str

The new file version. None by default which means that the file_version may be upgraded for compatibility with the new point_format. The file version will not be downgraded.

None

Returns:

Type Description
aspy.lasdatas.base.LasBase

The converted LasData object.

Source code in leafmap/common.py
def convert_lidar(
    source, destination=None, point_format_id=None, file_version=None, **kwargs
):
    """Converts a Las from one point format to another Automatically upgrades the file version if source file version
        is not compatible with the new point_format_id

    Args:
        source (str | laspy.lasdatas.base.LasBase): The source data to be converted.
        destination (str, optional): The destination file path. Defaults to None.
        point_format_id (int, optional): The new point format id (the default is None, which won't change the source format id).
        file_version (str, optional): The new file version. None by default which means that the file_version may be upgraded
            for compatibility with the new point_format. The file version will not be downgraded.

    Returns:
        aspy.lasdatas.base.LasBase: The converted LasData object.
    """
    try:
        import laspy
    except ImportError:
        print(
            "The laspy package is required for this function. Use `pip install laspy[lazrs,laszip]` to install it."
        )
        return

    if isinstance(source, str):
        source = read_lidar(source)

    las = laspy.convert(
        source, point_format_id=point_format_id, file_version=file_version
    )

    if destination is None:
        return las
    else:
        destination = check_file_path(destination)
        write_lidar(las, destination, **kwargs)
        return destination

coords_to_geojson(coords)

Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.

Parameters:

Name Type Description Default
coords list

A list of bbox coordinates representing [left, bottom, right, top].

required

Returns:

Type Description
dict

A geojson FeatureCollection.

Source code in leafmap/common.py
def coords_to_geojson(coords):
    """Convert a list of bbox coordinates representing [left, bottom, right, top] to geojson FeatureCollection.

    Args:
        coords (list): A list of bbox coordinates representing [left, bottom, right, top].

    Returns:
        dict: A geojson FeatureCollection.
    """

    features = []
    for bbox in coords:
        features.append(bbox_to_geojson(bbox))
    return {"type": "FeatureCollection", "features": features}

coords_to_vector(coords, output=None, crs='EPSG:4326', **kwargs)

Convert a list of coordinates to a GeoDataFrame or a vector file.

Parameters:

Name Type Description Default
coords list

A list of coordinates in the format of [(x1, y1), (x2, y2), ...].

required
output str

The path to the output vector file. Defaults to None.

None
crs str

The CRS of the coordinates. Defaults to "EPSG:4326".

'EPSG:4326'

Returns:

Type Description
gpd.GeoDataFraem

A GeoDataFrame of the coordinates.

Source code in leafmap/common.py
def coords_to_vector(coords, output=None, crs="EPSG:4326", **kwargs):
    """Convert a list of coordinates to a GeoDataFrame or a vector file.

    Args:
        coords (list): A list of coordinates in the format of [(x1, y1), (x2, y2), ...].
        output (str, optional): The path to the output vector file. Defaults to None.
        crs (str, optional): The CRS of the coordinates. Defaults to "EPSG:4326".

    Returns:
        gpd.GeoDataFraem: A GeoDataFrame of the coordinates.
    """
    import geopandas as gpd
    from shapely.geometry import Point

    if not isinstance(coords, list):
        raise TypeError("coords must be a list of coordinates")

    if isinstance(coords[0], int) or isinstance(coords[0], float):
        coords = [(coords[0], coords[1])]

    # convert the points to a GeoDataFrame
    geometry = [Point(xy) for xy in coords]
    gdf = gpd.GeoDataFrame(geometry=geometry, crs="EPSG:4326")
    gdf.to_crs(crs, inplace=True)

    if output is not None:
        gdf.to_file(output, **kwargs)
    else:
        return gdf

coords_to_xy(src_fp, coords, coord_crs='epsg:4326', request_payer='bucket-owner', env_args={}, open_args={}, **kwargs)

Converts a list of coordinates to pixel coordinates, i.e., (col, row) coordinates.

Parameters:

Name Type Description Default
src_fp str

The source raster file path.

required
coords list

A list of coordinates in the format of [[x1, y1], [x2, y2], ...]

required
coord_crs str

The coordinate CRS of the input coordinates. Defaults to "epsg:4326".

'epsg:4326'
request_payer

Specifies who pays for the download from S3. Can be "bucket-owner" or "requester". Defaults to "bucket-owner".

'bucket-owner'
env_args

Additional keyword arguments to pass to rasterio.Env.

{}
open_args

Additional keyword arguments to pass to rasterio.open.

{}
**kwargs

Additional keyword arguments to pass to rasterio.transform.rowcol.

{}

Returns:

Type Description
list

A list of pixel coordinates in the format of [[x1, y1], [x2, y2], ...]

Source code in leafmap/common.py
def coords_to_xy(
    src_fp: str,
    coords: list,
    coord_crs: str = "epsg:4326",
    request_payer="bucket-owner",
    env_args={},
    open_args={},
    **kwargs,
) -> list:
    """Converts a list of coordinates to pixel coordinates, i.e., (col, row) coordinates.

    Args:
        src_fp: The source raster file path.
        coords: A list of coordinates in the format of [[x1, y1], [x2, y2], ...]
        coord_crs: The coordinate CRS of the input coordinates. Defaults to "epsg:4326".
        request_payer: Specifies who pays for the download from S3.
            Can be "bucket-owner" or "requester". Defaults to "bucket-owner".
        env_args: Additional keyword arguments to pass to rasterio.Env.
        open_args: Additional keyword arguments to pass to rasterio.open.
        **kwargs: Additional keyword arguments to pass to rasterio.transform.rowcol.

    Returns:
        A list of pixel coordinates in the format of [[x1, y1], [x2, y2], ...]
    """
    import numpy as np
    import rasterio

    if isinstance(coords, np.ndarray):
        coords = coords.tolist()

    if len(coords) == 4 and all([isinstance(c, (int, float)) for c in coords]):
        coords = [[coords[0], coords[1]], [coords[2], coords[3]]]

    xs, ys = zip(*coords)
    with rasterio.Env(AWS_REQUEST_PAYER=request_payer, **env_args):
        with rasterio.open(src_fp, **open_args) as src:
            width = src.width
            height = src.height
            if coord_crs != src.crs:
                xs, ys = transform_coords(xs, ys, coord_crs, src.crs, **kwargs)
            rows, cols = rasterio.transform.rowcol(src.transform, xs, ys, **kwargs)
        result = [[col, row] for col, row in zip(cols, rows)]

        result = [
            [x, y] for x, y in result if x >= 0 and y >= 0 and x < width and y < height
        ]
        if len(result) == 0:
            print("No valid pixel coordinates found.")
        elif len(result) < len(coords):
            print("Some coordinates are out of the image boundary.")

        return result

create_code_cell(code='', where='below')

Creates a code cell in the IPython Notebook.

Parameters:

Name Type Description Default
code str

Code to fill the new code cell with. Defaults to ''.

''
where str

Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'.

'below'
Source code in leafmap/common.py
def create_code_cell(code="", where="below"):
    """Creates a code cell in the IPython Notebook.

    Args:
        code (str, optional): Code to fill the new code cell with. Defaults to ''.
        where (str, optional): Where to add the new code cell. It can be one of the following: above, below, at_bottom. Defaults to 'below'.
    """

    import base64

    # try:
    #     import pyperclip
    # except ImportError:
    #     install_package("pyperclip")
    #     import pyperclip

    from IPython.display import Javascript, display

    # try:
    #     pyperclip.copy(str(code))
    # except Exception as e:
    #     pass

    encoded_code = (base64.b64encode(str.encode(code))).decode()
    display(
        Javascript(
            """
        var code = IPython.notebook.insert_cell_{0}('code');
        code.set_text(atob("{1}"));
    """.format(
                where, encoded_code
            )
        )
    )

Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578

Parameters:

Name Type Description Default
filename str

The file path to the file to download

required
title str

str. Defaults to "Click here to download: ".

'Click here to download: '

Returns:

Type Description
str

HTML download URL.

Source code in leafmap/common.py
def create_download_link(filename, title="Click here to download: "):
    """Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578

    Args:
        filename (str): The file path to the file to download
        title (str, optional): str. Defaults to "Click here to download: ".

    Returns:
        str: HTML download URL.
    """
    import base64
    from IPython.display import HTML

    data = open(filename, "rb").read()
    b64 = base64.b64encode(data)
    payload = b64.decode()
    basename = os.path.basename(filename)
    html = '<a download="{filename}" href="data:text/csv;base64,{payload}" style="color:#0000FF;" target="_blank">{title}</a>'
    html = html.format(payload=payload, title=title + f" {basename}", filename=basename)
    return HTML(html)

create_legend(title='Legend', labels=None, colors=None, legend_dict=None, builtin_legend=None, opacity=1.0, position='bottomright', draggable=True, output=None, style={})

Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH

Parameters:

Name Type Description Default
title str

Title of the legend. Defaults to 'Legend'. Defaults to "Legend".

'Legend'
colors list

A list of legend colors. Defaults to None.

None
labels list

A list of legend labels. Defaults to None.

None
legend_dict dict

A dictionary containing legend items as keys and color as values. If provided, legend_keys and legend_colors will be ignored. Defaults to None.

None
builtin_legend str

Name of the builtin legend to add to the map. Defaults to None.

None
opacity float

The opacity of the legend. Defaults to 1.0.

1.0
position str

The position of the legend, can be one of the following: "topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright".

'bottomright'
draggable bool

If True, the legend can be dragged to a new position. Defaults to True.

True
output str

The output file path (*.html) to save the legend. Defaults to None.

None
style

Additional keyword arguments to style the legend, such as position, bottom, right, z-index, border, background-color, border-radius, padding, font-size, etc. The default style is: style = { 'position': 'fixed', 'z-index': '9999', 'border': '2px solid grey', 'background-color': 'rgba(255, 255, 255, 0.8)', 'border-radius': '5px', 'padding': '10px', 'font-size': '14px', 'bottom': '20px', 'right': '5px' }

{}

Returns:

Type Description
str

The HTML code of the legend.

Source code in leafmap/common.py
def create_legend(
    title="Legend",
    labels=None,
    colors=None,
    legend_dict=None,
    builtin_legend=None,
    opacity=1.0,
    position="bottomright",
    draggable=True,
    output=None,
    style={},
):
    """Create a legend in HTML format. Reference: https://bit.ly/3oV6vnH

    Args:
        title (str, optional): Title of the legend. Defaults to 'Legend'. Defaults to "Legend".
        colors (list, optional): A list of legend colors. Defaults to None.
        labels (list, optional): A list of legend labels. Defaults to None.
        legend_dict (dict, optional): A dictionary containing legend items as keys and color as values.
            If provided, legend_keys and legend_colors will be ignored. Defaults to None.
        builtin_legend (str, optional): Name of the builtin legend to add to the map. Defaults to None.
        opacity (float, optional): The opacity of the legend. Defaults to 1.0.
        position (str, optional): The position of the legend, can be one of the following:
            "topleft", "topright", "bottomleft", "bottomright". Defaults to "bottomright".
        draggable (bool, optional): If True, the legend can be dragged to a new position. Defaults to True.
        output (str, optional): The output file path (*.html) to save the legend. Defaults to None.
        style: Additional keyword arguments to style the legend, such as position, bottom, right, z-index,
            border, background-color, border-radius, padding, font-size, etc. The default style is:
            style = {
                'position': 'fixed',
                'z-index': '9999',
                'border': '2px solid grey',
                'background-color': 'rgba(255, 255, 255, 0.8)',
                'border-radius': '5px',
                'padding': '10px',
                'font-size': '14px',
                'bottom': '20px',
                'right': '5px'
            }

    Returns:
        str: The HTML code of the legend.
    """

    import pkg_resources
    from .legends import builtin_legends

    pkg_dir = os.path.dirname(pkg_resources.resource_filename("leafmap", "leafmap.py"))
    legend_template = os.path.join(pkg_dir, "data/template/legend_style.html")

    if draggable:
        legend_template = os.path.join(pkg_dir, "data/template/legend.txt")

    if not os.path.exists(legend_template):
        raise FileNotFoundError("The legend template does not exist.")

    if labels is not None:
        if not isinstance(labels, list):
            print("The legend keys must be a list.")
            return
    else:
        labels = ["One", "Two", "Three", "Four", "etc"]

    if colors is not None:
        if not isinstance(colors, list):
            print("The legend colors must be a list.")
            return
        elif all(isinstance(item, tuple) for item in colors):
            try:
                colors = [rgb_to_hex(x) for x in colors]
            except Exception as e:
                print(e)
        elif all((item.startswith("#") and len(item) == 7) for item in colors):
            pass
        elif all((len(item) == 6) for item in colors):
            pass
        else:
            print("The legend colors must be a list of tuples.")
            return
    else:
        colors = [
            "#8DD3C7",
            "#FFFFB3",
            "#BEBADA",
            "#FB8072",
            "#80B1D3",
        ]

    if len(labels) != len(colors):
        print("The legend keys and values must be the same length.")
        return

    allowed_builtin_legends = builtin_legends.keys()
    if builtin_legend is not None:
        if builtin_legend not in allowed_builtin_legends:
            print(
                "The builtin legend must be one of the following: {}".format(
                    ", ".join(allowed_builtin_legends)
                )
            )
            return
        else:
            legend_dict = builtin_legends[builtin_legend]
            labels = list(legend_dict.keys())
            colors = list(legend_dict.values())

    if legend_dict is not None:
        if not isinstance(legend_dict, dict):
            print("The legend dict must be a dictionary.")
            return
        else:
            labels = list(legend_dict.keys())
            colors = list(legend_dict.values())
            if all(isinstance(item, tuple) for item in colors):
                try:
                    colors = [rgb_to_hex(x) for x in colors]
                except Exception as e:
                    print(e)

    allowed_positions = [
        "topleft",
        "topright",
        "bottomleft",
        "bottomright",
    ]
    if position not in allowed_positions:
        raise ValueError(
            "The position must be one of the following: {}".format(
                ", ".join(allowed_positions)
            )
        )

    if position == "bottomright":
        if "bottom" not in style:
            style["bottom"] = "20px"
        if "right" not in style:
            style["right"] = "5px"
        if "left" in style:
            del style["left"]
        if "top" in style:
            del style["top"]
    elif position == "bottomleft":
        if "bottom" not in style:
            style["bottom"] = "5px"
        if "left" not in style:
            style["left"] = "5px"
        if "right" in style:
            del style["right"]
        if "top" in style:
            del style["top"]
    elif position == "topright":
        if "top" not in style:
            style["top"] = "5px"
        if "right" not in style:
            style["right"] = "5px"
        if "left" in style:
            del style["left"]
        if "bottom" in style:
            del style["bottom"]
    elif position == "topleft":
        if "top" not in style:
            style["top"] = "5px"
        if "left" not in style:
            style["left"] = "5px"
        if "right" in style:
            del style["right"]
        if "bottom" in style:
            del style["bottom"]

    if "position" not in style:
        style["position"] = "fixed"
    if "z-index" not in style:
        style["z-index"] = "9999"
    if "background-color" not in style:
        style["background-color"] = "rgba(255, 255, 255, 0.8)"
    if "padding" not in style:
        style["padding"] = "10px"
    if "border-radius" not in style:
        style["border-radius"] = "5px"
    if "font-size" not in style:
        style["font-size"] = "14px"

    content = []

    with open(legend_template) as f:
        lines = f.readlines()

    if draggable:
        for index, line in enumerate(lines):
            if index < 36:
                content.append(line)
            elif index == 36:
                line = lines[index].replace("Legend", title)
                content.append(line)
            elif index < 39:
                content.append(line)
            elif index == 39:
                for i, color in enumerate(colors):
                    item = f"    <li><span style='background:{check_color(color)};opacity:{opacity};'></span>{labels[i]}</li>\n"
                    content.append(item)
            elif index > 41:
                content.append(line)
        content = content[3:-1]

    else:
        for index, line in enumerate(lines):
            if index < 8:
                content.append(line)
            elif index == 8:
                for key, value in style.items():
                    content.append(
                        "              {}: {};\n".format(key.replace("_", "-"), value)
                    )
            elif index < 17:
                pass
            elif index < 19:
                content.append(line)
            elif index == 19:
                content.append(line.replace("Legend", title))
            elif index < 22:
                content.append(line)
            elif index == 22:
                for index, key in enumerate(labels):
                    color = colors[index]
                    if not color.startswith("#"):
                        color = "#" + color
                    item = "                    <li><span style='background:{};opacity:{};'></span>{}</li>\n".format(
                        color, opacity, key
                    )
                    content.append(item)
            elif index < 33:
                pass
            else:
                content.append(line)

    legend_text = "".join(content)

    if output is not None:
        with open(output, "w") as f:
            f.write(legend_text)
    else:
        return legend_text

create_timelapse(images, out_gif, ext='.tif', bands=None, size=None, bbox=None, fps=5, loop=0, add_progress_bar=True, progress_bar_color='blue', progress_bar_height=5, add_text=False, text_xy=None, text_sequence=None, font_type='arial.ttf', font_size=20, font_color='black', mp4=False, quiet=True, reduce_size=False, clean_up=True, **kwargs)

Creates a timelapse gif from a list of images.

Parameters:

Name Type Description Default
images list | str

The list of images or input directory to create the gif from. For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...]

required
out_gif str

File path to the output gif.

required
ext str

The extension of the images. Defaults to '.tif'.

'.tif'
bands list

The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None.

None
size tuple

The size of the gif. For example, (500, 500). Defaults to None, using the original size.

None
bbox list

The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box.

None
fps int

The frames per second of the gif. Defaults to 5.

5
loop int

The number of times to loop the gif. Defaults to 0, looping forever.

0
add_progress_bar bool

Whether to add a progress bar to the gif. Defaults to True.

True
progress_bar_color str

The color of the progress bar, can be color name or hex code. Defaults to 'blue'.

'blue'
progress_bar_height int

The height of the progress bar. Defaults to 5.

5
add_text bool

Whether to add text to the gif. Defaults to False.

False
text_xy tuple

The x, y coordinates of the text. For example, ('10%', '10%'). Defaults to None, using the bottom left corner.

None
text_sequence list

The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...].

None
font_type str

The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'.

'arial.ttf'
font_size int

The font size of the text. Defaults to 20.

20
font_color str

The color of the text, can be color name or hex code. Defaults to 'black'.

'black'
mp4 bool

Whether to convert the gif to mp4. Defaults to False.

False
quiet bool

Whether to print the progress. Defaults to False.

True
reduce_size bool

Whether to reduce the size of the gif using ffmpeg. Defaults to False.

False
clean_up bool

Whether to clean up the temporary files. Defaults to True.

True
Source code in leafmap/common.py
def create_timelapse(
    images: Union[List, str],
    out_gif: str,
    ext: str = ".tif",
    bands: Optional[List] = None,
    size: Optional[Tuple] = None,
    bbox: Optional[List] = None,
    fps: int = 5,
    loop: int = 0,
    add_progress_bar: bool = True,
    progress_bar_color: str = "blue",
    progress_bar_height: int = 5,
    add_text: bool = False,
    text_xy: Optional[Tuple] = None,
    text_sequence: Optional[List] = None,
    font_type: str = "arial.ttf",
    font_size: int = 20,
    font_color: str = "black",
    mp4: bool = False,
    quiet: bool = True,
    reduce_size: bool = False,
    clean_up: bool = True,
    **kwargs,
):
    """Creates a timelapse gif from a list of images.

    Args:
        images (list | str): The list of images or input directory to create the gif from.
            For example, '/path/to/images/*.tif' or ['/path/to/image1.tif', '/path/to/image2.tif', ...]
        out_gif (str): File path to the output gif.
        ext (str, optional): The extension of the images. Defaults to '.tif'.
        bands (list, optional): The bands to use for the gif. For example, [0, 1, 2] for RGB, and [0] for grayscale. Defaults to None.
        size (tuple, optional): The size of the gif. For example, (500, 500). Defaults to None, using the original size.
        bbox (list, optional): The bounding box of the gif. For example, [xmin, ymin, xmax, ymax]. Defaults to None, using the original bounding box.
        fps (int, optional): The frames per second of the gif. Defaults to 5.
        loop (int, optional): The number of times to loop the gif. Defaults to 0, looping forever.
        add_progress_bar (bool, optional): Whether to add a progress bar to the gif. Defaults to True.
        progress_bar_color (str, optional): The color of the progress bar, can be color name or hex code. Defaults to 'blue'.
        progress_bar_height (int, optional): The height of the progress bar. Defaults to 5.
        add_text (bool, optional): Whether to add text to the gif. Defaults to False.
        text_xy (tuple, optional): The x, y coordinates of the text. For example, ('10%', '10%').
            Defaults to None, using the bottom left corner.
        text_sequence (list, optional): The sequence of text to add to the gif. For example, ['year 1', 'year 2', ...].
        font_type (str, optional): The font type of the text, can be 'arial.ttf' or 'alibaba.otf', or any system font. Defaults to 'arial.ttf'.
        font_size (int, optional): The font size of the text. Defaults to 20.
        font_color (str, optional): The color of the text, can be color name or hex code. Defaults to 'black'.
        mp4 (bool, optional): Whether to convert the gif to mp4. Defaults to False.
        quiet (bool, optional): Whether to print the progress. Defaults to False.
        reduce_size (bool, optional): Whether to reduce the size of the gif using ffmpeg. Defaults to False.
        clean_up (bool, optional): Whether to clean up the temporary files. Defaults to True.

    """

    import glob
    import tempfile

    if isinstance(images, str):
        if not images.endswith(ext):
            images = os.path.join(images, f"*{ext}")
        images = list(glob.glob(images))

    if not isinstance(images, list):
        raise ValueError("images must be a list or a path to the image directory.")

    images.sort()

    temp_dir = os.path.join(tempfile.gettempdir(), "timelapse")
    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir)

    if bbox is not None:
        clip_dir = os.path.join(tempfile.gettempdir(), "clip")
        if not os.path.exists(clip_dir):
            os.makedirs(clip_dir)

        if len(bbox) == 4:
            bbox = bbox_to_geojson(bbox)

    else:
        clip_dir = None

    output = widgets.Output()

    if "out_ext" in kwargs:
        out_ext = kwargs["out_ext"].lower()
    else:
        out_ext = ".jpg"

    try:
        for index, image in enumerate(images):
            if bbox is not None:
                clip_file = os.path.join(clip_dir, os.path.basename(image))
                with output:
                    clip_image(image, mask=bbox, output=clip_file, to_cog=False)
                image = clip_file

            if "add_prefix" in kwargs:
                basename = (
                    str(f"{index + 1}").zfill(len(str(len(images))))
                    + "-"
                    + os.path.basename(image).replace(ext, out_ext)
                )
            else:
                basename = os.path.basename(image).replace(ext, out_ext)
            if not quiet:
                print(f"Processing {index+1}/{len(images)}: {basename} ...")

            # ignore GDAL warnings
            with output:
                numpy_to_image(
                    image, os.path.join(temp_dir, basename), bands=bands, size=size
                )
        make_gif(
            temp_dir,
            out_gif,
            ext=out_ext,
            fps=fps,
            loop=loop,
            mp4=mp4,
            clean_up=clean_up,
        )

        if clip_dir is not None:
            shutil.rmtree(clip_dir)

        if add_text:
            add_text_to_gif(
                out_gif,
                out_gif,
                text_xy,
                text_sequence,
                font_type,
                font_size,
                font_color,
                add_progress_bar,
                progress_bar_color,
                progress_bar_height,
                1000 / fps,
                loop,
            )
        elif add_progress_bar:
            add_progress_bar_to_gif(
                out_gif,
                out_gif,
                progress_bar_color,
                progress_bar_height,
                1000 / fps,
                loop,
            )

        if reduce_size:
            reduce_gif_size(out_gif)
    except Exception as e:
        print(e)

csv_points_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude')

Converts a csv file containing points (latitude, longitude) into a shapefile.

Parameters:

Name Type Description Default
in_csv str

File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv

required
out_shp str

File path to the output shapefile.

required
latitude str

Column name for the latitude column. Defaults to 'latitude'.

'latitude'
longitude str

Column name for the longitude column. Defaults to 'longitude'.

'longitude'
Source code in leafmap/common.py
def csv_points_to_shp(in_csv, out_shp, latitude="latitude", longitude="longitude"):
    """Converts a csv file containing points (latitude, longitude) into a shapefile.

    Args:
        in_csv (str): File path or HTTP URL to the input csv file. For example, https://raw.githubusercontent.com/opengeos/data/main/world/world_cities.csv
        out_shp (str): File path to the output shapefile.
        latitude (str, optional): Column name for the latitude column. Defaults to 'latitude'.
        longitude (str, optional): Column name for the longitude column. Defaults to 'longitude'.

    """

    if in_csv.startswith("http") and in_csv.endswith(".csv"):
        out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
        out_name = os.path.basename(in_csv)

        if not os.path.exists(out_dir):
            os.makedirs(out_dir)
        download_from_url(in_csv, out_dir=out_dir)
        in_csv = os.path.join(out_dir, out_name)

    wbt = whitebox.WhiteboxTools()
    in_csv = os.path.abspath(in_csv)
    out_shp = os.path.abspath(out_shp)

    if not os.path.exists(in_csv):
        raise Exception("The provided csv file does not exist.")

    with open(in_csv, encoding="utf-8") as csv_file:
        reader = csv.DictReader(csv_file)
        fields = reader.fieldnames
        xfield = fields.index(longitude)
        yfield = fields.index(latitude)

    wbt.csv_points_to_vector(in_csv, out_shp, xfield=xfield, yfield=yfield, epsg=4326)

csv_to_df(in_csv, **kwargs)

Converts a CSV file to pandas dataframe.

Parameters:

Name Type Description Default
in_csv str

File path to the input CSV.

required

Returns:

Type Description
pd.DataFrame

pandas DataFrame

Source code in leafmap/common.py
def csv_to_df(in_csv, **kwargs):
    """Converts a CSV file to pandas dataframe.

    Args:
        in_csv (str): File path to the input CSV.

    Returns:
        pd.DataFrame: pandas DataFrame
    """
    import pandas as pd

    try:
        return pd.read_csv(in_csv, **kwargs)
    except Exception as e:
        raise Exception(e)

csv_to_gdf(in_csv, latitude='latitude', longitude='longitude', encoding='utf-8')

Creates points for a CSV file and converts them to a GeoDataFrame.

Parameters:

Name Type Description Default
in_csv str

The file path to the input CSV file.

required
latitude str

The name of the column containing latitude coordinates. Defaults to "latitude".

'latitude'
longitude str

The name of the column containing longitude coordinates. Defaults to "longitude".

'longitude'
encoding str

The encoding of characters. Defaults to "utf-8".

'utf-8'

Returns:

Type Description
object

GeoDataFrame.

Source code in leafmap/common.py
def csv_to_gdf(in_csv, latitude="latitude", longitude="longitude", encoding="utf-8"):
    """Creates points for a CSV file and converts them to a GeoDataFrame.

    Args:
        in_csv (str): The file path to the input CSV file.
        latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
        longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
        encoding (str, optional): The encoding of characters. Defaults to "utf-8".

    Returns:
        object: GeoDataFrame.
    """

    check_package(name="geopandas", URL="https://geopandas.org")

    import geopandas as gpd

    out_dir = os.getcwd()

    out_geojson = os.path.join(out_dir, random_string() + ".geojson")
    csv_to_geojson(in_csv, out_geojson, latitude, longitude, encoding)

    gdf = gpd.read_file(out_geojson)
    os.remove(out_geojson)
    return gdf

csv_to_geojson(in_csv, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')

Creates points for a CSV file and exports data as a GeoJSON.

Parameters:

Name Type Description Default
in_csv str

The file path to the input CSV file.

required
out_geojson str

The file path to the exported GeoJSON. Default to None.

None
latitude str

The name of the column containing latitude coordinates. Defaults to "latitude".

'latitude'
longitude str

The name of the column containing longitude coordinates. Defaults to "longitude".

'longitude'
encoding str

The encoding of characters. Defaults to "utf-8".

'utf-8'
Source code in leafmap/common.py
def csv_to_geojson(
    in_csv,
    out_geojson=None,
    latitude="latitude",
    longitude="longitude",
    encoding="utf-8",
):
    """Creates points for a CSV file and exports data as a GeoJSON.

    Args:
        in_csv (str): The file path to the input CSV file.
        out_geojson (str): The file path to the exported GeoJSON. Default to None.
        latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
        longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
        encoding (str, optional): The encoding of characters. Defaults to "utf-8".

    """

    import pandas as pd

    in_csv = github_raw_url(in_csv)

    if out_geojson is not None:
        out_geojson = check_file_path(out_geojson)

    df = pd.read_csv(in_csv)
    geojson = df_to_geojson(
        df, latitude=latitude, longitude=longitude, encoding=encoding
    )

    if out_geojson is None:
        return geojson
    else:
        with open(out_geojson, "w", encoding=encoding) as f:
            f.write(json.dumps(geojson))

csv_to_shp(in_csv, out_shp, latitude='latitude', longitude='longitude', encoding='utf-8')

Converts a csv file with latlon info to a point shapefile.

Parameters:

Name Type Description Default
in_csv str

The input csv file containing longitude and latitude columns.

required
out_shp str

The file path to the output shapefile.

required
latitude str

The column name of the latitude column. Defaults to 'latitude'.

'latitude'
longitude str

The column name of the longitude column. Defaults to 'longitude'.

'longitude'
Source code in leafmap/common.py
def csv_to_shp(
    in_csv, out_shp, latitude="latitude", longitude="longitude", encoding="utf-8"
):
    """Converts a csv file with latlon info to a point shapefile.

    Args:
        in_csv (str): The input csv file containing longitude and latitude columns.
        out_shp (str): The file path to the output shapefile.
        latitude (str, optional): The column name of the latitude column. Defaults to 'latitude'.
        longitude (str, optional): The column name of the longitude column. Defaults to 'longitude'.
    """
    import shapefile as shp

    if in_csv.startswith("http") and in_csv.endswith(".csv"):
        in_csv = github_raw_url(in_csv)
        in_csv = download_file(in_csv, quiet=True, overwrite=True)

    try:
        points = shp.Writer(out_shp, shapeType=shp.POINT)
        with open(in_csv, encoding=encoding) as csvfile:
            csvreader = csv.DictReader(csvfile)
            header = csvreader.fieldnames
            [points.field(field) for field in header]
            for row in csvreader:
                points.point((float(row[longitude])), (float(row[latitude])))
                points.record(*tuple([row[f] for f in header]))

        out_prj = out_shp.replace(".shp", ".prj")
        with open(out_prj, "w") as f:
            prj_str = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137,298.257223563]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]] '
            f.write(prj_str)

    except Exception as e:
        raise Exception(e)

csv_to_vector(in_csv, output, latitude='latitude', longitude='longitude', encoding='utf-8', **kwargs)

Creates points for a CSV file and converts them to a vector dataset.

Parameters:

Name Type Description Default
in_csv str

The file path to the input CSV file.

required
output str

The file path to the output vector dataset.

required
latitude str

The name of the column containing latitude coordinates. Defaults to "latitude".

'latitude'
longitude str

The name of the column containing longitude coordinates. Defaults to "longitude".

'longitude'
encoding str

The encoding of characters. Defaults to "utf-8".

'utf-8'
Source code in leafmap/common.py
def csv_to_vector(
    in_csv,
    output,
    latitude="latitude",
    longitude="longitude",
    encoding="utf-8",
    **kwargs,
):
    """Creates points for a CSV file and converts them to a vector dataset.

    Args:
        in_csv (str): The file path to the input CSV file.
        output (str): The file path to the output vector dataset.
        latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
        longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
        encoding (str, optional): The encoding of characters. Defaults to "utf-8".

    """
    gdf = csv_to_gdf(in_csv, latitude, longitude, encoding)
    gdf.to_file(output, **kwargs)

delete_shp(in_shp, verbose=False)

Deletes a shapefile.

Parameters:

Name Type Description Default
in_shp str

The input shapefile to delete.

required
verbose bool

Whether to print out descriptive text. Defaults to True.

False
Source code in leafmap/common.py
def delete_shp(in_shp, verbose=False):
    """Deletes a shapefile.

    Args:
        in_shp (str): The input shapefile to delete.
        verbose (bool, optional): Whether to print out descriptive text. Defaults to True.
    """
    from pathlib import Path

    in_shp = os.path.abspath(in_shp)
    in_dir = os.path.dirname(in_shp)
    basename = os.path.basename(in_shp).replace(".shp", "")

    files = Path(in_dir).rglob(basename + ".*")

    for file in files:
        filepath = os.path.join(in_dir, str(file))
        os.remove(filepath)
        if verbose:
            print(f"Deleted {filepath}")

df_to_gdf(df, geometry='geometry', src_crs='EPSG:4326', dst_crs=None, **kwargs)

Converts a pandas DataFrame to a GeoPandas GeoDataFrame.

Parameters:

Name Type Description Default
df pandas.DataFrame

The pandas DataFrame to convert.

required
geometry str

The name of the geometry column in the DataFrame.

'geometry'
src_crs str

The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326".

'EPSG:4326'
dst_crs str

The target CRS of the GeoDataFrame. Default is None

None

Returns:

Type Description
geopandas.GeoDataFrame

The converted GeoPandas GeoDataFrame.

Source code in leafmap/common.py
def df_to_gdf(df, geometry="geometry", src_crs="EPSG:4326", dst_crs=None, **kwargs):
    """
    Converts a pandas DataFrame to a GeoPandas GeoDataFrame.

    Args:
        df (pandas.DataFrame): The pandas DataFrame to convert.
        geometry (str): The name of the geometry column in the DataFrame.
        src_crs (str): The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326".
        dst_crs (str): The target CRS of the GeoDataFrame. Default is None

    Returns:
        geopandas.GeoDataFrame: The converted GeoPandas GeoDataFrame.
    """
    import geopandas as gpd
    from shapely import wkt

    # Convert the geometry column to Shapely geometry objects
    df[geometry] = df[geometry].apply(lambda x: wkt.loads(x))

    # Convert the pandas DataFrame to a GeoPandas GeoDataFrame
    gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=src_crs, **kwargs)
    if dst_crs is not None and dst_crs != src_crs:
        gdf = gdf.to_crs(dst_crs)

    return gdf

df_to_geojson(df, out_geojson=None, latitude='latitude', longitude='longitude', encoding='utf-8')

Creates points for a Pandas DataFrame and exports data as a GeoJSON.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input Pandas DataFrame.

required
out_geojson str

The file path to the exported GeoJSON. Default to None.

None
latitude str

The name of the column containing latitude coordinates. Defaults to "latitude".

'latitude'
longitude str

The name of the column containing longitude coordinates. Defaults to "longitude".

'longitude'
encoding str

The encoding of characters. Defaults to "utf-8".

'utf-8'
Source code in leafmap/common.py
def df_to_geojson(
    df,
    out_geojson=None,
    latitude="latitude",
    longitude="longitude",
    encoding="utf-8",
):
    """Creates points for a Pandas DataFrame and exports data as a GeoJSON.

    Args:
        df (pandas.DataFrame): The input Pandas DataFrame.
        out_geojson (str): The file path to the exported GeoJSON. Default to None.
        latitude (str, optional): The name of the column containing latitude coordinates. Defaults to "latitude".
        longitude (str, optional): The name of the column containing longitude coordinates. Defaults to "longitude".
        encoding (str, optional): The encoding of characters. Defaults to "utf-8".

    """

    import json
    from geojson import Feature, FeatureCollection, Point

    if out_geojson is not None:
        out_dir = os.path.dirname(os.path.abspath(out_geojson))
        if not os.path.exists(out_dir):
            os.makedirs(out_dir)

    features = df.apply(
        lambda row: Feature(
            geometry=Point((float(row[longitude]), float(row[latitude]))),
            properties=dict(row),
        ),
        axis=1,
    ).tolist()

    geojson = FeatureCollection(features=features)

    if out_geojson is None:
        return geojson
    else:
        with open(out_geojson, "w", encoding=encoding) as f:
            f.write(json.dumps(geojson))

dict_to_json(data, file_path, indent=4)

Writes a dictionary to a JSON file.

Parameters:

Name Type Description Default
data dict

A dictionary.

required
file_path str

The path to the JSON file.

required
indent int

The indentation of the JSON file. Defaults to 4.

4

Exceptions:

Type Description
TypeError

If the input data is not a dictionary.

Source code in leafmap/common.py
def dict_to_json(data, file_path, indent=4):
    """Writes a dictionary to a JSON file.

    Args:
        data (dict): A dictionary.
        file_path (str): The path to the JSON file.
        indent (int, optional): The indentation of the JSON file. Defaults to 4.

    Raises:
        TypeError: If the input data is not a dictionary.
    """
    import json

    file_path = check_file_path(file_path)

    if isinstance(data, dict):
        with open(file_path, "w") as f:
            json.dump(data, f, indent=indent)
    else:
        raise TypeError("The provided data must be a dictionary.")

disjoint(input_features, selecting_features, output=None, **kwargs)

Find the features in the input_features that do not intersect the selecting_features.

Parameters:

Name Type Description Default
input_features str | GeoDataFrame

The input features to select from. Can be a file path or a GeoDataFrame.

required
selecting_features str | GeoDataFrame

The features in the Input Features parameter will be selected based on their relationship to the features from this layer.

required
output are

The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None.

None

Returns:

Type Description
str | GeoDataFrame

The path to the output file or the GeoDataFrame.

Source code in leafmap/common.py
def disjoint(input_features, selecting_features, output=None, **kwargs):
    """Find the features in the input_features that do not intersect the selecting_features.

    Args:
        input_features (str | GeoDataFrame): The input features to select from. Can be a file path or a GeoDataFrame.
        selecting_features (str | GeoDataFrame): The features in the Input Features parameter will be selected based
            on their relationship to the features from this layer.
        output (are, optional): The output path to save the GeoDataFrame in a vector format (e.g., shapefile). Defaults to None.

    Returns:
        str | GeoDataFrame: The path to the output file or the GeoDataFrame.
    """
    import geopandas as gpd

    if isinstance(input_features, str):
        input_features = gpd.read_file(input_features, **kwargs)
    elif not isinstance(input_features, gpd.GeoDataFrame):
        raise TypeError("input_features must be a file path or a GeoDataFrame")

    if isinstance(selecting_features, str):
        selecting_features = gpd.read_file(selecting_features, **kwargs)
    elif not isinstance(selecting_features, gpd.GeoDataFrame):
        raise TypeError("selecting_features must be a file path or a GeoDataFrame")

    selecting_features = selecting_features.to_crs(input_features.crs)

    input_features["savedindex"] = input_features.index
    intersecting = selecting_features.sjoin(input_features, how="inner")["savedindex"]
    results = input_features[~input_features.savedindex.isin(intersecting)].drop(
        columns=["savedindex"], axis=1
    )

    if output is not None:
        results.to_file(output, **kwargs)
    else:
        return results

display_html(filename, width='100%', height='600px', **kwargs)

Show an HTML file in a Jupyter notebook.

Parameters:

Name Type Description Default
filename str

The path to the HTML file.

required
width str

The width of the HTML file. Defaults to "100%".

'100%'
height str

The height of the HTML file. Defaults to "600px".

'600px'

Returns:

Type Description
IFrame

An IFrame object.

Source code in leafmap/common.py
def display_html(filename, width="100%", height="600px", **kwargs):
    """Show an HTML file in a Jupyter notebook.

    Args:
        filename (str): The path to the HTML file.
        width (str, optional): The width of the HTML file. Defaults to "100%".
        height (str, optional): The height of the HTML file. Defaults to "600px".

    Returns:
        IFrame: An IFrame object.
    """

    from IPython.display import IFrame

    if not os.path.exists(filename):
        raise Exception(f"File {filename} does not exist")

    return IFrame(filename, width=width, height=height, **kwargs)

download_file(url=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)

Download a file from URL, including Google Drive shared URL.

Parameters:

Name Type Description Default
url str

Google Drive URL is also supported. Defaults to None.

None
output str

Output filename. Default is basename of URL.

None
quiet bool

Suppress terminal output. Default is False.

False
proxy str

Proxy. Defaults to None.

None
speed float

Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.

None
use_cookies bool

Flag to use cookies. Defaults to True.

True
verify bool | str

Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.

True
id str

Google Drive's file ID. Defaults to None.

None
fuzzy bool

Fuzzy extraction of Google Drive's file Id. Defaults to False.

False
resume bool

Resume the download from existing tmp file if possible. Defaults to False.

False
unzip bool

Unzip the file. Defaults to True.

True
overwrite bool

Overwrite the file if it already exists. Defaults to False.

False
subfolder bool

Create a subfolder with the same name as the file. Defaults to False.

False

Returns:

Type Description
str

The output file path.

Source code in leafmap/common.py
def download_file(
    url=None,
    output=None,
    quiet=False,
    proxy=None,
    speed=None,
    use_cookies=True,
    verify=True,
    id=None,
    fuzzy=False,
    resume=False,
    unzip=True,
    overwrite=False,
    subfolder=False,
):
    """Download a file from URL, including Google Drive shared URL.

    Args:
        url (str, optional): Google Drive URL is also supported. Defaults to None.
        output (str, optional): Output filename. Default is basename of URL.
        quiet (bool, optional): Suppress terminal output. Default is False.
        proxy (str, optional): Proxy. Defaults to None.
        speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
        use_cookies (bool, optional): Flag to use cookies. Defaults to True.
        verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string,
            in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
        id (str, optional): Google Drive's file ID. Defaults to None.
        fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
        resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
        unzip (bool, optional): Unzip the file. Defaults to True.
        overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
        subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.

    Returns:
        str: The output file path.
    """
    try:
        import gdown
    except ImportError:
        print(
            "The gdown package is required for this function. Use `pip install gdown` to install it."
        )
        return

    if output is None:
        if isinstance(url, str) and url.startswith("http"):
            output = os.path.basename(url)

    out_dir = os.path.abspath(os.path.dirname(output))
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    if isinstance(url, str):
        if os.path.exists(os.path.abspath(output)) and (not overwrite):
            print(
                f"{output} already exists. Skip downloading. Set overwrite=True to overwrite."
            )
            return os.path.abspath(output)
        else:
            url = github_raw_url(url)

    if "https://drive.google.com/file/d/" in url:
        fuzzy = True

    output = gdown.download(
        url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume
    )

    if unzip:
        if output.endswith(".zip"):
            with zipfile.ZipFile(output, "r") as zip_ref:
                if not quiet:
                    print("Extracting files...")
                if subfolder:
                    basename = os.path.splitext(os.path.basename(output))[0]

                    output = os.path.join(out_dir, basename)
                    if not os.path.exists(output):
                        os.makedirs(output)
                    zip_ref.extractall(output)
                else:
                    zip_ref.extractall(os.path.dirname(output))
        elif output.endswith(".tar.gz") or output.endswith(".tar"):
            if output.endswith(".tar.gz"):
                mode = "r:gz"
            else:
                mode = "r"

            with tarfile.open(output, mode) as tar_ref:
                if not quiet:
                    print("Extracting files...")
                if subfolder:
                    basename = os.path.splitext(os.path.basename(output))[0]
                    output = os.path.join(out_dir, basename)
                    if not os.path.exists(output):
                        os.makedirs(output)
                    tar_ref.extractall(output)
                else:
                    tar_ref.extractall(os.path.dirname(output))

    return os.path.abspath(output)

download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs) async

Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).

Parameters:

Name Type Description Default
url str

The URL of the file.

required
output str

The local path to save the file. Defaults to None.

None
binary bool

Whether the file is binary. Defaults to False.

False
overwrite bool

Whether to overwrite the file if it exists. Defaults to False.

False
Source code in leafmap/common.py
async def download_file_lite(url, output=None, binary=False, overwrite=False, **kwargs):
    """Download a file using Pyodide. This function is only available on JupyterLite. Call the function with await, such as await download_file_lite(url).

    Args:
        url (str): The URL of the file.
        output (str, optional): The local path to save the file. Defaults to None.
        binary (bool, optional): Whether the file is binary. Defaults to False.
        overwrite (bool, optional): Whether to overwrite the file if it exists. Defaults to False.
    """
    import sys
    import pyodide

    if "pyodide" not in sys.modules:
        raise ValueError("Pyodide is not available.")

    if output is None:
        output = os.path.basename(url)

    output = os.path.abspath(output)

    ext = os.path.splitext(output)[1]

    if ext in [".png", "jpg", ".tif", ".tiff", "zip", "gz", "bz2", "xz"]:
        binary = True

    if os.path.exists(output) and not overwrite:
        print(f"{output} already exists, skip downloading.")
        return output

    if binary:
        response = await pyodide.http.pyfetch(url)
        with open(output, "wb") as f:
            f.write(await response.bytes())

    else:
        obj = pyodide.http.open_url(url)
        with open(output, "w") as fd:
            shutil.copyfileobj(obj, fd)

    return output

download_files(urls, out_dir=None, filenames=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False, multi_part=False)

Download files from URLs, including Google Drive shared URL.

Parameters:

Name Type Description Default
urls list

The list of urls to download. Google Drive URL is also supported.

required
out_dir str

The output directory. Defaults to None.

None
filenames list

Output filename. Default is basename of URL.

None
quiet bool

Suppress terminal output. Default is False.

False
proxy str

Proxy. Defaults to None.

None
speed float

Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.

None
use_cookies bool

Flag to use cookies. Defaults to True.

True
verify bool | str

Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.

True
id str

Google Drive's file ID. Defaults to None.

None
fuzzy bool

Fuzzy extraction of Google Drive's file Id. Defaults to False.

False
resume bool

Resume the download from existing tmp file if possible. Defaults to False.

False
unzip bool

Unzip the file. Defaults to True.

True
overwrite bool

Overwrite the file if it already exists. Defaults to False.

False
subfolder bool

Create a subfolder with the same name as the file. Defaults to False.

False
multi_part bool

If the file is a multi-part file. Defaults to False.

False

Examples:

files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"] base_url = "https://github.com/opengeos/datasets/releases/download/models/" urls = [base_url + f for f in files] leafmap.download_files(urls, out_dir="models", multi_part=True)

Source code in leafmap/common.py
def download_files(
    urls,
    out_dir=None,
    filenames=None,
    quiet=False,
    proxy=None,
    speed=None,
    use_cookies=True,
    verify=True,
    id=None,
    fuzzy=False,
    resume=False,
    unzip=True,
    overwrite=False,
    subfolder=False,
    multi_part=False,
):
    """Download files from URLs, including Google Drive shared URL.

    Args:
        urls (list): The list of urls to download. Google Drive URL is also supported.
        out_dir (str, optional): The output directory. Defaults to None.
        filenames (list, optional): Output filename. Default is basename of URL.
        quiet (bool, optional): Suppress terminal output. Default is False.
        proxy (str, optional): Proxy. Defaults to None.
        speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
        use_cookies (bool, optional): Flag to use cookies. Defaults to True.
        verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
        id (str, optional): Google Drive's file ID. Defaults to None.
        fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
        resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
        unzip (bool, optional): Unzip the file. Defaults to True.
        overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
        subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
        multi_part (bool, optional): If the file is a multi-part file. Defaults to False.

    Examples:

        files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"]
        base_url = "https://github.com/opengeos/datasets/releases/download/models/"
        urls = [base_url + f for f in files]
        leafmap.download_files(urls, out_dir="models", multi_part=True)
    """

    if out_dir is None:
        out_dir = os.getcwd()

    if filenames is None:
        filenames = [None] * len(urls)

    filepaths = []
    for url, output in zip(urls, filenames):
        if output is None:
            filename = os.path.join(out_dir, os.path.basename(url))
        else:
            filename = os.path.join(out_dir, output)

        filepaths.append(filename)
        if multi_part:
            unzip = False

        download_file(
            url,
            filename,
            quiet,
            proxy,
            speed,
            use_cookies,
            verify,
            id,
            fuzzy,
            resume,
            unzip,
            overwrite,
            subfolder,
        )

    if multi_part:
        archive = os.path.splitext(filename)[0] + ".zip"
        out_dir = os.path.dirname(filename)
        extract_archive(archive, out_dir)

        for file in filepaths:
            os.remove(file)

download_folder(url=None, id=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, remaining_ok=False)

Downloads the entire folder from URL.

Parameters:

Name Type Description Default
url str

URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None.

None
id str

Google Drive's folder ID. Defaults to None.

None
output str

String containing the path of the output folder. Defaults to current working directory.

None
quiet bool

Suppress terminal output. Defaults to False.

False
proxy str

Proxy. Defaults to None.

None
speed float

Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.

None
use_cookies bool

Flag to use cookies. Defaults to True.

True
resume bool

Resume the download from existing tmp file if possible. Defaults to False.

required

Returns:

Type Description
list

List of files downloaded, or None if failed.

Source code in leafmap/common.py
def download_folder(
    url=None,
    id=None,
    output=None,
    quiet=False,
    proxy=None,
    speed=None,
    use_cookies=True,
    remaining_ok=False,
):
    """Downloads the entire folder from URL.

    Args:
        url (str, optional): URL of the Google Drive folder. Must be of the format 'https://drive.google.com/drive/folders/{url}'. Defaults to None.
        id (str, optional): Google Drive's folder ID. Defaults to None.
        output (str, optional):  String containing the path of the output folder. Defaults to current working directory.
        quiet (bool, optional): Suppress terminal output. Defaults to False.
        proxy (str, optional): Proxy. Defaults to None.
        speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
        use_cookies (bool, optional): Flag to use cookies. Defaults to True.
        resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.

    Returns:
        list: List of files downloaded, or None if failed.
    """

    try:
        import gdown
    except ImportError:
        print(
            "The gdown package is required for this function. Use `pip install gdown` to install it."
        )
        return

    files = gdown.download_folder(
        url, id, output, quiet, proxy, speed, use_cookies, remaining_ok
    )
    return files

download_from_gdrive(gfile_url, file_name, out_dir='.', unzip=True, verbose=True)

Download a file shared via Google Drive (e.g., https://drive.google.com/file/d/18SUo_HcDGltuWYZs1s7PpOmOq_FvFn04/view?usp=sharing)

Parameters:

Name Type Description Default
gfile_url str

The Google Drive shared file URL

required
file_name str

The output file name to use.

required
out_dir str

The output directory. Defaults to '.'.

'.'
unzip bool

Whether to unzip the output file if it is a zip file. Defaults to True.

True
verbose bool

Whether to display or not the output of the function

True
Source code in leafmap/common.py
def download_from_gdrive(gfile_url, file_name, out_dir=".", unzip=True, verbose=True):
    """Download a file shared via Google Drive
       (e.g., https://drive.google.com/file/d/18SUo_HcDGltuWYZs1s7PpOmOq_FvFn04/view?usp=sharing)

    Args:
        gfile_url (str): The Google Drive shared file URL
        file_name (str): The output file name to use.
        out_dir (str, optional): The output directory. Defaults to '.'.
        unzip (bool, optional): Whether to unzip the output file if it is a zip file. Defaults to True.
        verbose (bool, optional): Whether to display or not the output of the function
    """
    try:
        from google_drive_downloader import GoogleDriveDownloader as gdd
    except ImportError:
        raise ImportError(
            'Please install googledrivedownloader using "pip install googledrivedownloader"'
        )

    file_id = gfile_url.split("/")[5]
    if verbose:
        print("Google Drive file id: {}".format(file_id))

    out_dir = check_dir(out_dir)
    dest_path = os.path.join(out_dir, file_name)
    gdd.download_file_from_google_drive(file_id, dest_path, True, unzip)

download_from_url(url, out_file_name=None, out_dir='.', unzip=True, verbose=True)

Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)

Parameters:

Name Type Description Default
url str

The HTTP URL to download.

required
out_file_name str

The output file name to use. Defaults to None.

None
out_dir str

The output directory to use. Defaults to '.'.

'.'
unzip bool

Whether to unzip the downloaded file if it is a zip file. Defaults to True.

True
verbose bool

Whether to display or not the output of the function

True
Source code in leafmap/common.py
def download_from_url(
    url: str,
    out_file_name: Optional[str] = None,
    out_dir: Optional[str] = ".",
    unzip: Optional[bool] = True,
    verbose: Optional[bool] = True,
):
    """Download a file from a URL (e.g., https://github.com/opengeos/whitebox-python/raw/master/examples/testdata.zip)

    Args:
        url (str): The HTTP URL to download.
        out_file_name (str, optional): The output file name to use. Defaults to None.
        out_dir (str, optional): The output directory to use. Defaults to '.'.
        unzip (bool, optional): Whether to unzip the downloaded file if it is a zip file. Defaults to True.
        verbose (bool, optional): Whether to display or not the output of the function
    """
    in_file_name = os.path.basename(url)
    out_dir = check_dir(out_dir)

    if out_file_name is None:
        out_file_name = in_file_name
    out_file_path = os.path.join(out_dir, out_file_name)

    if verbose:
        print("Downloading {} ...".format(url))

    try:
        urllib.request.urlretrieve(url, out_file_path)
    except Exception:
        raise Exception("The URL is invalid. Please double check the URL.")

    final_path = out_file_path

    if unzip:
        # if it is a zip file
        if ".zip" in out_file_name:
            if verbose:
                print("Unzipping {} ...".format(out_file_name))
            with zipfile.ZipFile(out_file_path, "r") as zip_ref:
                zip_ref.extractall(out_dir)
            final_path = os.path.join(
                os.path.abspath(out_dir), out_file_name.replace(".zip", "")
            )

        # if it is a tar file
        if ".tar" in out_file_name:
            if verbose:
                print("Unzipping {} ...".format(out_file_name))
            with tarfile.open(out_file_path, "r") as tar_ref:
                with tarfile.open(out_file_path, "r") as tar_ref:

                    def is_within_directory(directory, target):
                        abs_directory = os.path.abspath(directory)
                        abs_target = os.path.abspath(target)

                        prefix = os.path.commonprefix([abs_directory, abs_target])

                        return prefix == abs_directory

                    def safe_extract(
                        tar, path=".", members=None, *, numeric_owner=False
                    ):
                        for member in tar.getmembers():
                            member_path = os.path.join(path, member.name)
                            if not is_within_directory(path, member_path):
                                raise Exception("Attempted Path Traversal in Tar File")

                        tar.extractall(path, members, numeric_owner=numeric_owner)

                    safe_extract(tar_ref, out_dir)

            final_path = os.path.join(
                os.path.abspath(out_dir), out_file_name.replace(".tart", "")
            )

    if verbose:
        print("Data downloaded to: {}".format(final_path))

download_google_buildings(location, out_dir=None, merge_output=None, head=None, keep_geojson=False, overwrite=False, quiet=False, **kwargs)

Download Google Open Building dataset for a specific location. Check the dataset links from https://sites.research.google/open-buildings.

Parameters:

Name Type Description Default
location str

The location name for which to download the dataset.

required
out_dir Optional[str]

The output directory to save the downloaded files. If not provided, the current working directory is used.

None
merge_output Optional[str]

Optional. The output file path for merging the downloaded files into a single GeoDataFrame.

None
head Optional[int]

Optional. The number of files to download. If not provided, all files will be downloaded.

None
keep_geojson bool

Optional. If True, the GeoJSON files will be kept after converting them to CSV files.

False
overwrite bool

Optional. If True, overwrite the existing files.

False
quiet bool

Optional. If True, suppresses the download progress messages.

False
**kwargs

Additional keyword arguments to be passed to the gpd.to_file function.

{}

Returns:

Type Description
List[str]

A list of file paths of the downloaded files.

Source code in leafmap/common.py
def download_google_buildings(
    location: str,
    out_dir: Optional[str] = None,
    merge_output: Optional[str] = None,
    head: Optional[int] = None,
    keep_geojson: bool = False,
    overwrite: bool = False,
    quiet: bool = False,
    **kwargs,
) -> List[str]:
    """
    Download Google Open Building dataset for a specific location. Check the dataset links from
        https://sites.research.google/open-buildings.

    Args:
        location: The location name for which to download the dataset.
        out_dir: The output directory to save the downloaded files. If not provided, the current working directory is used.
        merge_output: Optional. The output file path for merging the downloaded files into a single GeoDataFrame.
        head: Optional. The number of files to download. If not provided, all files will be downloaded.
        keep_geojson: Optional. If True, the GeoJSON files will be kept after converting them to CSV files.
        overwrite: Optional. If True, overwrite the existing files.
        quiet: Optional. If True, suppresses the download progress messages.
        **kwargs: Additional keyword arguments to be passed to the `gpd.to_file` function.

    Returns:
        A list of file paths of the downloaded files.

    """

    import pandas as pd
    import geopandas as gpd
    from shapely import wkt

    building_url = "https://sites.research.google/open-buildings/tiles.geojson"
    country_url = (
        "https://naciscdn.org/naturalearth/110m/cultural/ne_110m_admin_0_countries.zip"
    )

    if out_dir is None:
        out_dir = os.getcwd()

    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    building_gdf = gpd.read_file(building_url)
    country_gdf = gpd.read_file(country_url)

    country = country_gdf[country_gdf["NAME"] == location]

    if len(country) == 0:
        country = country_gdf[country_gdf["NAME_LONG"] == location]
        if len(country) == 0:
            raise ValueError(f"Could not find {location} in the Natural Earth dataset.")

    gdf = building_gdf[building_gdf.intersects(country.geometry.iloc[0])]
    gdf.sort_values(by="size_mb", inplace=True)

    print(f"Found {len(gdf)} links for {location}.")
    if head is not None:
        gdf = gdf.head(head)

    if len(gdf) > 0:
        links = gdf["tile_url"].tolist()
        download_files(links, out_dir=out_dir, quiet=quiet, **kwargs)
        filenames = [os.path.join(out_dir, os.path.basename(link)) for link in links]

        gdfs = []
        for filename in filenames:
            # Read the CSV file into a pandas DataFrame
            df = pd.read_csv(filename)

            # Create a geometry column from the "geometry" column in the DataFrame
            df["geometry"] = df["geometry"].apply(wkt.loads)

            # Convert the pandas DataFrame to a GeoDataFrame
            gdf = gpd.GeoDataFrame(df, geometry="geometry")
            gdf.crs = "EPSG:4326"
            if keep_geojson:
                gdf.to_file(
                    filename.replace(".csv.gz", ".geojson"), driver="GeoJSON", **kwargs
                )
            gdfs.append(gdf)

        if merge_output:
            if os.path.exists(merge_output) and not overwrite:
                print(f"File {merge_output} already exists, skip merging...")
            else:
                if not quiet:
                    print("Merging GeoDataFrames ...")
                gdf = gpd.GeoDataFrame(
                    pd.concat(gdfs, ignore_index=True), crs="EPSG:4326"
                )
                gdf.to_file(merge_output, **kwargs)

    else:
        print(f"No buildings found for {location}.")

download_ms_buildings(location, out_dir=None, merge_output=None, head=None, quiet=False, **kwargs)

Download Microsoft Buildings dataset for a specific location. Check the dataset links from https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv.

Parameters:

Name Type Description Default
location str

The location name for which to download the dataset.

required
out_dir Optional[str]

The output directory to save the downloaded files. If not provided, the current working directory is used.

None
merge_output Optional[str]

Optional. The output file path for merging the downloaded files into a single GeoDataFrame.

None
head

Optional. The number of files to download. If not provided, all files will be downloaded.

None
quiet bool

Optional. If True, suppresses the download progress messages.

False
**kwargs

Additional keyword arguments to be passed to the gpd.to_file function.

{}

Returns:

Type Description
List[str]

A list of file paths of the downloaded files.

Source code in leafmap/common.py
def download_ms_buildings(
    location: str,
    out_dir: Optional[str] = None,
    merge_output: Optional[str] = None,
    head=None,
    quiet: bool = False,
    **kwargs,
) -> List[str]:
    """
    Download Microsoft Buildings dataset for a specific location. Check the dataset links from
        https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv.

    Args:
        location: The location name for which to download the dataset.
        out_dir: The output directory to save the downloaded files. If not provided, the current working directory is used.
        merge_output: Optional. The output file path for merging the downloaded files into a single GeoDataFrame.
        head: Optional. The number of files to download. If not provided, all files will be downloaded.
        quiet: Optional. If True, suppresses the download progress messages.
        **kwargs: Additional keyword arguments to be passed to the `gpd.to_file` function.

    Returns:
        A list of file paths of the downloaded files.

    """

    import pandas as pd
    import geopandas as gpd
    from shapely.geometry import shape

    if out_dir is None:
        out_dir = os.getcwd()

    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    dataset_links = pd.read_csv(
        "https://minedbuildings.blob.core.windows.net/global-buildings/dataset-links.csv"
    )
    country_links = dataset_links[dataset_links.Location == location]

    if not quiet:
        print(f"Found {len(country_links)} links for {location}")
    if head is not None:
        country_links = country_links.head(head)

    filenames = []
    i = 1

    for _, row in country_links.iterrows():
        if not quiet:
            print(f"Downloading {i} of {len(country_links)}: {row.QuadKey}.geojson")
        i += 1
        filename = os.path.join(out_dir, f"{row.QuadKey}.geojson")
        filenames.append(filename)
        if os.path.exists(filename):
            print(f"File {filename} already exists, skipping...")
            continue
        df = pd.read_json(row.Url, lines=True)
        df["geometry"] = df["geometry"].apply(shape)
        gdf = gpd.GeoDataFrame(df, crs=4326)
        gdf.to_file(filename, driver="GeoJSON", **kwargs)

    if merge_output is not None:
        if os.path.exists(merge_output):
            print(f"File {merge_output} already exists, skip merging...")
            return filenames
        merge_vector(filenames, merge_output, quiet=quiet)

    return filenames

download_ned(region, out_dir=None, return_url=False, download_args={}, geopandas_args={}, query={})

Download the US National Elevation Datasets (NED) for a region.

Parameters:

Name Type Description Default
region str | list

A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy].

required
out_dir str

The directory to download the files to. Defaults to None, which uses the current working directory.

None
return_url bool

Whether to return the download URLs of the files. Defaults to False.

False
download_args dict

A dictionary of arguments to pass to the download_file function. Defaults to {}.

{}
geopandas_args dict

A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath.

{}
query dict

A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. See https://apps.nationalmap.gov/tnmaccess/#/product for more information.

{}

Returns:

Type Description
list

A list of the download URLs of the files if return_url is True.

Source code in leafmap/common.py
def download_ned(
    region,
    out_dir=None,
    return_url=False,
    download_args={},
    geopandas_args={},
    query={},
) -> Union[None, List]:
    """Download the US National Elevation Datasets (NED) for a region.

    Args:
        region (str | list): A filepath to a vector dataset or a list of bounds in the form of [minx, miny, maxx, maxy].
        out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
        return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
        download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
        geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
            Used for reading a region URL|filepath.
        query (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
            See https://apps.nationalmap.gov/tnmaccess/#/product for more information.

    Returns:
        list: A list of the download URLs of the files if return_url is True.
    """

    if os.environ.get("USE_MKDOCS") is not None:
        return

    if not query:
        query = {
            "datasets": "National Elevation Dataset (NED) 1/3 arc-second",
            "prodFormats": "GeoTIFF",
        }

    TNM = The_national_map_USGS()
    if return_url:
        return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=query)
    return TNM.download_tiles(
        region=region,
        out_dir=out_dir,
        download_args=download_args,
        geopandas_args=geopandas_args,
        API=query,
    )

download_tnm(region=None, out_dir=None, return_url=False, download_args={}, geopandas_args={}, API={})

Download the US National Elevation Datasets (NED) for a region.

Parameters:

Name Type Description Default
region str | list

An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy]. Alternatively you could use API parameters such as polygon or bbox.

None
out_dir str

The directory to download the files to. Defaults to None, which uses the current working directory.

None
return_url bool

Whether to return the download URLs of the files. Defaults to False.

False
download_args dict

A dictionary of arguments to pass to the download_file function. Defaults to {}.

{}
geopandas_args dict

A dictionary of arguments to pass to the geopandas.read_file() function. Used for reading a region URL|filepath.

{}
API dict

A dictionary of arguments to pass to the The_national_map_USGS.find_details() function. Exposes most of the documented API. Defaults to {}

{}

Returns:

Type Description
list

A list of the download URLs of the files if return_url is True.

Source code in leafmap/common.py
def download_tnm(
    region=None,
    out_dir=None,
    return_url=False,
    download_args={},
    geopandas_args={},
    API={},
) -> Union[None, List]:
    """Download the US National Elevation Datasets (NED) for a region.

    Args:
        region (str | list, optional): An URL|filepath to a vector dataset Or a list of bounds in the form of [minx, miny, maxx, maxy].
            Alternatively you could use API parameters such as polygon or bbox.
        out_dir (str, optional): The directory to download the files to. Defaults to None, which uses the current working directory.
        return_url (bool, optional): Whether to return the download URLs of the files. Defaults to False.
        download_args (dict, optional): A dictionary of arguments to pass to the download_file function. Defaults to {}.
        geopandas_args (dict, optional): A dictionary of arguments to pass to the geopandas.read_file() function.
            Used for reading a region URL|filepath.
        API (dict, optional): A dictionary of arguments to pass to the The_national_map_USGS.find_details() function.
            Exposes most of the documented API. Defaults to {}

    Returns:
        list: A list of the download URLs of the files if return_url is True.
    """

    if os.environ.get("USE_MKDOCS") is not None:
        return

    TNM = The_national_map_USGS()
    if return_url:
        return TNM.find_tiles(region=region, geopandas_args=geopandas_args, API=API)
    return TNM.download_tiles(
        region=region,
        out_dir=out_dir,
        download_args=download_args,
        geopandas_args=geopandas_args,
        API=API,
    )

edit_download_html(htmlWidget, filename, title='Click here to download: ')

Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058

Parameters:

Name Type Description Default
htmlWidget object

The HTML widget to display the URL.

required
filename str

File path to download.

required
title str

Download description. Defaults to "Click here to download: ".

'Click here to download: '
Source code in leafmap/common.py
def edit_download_html(htmlWidget, filename, title="Click here to download: "):
    """Downloads a file from voila. Adopted from https://github.com/voila-dashboards/voila/issues/578#issuecomment-617668058

    Args:
        htmlWidget (object): The HTML widget to display the URL.
        filename (str): File path to download.
        title (str, optional): Download description. Defaults to "Click here to download: ".
    """

    # from IPython.display import HTML
    # import ipywidgets as widgets
    import base64

    # Change widget html temporarily to a font-awesome spinner
    htmlWidget.value = '<i class="fa fa-spinner fa-spin fa-2x fa-fw"></i><span class="sr-only">Loading...</span>'

    # Process raw data
    data = open(filename, "rb").read()
    b64 = base64.b64encode(data)
    payload = b64.decode()

    basename = os.path.basename(filename)

    # Create and assign html to widget
    html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
    htmlWidget.value = html.format(
        payload=payload, title=title + basename, filename=basename
    )

explode(coords)

Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. As long as the input is conforming, the type of the geometry doesn't matter. From Fiona 1.4.8

Parameters:

Name Type Description Default
coords list

A list of coordinates.

required

Yields:

Type Description
[type]

[description]

Source code in leafmap/common.py
def explode(coords):
    """Explode a GeoJSON geometry's coordinates object and yield
    coordinate tuples. As long as the input is conforming, the type of
    the geometry doesn't matter.  From Fiona 1.4.8

    Args:
        coords (list): A list of coordinates.

    Yields:
        [type]: [description]
    """

    for e in coords:
        if isinstance(e, (float, int)):
            yield coords
            break
        else:
            for f in explode(e):
                yield f

extract_archive(archive, outdir=None, **kwargs)

Extracts a multipart archive.

This function uses the patoolib library to extract a multipart archive. If the patoolib library is not installed, it attempts to install it. If the archive does not end with ".zip", it appends ".zip" to the archive name. If the extraction fails (for example, if the files already exist), it skips the extraction.

Parameters:

Name Type Description Default
archive str

The path to the archive file.

required
outdir str

The directory where the archive should be extracted.

None
**kwargs

Arbitrary keyword arguments for the patoolib.extract_archive function.

{}

Returns:

Type Description

None

Exceptions:

Type Description
Exception

An exception is raised if the extraction fails for reasons other than the files already existing.

Examples:

files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"] base_url = "https://github.com/opengeos/datasets/releases/download/models/" urls = [base_url + f for f in files] leafmap.download_files(urls, out_dir="models", multi_part=True)

Source code in leafmap/common.py
def extract_archive(archive, outdir=None, **kwargs):
    """
    Extracts a multipart archive.

    This function uses the patoolib library to extract a multipart archive.
    If the patoolib library is not installed, it attempts to install it.
    If the archive does not end with ".zip", it appends ".zip" to the archive name.
    If the extraction fails (for example, if the files already exist), it skips the extraction.

    Args:
        archive (str): The path to the archive file.
        outdir (str): The directory where the archive should be extracted.
        **kwargs: Arbitrary keyword arguments for the patoolib.extract_archive function.

    Returns:
        None

    Raises:
        Exception: An exception is raised if the extraction fails for reasons other than the files already existing.

    Example:

        files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"]
        base_url = "https://github.com/opengeos/datasets/releases/download/models/"
        urls = [base_url + f for f in files]
        leafmap.download_files(urls, out_dir="models", multi_part=True)

    """
    try:
        import patoolib
    except ImportError:
        install_package("patool")
        import patoolib

    if not archive.endswith(".zip"):
        archive = archive + ".zip"

    if outdir is None:
        outdir = os.path.dirname(archive)

    try:
        patoolib.extract_archive(archive, outdir=outdir, **kwargs)
    except Exception as e:
        print("The unzipped files might already exist. Skipping extraction.")
        return

filter_bounds(data, bbox, within=False, align=True, **kwargs)

Filters a GeoDataFrame or GeoSeries by a bounding box.

Parameters:

Name Type Description Default
data str | GeoDataFrame

The input data to filter. Can be a file path or a GeoDataFrame.

required
bbox list | GeoDataFrame

The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame.

required
within bool

Whether to filter by the bounding box or the bounding box's interior. Defaults to False.

False
align bool

If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved.

True

Returns:

Type Description
GeoDataFrame

The filtered data.

Source code in leafmap/common.py
def filter_bounds(data, bbox, within=False, align=True, **kwargs):
    """Filters a GeoDataFrame or GeoSeries by a bounding box.

    Args:
        data (str | GeoDataFrame): The input data to filter. Can be a file path or a GeoDataFrame.
        bbox (list | GeoDataFrame): The bounding box to filter by. Can be a list of 4 coordinates or a file path or a GeoDataFrame.
        within (bool, optional): Whether to filter by the bounding box or the bounding box's interior. Defaults to False.
        align (bool, optional): If True, automatically aligns GeoSeries based on their indices. If False, the order of elements is preserved.

    Returns:
        GeoDataFrame: The filtered data.
    """
    import geopandas as gpd

    if isinstance(data, str):
        data = gpd.read_file(data, **kwargs)
    elif not isinstance(data, (gpd.GeoDataFrame, gpd.GeoSeries)):
        raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")

    if isinstance(bbox, list):
        if len(bbox) != 4:
            raise ValueError("bbox must be a list of 4 coordinates")
        bbox = bbox_to_gdf(bbox)
    elif isinstance(bbox, str):
        bbox = gpd.read_file(bbox, **kwargs)

    if within:
        result = data[data.within(bbox.unary_union, align=align)]
    else:
        result = data[data.intersects(bbox.unary_union, align=align)]

    return result

filter_date(data, start_date=None, end_date=None, date_field='date', date_args={}, **kwargs)

Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.

Parameters:

Name Type Description Default
data str | DataFrame | GeoDataFrame

The input data to filter. Can be a file path or a DataFrame or GeoDataFrame.

required
start_date str

The start date, e.g., 2023-01-01. Defaults to None.

None
end_date str

The end date, e.g., 2023-12-31. Defaults to None.

None
date_field str

The name of the date field. Defaults to "date".

'date'
date_args dict

Additional arguments for pd.to_datetime. Defaults to {}.

{}

Returns:

Type Description
DataFrame

The filtered data.

Source code in leafmap/common.py
def filter_date(
    data, start_date=None, end_date=None, date_field="date", date_args={}, **kwargs
):
    """Filters a DataFrame, GeoDataFrame or GeoSeries by a date range.

    Args:
        data (str | DataFrame | GeoDataFrame): The input data to filter. Can be a file path or a DataFrame or GeoDataFrame.
        start_date (str, optional): The start date, e.g., 2023-01-01. Defaults to None.
        end_date (str, optional): The end date, e.g., 2023-12-31. Defaults to None.
        date_field (str, optional): The name of the date field. Defaults to "date".
        date_args (dict, optional): Additional arguments for pd.to_datetime. Defaults to {}.

    Returns:
        DataFrame: The filtered data.
    """

    import datetime
    import pandas as pd
    import geopandas as gpd

    if isinstance(data, str):
        data = gpd.read_file(data, **kwargs)
    elif not isinstance(
        data, (gpd.GeoDataFrame, gpd.GeoSeries, pd.DataFrame, pd.Series)
    ):
        raise TypeError("data must be a file path or a GeoDataFrame or GeoSeries")

    if date_field not in data.columns:
        raise ValueError(f"date_field must be one of {data.columns}")

    new_field = f"{date_field}_temp"
    data[new_field] = pd.to_datetime(data[date_field], **date_args)

    if end_date is None:
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")

    if start_date is None:
        start_date = data[new_field].min()

    mask = (data[new_field] >= start_date) & (data[new_field] <= end_date)
    result = data.loc[mask]
    return result.drop(columns=[new_field], axis=1)

find_files(input_dir, ext=None, fullpath=True, recursive=True)

Find files in a directory.

Parameters:

Name Type Description Default
input_dir str

The input directory.

required
ext str

The file extension to match. Defaults to None.

None
fullpath bool

Whether to return the full path. Defaults to True.

True
recursive bool

Whether to search recursively. Defaults to True.

True

Returns:

Type Description
list

A list of matching files.

Source code in leafmap/common.py
def find_files(input_dir, ext=None, fullpath=True, recursive=True):
    """Find files in a directory.

    Args:
        input_dir (str): The input directory.
        ext (str, optional): The file extension to match. Defaults to None.
        fullpath (bool, optional): Whether to return the full path. Defaults to True.
        recursive (bool, optional): Whether to search recursively. Defaults to True.

    Returns:
        list: A list of matching files.
    """

    from pathlib import Path

    files = []

    if ext is None:
        ext = "*"
    else:
        ext = ext.replace(".", "")

    ext = f"*.{ext}"

    if recursive:
        if fullpath:
            files = [str(path.joinpath()) for path in Path(input_dir).rglob(ext)]
        else:
            files = [str(path.name) for path in Path(input_dir).rglob(ext)]
    else:
        if fullpath:
            files = [str(path.joinpath()) for path in Path(input_dir).glob(ext)]
        else:
            files = [path.name for path in Path(input_dir).glob(ext)]

    files.sort()
    return files

gdb_layer_names(gdb_path)

Get a list of layer names in a File Geodatabase (GDB).

Parameters:

Name Type Description Default
gdb_path str

The path to the File Geodatabase (GDB).

required

Returns:

Type Description
List[str]

A list of layer names in the GDB.

Source code in leafmap/common.py
def gdb_layer_names(gdb_path: str) -> List[str]:
    """Get a list of layer names in a File Geodatabase (GDB).

    Args:
        gdb_path (str): The path to the File Geodatabase (GDB).

    Returns:
        List[str]: A list of layer names in the GDB.
    """

    from osgeo import ogr

    # Open the GDB
    gdb_driver = ogr.GetDriverByName("OpenFileGDB")
    gdb_dataset = gdb_driver.Open(gdb_path, 0)

    # Get the number of layers in the GDB
    layer_count = gdb_dataset.GetLayerCount()
    # Iterate over the layers
    layer_names = []
    for i in range(layer_count):
        layer = gdb_dataset.GetLayerByIndex(i)
        feature_class_name = layer.GetName()
        layer_names.append(feature_class_name)

    # Close the GDB dataset
    gdb_dataset = None
    return layer_names

gdb_to_vector(gdb_path, out_dir, layers=None, filenames=None, gdal_driver='GPKG', file_extension=None, overwrite=False, quiet=False, **kwargs)

Converts layers from a File Geodatabase (GDB) to a vector format.

Parameters:

Name Type Description Default
gdb_path str

The path to the File Geodatabase (GDB).

required
out_dir str

The output directory to save the converted files.

required
layers Optional[List[str]]

A list of layer names to convert. If None, all layers will be converted. Default is None.

None
filenames Optional[List[str]]

A list of output file names. If None, the layer names will be used as the file names. Default is None.

None
gdal_driver str

The GDAL driver name for the output vector format. Default is "GPKG".

'GPKG'
file_extension Optional[str]

The file extension for the output files. If None, it will be determined automatically based on the gdal_driver. Default is None.

None
overwrite bool

Whether to overwrite the existing output files. Default is False.

False
quiet bool

If True, suppress the log output. Defaults to False.

False

Returns:

Type Description

None

Source code in leafmap/common.py
def gdb_to_vector(
    gdb_path: str,
    out_dir: str,
    layers: Optional[List[str]] = None,
    filenames: Optional[List[str]] = None,
    gdal_driver: str = "GPKG",
    file_extension: Optional[str] = None,
    overwrite: bool = False,
    quiet=False,
    **kwargs,
):
    """Converts layers from a File Geodatabase (GDB) to a vector format.

    Args:
        gdb_path (str): The path to the File Geodatabase (GDB).
        out_dir (str): The output directory to save the converted files.
        layers (Optional[List[str]]): A list of layer names to convert. If None, all layers will be converted. Default is None.
        filenames (Optional[List[str]]): A list of output file names. If None, the layer names will be used as the file names. Default is None.
        gdal_driver (str): The GDAL driver name for the output vector format. Default is "GPKG".
        file_extension (Optional[str]): The file extension for the output files. If None, it will be determined automatically based on the gdal_driver. Default is None.
        overwrite (bool): Whether to overwrite the existing output files. Default is False.
        quiet (bool): If True, suppress the log output. Defaults to False.

    Returns:
        None
    """
    from osgeo import ogr

    # Open the GDB
    gdb_driver = ogr.GetDriverByName("OpenFileGDB")
    gdb_dataset = gdb_driver.Open(gdb_path, 0)

    # Get the number of layers in the GDB
    layer_count = gdb_dataset.GetLayerCount()

    if isinstance(layers, str):
        layers = [layers]

    if isinstance(filenames, str):
        filenames = [filenames]

    if filenames is not None:
        if len(filenames) != len(layers):
            raise ValueError("The length of filenames must match the length of layers.")

    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    ii = 0
    # Iterate over the layers
    for i in range(layer_count):
        layer = gdb_dataset.GetLayerByIndex(i)
        feature_class_name = layer.GetName()

        if layers is not None:
            if feature_class_name not in layers:
                continue

        if file_extension is None:
            file_extension = get_gdal_file_extension(gdal_driver)

        # Create the output file path
        if filenames is not None:
            output_file = os.path.join(out_dir, filenames[ii] + "." + file_extension)
            ii += 1
        else:
            output_file = os.path.join(
                out_dir, feature_class_name + "." + file_extension
            )

        if os.path.exists(output_file) and not overwrite:
            print(f"File {output_file} already exists. Skipping...")
            continue
        else:
            if not quiet:
                print(f"Converting layer {feature_class_name} to {output_file}...")

        # Create the output driver
        output_driver = ogr.GetDriverByName(gdal_driver)
        output_dataset = output_driver.CreateDataSource(output_file)

        # Copy the input layer to the output format
        output_dataset.CopyLayer(layer, feature_class_name)

        output_dataset = None

    # Close the GDB dataset
    gdb_dataset = None

gdf_bounds(gdf, return_geom=False)

Returns the bounding box of a GeoDataFrame.

Parameters:

Name Type Description Default
gdf gpd.GeoDataFrame

A GeoDataFrame.

required
return_geom bool

Whether to return the bounding box as a GeoDataFrame. Defaults to False.

False

Returns:

Type Description
list | gpd.GeoDataFrame

A bounding box in the form of a list (minx, miny, maxx, maxy) or GeoDataFrame.

Source code in leafmap/common.py
def gdf_bounds(gdf, return_geom=False):
    """Returns the bounding box of a GeoDataFrame.

    Args:
        gdf (gpd.GeoDataFrame): A GeoDataFrame.
        return_geom (bool, optional): Whether to return the bounding box as a GeoDataFrame. Defaults to False.

    Returns:
        list | gpd.GeoDataFrame: A bounding box in the form of a list (minx, miny, maxx, maxy) or GeoDataFrame.
    """
    bounds = gdf.total_bounds
    if return_geom:
        return bbox_to_gdf(bbox=bounds)
    else:
        return bounds

gdf_centroid(gdf, return_geom=False)

Returns the centroid of a GeoDataFrame.

Parameters:

Name Type Description Default
gdf gpd.GeoDataFrame

A GeoDataFrame.

required
return_geom bool

Whether to return the bounding box as a GeoDataFrame. Defaults to False.

False

Returns:

Type Description
list | gpd.GeoDataFrame

A bounding box in the form of a list (lon, lat) or GeoDataFrame.

Source code in leafmap/common.py
def gdf_centroid(gdf, return_geom=False):
    """Returns the centroid of a GeoDataFrame.

    Args:
        gdf (gpd.GeoDataFrame): A GeoDataFrame.
        return_geom (bool, optional): Whether to return the bounding box as a GeoDataFrame. Defaults to False.

    Returns:
        list | gpd.GeoDataFrame: A bounding box in the form of a list (lon, lat) or GeoDataFrame.
    """

    warnings.filterwarnings("ignore")

    centroid = gdf_bounds(gdf, return_geom=True).centroid
    if return_geom:
        return centroid
    else:
        return centroid.x[0], centroid.y[0]

gdf_geom_type(gdf, first_only=True)

Returns the geometry type of a GeoDataFrame.

Parameters:

Name Type Description Default
gdf gpd.GeoDataFrame

A GeoDataFrame.

required
first_only bool

Whether to return the geometry type of the f irst feature in the GeoDataFrame. Defaults to True.

True

Returns:

Type Description
str

The geometry type of the GeoDataFrame, such as Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon. For more info, see https://shapely.readthedocs.io/en/stable/manual.html

Source code in leafmap/common.py
def gdf_geom_type(gdf, first_only=True):
    """Returns the geometry type of a GeoDataFrame.

    Args:
        gdf (gpd.GeoDataFrame): A GeoDataFrame.
        first_only (bool, optional): Whether to return the geometry type of the f
            irst feature in the GeoDataFrame. Defaults to True.

    Returns:
        str: The geometry type of the GeoDataFrame, such as Point, LineString,
            Polygon, MultiPoint, MultiLineString, MultiPolygon.
            For more info, see https://shapely.readthedocs.io/en/stable/manual.html
    """
    import geopandas as gpd

    if first_only:
        return gdf.geometry.type[0]
    else:
        return gdf.geometry.type

gdf_to_bokeh(gdf)

Function to convert a GeoPandas GeoDataFrame to a Bokeh ColumnDataSource object.

:param: (GeoDataFrame) gdf: GeoPandas GeoDataFrame with polygon(s) under the column name 'geometry.'

:return: ColumnDataSource for Bokeh.

Source code in leafmap/common.py
def gdf_to_bokeh(gdf):
    """
    Function to convert a GeoPandas GeoDataFrame to a Bokeh
    ColumnDataSource object.

    :param: (GeoDataFrame) gdf: GeoPandas GeoDataFrame with polygon(s) under
                                the column name 'geometry.'

    :return: ColumnDataSource for Bokeh.
    """
    from bokeh.plotting import ColumnDataSource

    shape_type = gdf_geom_type(gdf)

    gdf_new = gdf.drop("geometry", axis=1).copy()
    gdf_new["x"] = gdf.apply(
        get_geometry_coords,
        geom="geometry",
        coord_type="x",
        shape_type=shape_type,
        mercator=True,
        axis=1,
    )

    gdf_new["y"] = gdf.apply(
        get_geometry_coords,
        geom="geometry",
        coord_type="y",
        shape_type=shape_type,
        mercator=True,
        axis=1,
    )

    return ColumnDataSource(gdf_new)

gdf_to_df(gdf, drop_geom=True)

Converts a GeoDataFrame to a pandas DataFrame.

Parameters:

Name Type Description Default
gdf gpd.GeoDataFrame

A GeoDataFrame.

required
drop_geom bool

Whether to drop the geometry column. Defaults to True.

True

Returns:

Type Description
pd.DataFrame

A pandas DataFrame containing the GeoDataFrame.

Source code in leafmap/common.py
def gdf_to_df(gdf, drop_geom=True):
    """Converts a GeoDataFrame to a pandas DataFrame.

    Args:
        gdf (gpd.GeoDataFrame): A GeoDataFrame.
        drop_geom (bool, optional): Whether to drop the geometry column. Defaults to True.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the GeoDataFrame.
    """
    import pandas as pd

    if drop_geom:
        df = pd.DataFrame(gdf.drop(columns=["geometry"]))
    else:
        df = pd.DataFrame(gdf)

    return df

gdf_to_geojson(gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding='utf-8')

Converts a GeoDataFame to GeoJSON.

Parameters:

Name Type Description Default
gdf GeoDataFrame

A GeoPandas GeoDataFrame.

required
out_geojson str

File path to he output GeoJSON. Defaults to None.

None
epsg str

An EPSG string, e.g., "4326". Defaults to None.

None
tuple_to_list bool

Whether to convert tuples to lists. Defaults to False.

False
encoding str

The encoding to use for the GeoJSON. Defaults to "utf-8".

'utf-8'

Exceptions:

Type Description
TypeError

When the output file extension is incorrect.

Exception

When the conversion fails.

Returns:

Type Description
dict

When the out_json is None returns a dict.

Source code in leafmap/common.py
def gdf_to_geojson(
    gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding="utf-8"
):
    """Converts a GeoDataFame to GeoJSON.

    Args:
        gdf (GeoDataFrame): A GeoPandas GeoDataFrame.
        out_geojson (str, optional): File path to he output GeoJSON. Defaults to None.
        epsg (str, optional): An EPSG string, e.g., "4326". Defaults to None.
        tuple_to_list (bool, optional): Whether to convert tuples to lists. Defaults to False.
        encoding (str, optional): The encoding to use for the GeoJSON. Defaults to "utf-8".

    Raises:
        TypeError: When the output file extension is incorrect.
        Exception: When the conversion fails.

    Returns:
        dict: When the out_json is None returns a dict.
    """
    check_package(name="geopandas", URL="https://geopandas.org")

    def listit(t):
        return list(map(listit, t)) if isinstance(t, (list, tuple)) else t

    try:
        if epsg is not None:
            if gdf.crs is not None and gdf.crs.to_epsg() != epsg:
                gdf = gdf.to_crs(epsg=epsg)
        geojson = gdf.__geo_interface__

        if tuple_to_list:
            for feature in geojson["features"]:
                feature["geometry"]["coordinates"] = listit(
                    feature["geometry"]["coordinates"]
                )

        if out_geojson is None:
            return geojson
        else:
            ext = os.path.splitext(out_geojson)[1]
            if ext.lower() not in [".json", ".geojson"]:
                raise TypeError(
                    "The output file extension must be either .json or .geojson"
                )
            out_dir = os.path.dirname(out_geojson)
            if not os.path.exists(out_dir):
                os.makedirs(out_dir)

            gdf.to_file(out_geojson, driver="GeoJSON", encoding=encoding)
    except Exception as e:
        raise Exception(e)

gedi_download_file(url, filename=None, username=None, password=None)

Downloads a file from the given URL and saves it to the specified filename. If no filename is provided, the name of the file from the URL will be used.

Parameters:

Name Type Description Default
url str

The URL of the file to download. e.g., https://daac.ornl.gov/daacdata/gedi/GEDI_L4A_AGB_Density_V2_1/data/GEDI04_A_2019298202754_O04921_01_T02899_02_002_02_V002.h5

required
filename str

The name of the file to save the downloaded content to. Defaults to None.

None
username str

Username for authentication. Can also be set using the EARTHDATA_USERNAME environment variable. Defaults to None. Create an account at https://urs.earthdata.nasa.gov

None
password str

Password for authentication. Can also be set using the EARTHDATA_PASSWORD environment variable. Defaults to None.

None

Returns:

Type Description
None

None

Source code in leafmap/common.py
def gedi_download_file(
    url: str, filename: str = None, username: str = None, password: str = None
) -> None:
    """
    Downloads a file from the given URL and saves it to the specified filename.
    If no filename is provided, the name of the file from the URL will be used.

    Args:
        url (str): The URL of the file to download.
            e.g., https://daac.ornl.gov/daacdata/gedi/GEDI_L4A_AGB_Density_V2_1/data/GEDI04_A_2019298202754_O04921_01_T02899_02_002_02_V002.h5
        filename (str, optional): The name of the file to save the downloaded content to. Defaults to None.
        username (str, optional): Username for authentication. Can also be set using the EARTHDATA_USERNAME environment variable. Defaults to None.
            Create an account at https://urs.earthdata.nasa.gov
        password (str, optional): Password for authentication. Can also be set using the EARTHDATA_PASSWORD environment variable. Defaults to None.

    Returns:
        None
    """
    import requests
    from tqdm import tqdm
    from urllib.parse import urlparse

    if username is None:
        username = os.environ.get("EARTHDATA_USERNAME", None)
    if password is None:
        password = os.environ.get("EARTHDATA_PASSWORD", None)

    if username is None or password is None:
        raise ValueError(
            "Username and password must be provided. Create an account at https://urs.earthdata.nasa.gov."
        )

    with requests.Session() as session:
        r1 = session.request("get", url, stream=True)
        r = session.get(r1.url, auth=(username, password), stream=True)
        print(r.status_code)

        if r.status_code == 200:
            total_size = int(r.headers.get("content-length", 0))
            block_size = 1024  # 1 KB

            # Use the filename from the URL if not provided
            if not filename:
                parsed_url = urlparse(url)
                filename = parsed_url.path.split("/")[-1]

            progress_bar = tqdm(total=total_size, unit="B", unit_scale=True)

            with open(filename, "wb") as file:
                for data in r.iter_content(block_size):
                    progress_bar.update(len(data))
                    file.write(data)

            progress_bar.close()

gedi_download_files(urls, outdir=None, filenames=None, username=None, password=None, overwrite=False)

Downloads files from the given URLs and saves them to the specified directory. If no directory is provided, the current directory will be used. If no filenames are provided, the names of the files from the URLs will be used.

Parameters:

Name Type Description Default
urls List[str]

The URLs of the files to download. e.g., ["https://example.com/file1.txt", "https://example.com/file2.txt"]

required
outdir str

The directory to save the downloaded files to. Defaults to None.

None
filenames str

The names of the files to save the downloaded content to. Defaults to None.

None
username str

Username for authentication. Can also be set using the EARTHDATA_USERNAME environment variable. Defaults to None. Create an account at https://urs.earthdata.nasa.gov

None
password str

Password for authentication. Can also be set using the EARTHDATA_PASSWORD environment variable. Defaults to None.

None
overwrite bool

Whether to overwrite the existing output files. Default is False.

False

Returns:

Type Description
None

None

Source code in leafmap/common.py
def gedi_download_files(
    urls: List[str],
    outdir: str = None,
    filenames: str = None,
    username: str = None,
    password: str = None,
    overwrite: bool = False,
) -> None:
    """
    Downloads files from the given URLs and saves them to the specified directory.
    If no directory is provided, the current directory will be used.
    If no filenames are provided, the names of the files from the URLs will be used.

    Args:
        urls (List[str]): The URLs of the files to download.
            e.g., ["https://example.com/file1.txt", "https://example.com/file2.txt"]
        outdir (str, optional): The directory to save the downloaded files to. Defaults to None.
        filenames (str, optional): The names of the files to save the downloaded content to. Defaults to None.
        username (str, optional): Username for authentication. Can also be set using the EARTHDATA_USERNAME environment variable. Defaults to None.
            Create an account at https://urs.earthdata.nasa.gov
        password (str, optional): Password for authentication. Can also be set using the EARTHDATA_PASSWORD environment variable. Defaults to None.
        overwrite (bool): Whether to overwrite the existing output files. Default is False.

    Returns:
        None
    """

    import requests
    from tqdm import tqdm
    from urllib.parse import urlparse
    import geopandas as gpd

    if isinstance(urls, gpd.GeoDataFrame):
        urls = urls["granule_url"].tolist()

    session = requests.Session()

    if username is None:
        username = os.environ.get("EARTHDATA_USERNAME", None)
    if password is None:
        password = os.environ.get("EARTHDATA_PASSWORD", None)

    if username is None or password is None:
        print("Username and password must be provided.")
        return

    if outdir is None:
        outdir = os.getcwd()

    if not os.path.exists(outdir):
        os.makedirs(outdir)

    for index, url in enumerate(urls):
        print(f"Downloading file {index+1} of {len(urls)}...")

        if url is None:
            continue

        # Use the filename from the URL if not provided
        if not filenames:
            parsed_url = urlparse(url)
            filename = parsed_url.path.split("/")[-1]
        else:
            filename = filenames.pop(0)

        filepath = os.path.join(outdir, filename)
        if os.path.exists(filepath) and not overwrite:
            print(f"File {filepath} already exists. Skipping...")
            continue

        r1 = session.request("get", url, stream=True)
        r = session.get(r1.url, auth=(username, password), stream=True)

        if r.status_code == 200:
            total_size = int(r.headers.get("content-length", 0))
            block_size = 1024  # 1 KB

            progress_bar = tqdm(total=total_size, unit="B", unit_scale=True)

            with open(filepath, "wb") as file:
                for data in r.iter_content(block_size):
                    progress_bar.update(len(data))
                    file.write(data)

            progress_bar.close()

    session.close()

Searches for GEDI data using the Common Metadata Repository (CMR) API. The source code for this function is adapted from https://github.com/ornldaac/gedi_tutorials. Credits to ORNL DAAC and Rupesh Shrestha.

Parameters:

Name Type Description Default
roi

A list, tuple, or file path representing the bounding box coordinates in the format (min_lon, min_lat, max_lon, max_lat), or a GeoDataFrame containing the region of interest geometry.

required
start_date Optional[str]

The start date of the temporal range to search for data in the format 'YYYY-MM-DD'.

None
end_date Optional[str]

The end date of the temporal range to search for data in the format 'YYYY-MM-DD'.

None
add_roi bool

A boolean value indicating whether to include the region of interest as a granule in the search results. Default is False.

False
return_type str

The type of the search results to return. Must be one of 'df' (DataFrame), 'gdf' (GeoDataFrame), or 'csv' (CSV file). Default is 'gdf'.

'gdf'
output Optional[str]

The file path to save the CSV output when return_type is 'csv'. Optional and only applicable when return_type is 'csv'.

None
sort_filesize bool

A boolean value indicating whether to sort the search results.

False
**kwargs

Additional keyword arguments to be passed to the CMR API.

{}

Returns:

Type Description

The search results as a pandas DataFrame (return_type='df'), geopandas GeoDataFrame (return_type='gdf'), or a CSV file (return_type='csv').

Exceptions:

Type Description
ValueError

If roi is not a list, tuple, or file path.

Source code in leafmap/common.py
def gedi_search(
    roi,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    add_roi: bool = False,
    return_type: str = "gdf",
    output: Optional[str] = None,
    sort_filesize: bool = False,
    **kwargs,
):
    """
    Searches for GEDI data using the Common Metadata Repository (CMR) API.
    The source code for this function is adapted from https://github.com/ornldaac/gedi_tutorials.
    Credits to ORNL DAAC and Rupesh Shrestha.

    Args:
        roi: A list, tuple, or file path representing the bounding box coordinates
            in the format (min_lon, min_lat, max_lon, max_lat), or a GeoDataFrame
            containing the region of interest geometry.
        start_date: The start date of the temporal range to search for data
            in the format 'YYYY-MM-DD'.
        end_date: The end date of the temporal range to search for data
            in the format 'YYYY-MM-DD'.
        add_roi: A boolean value indicating whether to include the region of interest
            as a granule in the search results. Default is False.
        return_type: The type of the search results to return. Must be one of 'df'
            (DataFrame), 'gdf' (GeoDataFrame), or 'csv' (CSV file). Default is 'gdf'.
        output: The file path to save the CSV output when return_type is 'csv'.
            Optional and only applicable when return_type is 'csv'.
        sort_filesize: A boolean value indicating whether to sort the search results.
        **kwargs: Additional keyword arguments to be passed to the CMR API.

    Returns:
        The search results as a pandas DataFrame (return_type='df'), geopandas GeoDataFrame
        (return_type='gdf'), or a CSV file (return_type='csv').

    Raises:
        ValueError: If roi is not a list, tuple, or file path.

    """

    import requests
    import datetime as dt
    import pandas as pd
    import geopandas as gpd
    from shapely.geometry import MultiPolygon, Polygon, box
    from shapely.ops import orient

    # CMR API base url
    cmrurl = "https://cmr.earthdata.nasa.gov/search/"

    doi = "10.3334/ORNLDAAC/2056"  # GEDI L4A DOI

    # Construct the DOI search URL
    doisearch = cmrurl + "collections.json?doi=" + doi

    # Send a request to the CMR API to get the concept ID
    response = requests.get(doisearch)
    response.raise_for_status()
    concept_id = response.json()["feed"]["entry"][0]["id"]

    # CMR formatted start and end times
    if start_date is not None and end_date is not None:
        dt_format = "%Y-%m-%dT%H:%M:%SZ"
        start_date = dt.datetime.strptime(start_date, "%Y-%m-%d")
        end_date = dt.datetime.strptime(end_date, "%Y-%m-%d")
        temporal_str = (
            start_date.strftime(dt_format) + "," + end_date.strftime(dt_format)
        )
    else:
        temporal_str = None

    # CMR formatted bounding box
    if isinstance(roi, list) or isinstance(roi, tuple):
        bound_str = ",".join(map(str, roi))
    elif isinstance(roi, str):
        roi = gpd.read_file(roi)
        roi.geometry = roi.geometry.apply(orient, args=(1,))  # make counter-clockwise
    elif isinstance(roi, gpd.GeoDataFrame):
        roi.geometry = roi.geometry.apply(orient, args=(1,))  # make counter-clockwise
    else:
        raise ValueError("roi must be a list, tuple, or a file path.")

    page_num = 1
    page_size = 2000  # CMR page size limit

    granule_arr = []

    while True:
        # Define CMR search parameters
        cmr_param = {
            "collection_concept_id": concept_id,
            "page_size": page_size,
            "page_num": page_num,
        }

        if temporal_str is not None:
            cmr_param["temporal"] = temporal_str

        if kwargs:
            cmr_param.update(kwargs)

        granulesearch = cmrurl + "granules.json"

        if isinstance(roi, list) or isinstance(roi, tuple):
            cmr_param["bounding_box[]"] = bound_str
            response = requests.get(granulesearch, params=cmr_param)
            response.raise_for_status()
        else:
            cmr_param["simplify-shapefile"] = "true"
            geojson = {
                "shapefile": (
                    "region.geojson",
                    roi.geometry.to_json(),
                    "application/geo+json",
                )
            }
            response = requests.post(granulesearch, data=cmr_param, files=geojson)

        # Send a request to the CMR API to get the granules
        granules = response.json()["feed"]["entry"]

        if granules:
            for index, g in enumerate(granules):
                granule_url = ""
                granule_poly = ""

                # Read file size
                granule_size = float(g["granule_size"])

                # Read bounding geometries
                if "polygons" in g:
                    polygons = g["polygons"]
                    multipolygons = []
                    for poly in polygons:
                        i = iter(poly[0].split(" "))
                        ltln = list(map(" ".join, zip(i, i)))
                        multipolygons.append(
                            Polygon(
                                [
                                    [float(p.split(" ")[1]), float(p.split(" ")[0])]
                                    for p in ltln
                                ]
                            )
                        )
                    granule_poly = MultiPolygon(multipolygons)

                # Get URL to HDF5 files
                for links in g["links"]:
                    if (
                        "title" in links
                        and links["title"].startswith("Download")
                        and links["title"].endswith(".h5")
                    ):
                        granule_url = links["href"]

                granule_id = g["id"]
                title = g["title"]
                time_start = g["time_start"]
                time_end = g["time_end"]

                granule_arr.append(
                    [
                        granule_id,
                        title,
                        time_start,
                        time_end,
                        granule_size,
                        granule_url,
                        granule_poly,
                    ]
                )

            page_num += 1
        else:
            break

    # Add bound as the last row into the dataframe
    if add_roi:
        if isinstance(roi, list) or isinstance(roi, tuple):
            b = list(roi)
            granule_arr.append(
                ["roi", None, None, None, 0, None, box(b[0], b[1], b[2], b[3])]
            )
        else:
            granule_arr.append(["roi", None, None, None, 0, None, roi.geometry.item()])

    # Create a pandas dataframe
    columns = [
        "id",
        "title",
        "time_start",
        "time_end",
        "granule_size",
        "granule_url",
        "granule_poly",
    ]
    l4adf = pd.DataFrame(granule_arr, columns=columns)

    # Drop granules with empty geometry
    l4adf = l4adf[l4adf["granule_poly"] != ""]

    if sort_filesize:
        l4adf = l4adf.sort_values(by=["granule_size"], ascending=True)

    if return_type == "df":
        return l4adf
    elif return_type == "gdf":
        gdf = gpd.GeoDataFrame(l4adf, geometry="granule_poly")
        gdf.crs = "EPSG:4326"
        return gdf
    elif return_type == "csv":
        columns.remove("granule_poly")
        return l4adf.to_csv(output, index=False, columns=columns)
    else:
        raise ValueError("return_type must be one of 'df', 'gdf', or 'csv'.")

gedi_subset(spatial=None, start_date=None, end_date=None, out_dir=None, collection=None, variables=['all'], max_results=None, username=None, password=None, overwrite=False, **kwargs)

Subsets GEDI data using the Harmony API.

Parameters:

Name Type Description Default
spatial Union[str, gpd.GeoDataFrame, List[float]]

Spatial extent for subsetting. Can be a file path to a shapefile, a GeoDataFrame, or a list of bounding box coordinates [minx, miny, maxx, maxy]. Defaults to None.

None
start_date str

Start date for subsetting in 'YYYY-MM-DD' format. Defaults to None.

None
end_date str

End date for subsetting in 'YYYY-MM-DD' format. Defaults to None.

None
out_dir str

Output directory to save the subsetted files. Defaults to None, which will use the current working directory.

None
collection Collection

GEDI data collection. If not provided, the default collection with DOI '10.3334/ORNLDAAC/2056' will be used. Defaults to None.

None
variables List[str]

List of variable names to subset. Defaults to ['all'], which subsets all available variables.

['all']
max_results int

Maximum number of results to return. Defaults to None, which returns all results.

None
username str

Earthdata username. Defaults to None, which will attempt to read from the 'EARTHDATA_USERNAME' environment variable.

None
password str

Earthdata password. Defaults to None, which will attempt to read from the 'EARTHDATA_PASSWORD' environment variable.

None
overwrite bool

Whether to overwrite existing files in the output directory. Defaults to False.

False
**kwargs

Additional keyword arguments to pass to the Harmony API request.

{}

Exceptions:

Type Description
ImportError

If the 'harmony' package is not installed.

ValueError

If the 'spatial', 'start_date', or 'end_date' arguments are not valid.

Returns:

Type Description
None

This function does not return any value.

Source code in leafmap/common.py
def gedi_subset(
    spatial=None,
    start_date=None,
    end_date=None,
    out_dir=None,
    collection=None,
    variables=["all"],
    max_results=None,
    username=None,
    password=None,
    overwrite=False,
    **kwargs,
):
    """
    Subsets GEDI data using the Harmony API.

    Args:
        spatial (Union[str, gpd.GeoDataFrame, List[float]], optional): Spatial extent for subsetting.
            Can be a file path to a shapefile, a GeoDataFrame, or a list of bounding box coordinates [minx, miny, maxx, maxy].
            Defaults to None.
        start_date (str, optional): Start date for subsetting in 'YYYY-MM-DD' format.
            Defaults to None.
        end_date (str, optional): End date for subsetting in 'YYYY-MM-DD' format.
            Defaults to None.
        out_dir (str, optional): Output directory to save the subsetted files.
            Defaults to None, which will use the current working directory.
        collection (Collection, optional): GEDI data collection. If not provided,
            the default collection with DOI '10.3334/ORNLDAAC/2056' will be used.
            Defaults to None.
        variables (List[str], optional): List of variable names to subset.
            Defaults to ['all'], which subsets all available variables.
        max_results (int, optional): Maximum number of results to return.
            Defaults to None, which returns all results.
        username (str, optional): Earthdata username.
            Defaults to None, which will attempt to read from the 'EARTHDATA_USERNAME' environment variable.
        password (str, optional): Earthdata password.
            Defaults to None, which will attempt to read from the 'EARTHDATA_PASSWORD' environment variable.
        overwrite (bool, optional): Whether to overwrite existing files in the output directory.
            Defaults to False.
        **kwargs: Additional keyword arguments to pass to the Harmony API request.

    Raises:
        ImportError: If the 'harmony' package is not installed.
        ValueError: If the 'spatial', 'start_date', or 'end_date' arguments are not valid.

    Returns:
        None: This function does not return any value.
    """

    try:
        import harmony
    except ImportError:
        install_package("harmony-py")

    import requests as re
    import geopandas as gpd
    from datetime import datetime
    from harmony import BBox, Client, Collection, Environment, Request

    if out_dir is None:
        out_dir = os.getcwd()

    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    if collection is None:
        # GEDI L4A DOI
        doi = "10.3334/ORNLDAAC/2056"

        # CMR API base url
        doisearch = f"https://cmr.earthdata.nasa.gov/search/collections.json?doi={doi}"
        concept_id = re.get(doisearch).json()["feed"]["entry"][0]["id"]
        concept_id
        collection = Collection(id=concept_id)

    if username is None:
        username = os.environ.get("EARTHDATA_USERNAME", None)
    if password is None:
        password = os.environ.get("EARTHDATA_PASSWORD", None)

    if username is None or password is None:
        raise ValueError("username and password must be provided.")

    harmony_client = Client(auth=(username, password))

    if isinstance(spatial, str):
        spatial = gpd.read_file(spatial)

    if isinstance(spatial, gpd.GeoDataFrame):
        spatial = spatial.total_bounds.tolist()

    if isinstance(spatial, list) and len(spatial) == 4:
        bounding_box = BBox(spatial[0], spatial[1], spatial[2], spatial[3])
    else:
        raise ValueError(
            "spatial must be a list of bounding box coordinates or a GeoDataFrame, or a file path."
        )

    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, "%Y-%m-%d")

    if isinstance(end_date, str):
        end_date = datetime.strptime(end_date, "%Y-%m-%d")

    if start_date is None or end_date is None:
        print("start_date and end_date must be provided.")
        temporal_range = None
    else:
        temporal_range = {"start": start_date, "end": end_date}

    request = Request(
        collection=collection,
        variables=variables,
        temporal=temporal_range,
        spatial=bounding_box,
        ignore_errors=True,
        max_results=max_results,
        **kwargs,
    )

    # submit harmony request, will return job id
    subset_job_id = harmony_client.submit(request)

    print(f"Processing job: {subset_job_id}")

    print(f"Waiting for the job to finish")
    results = harmony_client.result_json(subset_job_id, show_progress=True)

    print(f"Downloading subset files...")
    futures = harmony_client.download_all(
        subset_job_id, directory=out_dir, overwrite=overwrite
    )
    for f in futures:
        # all subsetted files have this suffix
        if f.result().endswith("subsetted.h5"):
            print(f"Downloaded: {f.result()}")

    print(f"Done downloading files.")

geojson_to_df(in_geojson, encoding='utf-8', drop_geometry=True)

Converts a GeoJSON object to a pandas DataFrame.

Parameters:

Name Type Description Default
in_geojson str | dict

The input GeoJSON file or dict.

required
encoding str

The encoding of the GeoJSON object. Defaults to "utf-8".

'utf-8'
drop_geometry bool

Whether to drop the geometry column. Defaults to True.

True

Exceptions:

Type Description
FileNotFoundError

If the input GeoJSON file could not be found.

Returns:

Type Description
pd.DataFrame

A pandas DataFrame containing the GeoJSON object.

Source code in leafmap/common.py
def geojson_to_df(in_geojson, encoding="utf-8", drop_geometry=True):
    """Converts a GeoJSON object to a pandas DataFrame.

    Args:
        in_geojson (str | dict): The input GeoJSON file or dict.
        encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8".
        drop_geometry (bool, optional): Whether to drop the geometry column. Defaults to True.

    Raises:
        FileNotFoundError: If the input GeoJSON file could not be found.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the GeoJSON object.
    """

    import json
    import pandas as pd
    from urllib.request import urlopen

    if isinstance(in_geojson, str):
        if in_geojson.startswith("http"):
            with urlopen(in_geojson) as f:
                data = json.load(f)
        else:
            in_geojson = os.path.abspath(in_geojson)
            if not os.path.exists(in_geojson):
                raise FileNotFoundError("The provided GeoJSON file could not be found.")

            with open(in_geojson, encoding=encoding) as f:
                data = json.load(f)

    elif isinstance(in_geojson, dict):
        data = in_geojson

    df = pd.json_normalize(data["features"])
    df.columns = [col.replace("properties.", "") for col in df.columns]
    if drop_geometry:
        df = df[df.columns.drop(list(df.filter(regex="geometry")))]
    return df

geojson_to_gdf(in_geojson, encoding='utf-8', **kwargs)

Converts a GeoJSON object to a geopandas GeoDataFrame.

Parameters:

Name Type Description Default
in_geojson str | dict

The input GeoJSON file or GeoJSON object as a dict.

required
encoding str

The encoding of the GeoJSON object. Defaults to "utf-8".

'utf-8'

Returns:

Type Description
geopandas.GeoDataFrame

A geopandas GeoDataFrame containing the GeoJSON object.

Source code in leafmap/common.py
def geojson_to_gdf(in_geojson, encoding="utf-8", **kwargs):
    """Converts a GeoJSON object to a geopandas GeoDataFrame.

    Args:
        in_geojson (str | dict): The input GeoJSON file or GeoJSON object as a dict.
        encoding (str, optional): The encoding of the GeoJSON object. Defaults to "utf-8".

    Returns:
        geopandas.GeoDataFrame: A geopandas GeoDataFrame containing the GeoJSON object.
    """

    import geopandas as gpd

    if isinstance(in_geojson, dict):
        out_file = temp_file_path(extension="geojson")
        with open(out_file, "w") as f:
            json.dump(in_geojson, f)
            in_geojson = out_file

    gdf = gpd.read_file(in_geojson, encoding=encoding, **kwargs)
    return gdf

geojson_to_gpkg(in_geojson, out_gpkg, **kwargs)

Converts a GeoJSON object to GeoPackage.

Parameters:

Name Type Description Default
in_geojson str | dict

The input GeoJSON file or dict.

required
out_gpkg str

The output GeoPackage path.

required
Source code in leafmap/common.py
def geojson_to_gpkg(in_geojson, out_gpkg, **kwargs):
    """Converts a GeoJSON object to GeoPackage.

    Args:
        in_geojson (str | dict): The input GeoJSON file or dict.
        out_gpkg (str): The output GeoPackage path.
    """
    import geopandas as gpd
    import json

    ext = os.path.splitext(out_gpkg)[1]
    if ext.lower() != ".gpkg":
        out_gpkg = out_gpkg + ".gpkg"
    out_gpkg = check_file_path(out_gpkg)

    if isinstance(in_geojson, dict):
        out_file = temp_file_path(extension="geojson")
        with open(out_file, "w") as f:
            json.dump(in_geojson, f)
            in_geojson = out_file

    gdf = gpd.read_file(in_geojson, **kwargs)
    name = os.path.splitext(os.path.basename(out_gpkg))[0]
    gdf.to_file(out_gpkg, layer=name, driver="GPKG")

geojson_to_mbtiles(input_file, output_file, layer_name=None, options=None, quiet=False)

Converts vector data to .mbtiles using Tippecanoe.

Parameters:

Name Type Description Default
input_file str

Path to the input vector data file (e.g., .geojson).

required
output_file str

Path to the output .mbtiles file.

required
layer_name Optional[str]

Optional name for the layer. Defaults to None.

None
options Optional[List[str]]

List of additional arguments for tippecanoe. For example '-zg' for auto maxzoom. Defaults to None.

None
quiet bool

If True, suppress the log output. Defaults to False.

False

Returns:

Type Description
Optional[str]

Output from the Tippecanoe command, or None if there was an error or if Tippecanoe is not installed.

Exceptions:

Type Description
subprocess.CalledProcessError

If there's an error executing the tippecanoe command.

Source code in leafmap/common.py
def geojson_to_mbtiles(
    input_file: str,
    output_file: str,
    layer_name: Optional[str] = None,
    options: Optional[List[str]] = None,
    quiet: bool = False,
) -> Optional[str]:
    """
    Converts vector data to .mbtiles using Tippecanoe.

    Args:
        input_file (str): Path to the input vector data file (e.g., .geojson).
        output_file (str): Path to the output .mbtiles file.
        layer_name (Optional[str]): Optional name for the layer. Defaults to None.
        options (Optional[List[str]]): List of additional arguments for tippecanoe. For example '-zg' for auto maxzoom. Defaults to None.
        quiet (bool): If True, suppress the log output. Defaults to False.

    Returns:
        Optional[str]: Output from the Tippecanoe command, or None if there was an error or if Tippecanoe is not installed.

    Raises:
        subprocess.CalledProcessError: If there's an error executing the tippecanoe command.
    """

    import subprocess
    import shutil

    # Check if tippecanoe exists
    if shutil.which("tippecanoe") is None:
        print("Error: tippecanoe is not installed.")
        print("You can install it using conda with the following command:")
        print("conda install -c conda-forge tippecanoe")
        return None

    command = ["tippecanoe", "-o", output_file]

    # Add layer name specification if provided
    if layer_name:
        command.extend(["-L", f"{layer_name}:{input_file}"])
    else:
        command.append(input_file)

    # Append additional arguments if provided
    if options:
        command.extend(options)

    try:
        process = subprocess.Popen(
            command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
        )

        if not quiet:
            for line in process.stdout:
                print(line, end="")

        exit_code = process.wait()
        if exit_code != 0:
            raise subprocess.CalledProcessError(exit_code, command)

    except subprocess.CalledProcessError