Page MenuHomePhabricator
Paste P15638

image classification gpu
ActivePublic

Authored by AikoChou on Apr 29 2021, 6:44 AM.
import os
from os import environ
from subprocess import Popen, PIPE
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["HADOOP_HOME"] = "/usr/lib/hadoop"
os.environ["HADOOP_CONF_DIR"] = "/usr/lib/hadoop/etc/hadoop"
os.environ["HADOOP_HDFS_HOME"] = "/usr/lib/hadoop-hdfs"
def set_env_variable(key, value):
bash_variable = value
capture = Popen(f"echo {bash_variable}", stdout=PIPE, shell=True)
std_out, std_err = capture.communicate()
return_code = capture.returncode
if return_code == 0:
evaluated_env = std_out.decode().strip()
environ[key] = evaluated_env
else:
print(f"Error: Unable to find environment variable {bash_variable}")
set_env_variable("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:${JAVA_HOME}/jre/lib/amd64/server")
set_env_variable("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:/usr/lib")
set_env_variable("CLASSPATH", "$(${HADOOP_HOME}/bin/hadoop classpath --glob)")
set_env_variable("KRB5CCNAME", "/tmp/krb5cc_$(id -u)")
import logging
logging.basicConfig(level="INFO") # noqa
import sys
import getpass
import gzip
import subprocess
import os
import skein
import numpy as np
from functools import partial
from datetime import datetime
from enum import Enum
from tensorflow import keras
import cluster_pack
from tf_yarn import event, TaskSpec, Experiment, run_on_yarn, get_safe_experiment_fn
import tensorflow as tf
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
if gpu_devices:
tf.config.experimental.set_memory_growth(gpu_devices[0], True)
USER = getpass.getuser()
DATA_DIR = f"hdfs://analytics-hadoop/user/aikochou/maiolica_vs_sculptures.tfrecords"
HDFS_DIR = (f"{cluster_pack.get_default_fs()}user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")
class NodeLabel(Enum):
CPU = ""
GPU = "GPU"
def experiment_fn(hdfs_dir: str) -> Experiment:
IMG_SIZE = 160 # All images will be resized to 160x160
def preprocess(image, label):
image = tf.reshape(image, [IMG_SIZE, IMG_SIZE, 3])
image = tf.cast(image, tf.float32)
image = (image/127.5) - 1
image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
return image, label
feature_dim=76800
def parse_example(example):
features = tf.io.parse_single_example(example, features={
"image": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"label": tf.io.FixedLenFeature([], dtype=tf.float32)
})
x = features["image"]
y = features["label"]
return x, y
train_fraction = 0.7
def in_training_set(*row):
num_buckets = 100
key = tf.strings.join(list(row))
bucket_id = tf.strings.to_hash_bucket_fast(key, num_buckets)
return bucket_id < int(train_fraction * num_buckets)
def in_test_set(*row):
return ~in_training_set(*row)
def train_input_fn():
data = tf.data.TFRecordDataset(DATA_DIR).filter(in_training_set).map(parse_example)
return (data.map(preprocess)
.shuffle(100)
.batch(8)
.repeat())
def eval_input_fn():
data = tf.data.TFRecordDataset(DATA_DIR).filter(in_test_set).map(parse_example)
return (data.map(preprocess)
.shuffle(100)
.batch(8))
keras_mobilenet_v2 = tf.keras.applications.MobileNetV2(
input_shape=(160, 160, 3), include_top=False, weights=None)
keras_mobilenet_v2.load_weights("mobilenet_v2_weights.h5")
keras_mobilenet_v2.trainable = False
model = tf.keras.Sequential([
keras_mobilenet_v2,
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(1)
])
model.summary()
model.compile(
optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy'])
config = tf.estimator.RunConfig(model_dir=hdfs_dir)
estimator = tf.keras.estimator.model_to_estimator(model, config=config)
return Experiment(
estimator,
tf.estimator.TrainSpec(
train_input_fn,
max_steps=100),
tf.estimator.EvalSpec(
eval_input_fn,
steps=10,
start_delay_secs=0,
throttle_secs=30))
# there seem to be pickling issues with Keras
# the experiment function is uploaded without pickling the experiment
# also see https://github.com/tensorflow/tensorflow/issues/32159
def get_safe_exp_fn():
return get_safe_experiment_fn("image_classification-gpu.experiment_fn", HDFS_DIR)
def main():
# forcing call to model_to_estimator._save_first_checkpoint l457
# https://github.com/tensorflow/estimator/blob/ \
# 1d55f01d8af871a35ef83fc3354b9feaa671cbe1/tensorflow_estimator/python/estimator/keras.py
# otherwise there is a race condition
# when all workers try to save the first checkpoint at the same time
experiment_fn(HDFS_DIR)
pyenv_zip_path = {NodeLabel.GPU: "/home/aikochou/tf-yarn-rocm2.zip", NodeLabel.CPU: "/home/aikochou/tf-yarn-env.zip"}
editable_requirements = cluster_pack.get_editable_requirements()
run_on_yarn(
pyenv_zip_path,
get_safe_exp_fn(),
task_specs={
"chief": TaskSpec(memory="2 GiB", vcores=4, label=NodeLabel.GPU),
"worker": TaskSpec(memory="2 GiB", vcores=4, instances=4, label=NodeLabel.GPU),
"ps": TaskSpec(memory="2 GiB", vcores=4, instances=2, label=NodeLabel.GPU),
"evaluator": TaskSpec(memory="2 GiB", vcores=1, label=NodeLabel.GPU)
},
queue="fifo",
files={
**editable_requirements,
'image_classification-gpu.py': '/home/aikochou/tf-yarn/image_classification-gpu.py',
'mobilenet_v2_weights.h5': '/home/aikochou/tf-yarn/mobilenet_v2_weights.h5'
},
pre_script_hook = 'export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server:/usr/lib && ' +
'export CLASSPATH=$CLASSPATH:`hadoop classpath --glob`'
)
if __name__ == "__main__":
main()