Service Integration Guide

Complete guide for integrating with all workbench services

Environment Setup

Required Python Packages

Install the following packages in your conda environment:

pip install mlflow feast pyspark kafka-python requests

Quick Start

All services are automatically configured when you install PySpark in a conda environment. The system will create necessary configuration files and set up environment variables automatically.

Service URLs

Service URL Description
MLflow http://mlflow:5000/ Model tracking and experiment management
Feast http://feast-ui:8080/ Feature store for ML features
Ollama http://ollama:11434/ Local LLM inference
vLLM http://vllm:8000/v1 High-performance LLM serving
Spark https://spark.workbench.relativelyobjective.com/ Distributed computing cluster
Kafka kafka:9092 Streaming platform (internal)

Conda Terminal

🆕 Automatic Kernel Registration

The workbench now automatically detects and registers new conda environments (including upgrades) as Jupyter kernels. No manual intervention required - just create or upgrade environments and they'll appear instantly in JupyterLab and VS Code.

Quick Start Commands

# Create a new environment
conda create -n my-env python=3.11 -y

# Activate the environment
conda activate my-env

# Install Jupyter kernel
conda install ipykernel -y

# Environment automatically appears in JupyterLab & VS Code!

Common Commands

  • conda env list - List all environments
  • conda list - List packages in current environment
  • conda info - Show conda configuration
  • conda search package-name - Search for packages
  • conda install package-name - Install package
  • conda remove package-name - Remove package
  • conda update package-name - Update package
  • conda env remove -n env-name - Delete environment
  • conda clean --all - Clean unused packages
  • ls /shared_kernels/share/jupyter/kernels/ - List registered kernels

How It Works

When you install ipykernel in any conda environment, the system automatically:

  • Detects the new environment
  • Creates a kernel specification
  • Registers it in JupyterLab
  • Registers it in VS Code

No restart needed - kernels appear immediately!

MLflow Integration

Getting Started with MLflow

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Set tracking URI
mlflow.set_tracking_uri("http://mlflow:5000/")

# Set experiment
mlflow.set_experiment("my-first-experiment")

Experiment Tracking

# Start a run
with mlflow.start_run():
    # Log parameters
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 10)
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, max_depth=10)
    model.fit(X_train, y_train)
    
    # Log metrics
    mlflow.log_metric("accuracy", model.score(X_test, y_test))
    
    # Log model
    mlflow.sklearn.log_model(model, "random-forest-model")

Model Loading

# Load a specific model version
model_uri = "runs:/{run_id}/random-forest-model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Make predictions
predictions = loaded_model.predict(X_new)

PySpark Integration

Initializing Spark Session

Connect to the Spark Cluster

To use the distributed Spark cluster, specify the master URL in your SparkSession builder. If you omit this, Spark will run in local mode and not use the cluster.

from pyspark.sql import SparkSession

# Create Spark session (connects to the cluster)
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Working with Data

# Read data
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Transform data
df_transformed = df.filter(df.column > 0) \
    .groupBy("category") \
    .agg({"value": "sum", "count": "count"})

# Write data
df_transformed.write.mode("overwrite").parquet("output/")

Spark Configuration

The Spark session is automatically configured with optimal settings for the workbench environment. Key configurations include:

  • Adaptive query execution enabled
  • Dynamic partition pruning
  • Optimized memory management
  • Web UI accessible at the Spark service URL
  • Automatic Python version alignment between Jupyter and Spark workers
  • Seamless integration with conda-store environments

🆕 PySpark Environment Compatibility (2025-07 Update)

PySpark now works seamlessly with conda-store environments. The system automatically ensures Python version compatibility between Jupyter kernels and Spark workers, eliminating version mismatch errors. Environment upgrades (e.g., Python 3.10 → 3.12) are automatically handled.

Feast Integration

Feature Store Configuration

from feast import FeatureStore

# Initialize feature store
store = FeatureStore(repo_path="feature_repo")

# Or connect to remote feature store
store = FeatureStore(
    repo_path="feature_repo",
    config_path="feature_store.yaml"
)

Feature Retrieval

# Get features for inference
features = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate"
    ],
    entity_rows=[{"driver_id": 1001}]
).to_dict()

print(features)

Feature Definitions

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import PushSource, RequestSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import PostgreSQLOfflineStoreConfig
from feast.infra.online_stores.redis import RedisOnlineStoreConfig

# Define entity
driver = Entity(
    name="driver_id",
    value_type=ValueType.INT64,
    description="driver id",
)

# Define feature view
driver_hourly_stats = FeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(hours=1),
    schema=[
        Feature(name="conv_rate", dtype=ValueType.FLOAT),
        Feature(name="acc_rate", dtype=ValueType.FLOAT),
    ],
    online=True,
    source=driver_stats_source,
)

Ollama Integration

Basic Usage

import requests
import json

OLLAMA_URL = "http://ollama:11434/api"

def generate_text(prompt, model="llama2"):
    response = requests.post(f"{OLLAMA_URL}/generate", json={
        "model": model,
        "prompt": prompt,
        "stream": False
    })
    return response.json()["response"]

# Example usage
response = generate_text("Explain machine learning in simple terms")
print(response)

Available Models

# List available models
response = requests.get(f"{OLLAMA_URL}/tags")
models = response.json()["models"]
for model in models:
    print(f"Model: {model['name']}, Size: {model['size']}")

Streaming Responses

def generate_streaming(prompt, model="llama2"):
    response = requests.post(f"{OLLAMA_URL}/generate", json={
        "model": model,
        "prompt": prompt,
        "stream": True
    }, stream=True)
    
    for line in response.iter_lines():
        if line:
            data = json.loads(line)
            if data.get("done"):
                break
            yield data.get("response", "")

# Usage
for chunk in generate_streaming("Write a short story"):
    print(chunk, end="", flush=True)

vLLM Integration

OpenAI-Compatible API

import requests
import json

VLLM_URL = "http://vllm:8000/v1"

def generate_text(prompt, model="llama2-7b-chat"):
    response = requests.post(f"{VLLM_URL}/completions", json={
        "model": model,
        "prompt": prompt,
        "max_tokens": 100,
        "temperature": 0.7,
        "stream": False
    })
    return response.json()["choices"][0]["text"]

# Example usage
response = generate_text("What is the capital of France?")
print(response)

Chat Completions

def chat_completion(messages, model="llama2-7b-chat"):
    response = requests.post(f"{VLLM_URL}/chat/completions", json={
        "model": model,
        "messages": messages,
        "max_tokens": 150,
        "temperature": 0.7
    })
    return response.json()["choices"][0]["message"]["content"]

# Example conversation
messages = [
    {"role": "user", "content": "Hello, how are you?"},
    {"role": "assistant", "content": "I'm doing well, thank you!"},
    {"role": "user", "content": "What can you help me with?"}
]

response = chat_completion(messages)
print(response)

Available Models

# List available models
response = requests.get(f"{VLLM_URL}/models")
models = response.json()["data"]
for model in models:
    print(f"Model: {model['id']}")

Kafka Integration

Producer Setup

from kafka import KafkaProducer
import json

# Create producer
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

Sending Messages

# Send message
message = {"user_id": 123, "action": "login", "timestamp": "2024-01-01T12:00:00Z"}
producer.send('user_events', key='user_123', value=message)

# Flush to ensure delivery
producer.flush()

Consumer Setup

from kafka import KafkaConsumer
import json

# Create consumer
consumer = KafkaConsumer(
    'user_events',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    group_id='my_consumer_group'
)

# Consume messages
for message in consumer:
    print(f"Received: {message.value}")
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")

Integration Examples

End-to-End ML Pipeline

import mlflow
from pyspark.sql import SparkSession
from feast import FeatureStore
import requests

# 1. Initialize services
mlflow.set_tracking_uri("http://mlflow:5000/")
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()
store = FeatureStore(repo_path="feature_repo")

# 2. Load and process data with Spark
df = spark.read.csv("data.csv", header=True, inferSchema=True)
processed_data = df.filter(df.quality > 0.5).toPandas()

# 3. Get features from Feast
features = store.get_online_features(
    features=["user_features:age", "user_features:income"],
    entity_rows=[{"user_id": 123}]
).to_dict()

# 4. Train model and log with MLflow
with mlflow.start_run():
    # Your training code here
    mlflow.log_metric("accuracy", 0.95)
    mlflow.sklearn.log_model(model, "my_model")

# 5. Generate insights with vLLM
insights = requests.post("http://vllm:8000/v1/completions", json={
    "model": "llama2-7b-chat",
    "prompt": f"Analyze this model performance: accuracy={0.95}",
    "max_tokens": 100
}).json()["choices"][0]["text"]

print(insights)

Real-time Data Processing

from kafka import KafkaProducer, KafkaConsumer
from pyspark.sql import SparkSession
import json

# Setup
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

consumer = KafkaConsumer(
    'sensor_data',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

spark = SparkSession.builder.appName("Streaming").getOrCreate()

# Process streaming data
for message in consumer:
    data = message.value
    
    # Process with Spark
    df = spark.createDataFrame([data])
    result = df.select("*").filter(df.temperature > 25)
    
    # Send processed data to another topic
    producer.send('processed_data', value=result.toPandas().to_dict('records')[0])
    producer.flush()

Troubleshooting

Connection Tests

Test connectivity to all services using the following code examples:

Quick Test Script

Run this script to test all service connections at once:

MLflow Connection

import requests
try:
    response = requests.get("http://mlflow:5000/")
    print(f"MLflow: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
    print(f"MLflow: Error - {e}")

Feast Connection

try:
    response = requests.get("http://feast-ui:8080/")
    print(f"Feast: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
    print(f"Feast: Error - {e}")

vLLM Connection

try:
    response = requests.get("http://vllm:8000/v1/models")
    print(f"vLLM: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
    print(f"vLLM: Error - {e}")

Spark Connection

try:
    response = requests.get("https://spark.workbench.relativelyobjective.com/")
    print(f"Spark: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
    print(f"Spark: Error - {e}")

Common Issues

If connections fail, check your network settings and ensure all services are running properly.

🆕 Environment & Kernel Issues (2025-07 Update)

If you encounter issues with Jupyter kernels or PySpark compatibility:

Kernel Not Appearing in JupyterLab

# Check if the watcher script is running
docker exec jupyterlab ps aux | grep utility-watch-envs

# Check watcher logs
docker exec jupyterlab tail -n 20 /scripts/utility-watch-envs.sh.log

# Manually trigger kernel registration (if needed)
docker exec jupyterlab bash -c "cd /scripts && ./utility-watch-envs.sh"

PySpark Python Version Mismatch

# Check your Jupyter kernel Python version
import sys
print(f"Python version: {sys.version}")

# Check if using the correct conda-store environment
import os
print(f"Python executable: {sys.executable}")

# Should show path like: /opt/conda_store/data/store/default/.../bin/python

Automatic Resolution

The watcher script automatically handles most environment issues. If problems persist, restart JupyterLab to pick up new kernel registrations.