.NET8でCSVファイルをPostgreSQLに定期的にインポートしたい
やりたいこと
- CSVファイルからデータを読み取り、そのデータをデータベース(PostgreSQL)に定期的に登録したい。
- csvは複数存在し、上書き(デリートインサート)か追加(既に存在するものは変更しない)を選べるようにしたい。
- すでにBlazorのシステムがあるのでこのシステム内で実装したい。
前提
-
.Net8 ASP.NET Core Blazor(Server)
- 今回Blazorはあまり関係ない。
-
postgreSQLに次のテーブルが存在し、EntityFrameworkCoreでアクセス可能な状態
社員(employees) : employee_code text,employee_name text, employee_age integer
部署(departments) : department_code text, department_name text -
ひとまず以下のようなcsvを想定
- 社員一覧csv(Employee.csv)
社員コード | 名前 | 年齢 | 余計な情報 |
---|---|---|---|
S001 | hoge | 10 | ... |
S002 | huga | 20 | ... |
- 部署一覧csv(Department.csv)
部署コード | 部署名 |
---|---|
B001 | ○○部 |
B002 | ××部 |
できたもの
とりあえず動くものはこれ。
作業手順
バッチ処理の作成
定時実行する処理はHostedService
を利用する。
ここではCsvToDBServiceというクラスを作る。
/// <summary>
/// バッチ処理でcsvファイルをDBに登録するサービス
/// </summary>
public class CsvToDBService : IHostedService, IDisposable
{
private readonly ILogger<CsvToDBService> _logger;
private Timer? _timer;
public CsvToDBService(ILogger<CsvToDBService> logger, IConfiguration configuration)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("CsvToDBService is starting.");
//とりあえず30秒ごとに処理を行う
_timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
return Task.CompletedTask;
}
private void DoWork(object? state)
{
_logger.LogInformation("CsvToDBService is working.");
//ここでcsvファイルをDBに登録する処理を行う
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("CsvToDBService is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
このDoWork
内に今回やりたいことを書いていく。
CSVHelperのインストール
csvの利用にはCSVHelperを利用する。
手順に従ってインストールする。csvを読み込むクラスの作成
csvの読み込みを行うクラスを作成する。
using CsvHelper;
using CsvHelper.Configuration;
using System.Globalization;
namespace BlazorApp9.Helper
{
/// <summary>
/// CSVの読み込みを行うクラスのインターフェース
/// </summary>
public interface ICsvFileProcessor
{
IEnumerable<T> ReadCsvFile<T, U>(string filePath) where U : ClassMap<T>;
}
/// <summary>
/// CSVの読み込みを行うクラス
/// </summary>
public class CsvFileProcessor : ICsvFileProcessor
{
public IEnumerable<T> ReadCsvFile<T, U>(string filePath) where U : ClassMap<T>
{
using (var reader = new StreamReader(filePath, System.Text.Encoding.GetEncoding("shift_jis"))
) using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
{
csv.Context.RegisterClassMap<U>();
return csv.GetRecords<T>().ToList();
}
}
}
}
shift_jisの場合はProgram.csに
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
が必要
Repositoryクラスの作成
今回のcsv取り込みは以下の2パターンのみ対応できれば良い。
- 既存のデータをすべて上書きする(デリートインサート)
- 存在しないデータのみ追加
DBとのやり取りはRepositoryクラスに任せる。
ここではCsvImportRepository
クラスを作成する。
using System.Reflection;
namespace BlazorApp.Data
{
/// <summary>
/// CSVインポート機能を提供するリポジトリのインターフェース
/// </summary>
public interface ICsvImportRepository
{
void ReplaceRecords(Type entityType, IEnumerable<object> records);
void AddRecordsWithIgnore(Type modelType, IEnumerable<object> records);
}
/// <summary>
/// CSVインポート機能の実装クラス
/// </summary>
public class CsvImportRepository : ICsvImportRepository
{
private readonly MyDbContext _context;
public CsvImportRepository(MyDbContext context)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public void AddRecords(Type entityType, IEnumerable<object> records)
{
MethodInfo method = this.GetType().GetMethod("AddRecordsGeneric", BindingFlags.NonPublic | BindingFlags.Instance)
?? throw new InvalidOperationException("AddRecordsGeneric method not found.");
MethodInfo generic = method.MakeGenericMethod(entityType);
generic.Invoke(this, [records]);
}
private void AddRecordsGeneric<TEntity>(IEnumerable<TEntity> records) where TEntity : class
{
var dbSet = _context.Set<TEntity>();
dbSet.AddRange(records);
_context.SaveChanges();
}
/// <summary>
/// DBのデータを全削除して新しいデータを追加する
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <param name="records"></param>
public void ReplaceRecords(Type entityType, IEnumerable<object> records)
{
MethodInfo method = this.GetType().GetMethod("ReplaceRecordsGeneric", BindingFlags.NonPublic | BindingFlags.Instance)
?? throw new InvalidOperationException("ReplaceRecordsGeneric method not found.");
MethodInfo generic = method.MakeGenericMethod(entityType);
generic.Invoke(this, [records]);
}
/// <summary>
/// DBのデータを全削除して新しいデータを追加する
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <param name="records"></param>
private void ReplaceRecordsGeneric<TEntity>(IEnumerable<TEntity> records) where TEntity : class
{
var dbSet = _context.Set<TEntity>();
dbSet.RemoveRange(dbSet);
dbSet.AddRange(records);
_context.SaveChanges();
}
public void AddRecordsWithIgnore(Type entityType, IEnumerable<object> records)
{
MethodInfo method = this.GetType().GetMethod("AddRecordsWithIgnoreGeneric", BindingFlags.NonPublic | BindingFlags.Instance)
?? throw new InvalidOperationException("AddRecordsWithIgnoreGeneric method not found.");
MethodInfo generic = method.MakeGenericMethod(entityType);
generic.Invoke(this, [records]);
}
private void AddRecordsWithIgnoreGeneric<TEntity>(IEnumerable<TEntity> records) where TEntity : class
{
var dbSet = _context.Set<TEntity>();
//主キー情報
var keyProperties = _context.Model.FindEntityType(typeof(TEntity)).FindPrimaryKey().Properties;
//主キーが重複している場合は無視する
foreach (var record in records)
{
var keyValues = keyProperties.Select(p => record.GetType().GetProperty(p.Name).GetValue(record)).ToArray();
var exists = dbSet.Find(keyValues) != null;
if (!exists)
{
dbSet.Add(record);
}
}
_context.SaveChanges();
}
}
}
csvとModelクラスのマッピング
現状ではcsvの列名と対象のテーブル(モデルクラス)の名称が異なっているため、マッピング用のクラスを作成する。
public class EmployeeMap : ClassMap<Employee>
{
public EmployeeMap()
{
Map(e => e.EmployeeCode).Name("社員コード");
Map(e => e.EmployeeName).Name("名前");
Map(e => e.Age).Name("年齢").TypeConverter<ZeroIfEmptyIntConverter>();
}
}
public class DepartmentMap : ClassMap<Department>
{
public DepartmentMap()
{
Map(d => d.DepartmentCode).Name("部署コード");
Map(d => d.DepartmentName).Name("部署名");
}
}
public class ZeroIfEmptyIntConverter : Int32Converter
{
public override object ConvertFromString(string text, IReaderRow row, MemberMapData memberMapData)
{
if (string.IsNullOrWhiteSpace(text))
{
return 0;
}
return base.ConvertFromString(text, row, memberMapData);
}
}
public class FileModelMapping
{
public string FileName { get; set; } = "";
public string ModelType { get; set; } = "";
public string MapModelType { get; set; } = "";
public string UpdateMethod { get; set; } = "";//"add" or "replace"
}
ついでに年齢が空欄の場合は0で埋めるようにしている。
appsettings.jsonに各種設定情報をセット
これらの情報をappsettings.jsonで管理する。
セキュリティ的にまずい?
- csvファイルの保存先フォルダ
- csvファイル名とモデルクラスの対応
"CsvFolderPath": "c:\\path\\csvfolder",
"FileModelMappings": [
{
"FileName": "Employee",
"ModelType": "BlazorApp.Models.Employee",
"MapModelType": "BlazorApp.Models.CsvMap.EmployeeMap",
"UpdateMethod": "add"
},
{
"FileName": "Department",
"ModelType": "BlazorApp.Models.Department",
"MapModelType": "BlazorApp.Models.CsvMap.DepartmentMap",
"UpdateMethod": "replace"
}
]
これらを組み合わせる
CsvToDBService
に肉付けをする。
まずはコンストラクタで必要なものを持ってくる。
public class CsvToDBService : IHostedService, IDisposable
{
private readonly ILogger<CsvToDBService> _logger;
private Timer? _timer;
private readonly string _csvFolderPath;
private readonly IServiceScopeFactory _scopeFactory;
private readonly List<FileModelMapping> _fileModelMappings;
public CsvToDBService(ILogger<CsvToDBService> logger, IConfiguration configuration, IServiceScopeFactory scopeFactory)
{
_logger = logger;
_csvFolderPath = configuration.GetValue<string>("CsvFolderPath") ?? throw new ArgumentNullException(nameof(configuration));
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(configuration));
_fileModelMappings = configuration.GetSection("FileModelMappings").Get<List<FileModelMapping>>() ?? throw new ArgumentNullException(nameof(configuration));
}
//
}
ICsvFileProcessorではなく一旦Scopeを持ってくる必要がある...
DoWork
内の処理を完成させる。
private void DoWork(object? state)
{
_logger.LogInformation("CsvToDBService is working.");
//csvファイルをDBに登録する処理
foreach (var filePath in Directory.GetFiles(_csvFolderPath, "*.csv"))
{
using (var scope = _scopeFactory.CreateScope())
{
try
{
var fileName = Path.GetFileNameWithoutExtension(filePath);
var mapping = GetMappingFromFileName(fileName);
if (mapping == null)
{
_logger.LogWarning($"No mapping found for file {fileName}");
continue;
}
var csvFileProcessor = scope.ServiceProvider.GetRequiredService<ICsvFileProcessor>();
var csvImportRepository = scope.ServiceProvider.GetRequiredService<ICsvImportRepository>();
Type modelType = Type.GetType(mapping.ModelType) ?? throw new ArgumentNullException(nameof(mapping.ModelType));
Type mapModelType = Type.GetType(mapping.MapModelType) ?? throw new ArgumentNullException(nameof(mapping.MapModelType));
var method = typeof(CsvFileProcessor).GetMethod("ReadCsvFile")?.MakeGenericMethod(modelType, mapModelType);
if (method == null)
{
_logger.LogError($"The generic method 'ReadCsvFile<{modelType.Name}, {mapModelType.Name}>' could not be found.");
continue;
}
var records = method.Invoke(csvFileProcessor, [ filePath ]) as IEnumerable<object>;
if (records == null)
{
_logger.LogError($"Failed to read CSV file '{filePath}' with model type '{modelType.Name}' and map model type '{mapModelType.Name}'.");
continue;
}
if (mapping.UpdateMethod == "add")
{
csvImportRepository.AddRecordsWithIgnore(modelType, records);
}
else if (mapping.UpdateMethod == "replace")
{
csvImportRepository.ReplaceRecords(modelType, records);
}
_logger.LogInformation($"Successfully processed {mapping.UpdateMethod} for file: {filePath}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in CsvToDBService");
}
}
}
}
private FileModelMapping GetMappingFromFileName(string fileName)
{
return _fileModelMappings.FirstOrDefault(m => fileName.StartsWith(m.FileName));
}
リフレクションって処理速度に影響する?
Program.csでDIの追加
これらのクラスを依存性注入(DI)するためにProgram.csを修正。
builder.Services.AddScoped<ICsvImportRepository, CsvImportRepository>();
builder.Services.AddTransient<ICsvFileProcessor, CsvFileProcessor>();
builder.Services.AddHostedService<CsvToDBService>();
おわり。
参考サイト
Discussion