For Spark <3.5.0, this option must be unspecified. For Spark >=3.5.0, use --auth-type APPLICATION_DEFAULT for your laptop. Use --auth-type COMPUTE_ENGINE for GCE VMs. Use SERVICE_ACCOUNT_JSON_KEYFILE with --key-file-path for an explicit key file location. For Spark >=3.5.0, we default to APPLICATION_DEFAULT.") p.add_argument("-k", "--key-file-path", help="Required for Spark <3.5.0. Service account key .json path. This path is just added to the spark config file. The .json file itself doesn't need to exist until the GCS connector is first used.") p.add_argument("--gcs-requester-pays-project", "--gcs-requestor-pays-project", help="If specified, this google cloud project will be charged for access to " "requester pays buckets via spark/hadoop. See https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#cloud-storage-requester-pays-feature-configuration") args = p.parse_args() if args.key_file_path and not os.path.isfile(args.key_file_path): logging.warning(f"{args.key_file_path} file doesn't exist") if THE_SPARK_VERSION >= (3, 5, 0): if args.key_file_path is not None and args.auth_type != "SERVICE_ACCOUNT_JSON_KEYFILE": raise ValueError(f"In Spark >=3.5.0, when --key-file-path is specified, --auth-type must be SERVICE_ACCOUNT_JSON_KEYFILE") if args.auth_type is None: args.auth_type = "APPLICATION_DEFAULT" valid_auth_types = { "ACCESS_TOKEN_PROVIDER", "APPLICATION_DEFAULT", "COMPUTE_ENGINE", "SERVICE_ACCOUNT_JSON_KEYFILE", "UNAUTHENTICATED", "USER_CREDENTIALS" } if args.auth_type not in valid_auth_types: raise ValueError(f'--auth-type must be one of {" ".join(valid_auth_types)}, found: {args.auth_type}') else: if args.auth_type is not None: raise ValueError(f"--auth-type cannot be used with Spark <3.5.0. We think you have spark {THE_SPARK_VERSION}.") if not args.key_file_path: # look for existing key files in ~/.config key_file_regexps = [ "~/.config/gcloud/application_default_credentials.json", "~/.config/gcloud/legacy_credentials/*/adc.json", ] # if more than one file matches a glob pattern, select the newest. key_file_sort = lambda file_path: -1 * os.path.getctime(file_path) for key_file_regexp in key_file_regexps: paths = sorted(glob.glob(os.path.expanduser(key_file_regexp)), key=key_file_sort) if paths: args.key_file_path = next(iter(paths)) logging.info(f"Using key file: {args.key_file_path}") break else: regexps_string = ' '.join(key_file_regexps) p.error(f"No json key files found in these locations: \n\n {regexps_string}\n\n" "Run \n\n gcloud auth application-default login \n\nthen rerun this script, " "or use --key-file-path to specify where the key file exists (or will exist later).\n") return args def is_dataproc_VM(): """Check if this installation is running on a Dataproc VM""" try: dataproc_metadata = urllib.request.urlopen("http://metadata.google.internal/0.1/meta-data/attributes/dataproc-bucket").read() if dataproc_metadata.decode("UTF-8").startswith("dataproc"): return True except: pass return False def main(): if is_dataproc_VM(): logging.info("This is a Dataproc VM which should already have the GCS cloud connector installed. Exiting...") return args = parse_args() spark_home = _find_spark_home() # download GCS connector jar try: gcs_connector_url = get_gcs_connector_url() except Exception as e: logging.error(f"get_gcs_connector_url() failed: {e}") return local_jar_path = os.path.join(spark_home, "jars", os.path.basename(gcs_connector_url)) logging.info(f"Downloading {gcs_connector_url}") logging.info(f" to {local_jar_path}") try: urllib.request.urlretrieve(gcs_connector_url, local_jar_path) except Exception as e: logging.error(f"Unable to download GCS connector to {local_jar_path}. {e}") return # update spark-defaults.conf spark_config_dir = os.path.join(spark_home, "conf") if not os.path.exists(spark_config_dir): os.mkdir(spark_config_dir) spark_config_file_path = os.path.join(spark_config_dir, "spark-defaults.conf") if THE_SPARK_VERSION >= (3, 5, 0): logging.info(f"Updating {spark_config_file_path} fs.gs.auth.type") logging.info(f"Setting fs.gs.auth.type = {args.auth_type}") spark_config_lines = [ f"spark.hadoop.fs.gs.auth.type {args.auth_type}\n", ] if args.key_file_path: spark_config_lines = [ f"spark.hadoop.fs.gs.auth.service.account.json.keyfile {args.key_file_path}\n", ] else: logging.info(f"Updating {spark_config_file_path} json.keyfile") logging.info(f"Setting json.keyfile = {args.key_file_path}") spark_config_lines = [ "spark.hadoop.google.cloud.auth.service.account.enable true\n", f"spark.hadoop.google.cloud.auth.service.account.json.keyfile {args.key_file_path}\n", ] if args.gcs_requester_pays_project: spark_config_lines.extend([ "spark.hadoop.fs.gs.requester.pays.mode AUTO\n", f"spark.hadoop.fs.gs.requester.pays.project.id {args.gcs_requester_pays_project}\n", ]) try: # spark hadoop options docs @ https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#cloud-storage-requester-pays-feature-configuration if os.path.isfile(spark_config_file_path): with open(spark_config_file_path, "rt") as f: for line in f: # avoid duplicating options if any([option.split(' ')[0] in line for option in spark_config_lines]): continue spark_config_lines.append(line) with open(spark_config_file_path, "wt") as f: for line in spark_config_lines: f.write(line) except Exception as e: logging.error(f"Unable to update spark config {spark_config_file_path}. {e}") return if __name__ == "__main__": main()